[CI] Extend metrics container to log BuildKite metrics (#130996)

The current container focuses on Github metrics. Before deprecating
BuildKite, we want to make sure the new infra quality is better, or at
least the same.

Being able to compare buildkite metrics with github metrics on grafana
will allow us to easily present the comparison.

BuildKite API allows filtering, but doesn't allow changing the result
ordering. Meaning we are left with builds ordered by IDs. This means a
completed job can appear before a running job in the list. 2 solutions
from there:
 - keep the cursor on the oldest running workflow
 - keep a list of running workflows to compare.

Because there is no guarantees in workflow ordering, waiting for the
oldest build to complete before reporting any newer build could mean
delaying the more recent build completion reporting by a few hours. And
because grafana cannot ingest metrics older than 2 hours, this is not an
option.

Thus we leave with the second solution: remember what jobs were running
during the last iteration, and record them as soon as they are
completed. Buildkite has at most ~100 pending jobs, so keeping all those
IDs should be OK.
This commit is contained in:
Nathan Gauër 2025-03-14 11:44:39 +01:00 committed by GitHub
parent 0a5847f1c1
commit 44f4e43b4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,6 +1,8 @@
import collections
import datetime
import dateutil
import github
import json
import logging
import os
import requests
@ -53,6 +55,18 @@ GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS = 8
# by trial and error).
GRAFANA_METRIC_MAX_AGE_MN = 120
# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to
# the metric name in Grafana. This is important not to lose metrics history
# if the workflow name changes.
BUILDKITE_WORKFLOW_TO_TRACK = {
":linux: Linux x64": "buildkite_linux",
":windows: Windows x64": "buildkite_windows",
}
# Number of builds to fetch per page. Since we scrape regularly, this can
# remain small.
BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 50
@dataclass
class JobMetrics:
job_name: str
@ -70,6 +84,191 @@ class GaugeMetric:
time_ns: int
def buildkite_fetch_page_build_list(
buildkite_token: str, after_cursor: str = None
) -> list[dict[str, str]]:
"""Fetches a page of the build list using the GraphQL BuildKite API.
Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last running/queued builds,
or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE running/queued builds
older than the one pointer by |after_cursor| if provided.
The |after_cursor| value is taken from the previous page returned by the
API.
Args:
buildkite_token: the secret token to authenticate GraphQL requests.
after_cursor: cursor after which to start the page fetch.
Returns:
The most recent builds after cursor (if set) with the following format:
[
{
"cursor": <value>,
"number": <build-number>,
}
]
"""
BUILDKITE_GRAPHQL_QUERY = """
query OrganizationShowQuery {{
organization(slug: "llvm-project") {{
pipelines(search: "Github pull requests", first: 1) {{
edges {{
node {{
builds (state: [CANCELING, CREATING, FAILING, RUNNING], first: {PAGE_SIZE}, after: {AFTER}) {{
edges {{
cursor
node {{
number
}}
}}
}}
}}
}}
}}
}}
}}
"""
query = BUILDKITE_GRAPHQL_QUERY.format(
PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE,
AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor),
)
query = json.dumps({"query": query})
url = "https://graphql.buildkite.com/v1"
headers = {
"Authorization": "Bearer " + buildkite_token,
"Content-Type": "application/json",
}
data = requests.post(url, data=query, headers=headers).json()
# De-nest the build list.
if "errors" in data:
logging.info("Failed to fetch BuildKite jobs: {}".format(data["errors"]))
return []
builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][
"edges"
]
# Fold cursor info into the node dictionnary.
return [{**x["node"], "cursor": x["cursor"]} for x in builds]
def buildkite_get_build_info(build_number: str) -> dict:
"""Returns all the info associated with the provided build number.
Note: for unknown reasons, graphql returns no jobs for a given build,
while this endpoint does, hence why this uses this API instead of graphql.
Args:
build_number: which build number to fetch info for.
Returns:
The info for the target build, a JSON dictionnary.
"""
URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json"
return requests.get(URL.format(build_number)).json()
def buildkite_get_incomplete_tasks(buildkite_token: str) -> list:
"""Returns all the running/pending BuildKite builds.
Args:
buildkite_token: the secret token to authenticate GraphQL requests.
last_cursor: the cursor to stop at if set. If None, a full page is fetched.
"""
output = []
cursor = None
while True:
page = buildkite_fetch_page_build_list(buildkite_token, cursor)
if len(page) == 0:
break
cursor = page[-1]["cursor"]
output += page
return output
def buildkite_get_metrics(
buildkite_token: str, previously_incomplete: set[int]
) -> (list[JobMetrics], set[int]):
"""Returns a tuple with:
- the metrics recorded for newly completed workflow jobs.
- the set of workflow still running now.
Args:
buildkite_token: the secret token to authenticate GraphQL requests.
previously_incomplete: the set of running workflows the last time this
function was called.
"""
running_builds = buildkite_get_incomplete_tasks(buildkite_token)
incomplete_now = set([x["number"] for x in running_builds])
output = []
for build_id in previously_incomplete:
if build_id in incomplete_now:
continue
info = buildkite_get_build_info(build_id)
metric_timestamp = dateutil.parser.isoparse(info["finished_at"])
for job in info["jobs"]:
# This workflow is not interesting to us.
if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK:
continue
created_at = dateutil.parser.isoparse(job["created_at"])
scheduled_at = (
created_at
if job["scheduled_at"] is None
else dateutil.parser.isoparse(job["scheduled_at"])
)
started_at = (
scheduled_at
if job["started_at"] is None
else dateutil.parser.isoparse(job["started_at"])
)
if job["canceled_at"] is None:
finished_at = (
started_at
if job["finished_at"] is None
else dateutil.parser.isoparse(job["finished_at"])
)
else:
finished_at = dateutil.parser.isoparse(job["canceled_at"])
job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]]
queue_time = (started_at - scheduled_at).seconds
run_time = (finished_at - started_at).seconds
status = bool(job["passed"])
# Grafana will refuse to ingest metrics older than ~2 hours, so we
# should avoid sending historical data.
metric_age_mn = (
datetime.datetime.now(datetime.timezone.utc) - metric_timestamp
).total_seconds() / 60
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
logging.warning(
f"Job {job['name']} from workflow {build_id} dropped due"
+ f" to staleness: {metric_age_mn}mn old."
)
continue
metric_timestamp_ns = int(metric_timestamp.timestamp()) * 10**9
workflow_id = build_id
workflow_name = "Github pull requests"
output.append(
JobMetrics(
job_name,
queue_time,
run_time,
status,
metric_timestamp_ns,
workflow_id,
workflow_name,
)
)
return output, incomplete_now
def github_get_metrics(
github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
) -> tuple[list[JobMetrics], int]:
@ -195,7 +394,7 @@ def github_get_metrics(
datetime.datetime.now(datetime.timezone.utc) - completed_at
).total_seconds() / 60
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
logging.info(
logging.warning(
f"Job {job.id} from workflow {task.id} dropped due"
+ f" to staleness: {metric_age_mn}mn old."
)
@ -292,6 +491,7 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
def main():
# Authenticate with Github
github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
buildkite_token = os.environ["BUILDKITE_TOKEN"]
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
@ -299,6 +499,9 @@ def main():
# Because the Github queries are broken, we'll simply log a 'processed'
# bit for the last COUNT_TO_PROCESS workflows.
gh_last_workflows_seen_as_completed = set()
# Stores the list of pending/running builds in BuildKite we need to check
# at the next iteration.
bk_incomplete = set()
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
@ -306,9 +509,15 @@ def main():
github_object = Github(auth=github_auth)
github_repo = github_object.get_repo("llvm/llvm-project")
metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
gh_metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
github_repo, gh_last_workflows_seen_as_completed
)
bk_metrics, bk_incomplete = buildkite_get_metrics(
buildkite_token, bk_incomplete
)
metrics = gh_metrics + bk_metrics
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
logging.info(f"Uploaded {len(metrics)} metrics")