-
Jonas Höppner authoreddaa4cbee
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
common.py 9.92 KiB
#!/usr/bin/env python3
import logging
import requests
import sys
import time
from furl import furl
from git import Actor, GitCommandError
from git.repo.base import Repo
from gitlab import GitlabAuthenticationError, GitlabGetError, GitlabMRRebaseError
from gitlab.v4.objects import Project
from gitlab.v4.objects import MergeRequest
manifest_file = "default.xml"
srcrev_file = "SRCREV.conf"
pending_states = ["created", "waiting_for_resource", "preparing", "pending", "running"]
def integration_branch_name(project_name, branch_name):
"""Get integration branch name"""
return "integrate/" + project_name.lower() + "/" + branch_name
def find_gitlab_ci_integration_branch(repo: Repo, branch_name):
"""
# Special handling for the gitlab-ci integration
# When the branch 'merge_request.source_branch' already starts with
# integrate/gitlab-ci we add our new commit to this branch
# Otherwise (normal behaviour) a new integration branch is created
"""
if not branch_name.startswith("integrate/gitlab-ci"):
return None
logging.debug("Integration of gitlab-ci: %s", branch_name)
for ref in repo.references:
refname = ref.name
logging.debug("Found ref: %s", refname)
if not refname.startswith("origin/"):
continue
# remove 'origin/' from the ref before compare
refname = ref.name.split("/", 1)[1]
logging.debug("Splitted refname: %s", refname)
if branch_name == refname:
integration_branch = refname
logging.debug(
"Found integration branch for gitlab-ci: %s", integration_branch
)
return integration_branch
def get_project(gitlab, project_name):
"""Get a GitLab project by its name (including or excluding namespace)"""
project = None
try:
# First try direct access, assuming name contains full path including namespace
try:
project = gitlab.projects.get(project_name, retry_transient_errors=True)
except GitlabGetError:
pass
# If not found, try searching for project, assuming only name given
if not project:
for p in gitlab.projects.list(
search=project_name, retry_transient_errors=True
):
if p.name == project_name:
project = p
if not project:
sys.exit("ERROR: project '%s' not found" % project_name)
except requests.ConnectionError:
sys.exit("ERROR: could not connect to GitLab server")
except GitlabAuthenticationError:
sys.exit("ERROR: authentication failed")
return project
def get_latest_commit(project, branch_name):
"""Get latest commit on a given project branch"""
try:
branch = project.branches.get(branch_name, retry_transient_errors=True)
except GitlabGetError as e:
sys.exit(
"ERROR: could not get branch '%s' for project '%s': %s"
% (branch_name, project.name, e)
)
if not branch:
sys.exit(
"ERROR: branch '%s' not found in project %s" % (branch_name, project.name)
)
return branch.commit
def rebase_merge_request(project, merge_request):
"""Attempt to rebase a merge request and return the updated merge request object"""
# Rebasing takes more than one API call, see:
# https://docs.gitlab.com/ce/api/merge_requests.html#rebase-a-merge-request
try:
merge_request.rebase()
except GitlabMRRebaseError as e:
merge_request.merge_error = "Could not rebase merge request: %s" % e
return merge_request
rebase_in_progress = True
while rebase_in_progress:
time.sleep(1)
try:
updated_merge_request = project.mergerequests.get(
id=merge_request.iid,
query_parameters={"include_rebase_in_progress": "True"},
retry_transient_errors=True,
)
except GitlabGetError as e:
merge_request.merge_error = "Could not get updated merge request: %s" % e
return merge_request
rebase_in_progress = updated_merge_request.rebase_in_progress
return updated_merge_request
def crosslink_merge_requests(source_mr: MergeRequest, integration_mr: MergeRequest):
"""Insert cross-links in merge requests"""
integration_mr.notes.create(
{"body": "Source merge request: %s" % source_mr.web_url}
)
source_mr.notes.create(
{"body": "Integration merge request: %s" % integration_mr.web_url}
)
def wait_until_merge_status_is_set(project: Project, mr: MergeRequest):
"""Periodically query MR until GitLab has checked its merge status"""
print("Waiting until merge status has been checked", end="", flush=True)
unchecked_states = ["unchecked", "checking", "cannot_be_merged_recheck"]
mr = project.mergerequests.get(mr.iid, retry_transient_errors=True)
while mr.merge_status in unchecked_states:
print(".", end="", flush=True)
time.sleep(1)
mr = project.mergerequests.get(mr.iid, retry_transient_errors=True)
print(" -> %s" % mr.merge_status)
def list_commits(commits):
"""Create a list of commits along with the commit messages"""
commit_list = ""
for commit in commits:
commit_list += "\n--\n\nCommit: %s\n\n%s" % (commit.web_url, commit.message)
return commit_list
def commit_and_push(project: Project, repo: Repo, branch, message, name, email):
"""Commit and push to a repo branch"""
author = Actor(name, email)
repo.index.commit(message, author=author, committer=author)
print(repo.git.log("--oneline", "-n", "5"))
# Push commit
try:
origin = repo.remote("origin")
logging.debug("Push branch %s to %s", branch, origin)
origin.push(branch, force=True)
except GitCommandError as e:
sys.exit("ERROR: could not commit changes\n" + str(e))
# Print commit information
revision = repo.head.commit.hexsha
print("Pushed new commit:")
print(project.web_url + "/-/commit/" + revision)
print(repo.git.show("--summary", "--decorate"))
return revision
def get_submodule(repo: Repo, submodule_name):
"""Find a submodule in a Git repository by its name"""
submodule = None
for sm in repo.submodules:
if sm.name == submodule_name:
submodule = sm
if submodule is None:
sys.exit("ERROR: submodule '%s' not found" % submodule_name)
return submodule
def extract_message_body(msg):
"""Extract message body out of a commit message"""
# Remove headline
msg = msg.split("\n", 1)[-1]
# Remove all newlines, whitespaces and hyphens from the beginning
while msg[0] in ["\n", " ", "-"]:
msg = msg[1:]
return msg
def get_merge_request(project: Project, merge_request):
"""Return a gitlab mergereqest specified either by id or by link"""
# MR may also be specified as
# SECO-Northern-Europe/yocto/infrastructure/ci-test/minimal-bar!115
if "!" in merge_request:
merge_request = int(merge_request.split("!")[-1])
logging.debug("Number of MR: %d", merge_request)
try:
mr = project.mergerequests.get(merge_request, retry_transient_errors=True)
except GitlabGetError:
return None
return mr
def clone_project(project: Project, into, branch=None):
gitlab = project.manager.gitlab
# If no branch is given, use project's default branch
if branch is None:
branch = project.default_branch
# Construct clone url containing access token
clone_url = furl(project.http_url_to_repo)
clone_url.username = "gitlab-ci"
clone_url.password = gitlab.private_token
# Checkout project
try:
repo = Repo.clone_from(clone_url.url, into, branch=branch, depth=1)
except GitCommandError as e:
raise Exception("could not clone repository\n" + str(e)) from e
except IndexError:
raise Exception("branch '%s' not found" % branch) from e
return repo
def get_repository_file_raw(project: Project, filename, ref=None):
# TODO tree objects are not supported
fileobj = get_repository_file_obj(project, filename, ref)
return project.repository_raw_blob(
fileobj["id"], retry_transient_errors=True
).decode()
def get_repository_file_obj(project: Project, filename, ref=None):
# TODO tree objects are not supported
if ref is None:
ref = project.default_branch
logging.debug("Using default branch %s", ref)
repository_tree = project.repository_tree(
ref=ref, all=True, retry_transient_errors=True
)
logging.debug(repository_tree)
fileobj = [f for f in repository_tree if f["name"] == filename]
if len(fileobj) == 0:
logging.error("Could not find file %s", filename)
for f in repository_tree:
logging.debug(f["name"])
return None
fileobj = fileobj[0]
return fileobj
def is_commit_parent_of_project_commit(project: Project, project_commit, commit):
"""Walks through the commits of project, starting with project_commit
and compares its sha with the given commit.
Both commits are specified as sha
"""
try:
_ = project.commits.get(commit, retry_transient_errors=True)
except GitlabGetError as e:
raise Exception(
"Failed to find commit {} in {}".format(project_commit, project.name)
) from e
# Loop over the parent commits until commit is found
parent_id = project_commit
while True:
try:
parent = project.commits.get(parent_id, retry_transient_errors=True)
except GitlabGetError as e:
raise Exception(
"Failed to find commit {} in {}".format(parent_id, project.name)
) from e
# The integration branch is up to date if its parent is the integration base
logging.debug(parent.id)
if parent.id == commit:
return True
if len(parent.parent_ids) == 0:
return False
parent_id = parent.parent_ids[0] # Assume linear history