Skip to content
Snippets Groups Projects
Commit 1a4a5ab1 authored by Dmitry Petrov's avatar Dmitry Petrov
Browse files

Refactoring: move common code to separate files

parent bf6de920
No related branches found
No related tags found
1 merge request!395CI: handle_artifacts,image_diff: various fixes for pipeline in charge of preserving build artifacts
from typing import Optional
import lxml.html
import requests
from gitlab.v4.objects import Project
from gitlab.v4.objects.pipelines import ProjectPipelineJob
class BuildArtifacts:
def __init__(self, project: Project, pipelinejob: ProjectPipelineJob):
self.project = project
self.pipelinejob = pipelinejob
def list_dir(self, path: str) -> dict[str, str]:
"""
Get a list of the files and directories for the specified path.
Args:
path: str — relative path in the job artifacts.
Returns:
A dictionary containing name of files as a key and file size in human-readable form as value.
"""
url = f"{self.project.web_url}/-/jobs/{self.pipelinejob.id}/artifacts/browse/{path}"
r = requests.get(url)
if r.status_code != 200:
return {}
ret = {}
root = lxml.html.fromstring(r.text)
for tree_item in root.find_class("tree-item"):
name = tree_item.find_class("tree-item-file-name")[0].text_content().strip()
size = tree_item.xpath(".//td/text()")[-1].strip()
if not size:
size = "dir"
ret[name] = size
return ret
def get_artifact(self, path: str, outfile: Optional[str] = None) -> Optional[bytes]:
"""
Get a single artifact file from GitLab.
Save it to the file if "outfile" arg is specified.
Args:
path: str — relative path to artifact file.
outfile: str — path to save the output file.
Returns:
None if "outfile" arg is given. Otherwise, returns the artifact file as bytes.
"""
job = self.project.jobs.get(self.pipelinejob.id, lazy=True)
if not outfile:
return job.artifact(path)
with open(outfile, "wb") as f:
job.artifact(path, streamed=True, action=f.write)
return None
def get_archive_size(self) -> int:
"""
Get the size of compressed artifacts (artifacts.zip).
Returns:
An integer containing the size of archive in bytes.
"""
return self.pipelinejob.artifacts_file["size"]
import fnmatch
import sys
import time
from gitlab.v4.objects import Project
from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob
class FullBuildPipeline:
def __init__(self, project: Project, commit_sha: str):
self.project = project
self.commit_sha = commit_sha
self.upstream_pipeline = self.__get_upstream_pipeline()
self.build_pipelines = self.__get_build_pipelines()
def __get_upstream_pipeline(self) -> ProjectPipeline:
"""
Get upstream (main) pipeline for the specified commit in the repository.
Returns:
A ProjectPipeline object if succeed, None otherwise.
"""
pipelines_for_commit = self.project.pipelines.list(
all=False, sha=self.commit_sha, order_by="id", sort="desc"
)
if not pipelines_for_commit:
return {}
# For the main branch we have two types of pipelines: short and full.
# The short one just retriggers the full pipeline and does not contain any artifacts.
# The source of the short pipeline is "push". So skip it here.
# This can be done earlier when calling project.pipelines.list().
# However, the currently installed version of python-gitlab does not support the "source" filter parameter.
# TODO: use self.project.pipelines.list(…, source="push") insted
build_pipeline = None
for p in pipelines_for_commit:
if p.source != "push":
build_pipeline = p
return build_pipeline
def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]:
"""
Get the latest pipeline for the specified commit in the repository.
Then extract the downstream build pipelines with their jobs and return
them as a dictionary.
Returns:
A dictionary where the key is the build pipeline name and
the value is a tuple of downstream jobs.
"""
timeout = 3000 # 50 min
check_interval = 30
not_rdy_status = ["created", "pending", "running"]
if self.upstream_pipeline.status in not_rdy_status:
print(
f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready."
)
print("Wait for it to complete", end="", flush=True)
while self.upstream_pipeline.status in not_rdy_status:
print(".", end="", flush=True)
time.sleep(check_interval)
timeout -= check_interval
if timeout < 0:
sys.exit("timeout")
ret = {}
for bridge in self.upstream_pipeline.bridges.list():
if not bridge.downstream_pipeline:
continue
downstream_pipeline = self.project.pipelines.get(
bridge.downstream_pipeline["id"]
)
ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True))
return ret
def get_jobs(
self, pipeline_name: str = "*", job_filter: str = "*"
) -> tuple[ProjectPipelineJob]:
"""
Get build jobs for the specified pipeline.
The result can also be filtered by name.
Args:
pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline").
job_filter: str — fnmatch pattern to select jobs by name.
Returns:
A tuple of pipeline jobs.
"""
ret = []
if pipeline_name == "*":
jobs = []
for v in self.build_pipelines.values():
jobs.extend(list(v))
else:
try:
jobs = self.build_pipelines[pipeline_name]
except KeyError:
return None
for job in jobs:
if fnmatch.fnmatch(job.name, job_filter):
ret.append(job)
return tuple(ret)
#!/usr/bin/env python3
import argparse
import fnmatch
import logging
import sys
import time
from gitlab import Gitlab
from gitlab.v4.objects import Project
from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob
import common
class FullBuildPipeline:
def __init__(self, project: Project, commit_sha: str):
self.project = project
self.commit_sha = commit_sha
self.upstream_pipeline = self.__get_upstream_pipeline()
self.build_pipelines = self.__get_build_pipelines()
def __get_upstream_pipeline(self) -> ProjectPipeline:
"""
Get upstream (main) pipeline for the specified commit in the repository.
Returns:
A ProjectPipeline object if succeed, None otherwise.
"""
pipelines_for_commit = self.project.pipelines.list(
all=False, sha=self.commit_sha, order_by="id", sort="desc"
)
if not pipelines_for_commit:
return {}
# For the main branch we have two types of pipelines: short and full.
# The short one just retriggers the full pipeline and does not contain any artifacts.
# The source of the short pipeline is "push". So skip it here.
# This can be done earlier when calling project.pipelines.list().
# However, the currently installed version of python-gitlab does not support the "source" filter parameter.
# TODO: use self.project.pipelines.list(…, source="push") insted
build_pipeline = None
for p in pipelines_for_commit:
if p.source != "push":
build_pipeline = p
if not build_pipeline:
return None
return build_pipeline
def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]:
"""
Get the latest pipeline for the specified commit in the repository.
Then extract the downstream build pipelines with their jobs and return
them as a dictionary.
Returns:
A dictionary where the key is the build pipeline name and
the value is a tuple of downstream jobs.
"""
timeout = 3000 # 50 min
check_interval = 30
not_rdy_status = ["created", "pending", "running"]
if self.upstream_pipeline.status in not_rdy_status:
print(
f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready."
)
print("Wait for it to complete", end="", flush=True)
while self.upstream_pipeline.status in not_rdy_status:
print(".", end="", flush=True)
time.sleep(check_interval)
timeout -= check_interval
if timeout < 0:
sys.exit("timeout")
ret = {}
for bridge in self.upstream_pipeline.bridges.list():
if not bridge.downstream_pipeline:
continue
downstream_pipeline = self.project.pipelines.get(
bridge.downstream_pipeline["id"]
)
ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True))
return ret
def get_jobs(
self, pipeline_name: str = "*", job_filter: str = "*"
) -> tuple[ProjectPipelineJob]:
"""
Get build jobs for the specified pipeline.
The result can also be filtered by name.
Args:
pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline").
job_filter: str — fnmatch pattern to select jobs by name.
Returns:
A tuple of pipeline jobs.
"""
ret = []
if pipeline_name == "*":
jobs = []
for v in self.build_pipelines.values():
jobs.extend(list(v))
else:
try:
jobs = self.build_pipelines[pipeline_name]
except KeyError:
return None
for job in jobs:
if fnmatch.fnmatch(job.name, job_filter):
ret.append(job)
return tuple(ret)
from fullbuildpipeline import FullBuildPipeline
def main():
......
......@@ -2,195 +2,13 @@
import argparse
import fnmatch
import logging
import sys
import time
from difflib import unified_diff
from typing import Optional
import lxml.html
import requests
from gitlab import Gitlab
from gitlab.v4.objects import Project
from gitlab.v4.objects.pipelines import ProjectPipeline, ProjectPipelineJob
import common
class FullBuildPipeline:
def __init__(self, project: Project, commit_sha: str):
self.project = project
self.commit_sha = commit_sha
self.upstream_pipeline = self.__get_upstream_pipeline()
self.build_pipelines = self.__get_build_pipelines()
def __get_upstream_pipeline(self) -> ProjectPipeline:
"""
Get upstream (main) pipeline for the specified commit in the repository.
Returns:
A ProjectPipeline object if succeed, None otherwise.
"""
pipelines_for_commit = self.project.pipelines.list(
all=False, sha=self.commit_sha, order_by="id", sort="desc"
)
if not pipelines_for_commit:
return {}
# For the main branch we have two types of pipelines: short and full.
# The short one just retriggers the full pipeline and does not contain any artifacts.
# The source of the short pipeline is "push". So skip it here.
# This can be done earlier when calling project.pipelines.list().
# However, the currently installed version of python-gitlab does not support the "source" filter parameter.
# TODO: use self.project.pipelines.list(…, source="push") insted
build_pipeline = None
for p in pipelines_for_commit:
if p.source != "push":
build_pipeline = p
if not build_pipeline:
return None
return build_pipeline
def __get_build_pipelines(self) -> dict[str, tuple[ProjectPipelineJob]]:
"""
Get the latest pipeline for the specified commit in the repository.
Then extract the downstream build pipelines with their jobs and return
them as a dictionary.
Returns:
A dictionary where the key is the build pipeline name and
the value is a tuple of downstream jobs.
"""
timeout = 3000 # 50 min
check_interval = 30
not_rdy_status = ["created", "pending", "running"]
if self.upstream_pipeline.status in not_rdy_status:
print(
f"The build pipeline ({self.upstream_pipeline.web_url}) is not ready."
)
print("Wait for it to complete", end="", flush=True)
while self.upstream_pipeline.status in not_rdy_status:
print(".", end="", flush=True)
time.sleep(check_interval)
timeout -= check_interval
if timeout < 0:
sys.exit("timeout")
ret = {}
for bridge in self.upstream_pipeline.bridges.list():
if not bridge.downstream_pipeline:
continue
downstream_pipeline = self.project.pipelines.get(
bridge.downstream_pipeline["id"]
)
ret[bridge.name] = tuple(downstream_pipeline.jobs.list(all=True))
return ret
def get_jobs(
self, pipeline_name: str = "*", job_filter: str = "*"
) -> tuple[ProjectPipelineJob]:
"""
Get build jobs for the specified pipeline.
The result can also be filtered by name.
Args:
pipeline_name: str — name of build pipeline (e.g. "fngsystem-pipeline", "sdk-pipeline").
job_filter: str — fnmatch pattern to select jobs by name.
Returns:
A tuple of pipeline jobs.
"""
ret = []
if pipeline_name == "*":
jobs = []
for v in self.build_pipelines.values():
jobs.extend(list(v))
else:
try:
jobs = self.build_pipelines[pipeline_name]
except KeyError:
return None
for job in jobs:
if fnmatch.fnmatch(job.name, job_filter):
ret.append(job)
return tuple(ret)
class BuildArtifacts:
def __init__(self, project: Project, pipelinejob: ProjectPipelineJob):
self.project = project
self.pipelinejob = pipelinejob
def list_dir(self, path: str) -> dict[str, str]:
"""
Get a list of the files and directories for the specified path.
Args:
path: str — relative path in the job artifacts.
Returns:
A dictionary containing name of files as a key and file size in human-readable form as value.
"""
url = f"{self.project.web_url}/-/jobs/{self.pipelinejob.id}/artifacts/browse/{path}"
r = requests.get(url)
if r.status_code != 200:
return {}
ret = {}
root = lxml.html.fromstring(r.text)
for tree_item in root.find_class("tree-item"):
name = tree_item.find_class("tree-item-file-name")[0].text_content().strip()
size = tree_item.xpath(".//td/text()")[-1].strip()
if not size:
size = "dir"
ret[name] = size
return ret
def get_artifact(self, path: str, outfile: Optional[str] = None) -> Optional[bytes]:
"""
Get a single artifact file from GitLab.
Save it to the file if "outfile" arg is specified.
Args:
path: str — relative path to artifact file.
outfile: str — path to save the output file.
Returns:
None if "outfile" arg is given. Otherwise, returns the artifact file as bytes.
"""
job = self.project.jobs.get(self.pipelinejob.id, lazy=True)
if not outfile:
return job.artifact(path)
with open(outfile, "wb") as f:
job.artifact(path, streamed=True, action=f.write)
return None
def get_archive_size(self) -> int:
"""
Get the size of compressed artifacts (artifacts.zip).
Returns:
An integer containing the size of archive in bytes.
"""
return self.pipelinejob.artifacts_file["size"]
from buildartifacts import BuildArtifacts
from fullbuildpipeline import FullBuildPipeline
def sizeof_fmt(num: int, p: int = 2) -> str:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment