#!/usr/bin/env python3 import argparse import logging import os import sys import tempfile from configparser import ConfigParser from furl import furl from git import GitCommandError, Repo from gitlab import Gitlab, GitlabGetError from gitlab.v4.objects import Project from ruamel.yaml import YAML import common def get_submodule_project_path_and_revision(project: Project, submodule, branch=None): gitmodules = common.get_repository_file_raw(project, ".gitmodules", ref=branch) if gitmodules is None: logging.error(f"Submodule {submodule} not found in {project.name}.") return None, None # logging.debug(f"Gitmodules: {gitmodules}") cfgparse = ConfigParser() cfgparse.read_string(gitmodules) try: section = cfgparse[f'submodule "{submodule}"'] except KeyError: logging.error(f"Submodule {submodule} not found in {project.name}.") return None, None submodule_url = section["url"] # absolut path to a relative submodule # Check for relative path if not submodule_url.startswith(".."): logging.error(f"absolute submodule paths are not supported ({submodule_url})") return None, None # Get absolute project path # This cannot be done with gitpython directly due to issue: # https://github.com/gitpython-developers/GitPython/issues/730 relative_path = os.path.splitext(submodule_url)[0] # remove .git project_path = project.path_with_namespace while relative_path.startswith(".."): relative_path = relative_path[3:] # strip off '../' project_path, _ = os.path.split(project_path) # remove last part submodule_project_path = os.path.join(project_path, relative_path) # Get current revision gitmodule_rev = common.get_repository_file_obj(project, submodule, ref=branch) return submodule_project_path, gitmodule_rev["id"] def get_submodule_integration_branch_suffix(submodule_project: Project, revision): # Find out if top commit is part of a merge request # If so, use source branch of this MR as integration branch name # Else use commit sha instead # sourcery skip: use-assigned-variable integration_branch_suffix = revision for mr in submodule_project.commits.get(revision).merge_requests(): if mr["target_branch"] == submodule_project.default_branch: integration_branch_suffix = mr["source_branch"] break return integration_branch_suffix def clone_project_and_submodule(project: Project, submodule_name, branch=None): """Creates a clone of the given project including the submodule return: """ gitlab = project.manager.gitlab # If no branch is given, use project's default branch if branch is None: branch = project.default_branch project_dir = tempfile.TemporaryDirectory() # Construct clone url containing access token clone_url = furl(project.http_url_to_repo) clone_url.username = "gitlab-ci" clone_url.password = gitlab.private_token # Checkout project try: repo = Repo.clone_from(clone_url.url, project_dir.name, branch=branch, depth=1) except GitCommandError as e: sys.exit(f"ERROR: could not clone repository\n{e}") except IndexError: sys.exit(f"ERROR: branch '{branch}' not found") # Find submodule submodule = common.get_submodule(repo, submodule_name) # Check for relative path if not submodule.url.startswith(".."): sys.exit(f"ERROR: absolute submodule paths are not supported ({submodule.url})") # Get absolute project path # This cannot be done with gitpython directly due to issue: # https://github.com/gitpython-developers/GitPython/issues/730 relative_path = os.path.splitext(submodule.url)[0] # remove .git project_path = project.path_with_namespace while relative_path.startswith(".."): relative_path = relative_path[3:] # strip off '../' project_path, _ = os.path.split(project_path) # remove last part submodule_project_path = os.path.join(project_path, relative_path) # Get submodule project submodule_project = common.get_project(gitlab, submodule_project_path) # Initialize submodule # Hack due to issue above: change to absolute path and switch back afterwards submodule_clone_url = furl(submodule_project.http_url_to_repo) submodule_clone_url.username = "gitlab-ci" submodule_clone_url.password = gitlab.private_token submodule_relative_url = submodule.url with submodule.config_writer() as writer: writer.set("url", submodule_clone_url.url) try: submodule.update(init=True) except GitCommandError: # This seems to happen when a not existing commit is referenced logging.error(f"Failed to initialize submodule {submodule_name}") with submodule.config_writer() as writer: writer.set("url", submodule_relative_url) # We need to keep the TemporaryDirectory object reference project_dir because we # need the cloned repo later, otherwise the directory will be immediately deleted. return repo, submodule_project, project_dir def update_submodule_in_repo(repo: Repo, submodule_project: Project, new_revision): """Updates the given submodule to the given revision and adds it to the staging of repo """ # Update submodule try: submodule_project.module().git.checkout(new_revision) except GitCommandError as e: sys.exit(f"ERROR: could not checkout commit\n{e}") repo.git.add(submodule_project.path) def update_gitlab_ci_include(content, include_project, new_revision): # Remove the group part from the project filter # as it is normally specified by $CI_PROJECT_ROOT_NAMESPACE include_project = include_project.split("/", 1)[1] yaml = YAML() data = yaml.load(content) logging.debug(f"Yaml: {data}") try: includes = data["include"] except KeyError: logging.debug("No include statement found") return None current_revision = None for entry in includes: try: if include_project in entry["project"]: current_revision = entry["ref"] break except KeyError: logging.debug("Failed to parse include statement") return None if current_revision is None: logging.debug(f"Failed to find {include_project} in include statement") return None # Use plain replacement to keep the content of the file # Yes, this may fail if the 'current_revision' is used multiple # time is this fail. But probably this will not ever happen logging.debug(f"Replace {current_revision} with {new_revision}") return content.replace(current_revision, new_revision) def update_submodule_and_include_ref( project, submodule_name, new_revision, branch=None, commit_and_push=True, force_clone=False, ): # FIXME: Remove the sourcery check deactivation below and refactor this method in # order to enhance code quality and make the check pass. # sourcery skip: low-code-quality """Update the submodule and include refs to the submodule in the given project. Create mergerequest if needed. Parameters: project ( gitlab project): The project which's submodule should be updated submodule_name (string): The name of the submodule to pull new_revision (hex string): The sha hash of the commit to update the submodule to branch (string): branch to update, if None, the projects default branch is used commit_and_push: Set to false if no commit should be created. Changes are left in staging. force_clone: Checkout repo and setup integration branch even if no update is needed Returns: tuple of: project_repo (Repo): GitPython repo with the cloned project integration_branch (string): Name of the newly created integration branch integration_commit (hexsha): Hash of the newly created commit message: Commit message based on the integrated changes. """ gitlab = project.manager.gitlab submodule_update_needed = True project_repo = None project_dir = None integration_commit = None if branch is None: branch = project.default_branch logging.debug(f"Branch: {branch}") ( submodule_project_path, submodule_current_rev, ) = get_submodule_project_path_and_revision(project, submodule_name, branch) # Get submodule project submodule_project = common.get_project(gitlab, submodule_project_path) # Get commits between current and new revision revision_range = f"{submodule_current_rev}..{new_revision}" commits = submodule_project.commits.list( ref_name=revision_range, retry_transient_errors=True ) if not commits: logging.info( f"No commits found in range {revision_range}, probably submodule already " f"up-to-date." ) return None, None, None, None, None logging.debug(f"New commits: {commits}") # Find out if top commit is top commit of a merge request # If so, use source branch of this MR as integration branch name # Else use commit sha instead integration_branch_suffix = new_revision for mr in commits[0].merge_requests(): if ( mr["target_branch"] == submodule_project.default_branch and mr["sha"] == new_revision ): integration_branch_suffix = mr["source_branch"] break logging.debug(f"Integration branch suffix: {integration_branch_suffix}") # Construct integration branch name integration_branch_name = common.integration_branch_name( submodule_project.path, integration_branch_suffix, branch ) # Construct commit message message = "Integrate %s/%s%s\n%s" % ( submodule_project.name, integration_branch_suffix, " and %d more" % (len(commits) - 1) if len(commits) > 1 else "", common.list_commits(commits), ) # Check if revisions are different if submodule_current_rev == new_revision: print(f"Submodule is already at {new_revision}") submodule_update_needed = False # Check if we already have an integration branch (before we actually do the checkout) # Check if integration branch already exists and if it is up to date # This is needed for one use case: # It is possible to amend changes to the integration branch manually # outside the pipeline. # When the submodule revision has not changed and the pipeline is run # again (due to merge or manually triggered) the manual change persists # in the final commit # For example rename a file in gitlab-ci repo that is included in a # subproject, requires an adapted include statement here. # To get this change 'atomic' the updated include statement should be # in the same commit as the update of the submodule if submodule_update_needed or force_clone: existing_branch = None try: existing_branch = project.branches.get(integration_branch_name) except GitlabGetError: # Branch not found pass if existing_branch: # Check if the integration branch is on top of the integration # base or if it is outdated integration_base_branch = project.branches.get(branch) integration_base_id = integration_base_branch.commit["id"] logging.debug(f"Head of {branch} points to {integration_base_id}") # Loop over the commits until the integration_branch head id is found if common.is_commit_parent_of_project_commit( project, existing_branch.commit["id"], integration_base_id, limit=5 ): # Check the submodule revision on the integration branch ( _, integration_branch_submodule_rev, ) = get_submodule_project_path_and_revision( project, submodule_name, integration_branch_name ) logging.debug( f"Revision in integration branch " f"'{integration_branch_submodule_rev}', " f"new_revision '{new_revision}'" ) if integration_branch_submodule_rev == new_revision: print( f"Submodule is already at {new_revision} on branch " f"{integration_branch_name}" ) integration_commit = existing_branch.commit["id"] submodule_update_needed = False else: logging.debug("Integration branch is outdated, delete it.") project.branches.delete(existing_branch.name) existing_branch = None # Clone the project, we need to do changes if submodule_update_needed or force_clone: clone_branch = branch if existing_branch: clone_branch = integration_branch_name # Actually clone project_repo, submodule_project, project_dir = clone_project_and_submodule( project, submodule_name, clone_branch ) if existing_branch: print(f"Using existing integration branch {integration_branch_name}") else: # Create branch print(f"Creating integration branch {integration_branch_name}") project_repo.head.set_reference( project_repo.create_head(integration_branch_name) ) if submodule_update_needed: # Update submodule to new revision submodule_repo = common.get_submodule(project_repo, submodule_name) update_submodule_in_repo(project_repo, submodule_repo, new_revision) # Update the gitlab-ci.yml file to the new revision # Now also update the project '.gitlab-ci.yml' file gitlab_ci_yml_filename = os.path.join( project_repo.working_tree_dir, ".gitlab-ci.yml" ) with open(gitlab_ci_yml_filename, "r", encoding="utf8") as fp: gitlab_ci_yml = fp.read() logging.debug(gitlab_ci_yml) new_gitlab_ci_yml = update_gitlab_ci_include( gitlab_ci_yml, submodule_project.web_url.split("//")[1].split("/", 1)[1], new_revision, ) if new_gitlab_ci_yml is None: print("Failed to update the include revision in '.gitlab-ci.yml'") else: logging.debug(new_gitlab_ci_yml) with open(gitlab_ci_yml_filename, "w", encoding="utf8") as fp: fp.write(new_gitlab_ci_yml) project_repo.git.add(os.path.basename(gitlab_ci_yml_filename)) # Commit the changes if commit_and_push: # ======================================================== # Squash all commits on the integration branch to one # ======================================================== project_repo.remotes.origin.fetch(branch, depth=5) gitlab_branch = project.branches.get(branch) project_repo.git.reset("--soft", gitlab_branch.commit["id"]) # Make an API request to create the gitlab.user object gitlab.auth() # Push the changes integration_commit = common.commit_and_push( project, project_repo, message, gitlab.user.username, gitlab.user.email, less_verbose=True, ) return ( project_repo, project_dir, integration_branch_name, integration_commit, message, ) def main(): parser = argparse.ArgumentParser() parser.add_argument( "--gitlab-url", help="""URL to the GitLab instance""", dest="gitlab_url", default=common.GITLAB_URL, ) parser.add_argument( "--token", help="""GitLab REST API private access token""", dest="token", required=True, ) parser.add_argument( "--project", help="""name of the GitLab project""", dest="project", required=True, ) parser.add_argument( "--submodule", help="""submodule to update""", dest="submodule", required=True, ) parser.add_argument( "--revision", help="""new revision for submodule""", dest="revision", required=True, ) parser.add_argument( "--branch", help="""project branch (if not default branch)""", dest="branch", required=False, default=None, ) parser.add_argument( "-v", "--verbose", action="store_true", help="""Increase verbosity.""", ) args, _ = parser.parse_known_args() if args.verbose: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) gitlab = Gitlab(args.gitlab_url, private_token=args.token) project = common.get_project(gitlab, args.project) update_submodule_and_include_ref( project, args.submodule, args.revision, args.branch ) if __name__ == "__main__": main()