diff --git a/scripts/buildartifacts.py b/scripts/buildartifacts.py new file mode 100755 index 0000000000000000000000000000000000000000..1f6ee86826017707d45f3954ea0cee773cd020b9 --- /dev/null +++ b/scripts/buildartifacts.py @@ -0,0 +1,74 @@ +from typing import Optional + +import lxml.html +import requests +from gitlab.v4.objects import Project +from gitlab.v4.objects.pipelines import ProjectPipelineJob + + +class BuildArtifacts: + def __init__(self, project: Project, pipelinejob: ProjectPipelineJob): + self.project = project + self.pipelinejob = pipelinejob + + def list_dir(self, path: str) -> dict[str, str]: + """ + Get a list of the files and directories for the specified path. + + Args: + path: str — relative path in the job artifacts. + + Returns: + A dictionary containing name of files as a key and file size in human-readable form as value. + """ + + url = f"{self.project.web_url}/-/jobs/{self.pipelinejob.id}/artifacts/browse/{path}" + r = requests.get(url) + if r.status_code != 200: + return {} + + ret = {} + + root = lxml.html.fromstring(r.text) + for tree_item in root.find_class("tree-item"): + name = tree_item.find_class("tree-item-file-name")[0].text_content().strip() + size = tree_item.xpath(".//td/text()")[-1].strip() + + if not size: + size = "dir" + + ret[name] = size + + return ret + + def get_artifact(self, path: str, outfile: Optional[str] = None) -> Optional[bytes]: + """ + Get a single artifact file from GitLab. + Save it to the file if "outfile" arg is specified. + + Args: + path: str — relative path to artifact file. + outfile: str — path to save the output file. + + Returns: + None if "outfile" arg is given. Otherwise, returns the artifact file as bytes. + """ + + job = self.project.jobs.get(self.pipelinejob.id, lazy=True) + + if not outfile: + return job.artifact(path) + + with open(outfile, "wb") as f: + job.artifact(path, streamed=True, action=f.write) + return None + + def get_archive_size(self) -> int: + """ + Get the size of compressed artifacts (artifacts.zip). + + Returns: + An integer containing the size of archive in bytes. + """ + + return self.pipelinejob.artifacts_file["size"] diff --git a/scripts/fullbuildpipeline.py b/scripts/fullbuildpipeline.py new file mode 100755 index 0000000000000000000000000000000000000000..2c0737cebbb644317999458c2a168aec7a605280 --- /dev/null +++ b/scripts/fullbuildpipeline.py @@ -0,0 +1,112 @@ +import fnmatch +import sys +import time + +from gitlab.v4.objects import Project +from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob + + +class FullBuildPipeline: + def __init__(self, project: Project, commit_sha: str): + self.project = project + self.commit_sha = commit_sha + self.upstream_pipeline = self.__get_upstream_pipeline() + self.build_pipelines = self.__get_build_pipelines() + + def __get_upstream_pipeline(self) -> ProjectPipeline: + """ + Get upstream (main) pipeline for the specified commit in the repository. + + Returns: + A ProjectPipeline object if succeed, None otherwise. + """ + + pipelines_for_commit = self.project.pipelines.list( + all=False, sha=self.commit_sha, order_by="id", sort="desc" + ) + + if not pipelines_for_commit: + return {} + + # For the main branch we have two types of pipelines: short and full. + # The short one just retriggers the full pipeline and does not contain any artifacts. + # The source of the short pipeline is "push". So skip it here. + # This can be done earlier when calling project.pipelines.list(). + # However, the currently installed version of python-gitlab does not support the "source" filter parameter. + # TODO: use self.project.pipelines.list(…, source="push") insted + build_pipeline = None + for p in pipelines_for_commit: + if p.source != "push": + build_pipeline = p + + return build_pipeline + + def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]: + """ + Get the latest pipeline for the specified commit in the repository. + Then extract the downstream build pipelines with their jobs and return + them as a dictionary. + + Returns: + A dictionary where the key is the build pipeline name and + the value is a tuple of downstream jobs. + """ + + timeout = 3000 # 50 min + check_interval = 30 + + not_rdy_status = ["created", "pending", "running"] + if self.upstream_pipeline.status in not_rdy_status: + print( + f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready." + ) + print("Wait for it to complete", end="", flush=True) + + while self.upstream_pipeline.status in not_rdy_status: + print(".", end="", flush=True) + time.sleep(check_interval) + timeout -= check_interval + if timeout < 0: + sys.exit("timeout") + + ret = {} + for bridge in self.upstream_pipeline.bridges.list(): + if not bridge.downstream_pipeline: + continue + downstream_pipeline = self.project.pipelines.get( + bridge.downstream_pipeline["id"] + ) + ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True)) + return ret + + def get_jobs( + self, pipeline_name: str = "*", job_filter: str = "*" + ) -> tuple[ProjectPipelineJob]: + """ + Get build jobs for the specified pipeline. + The result can also be filtered by name. + + Args: + pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline"). + job_filter: str — fnmatch pattern to select jobs by name. + + Returns: + A tuple of pipeline jobs. + """ + + ret = [] + + if pipeline_name == "*": + jobs = [] + for v in self.build_pipelines.values(): + jobs.extend(list(v)) + else: + try: + jobs = self.build_pipelines[pipeline_name] + except KeyError: + return None + + for job in jobs: + if fnmatch.fnmatch(job.name, job_filter): + ret.append(job) + return tuple(ret) diff --git a/scripts/handle_artifacts.py b/scripts/handle_artifacts.py index 29cf1fc64368cdac3315de15ce2e58a70a70ac57..dea6ea456c1d1e4eaecbf373da0181227b90063f 100755 --- a/scripts/handle_artifacts.py +++ b/scripts/handle_artifacts.py @@ -1,124 +1,12 @@ #!/usr/bin/env python3 import argparse -import fnmatch import logging import sys -import time from gitlab import Gitlab -from gitlab.v4.objects import Project -from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob import common - - -class FullBuildPipeline: - def __init__(self, project: Project, commit_sha: str): - self.project = project - self.commit_sha = commit_sha - self.upstream_pipeline = self.__get_upstream_pipeline() - self.build_pipelines = self.__get_build_pipelines() - - def __get_upstream_pipeline(self) -> ProjectPipeline: - """ - Get upstream (main) pipeline for the specified commit in the repository. - - Returns: - A ProjectPipeline object if succeed, None otherwise. - """ - - pipelines_for_commit = self.project.pipelines.list( - all=False, sha=self.commit_sha, order_by="id", sort="desc" - ) - - if not pipelines_for_commit: - return {} - - # For the main branch we have two types of pipelines: short and full. - # The short one just retriggers the full pipeline and does not contain any artifacts. - # The source of the short pipeline is "push". So skip it here. - # This can be done earlier when calling project.pipelines.list(). - # However, the currently installed version of python-gitlab does not support the "source" filter parameter. - # TODO: use self.project.pipelines.list(…, source="push") insted - build_pipeline = None - for p in pipelines_for_commit: - if p.source != "push": - build_pipeline = p - - if not build_pipeline: - return None - - return build_pipeline - - def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]: - """ - Get the latest pipeline for the specified commit in the repository. - Then extract the downstream build pipelines with their jobs and return - them as a dictionary. - - Returns: - A dictionary where the key is the build pipeline name and - the value is a tuple of downstream jobs. - """ - - timeout = 3000 # 50 min - check_interval = 30 - - not_rdy_status = ["created", "pending", "running"] - if self.upstream_pipeline.status in not_rdy_status: - print( - f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready." - ) - print("Wait for it to complete", end="", flush=True) - - while self.upstream_pipeline.status in not_rdy_status: - print(".", end="", flush=True) - time.sleep(check_interval) - timeout -= check_interval - if timeout < 0: - sys.exit("timeout") - - ret = {} - for bridge in self.upstream_pipeline.bridges.list(): - if not bridge.downstream_pipeline: - continue - downstream_pipeline = self.project.pipelines.get( - bridge.downstream_pipeline["id"] - ) - ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True)) - return ret - - def get_jobs( - self, pipeline_name: str = "*", job_filter: str = "*" - ) -> tuple[ProjectPipelineJob]: - """ - Get build jobs for the specified pipeline. - The result can also be filtered by name. - - Args: - pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline"). - job_filter: str — fnmatch pattern to select jobs by name. - - Returns: - A tuple of pipeline jobs. - """ - - ret = [] - - if pipeline_name == "*": - jobs = [] - for v in self.build_pipelines.values(): - jobs.extend(list(v)) - else: - try: - jobs = self.build_pipelines[pipeline_name] - except KeyError: - return None - - for job in jobs: - if fnmatch.fnmatch(job.name, job_filter): - ret.append(job) - return tuple(ret) +from fullbuildpipeline import FullBuildPipeline def main(): diff --git a/scripts/report_image_diff.py b/scripts/report_image_diff.py index e6623efce4984dbdfbb83cc31dee97ba66bf5e38..a26d449f9e8581a1d7aa7a8aa30dc8d17b3321e5 100755 --- a/scripts/report_image_diff.py +++ b/scripts/report_image_diff.py @@ -2,195 +2,13 @@ import argparse import fnmatch import logging -import sys -import time from difflib import unified_diff -from typing import Optional -import lxml.html -import requests from gitlab import Gitlab -from gitlab.v4.objects import Project -from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob import common - - -class FullBuildPipeline: - def __init__(self, project: Project, commit_sha: str): - self.project = project - self.commit_sha = commit_sha - self.upstream_pipeline = self.__get_upstream_pipeline() - self.build_pipelines = self.__get_build_pipelines() - - def __get_upstream_pipeline(self) -> ProjectPipeline: - """ - Get upstream (main) pipeline for the specified commit in the repository. - - Returns: - A ProjectPipeline object if succeed, None otherwise. - """ - - pipelines_for_commit = self.project.pipelines.list( - all=False, sha=self.commit_sha, order_by="id", sort="desc" - ) - - if not pipelines_for_commit: - return {} - - # For the main branch we have two types of pipelines: short and full. - # The short one just retriggers the full pipeline and does not contain any artifacts. - # The source of the short pipeline is "push". So skip it here. - # This can be done earlier when calling project.pipelines.list(). - # However, the currently installed version of python-gitlab does not support the "source" filter parameter. - # TODO: use self.project.pipelines.list(…, source="push") insted - build_pipeline = None - for p in pipelines_for_commit: - if p.source != "push": - build_pipeline = p - - if not build_pipeline: - return None - - return build_pipeline - - def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]: - """ - Get the latest pipeline for the specified commit in the repository. - Then extract the downstream build pipelines with their jobs and return - them as a dictionary. - - Returns: - A dictionary where the key is the build pipeline name and - the value is a tuple of downstream jobs. - """ - - timeout = 3000 # 50 min - check_interval = 30 - - not_rdy_status = ["created", "pending", "running"] - if self.upstream_pipeline.status in not_rdy_status: - print( - f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready." - ) - print("Wait for it to complete", end="", flush=True) - - while self.upstream_pipeline.status in not_rdy_status: - print(".", end="", flush=True) - time.sleep(check_interval) - timeout -= check_interval - if timeout < 0: - sys.exit("timeout") - - ret = {} - for bridge in self.upstream_pipeline.bridges.list(): - if not bridge.downstream_pipeline: - continue - downstream_pipeline = self.project.pipelines.get( - bridge.downstream_pipeline["id"] - ) - ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True)) - return ret - - def get_jobs( - self, pipeline_name: str = "*", job_filter: str = "*" - ) -> tuple[ProjectPipelineJob]: - """ - Get build jobs for the specified pipeline. - The result can also be filtered by name. - - Args: - pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline"). - job_filter: str — fnmatch pattern to select jobs by name. - - Returns: - A tuple of pipeline jobs. - """ - - ret = [] - - if pipeline_name == "*": - jobs = [] - for v in self.build_pipelines.values(): - jobs.extend(list(v)) - else: - try: - jobs = self.build_pipelines[pipeline_name] - except KeyError: - return None - - for job in jobs: - if fnmatch.fnmatch(job.name, job_filter): - ret.append(job) - return tuple(ret) - - -class BuildArtifacts: - def __init__(self, project: Project, pipelinejob: ProjectPipelineJob): - self.project = project - self.pipelinejob = pipelinejob - - def list_dir(self, path: str) -> dict[str, str]: - """ - Get a list of the files and directories for the specified path. - - Args: - path: str — relative path in the job artifacts. - - Returns: - A dictionary containing name of files as a key and file size in human-readable form as value. - """ - - url = f"{self.project.web_url}/-/jobs/{self.pipelinejob.id}/artifacts/browse/{path}" - r = requests.get(url) - if r.status_code != 200: - return {} - - ret = {} - - root = lxml.html.fromstring(r.text) - for tree_item in root.find_class("tree-item"): - name = tree_item.find_class("tree-item-file-name")[0].text_content().strip() - size = tree_item.xpath(".//td/text()")[-1].strip() - - if not size: - size = "dir" - - ret[name] = size - - return ret - - def get_artifact(self, path: str, outfile: Optional[str] = None) -> Optional[bytes]: - """ - Get a single artifact file from GitLab. - Save it to the file if "outfile" arg is specified. - - Args: - path: str — relative path to artifact file. - outfile: str — path to save the output file. - - Returns: - None if "outfile" arg is given. Otherwise, returns the artifact file as bytes. - """ - - job = self.project.jobs.get(self.pipelinejob.id, lazy=True) - - if not outfile: - return job.artifact(path) - - with open(outfile, "wb") as f: - job.artifact(path, streamed=True, action=f.write) - return None - - def get_archive_size(self) -> int: - """ - Get the size of compressed artifacts (artifacts.zip). - - Returns: - An integer containing the size of archive in bytes. - """ - - return self.pipelinejob.artifacts_file["size"] +from buildartifacts import BuildArtifacts +from fullbuildpipeline import FullBuildPipeline def sizeof_fmt(num: int, p: int = 2) -> str: