From 3df8be3ee906374b992268683b59c9dd4780aaef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= Date: Tue, 11 Mar 2025 14:11:07 +0100 Subject: [PATCH] [CI] Extend metrics container to log BuildKite metrics (#129699) The current container focuses on Github metrics. Before deprecating BuildKite, we want to make sure the new infra quality is better, or at least the same. Being able to compare buildkite metrics with github metrics on grafana will allow us to easily present the comparison. This PR requires https://github.com/llvm/llvm-zorg/pull/400 to be merged first. --- .ci/metrics/metrics.py | 402 ++++++++++++++++++++++++++++++----------- 1 file changed, 295 insertions(+), 107 deletions(-) diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py index bd2b51154768..a5f76428cb3c 100644 --- a/.ci/metrics/metrics.py +++ b/.ci/metrics/metrics.py @@ -1,8 +1,11 @@ import requests +import dateutil +import json import time import os from dataclasses import dataclass import sys +import collections import logging import github @@ -12,10 +15,35 @@ from github import Auth GRAFANA_URL = ( "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write" ) -GITHUB_PROJECT = "llvm/llvm-project" -WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"] SCRAPE_INTERVAL_SECONDS = 5 * 60 +# Number of builds to fetch per page. Since we scrape regularly, this can +# remain small. +BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 10 + +# Lists the Github workflows we want to track. Maps the Github job name to +# the metric name prefix in grafana. +# This metric name is also used as a key in the job->name map. +GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"} + +# Lists the Github jobs to track for a given workflow. The key is the stable +# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK). +# Each value is a map to link the github job name to the corresponding metric +# name. +GITHUB_JOB_TO_TRACK = { + "github_llvm_premerge_checks": { + "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux", + "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows", + } +} + +# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to +# the metric name in Grafana. This is important not to lose metrics history +# if the workflow name changes. +BUILDKITE_WORKFLOW_TO_TRACK = { + ":linux: Linux x64": "buildkite_linux", + ":windows: Windows x64": "buildkite_windows", +} @dataclass class JobMetrics: @@ -35,6 +63,214 @@ class GaugeMetric: time_ns: int +def buildkite_fetch_page_build_list( + buildkite_token: str, after_cursor: str = None +) -> list[dict[str, str]]: + """Fetches a page of the build list using the GraphQL BuildKite API. + Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last **finished** builds by + default, or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE **finished** builds + older than the one pointer by |cursor| if provided. + The |cursor| value is taken from the previous page returned by the API. + + The returned data had the following format: + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + after_cursor: cursor after which to start the page fetch. + + Returns: + The most recent builds after cursor (if set) with the following format: + [ + { + "cursor": , + "number": , + } + ] + """ + + BUILDKITE_GRAPHQL_QUERY = """ + query OrganizationShowQuery {{ + organization(slug: "llvm-project") {{ + pipelines(search: "Github pull requests", first: 1) {{ + edges {{ + node {{ + builds (state: [FAILED, PASSED], first: {PAGE_SIZE}, after: {AFTER}) {{ + edges {{ + cursor + node {{ + number + }} + }} + }} + }} + }} + }} + }} + }} + """ + data = BUILDKITE_GRAPHQL_QUERY.format( + PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE, + AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor), + ) + data = data.replace("\n", "").replace('"', '\\"') + data = '{ "query": "' + data + '" }' + url = "https://graphql.buildkite.com/v1" + headers = { + "Authorization": "Bearer " + buildkite_token, + "Content-Type": "application/json", + } + r = requests.post(url, data=data, headers=headers) + data = r.json() + # De-nest the build list. + builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][ + "edges" + ] + # Fold cursor info into the node dictionnary. + return [{**x["node"], "cursor": x["cursor"]} for x in builds] + + +def buildkite_get_build_info(build_number: str) -> dict: + """Returns all the info associated with the provided build number. + Note: for unknown reasons, graphql returns no jobs for a given build, + while this endpoint does, hence why this uses this API instead of graphql. + + Args: + build_number: which build number to fetch info for. + + Returns: + The info for the target build, a JSON dictionnary. + """ + + URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json" + return requests.get(URL.format(build_number)).json() + + +def buildkite_get_builds_up_to(buildkite_token: str, last_cursor: str = None) -> list: + """Returns the last BUILDKITE_GRAPHQL_BUILDS_PER_PAGE builds by default, or + until the build pointed by |last_cursor| is found. + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + last_cursor: the cursor to stop at if set. If None, a full page is fetched. + """ + output = [] + cursor = None + + while True: + page = buildkite_fetch_page_build_list(buildkite_token, cursor) + # No cursor provided, return the first page. + if last_cursor is None: + return page + + # Cursor has been provided, check if present in this page. + match_index = None + for index, item in enumerate(page): + if item["cursor"] == last_cursor: + match_index = index + break + + # Not present, continue loading more pages. + if match_index is None: + output += page + cursor = page[-1]["cursor"] + continue + # Cursor found, keep results up to cursor + output += page[:match_index] + return output + + +def buildkite_get_metrics( + buildkite_token: str, last_cursor: str = None +) -> (list[JobMetrics], str): + """Returns a tuple with: + - the metrics to record until |last_cursor| is reached, or none if last cursor is None. + - the cursor of the most recent build processed. + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + last_cursor: the cursor to stop at if set. If None, a full page is fetched. + """ + builds = buildkite_get_builds_up_to(buildkite_token, last_cursor) + # Don't return any metrics if last_cursor is None. + # This happens when the program starts. + if last_cursor is None: + return [], builds[0]["cursor"] + + last_recorded_build = last_cursor + output = [] + for build in reversed(builds): + info = buildkite_get_build_info(build["number"]) + last_recorded_build = build["cursor"] + for job in info["jobs"]: + # Skip this job. + if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK: + continue + + created_at = dateutil.parser.isoparse(job["created_at"]) + scheduled_at = dateutil.parser.isoparse(job["scheduled_at"]) + started_at = dateutil.parser.isoparse(job["started_at"]) + finished_at = dateutil.parser.isoparse(job["finished_at"]) + + job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]] + queue_time = (started_at - scheduled_at).seconds + run_time = (finished_at - started_at).seconds + status = bool(job["passed"]) + finished_at_ns = int(finished_at.timestamp()) * 10**9 + workflow_id = build["number"] + workflow_name = "Github pull requests" + output.append( + JobMetrics( + job_name, + queue_time, + run_time, + status, + finished_at_ns, + workflow_id, + workflow_name, + ) + ) + + return output, last_recorded_build + + +def github_job_name_to_metric_name(workflow_name, job_name): + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_name] + job_key = GITHUB_JOB_TO_TRACK[workflow_key][job_name] + return f"{workflow_key}_{job_key}" + + +def github_count_queued_running_workflows(workflow_list): + """Returns the per-job count of running & queued jobs in the passed + workflow list. + + Args: + workflow_list: an iterable of workflows. + + Returns: + A tuple, (per-job-queue-size, per-job-running-count). The key + is the pretty job name, and the value the count of jobs. + """ + queued_count = collections.Counter() + running_count = collections.Counter() + + for workflow in workflow_list: + if workflow.name not in GITHUB_WORKFLOW_TO_TRACK: + continue + + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow.name] + for job in workflow.jobs(): + if job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: + continue + job_key = GITHUB_JOB_TO_TRACK[workflow_key][job.name] + metric_name = f"{workflow_key}_{job_key}" + + if job.status == "queued": + queued_count[metric_name] += 1 + elif job.status == "in_progress": + running_count[metric_name] += 1 + return queued_count, running_count + + def get_sampled_workflow_metrics(github_repo: github.Repository): """Gets global statistics about the Github workflow queue @@ -45,131 +281,83 @@ def get_sampled_workflow_metrics(github_repo: github.Repository): Returns a list of GaugeMetric objects, containing the relevant metrics about the workflow """ - queued_job_counts = {} - running_job_counts = {} - # Other states are available (pending, waiting, etc), but the meaning # is not documented (See #70540). # "queued" seems to be the info we want. - for queued_workflow in github_repo.get_workflow_runs(status="queued"): - if queued_workflow.name not in WORKFLOWS_TO_TRACK: - continue - for queued_workflow_job in queued_workflow.jobs(): - job_name = queued_workflow_job.name - # Workflows marked as queued can potentially only have some jobs - # queued, so make sure to also count jobs currently in progress. - if queued_workflow_job.status == "queued": - if job_name not in queued_job_counts: - queued_job_counts[job_name] = 1 - else: - queued_job_counts[job_name] += 1 - elif queued_workflow_job.status == "in_progress": - if job_name not in running_job_counts: - running_job_counts[job_name] = 1 - else: - running_job_counts[job_name] += 1 - - for running_workflow in github_repo.get_workflow_runs(status="in_progress"): - if running_workflow.name not in WORKFLOWS_TO_TRACK: - continue - for running_workflow_job in running_workflow.jobs(): - job_name = running_workflow_job.name - if running_workflow_job.status != "in_progress": - continue - - if job_name not in running_job_counts: - running_job_counts[job_name] = 1 - else: - running_job_counts[job_name] += 1 + queued_1, running_1 = github_count_queued_running_workflows( + github_repo.get_workflow_runs(status="queued") + ) + queued_2, running_2 = github_count_queued_running_workflows( + github_repo.get_workflow_runs(status="in_progress") + ) workflow_metrics = [] - for queued_job in queued_job_counts: + for key, value in (queued_1 + queued_2).items(): workflow_metrics.append( - GaugeMetric( - f"workflow_queue_size_{queued_job}", - queued_job_counts[queued_job], - time.time_ns(), - ) + GaugeMetric(f"workflow_queue_size_{key}", value, time.time_ns()) ) - for running_job in running_job_counts: + for key, value in (running_1 + running_2).items(): workflow_metrics.append( - GaugeMetric( - f"running_workflow_count_{running_job}", - running_job_counts[running_job], - time.time_ns(), - ) + GaugeMetric(f"running_workflow_count_{key}", value, time.time_ns()) ) - # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + + # Always send a hearbeat metric so we can monitor is this container is + # still able to log to Grafana. workflow_metrics.append( GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) ) return workflow_metrics -def get_per_workflow_metrics( - github_repo: github.Repository, workflows_to_track: dict[str, int] -): +def get_per_workflow_metrics(github_repo: github.Repository, last_workflow_id: str): """Gets the metrics for specified Github workflows. - This function takes in a list of workflows to track, and optionally the - workflow ID of the last tracked invocation. It grabs the relevant data - from Github, returning it to the caller. + This function loads the last workflows from GitHub up to + `last_workflow_id` and logs their metrics if they are referenced in + GITHUB_WORKFLOW_TO_TRACK. + The function returns a list of metrics, and the most recent processed + workflow. + If `last_workflow_id` is None, no metrics are returned, and the last + completed github workflow ID is returned. This is used once when the + program starts. Args: github_repo: A github repo object to use to query the relevant information. - workflows_to_track: A dictionary mapping workflow names to the last - invocation ID where metrics have been collected, or None to collect the - last five results. + last_workflow_id: the last workflow we checked. Returns: Returns a list of JobMetrics objects, containing the relevant metrics about the workflow. """ workflow_metrics = [] + last_recorded_workflow = None + for workflow_run in iter(github_repo.get_workflow_runs(status="completed")): + # Record the first workflow of this list as the most recent one. + if last_recorded_workflow is None: + last_recorded_workflow = workflow_run.id - workflows_to_include = set(workflows_to_track.keys()) - - for workflow_run in iter(github_repo.get_workflow_runs()): - if len(workflows_to_include) == 0: + # If we saw this workflow already, break. We also break if no + # workflow has been seen, as this means the script just started. + if last_workflow_id == workflow_run.id or last_workflow_id is None: break - if workflow_run.status != "completed": + # This workflow is not interesting to us. Skipping. + if workflow_run.name not in GITHUB_WORKFLOW_TO_TRACK: continue - # This workflow was already sampled for this run, or is not tracked at - # all. Ignoring. - if workflow_run.name not in workflows_to_include: - continue + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_run.name] - # There were no new workflow invocations since the previous scrape. - # The API returns a sorted list with the most recent invocations first, - # so we can stop looking for this particular workflow. Continue to grab - # information on the other workflows of interest, if present. - if workflows_to_track[workflow_run.name] == workflow_run.id: - workflows_to_include.remove(workflow_run.name) - continue + for workflow_job in workflow_run.jobs(): + # This job is not interesting, skipping. + if workflow_job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: + continue - workflow_jobs = workflow_run.jobs() - if workflow_jobs.totalCount == 0: - continue - - if ( - workflows_to_track[workflow_run.name] is None - or workflows_to_track[workflow_run.name] == workflow_run.id - ): - workflows_to_include.remove(workflow_run.name) - if ( - workflows_to_track[workflow_run.name] is not None - and len(workflows_to_include) == 0 - ): - break - - for workflow_job in workflow_jobs: created_at = workflow_job.created_at started_at = workflow_job.started_at completed_at = workflow_job.completed_at - job_result = int(workflow_job.conclusion == "success") + job_key = GITHUB_JOB_TO_TRACK[workflow_key][workflow_job.name] + if job_result: # We still might want to mark the job as a failure if one of the steps # failed. This is required due to use setting continue-on-error in @@ -199,7 +387,7 @@ def get_per_workflow_metrics( workflow_metrics.append( JobMetrics( - workflow_run.name + "-" + workflow_job.name, + workflow_key + "_" + job_key, queue_time.seconds, run_time.seconds, job_result, @@ -209,8 +397,7 @@ def get_per_workflow_metrics( ) ) - return workflow_metrics - + return workflow_metrics, last_recorded_workflow def upload_metrics(workflow_metrics, metrics_userid, api_key): """Upload metrics to Grafana. @@ -260,13 +447,14 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key): def main(): # Authenticate with Github auth = Auth.Token(os.environ["GITHUB_TOKEN"]) - grafana_api_key = os.environ["GRAFANA_API_KEY"] grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"] + buildkite_token = os.environ["BUILDKITE_TOKEN"] - workflows_to_track = {} - for workflow_to_track in WORKFLOWS_TO_TRACK: - workflows_to_track[workflow_to_track] = None + # This script only records workflows/jobs/builds finished after it + # started. So we need to keep track of the last known build. + buildkite_last_cursor = None + github_last_workflow_id = None # Enter the main loop. Every five minutes we wake up and dump metrics for # the relevant jobs. @@ -274,17 +462,17 @@ def main(): github_object = Github(auth=auth) github_repo = github_object.get_repo("llvm/llvm-project") - current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track) - current_metrics += get_sampled_workflow_metrics(github_repo) + buildkite_metrics, buildkite_last_cursor = buildkite_get_metrics( + buildkite_token, buildkite_last_cursor + ) + github_metrics, github_last_workflow_id = get_per_workflow_metrics( + github_repo, github_last_workflow_id + ) + sampled_metrics = get_sampled_workflow_metrics(github_repo) - upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key) - logging.info(f"Uploaded {len(current_metrics)} metrics") - - for workflow_metric in reversed(current_metrics): - if isinstance(workflow_metric, JobMetrics): - workflows_to_track[ - workflow_metric.workflow_name - ] = workflow_metric.workflow_id + metrics = buildkite_metrics + github_metrics + sampled_metrics + upload_metrics(metrics, grafana_metrics_userid, grafana_api_key) + logging.info(f"Uploaded {len(metrics)} metrics") time.sleep(SCRAPE_INTERVAL_SECONDS)