Skip to content
Snippets Groups Projects
submit_test.py 8.54 KiB
Newer Older
#!/usr/bin/env python3
import argparse
import logging
import os
import shutil
import subprocess
import sys
import time

from lava_create_testreport import lava_create_testreport

TESTS_GIT_URL = "git@gitlab.com:garz-fricke/tests/development-tests.git"


    logging.debug(f"Call: {cmd}")
    try:
        if stdout is None:
            result = subprocess.run(cmd, capture_output=True, check=True)
        else:
            result = subprocess.run(cmd, stdout=stdout, check=True)
    except subprocess.CalledProcessError as e:
        out = e.stdout.decode() if e.stdout is not None else ""
        err = e.stderr.decode() if e.stderr is not None else ""
        logging.error(f"Command failed {cmd}: {out} {err}")
        exit(1)
    if result is not None and result.stdout is not None:
        res = result.stdout.decode().strip()
        logging.debug(f"Command returned: {res}")
    # FIXME: Remove the sourcery check deactivation below and refactor this method in
    # order to enhance code quality and make the check pass.
    # sourcery skip: low-code-quality
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--verbose",
        help="""More verbose output.""",
        action="store_true",
        default=False,
    )
    parser.add_argument(
        "--fng-install",
        help="""Url to the download link of the install script.""",
        dest="fnginstall",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--name",
        help="""Base name for the submitted job.""",
        default="Testjob for {platform}",
    )
    parser.add_argument(
        "--dry",
        help="""Don't really submit anything""",
        action="store_true",
        default=False,
    )
    parser.add_argument(
        "--test-repo",
        help="""Repository where the tests and access scripts are stored.""",
        default=TESTS_GIT_URL,
        dest="testrepo",
    )
    parser.add_argument(
        "--test-repo-branch",
        help="""Branch of the repository to check out.""",
        default=None,
        dest="testrepo_branch",
    )
        help="""When set the script expects the tests to be checkout out in the given directory.""",
        default=None,
        dest="checkout_path",
    )
    parser.add_argument(
        "--results-path",
        help="""Subfolder where the result files are stored.""",
        default="results",
        dest="results_path",
    )
    parser.add_argument(
        "--all-devices",
        help="""Submit the test to all devices of a given device type, platforms parameter is treated as devicetype in this case.""",
        default=False,
        action="store_true",
        dest="all_devices",
    )
    parser.add_argument(
        "--test-plan",
        help="""Name pattern of the test plan to use, file needs to be
                found in the 'tests' subfolder of the test repo and may use
                {platform} to be replaced by the actual platform name.""",
        default="{platform}.jinja2",
        dest="test_suite",
    )
    parser.add_argument(
        "--report-name",
        help="""Name of the generated markdown result. If omnitted, report is generation is skipped.""",
        default=None,
        dest="report_name",
    )
    # TODO add parameters to specify branch or revision,
    # but this is needed to be implemented in the tests also.

    parser.add_argument(
        "platforms",
        help="""Platforms to submit the tests for.""",
    )

    args, _ = parser.parse_known_args()

    if args.verbose:
        logging.basicConfig(level=10)
    else:
        logging.basicConfig(level=40)

    if args.checkout_path is None:
        checkout_path = "test"
        if os.path.exists(checkout_path):
            shutil.rmtree(checkout_path)

        logging.debug(f"Cloning {args.testrepo}")
            if args.testrepo_branch is not None:
                testrepo = git.Repo.clone_from(
                    args.testrepo, checkout_path, branch=args.testrepo_branch
                )
            else:
                testrepo = git.Repo.clone_from(args.testrepo, checkout_path)
        except git.GitError as error:
            print("Failed to clone the test repo:", error)
            sys.exit(1)
    else:
        checkout_path = args.checkout_path
        logging.debug(f"Using test repo at {checkout_path}")
    testrepo_revision = testrepo.head.commit.hexsha

    if os.path.exists(args.results_path):
        shutil.rmtree(args.results_path)
    os.mkdir(args.results_path)

    jobs = {}

    cmd_submit = os.path.join(checkout_path, "scripts", "submit.py")
    cmd_query = os.path.join(checkout_path, "scripts", "query.py")
    cmd_submitall = os.path.join(checkout_path, "scripts", "submit_all.py")
    cmd_generate = os.path.join(checkout_path, "scripts", "generate_lava_job.py")

    logging.debug(f"Test suite {args.test_suite}")

        test_suite = os.path.join(
            checkout_path, "tests", args.test_suite.format(platform=platform)
        )
        logging.debug(f"Test suite {test_suite}")

        if os.path.splitext(test_suite)[1] == ".jinja2":
            cmd = [
                cmd_generate,
                "--test-plan",
                test_suite,
                "--test-repo",
                args.testrepo,
                "--testrepo-revision",
                testrepo_revision,
Tim Jaacks's avatar
Tim Jaacks committed
                "--job-name",
                args.name.format(platform=platform),
            ]
            logging.debug(f"Generate job: {cmd}")
            jobfile = os.path.join(args.results_path, f"{platform}.yaml")
            with open(jobfile, "w", encoding="utf-8") as jobfile_handle:
                call(cmd, stdout=jobfile_handle)
        else:
            jobfile = test_suite

        if args.all_devices:
            cmd = [cmd_submitall, "--device-type", platform, jobfile]
        else:
            cmd = [cmd_submit, jobfile]

            print(f"Skipping submit because of dry run: {cmd}")
        result = call(cmd)

        for line in result.splitlines():
            url = line.strip().replace("\n", "")
            print(f"Started testjob {platform}: {url}")
            jobid = url.split("/")[-1]
            jobs[jobid] = [platform, ""]

    logging.debug(f"Queued jobs: {jobs}")
        logging.info("No jobs queued.")
        return 0

    # Wait for the results
    pending = True
    while pending:
        time.sleep(10)
        print(".", end="", flush=True)
        pending = False
        for jobid in jobs:
            result = call([cmd_query, jobid]).replace("\n", "")
            jobs[jobid][1] = result
            if result == "Pending":
                pending = True

    # Check the results
    passed = True
    print("")
    for jobid in jobs:
        jobplatform = jobs[jobid][0]
        jobresult = jobs[jobid][1]

        # Get results as yaml
        resultfile = os.path.join(
            args.results_path, f"results-{jobid}-{jobplatform}.yaml"
        with open(resultfile, "w", encoding="utf-8") as resultfile_handle:
            call([cmd_query, "--get-results", jobid], stdout=resultfile_handle)

        # Get results as junit xml
        resultfile = os.path.join(
            args.results_path, f"results-{jobid}-{jobplatform}.xml"
        with open(resultfile, "w", encoding="utf-8") as resultfile_handle:
            call(
                [
                    cmd_query,
                    "--get-results",
                    "--result-format",
                    "junit",
                    jobid,
                ],
                stdout=resultfile_handle,
            )
        # Read the number of errors from the results
        resultsxml = junitparser.JUnitXml.fromfile(resultfile)
        joberrors = resultsxml.errors

        if jobresult != "Complete" or joberrors > 0:
            passed = False
        print(f"Test result for {jobplatform}: {joberrors} Errors, {jobresult}")
    # Create report as MarkDown
    if args.report_name is not None:
        filename = os.path.join(args.results_path, args.report_name)
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        with open(filename, "w", encoding="utf-8") as f:
            f.write(lava_create_testreport(jobs))