Newer
Older
#!/usr/bin/env python3
import argparse
import sys
from gitlab.client import Gitlab
from gitlab.v4.objects import Project, ProjectPipeline
import common
def cancel_pipeline_including_children(
project: Project,
pipeline: ProjectPipeline,
):
"""Cancel pipeline including downstream pipelines (parent/child or multi-project).
Args:
project: Project containing the pipeline
pipeline: Pipeline to cancel
Returns:
Nothing
"""
# Browse through all downstream pipelines
for bridge in pipeline.bridges.list():
if not bridge.downstream_pipeline:
continue
# Check if downstream pipeline is in different project
if bridge.downstream_pipeline["project_id"] != pipeline.project_id:
project = pipeline.manager.gitlab.projects.get(
bridge.downstream_pipeline["project_id"]
)
# Recurse to downstream pipeline
downstream_pipeline = project.pipelines.get(bridge.downstream_pipeline["id"])
cancel_pipeline_including_children(project, downstream_pipeline)
pipeline.cancel()
def cancel_pipelines(
project: Project,
ref: str = "",
below_pipeline_id: int = sys.maxsize,
) -> list[ProjectPipeline]:
"""Cancel currently running pipelines.
Args:
project: GitLab project the pipeline belongs to
ref: Git ref (branch or tag) the pipeline is running on
below_pipeline_id: cancel only pipelines with ID below this
Returns:
List of cancelled pipelines
"""
pipelines = project.pipelines.list(
ref=ref, status="running", retry_transient_errors=True
)
cancelled_pipelines = []
for pipeline in pipelines:
if pipeline.id < below_pipeline_id:
cancel_pipeline_including_children(project, pipeline)
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
cancelled_pipelines.append(pipeline)
return cancelled_pipelines
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--gitlab-url",
help="""URL to the GitLab instance""",
dest="gitlab_url",
default=common.GITLAB_URL,
)
parser.add_argument(
"--token",
help="""GitLab REST API private access token""",
dest="token",
required=True,
)
parser.add_argument(
"--project",
help="""name of the GitLab project""",
dest="project",
required=True,
)
parser.add_argument(
"--ref",
help="""Git reference (branch or tag)""",
dest="ref",
default="",
)
parser.add_argument(
"--below-pipeline-id",
help="""Cancel only pipelines with IDs lower than this""",
dest="below_pipeline_id",
type=int,
default=sys.maxsize,
)
args, _ = parser.parse_known_args()
gitlab = Gitlab(args.gitlab_url, private_token=args.token)
project = common.get_project(gitlab, args.project)
print(
"Searching for pipelines in project '%s' on ref '%s' with IDs below %d"
% (args.project, args.ref, args.below_pipeline_id)
)
cancelled_pipelines = cancel_pipelines(project, args.ref, args.below_pipeline_id)
if not cancelled_pipelines:
print("No running pipelines found.")
sys.exit(0)
print("Cancelled pipelines:")
for pipeline in cancelled_pipelines:
print(pipeline.web_url)
if __name__ == "__main__":
main()