llvm-project/.ci/metrics/metrics.py
Aiden Grossman 4aeb2f1c79
[CI] Remove Duplicate Heartbeat in Metrics Script
This patch removes an extra heartbeat metric in the metrics python file. Before
it was performed twice, once in the main function, and once in the
get_sampled_workflow_metrics function. We only need one to keep everything
happy, and I've chosen to keep the one in get_sampled_workflow_metrics as it
seems a more appropriate place to keep it.

Reviewers: Keenuts, lnihlen

Reviewed By: lnihlen

Pull Request: https://github.com/llvm/llvm-project/pull/127275
2025-02-14 19:10:51 -08:00

288 lines
9.8 KiB
Python

import requests
import time
import os
from dataclasses import dataclass
import sys
import github
from github import Github
from github import Auth
GRAFANA_URL = (
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
)
GITHUB_PROJECT = "llvm/llvm-project"
WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
SCRAPE_INTERVAL_SECONDS = 5 * 60
@dataclass
class JobMetrics:
job_name: str
queue_time: int
run_time: int
status: int
created_at_ns: int
workflow_id: int
@dataclass
class GaugeMetric:
name: str
value: int
time_ns: int
def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue
Args:
github_repo: A github repo object to use to query the relevant information.
Returns:
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""
queued_job_counts = {}
running_job_counts = {}
# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
for queued_workflow in github_repo.get_workflow_runs(status="queued"):
if queued_workflow.name not in WORKFLOWS_TO_TRACK:
continue
for queued_workflow_job in queued_workflow.jobs():
job_name = queued_workflow_job.name
# Workflows marked as queued can potentially only have some jobs
# queued, so make sure to also count jobs currently in progress.
if queued_workflow_job.status == "queued":
if job_name not in queued_job_counts:
queued_job_counts[job_name] = 1
else:
queued_job_counts[job_name] += 1
elif queued_workflow_job.status == "in_progress":
if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1
for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
if running_workflow.name not in WORKFLOWS_TO_TRACK:
continue
for running_workflow_job in running_workflow.jobs():
job_name = running_workflow_job.name
if running_workflow_job.status != "in_progress":
continue
if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1
workflow_metrics = []
for queued_job in queued_job_counts:
workflow_metrics.append(
GaugeMetric(
f"workflow_queue_size_{queued_job}",
queued_job_counts[queued_job],
time.time_ns(),
)
)
for running_job in running_job_counts:
workflow_metrics.append(
GaugeMetric(
f"running_workflow_count_{running_job}",
running_job_counts[running_job],
time.time_ns(),
)
)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
return workflow_metrics
def get_per_workflow_metrics(
github_repo: github.Repository, workflows_to_track: dict[str, int]
):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
workflow ID of the last tracked invocation. It grabs the relevant data
from Github, returning it to the caller.
Args:
github_repo: A github repo object to use to query the relevant information.
workflows_to_track: A dictionary mapping workflow names to the last
invocation ID where metrics have been collected, or None to collect the
last five results.
Returns:
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
workflow_metrics = []
workflows_to_include = set(workflows_to_track.keys())
for workflow_run in iter(github_repo.get_workflow_runs()):
if len(workflows_to_include) == 0:
break
if workflow_run.status != "completed":
continue
# This workflow was already sampled for this run, or is not tracked at
# all. Ignoring.
if workflow_run.name not in workflows_to_include:
continue
# There were no new workflow invocations since the previous scrape.
# The API returns a sorted list with the most recent invocations first,
# so we can stop looking for this particular workflow. Continue to grab
# information on the other workflows of interest, if present.
if workflows_to_track[workflow_run.name] == workflow_run.id:
workflows_to_include.remove(workflow_run.name)
continue
workflow_jobs = workflow_run.jobs()
if workflow_jobs.totalCount == 0:
continue
if (
workflows_to_track[workflow_run.name] is None
or workflows_to_track[workflow_run.name] == workflow_run.id
):
workflows_to_include.remove(workflow_run.name)
if (
workflows_to_track[workflow_run.name] is not None
and len(workflows_to_include) == 0
):
break
for workflow_job in workflow_jobs:
created_at = workflow_job.created_at
started_at = workflow_job.started_at
completed_at = workflow_job.completed_at
job_result = int(workflow_job.conclusion == "success")
if job_result:
# We still might want to mark the job as a failure if one of the steps
# failed. This is required due to use setting continue-on-error in
# the premerge pipeline to prevent sending emails while we are
# testing the infrastructure.
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
# longer in a testing state and we can directly assert the workflow
# result.
for step in workflow_job.steps:
if step.conclusion != "success":
job_result = 0
break
queue_time = started_at - created_at
run_time = completed_at - started_at
if run_time.seconds == 0:
continue
# The timestamp associated with the event is expected by Grafana to be
# in nanoseconds.
created_at_ns = int(created_at.timestamp()) * 10**9
workflow_metrics.append(
JobMetrics(
workflow_run.name + "-" + workflow_job.name,
queue_time.seconds,
run_time.seconds,
job_result,
created_at_ns,
workflow_run.id,
)
)
return workflow_metrics
def upload_metrics(workflow_metrics, metrics_userid, api_key):
"""Upload metrics to Grafana.
Takes in a list of workflow metrics and then uploads them to Grafana
through a REST request.
Args:
workflow_metrics: A list of metrics to upload to Grafana.
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""
if len(workflow_metrics) == 0:
print("No metrics found to upload.", file=sys.stderr)
return
metrics_batch = []
for workflow_metric in workflow_metrics:
if isinstance(workflow_metric, GaugeMetric):
name = workflow_metric.name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
)
elif isinstance(workflow_metric, JobMetrics):
name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
else:
raise ValueError(
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
)
request_data = "\n".join(metrics_batch)
response = requests.post(
GRAFANA_URL,
headers={"Content-Type": "text/plain"},
data=request_data,
auth=(metrics_userid, api_key),
)
if response.status_code < 200 or response.status_code >= 300:
print(
f"Failed to submit data to Grafana: {response.status_code}", file=sys.stderr
)
def main():
# Authenticate with Github
auth = Auth.Token(os.environ["GITHUB_TOKEN"])
github_object = Github(auth=auth)
github_repo = github_object.get_repo("llvm/llvm-project")
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
workflows_to_track = {}
for workflow_to_track in WORKFLOWS_TO_TRACK:
workflows_to_track[workflow_to_track] = None
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
for workflow_metric in reversed(current_metrics):
if isinstance(workflow_metric, JobMetrics):
workflows_to_track[
workflow_metric.job_name
] = workflow_metric.workflow_id
time.sleep(SCRAPE_INTERVAL_SECONDS)
if __name__ == "__main__":
main()