mirror of
https://github.com/llvm/llvm-project.git
synced 2025-04-16 15:36:40 +00:00

This patch makes it so that the metrics container counts the number of in progress and queued jobs at the job level rather than at the workflow level. This helps us distinguish windows versus linux load and also lets us filter out the MacOS jobs that only run in the release branch. Reviewers: Keenuts, lnihlen Reviewed By: lnihlen Pull Request: https://github.com/llvm/llvm-project/pull/127274
292 lines
10 KiB
Python
292 lines
10 KiB
Python
import requests
|
|
import time
|
|
import os
|
|
from dataclasses import dataclass
|
|
import sys
|
|
|
|
import github
|
|
from github import Github
|
|
from github import Auth
|
|
|
|
GRAFANA_URL = (
|
|
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
|
|
)
|
|
GITHUB_PROJECT = "llvm/llvm-project"
|
|
WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
|
|
SCRAPE_INTERVAL_SECONDS = 5 * 60
|
|
|
|
|
|
@dataclass
|
|
class JobMetrics:
|
|
job_name: str
|
|
queue_time: int
|
|
run_time: int
|
|
status: int
|
|
created_at_ns: int
|
|
workflow_id: int
|
|
|
|
|
|
@dataclass
|
|
class GaugeMetric:
|
|
name: str
|
|
value: int
|
|
time_ns: int
|
|
|
|
|
|
def get_sampled_workflow_metrics(github_repo: github.Repository):
|
|
"""Gets global statistics about the Github workflow queue
|
|
|
|
Args:
|
|
github_repo: A github repo object to use to query the relevant information.
|
|
|
|
Returns:
|
|
Returns a list of GaugeMetric objects, containing the relevant metrics about
|
|
the workflow
|
|
"""
|
|
queued_job_counts = {}
|
|
running_job_counts = {}
|
|
|
|
# Other states are available (pending, waiting, etc), but the meaning
|
|
# is not documented (See #70540).
|
|
# "queued" seems to be the info we want.
|
|
for queued_workflow in github_repo.get_workflow_runs(status="queued"):
|
|
if queued_workflow.name not in WORKFLOWS_TO_TRACK:
|
|
continue
|
|
for queued_workflow_job in queued_workflow.jobs():
|
|
job_name = queued_workflow_job.name
|
|
# Workflows marked as queued can potentially only have some jobs
|
|
# queued, so make sure to also count jobs currently in progress.
|
|
if queued_workflow_job.status == "queued":
|
|
if job_name not in queued_job_counts:
|
|
queued_job_counts[job_name] = 1
|
|
else:
|
|
queued_job_counts[job_name] += 1
|
|
elif queued_workflow_job.status == "in_progress":
|
|
if job_name not in running_job_counts:
|
|
running_job_counts[job_name] = 1
|
|
else:
|
|
running_job_counts[job_name] += 1
|
|
|
|
for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
|
|
if running_workflow.name not in WORKFLOWS_TO_TRACK:
|
|
continue
|
|
for running_workflow_job in running_workflow.jobs():
|
|
job_name = running_workflow_job.name
|
|
if running_workflow_job.status != "in_progress":
|
|
continue
|
|
|
|
if job_name not in running_job_counts:
|
|
running_job_counts[job_name] = 1
|
|
else:
|
|
running_job_counts[job_name] += 1
|
|
|
|
workflow_metrics = []
|
|
for queued_job in queued_job_counts:
|
|
workflow_metrics.append(
|
|
GaugeMetric(
|
|
f"workflow_queue_size_{queued_job}",
|
|
queued_job_counts[queued_job],
|
|
time.time_ns(),
|
|
)
|
|
)
|
|
for running_job in running_job_counts:
|
|
workflow_metrics.append(
|
|
GaugeMetric(
|
|
f"running_workflow_count_{running_job}",
|
|
running_job_counts[running_job],
|
|
time.time_ns(),
|
|
)
|
|
)
|
|
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
|
|
workflow_metrics.append(
|
|
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
|
|
)
|
|
return workflow_metrics
|
|
|
|
|
|
def get_per_workflow_metrics(
|
|
github_repo: github.Repository, workflows_to_track: dict[str, int]
|
|
):
|
|
"""Gets the metrics for specified Github workflows.
|
|
|
|
This function takes in a list of workflows to track, and optionally the
|
|
workflow ID of the last tracked invocation. It grabs the relevant data
|
|
from Github, returning it to the caller.
|
|
|
|
Args:
|
|
github_repo: A github repo object to use to query the relevant information.
|
|
workflows_to_track: A dictionary mapping workflow names to the last
|
|
invocation ID where metrics have been collected, or None to collect the
|
|
last five results.
|
|
|
|
Returns:
|
|
Returns a list of JobMetrics objects, containing the relevant metrics about
|
|
the workflow.
|
|
"""
|
|
workflow_metrics = []
|
|
|
|
workflows_to_include = set(workflows_to_track.keys())
|
|
|
|
for workflow_run in iter(github_repo.get_workflow_runs()):
|
|
if len(workflows_to_include) == 0:
|
|
break
|
|
|
|
if workflow_run.status != "completed":
|
|
continue
|
|
|
|
# This workflow was already sampled for this run, or is not tracked at
|
|
# all. Ignoring.
|
|
if workflow_run.name not in workflows_to_include:
|
|
continue
|
|
|
|
# There were no new workflow invocations since the previous scrape.
|
|
# The API returns a sorted list with the most recent invocations first,
|
|
# so we can stop looking for this particular workflow. Continue to grab
|
|
# information on the other workflows of interest, if present.
|
|
if workflows_to_track[workflow_run.name] == workflow_run.id:
|
|
workflows_to_include.remove(workflow_run.name)
|
|
continue
|
|
|
|
workflow_jobs = workflow_run.jobs()
|
|
if workflow_jobs.totalCount == 0:
|
|
continue
|
|
|
|
if (
|
|
workflows_to_track[workflow_run.name] is None
|
|
or workflows_to_track[workflow_run.name] == workflow_run.id
|
|
):
|
|
workflows_to_include.remove(workflow_run.name)
|
|
if (
|
|
workflows_to_track[workflow_run.name] is not None
|
|
and len(workflows_to_include) == 0
|
|
):
|
|
break
|
|
|
|
for workflow_job in workflow_jobs:
|
|
created_at = workflow_job.created_at
|
|
started_at = workflow_job.started_at
|
|
completed_at = workflow_job.completed_at
|
|
|
|
job_result = int(workflow_job.conclusion == "success")
|
|
if job_result:
|
|
# We still might want to mark the job as a failure if one of the steps
|
|
# failed. This is required due to use setting continue-on-error in
|
|
# the premerge pipeline to prevent sending emails while we are
|
|
# testing the infrastructure.
|
|
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
|
|
# longer in a testing state and we can directly assert the workflow
|
|
# result.
|
|
for step in workflow_job.steps:
|
|
if step.conclusion != "success":
|
|
job_result = 0
|
|
break
|
|
|
|
queue_time = started_at - created_at
|
|
run_time = completed_at - started_at
|
|
|
|
if run_time.seconds == 0:
|
|
continue
|
|
|
|
# The timestamp associated with the event is expected by Grafana to be
|
|
# in nanoseconds.
|
|
created_at_ns = int(created_at.timestamp()) * 10**9
|
|
|
|
workflow_metrics.append(
|
|
JobMetrics(
|
|
workflow_run.name + "-" + workflow_job.name,
|
|
queue_time.seconds,
|
|
run_time.seconds,
|
|
job_result,
|
|
created_at_ns,
|
|
workflow_run.id,
|
|
)
|
|
)
|
|
|
|
return workflow_metrics
|
|
|
|
|
|
def upload_metrics(workflow_metrics, metrics_userid, api_key):
|
|
"""Upload metrics to Grafana.
|
|
|
|
Takes in a list of workflow metrics and then uploads them to Grafana
|
|
through a REST request.
|
|
|
|
Args:
|
|
workflow_metrics: A list of metrics to upload to Grafana.
|
|
metrics_userid: The userid to use for the upload.
|
|
api_key: The API key to use for the upload.
|
|
"""
|
|
|
|
if len(workflow_metrics) == 0:
|
|
print("No metrics found to upload.", file=sys.stderr)
|
|
return
|
|
|
|
metrics_batch = []
|
|
for workflow_metric in workflow_metrics:
|
|
if isinstance(workflow_metric, GaugeMetric):
|
|
name = workflow_metric.name.lower().replace(" ", "_")
|
|
metrics_batch.append(
|
|
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
|
|
)
|
|
elif isinstance(workflow_metric, JobMetrics):
|
|
name = workflow_metric.job_name.lower().replace(" ", "_")
|
|
metrics_batch.append(
|
|
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
|
|
)
|
|
|
|
request_data = "\n".join(metrics_batch)
|
|
response = requests.post(
|
|
GRAFANA_URL,
|
|
headers={"Content-Type": "text/plain"},
|
|
data=request_data,
|
|
auth=(metrics_userid, api_key),
|
|
)
|
|
|
|
if response.status_code < 200 or response.status_code >= 300:
|
|
print(
|
|
f"Failed to submit data to Grafana: {response.status_code}", file=sys.stderr
|
|
)
|
|
|
|
|
|
def main():
|
|
# Authenticate with Github
|
|
auth = Auth.Token(os.environ["GITHUB_TOKEN"])
|
|
github_object = Github(auth=auth)
|
|
github_repo = github_object.get_repo("llvm/llvm-project")
|
|
|
|
grafana_api_key = os.environ["GRAFANA_API_KEY"]
|
|
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
|
|
|
|
workflows_to_track = {}
|
|
for workflow_to_track in WORKFLOWS_TO_TRACK:
|
|
workflows_to_track[workflow_to_track] = None
|
|
|
|
# Enter the main loop. Every five minutes we wake up and dump metrics for
|
|
# the relevant jobs.
|
|
while True:
|
|
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
|
|
current_metrics += get_sampled_workflow_metrics(github_repo)
|
|
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
|
|
current_metrics.append(
|
|
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
|
|
)
|
|
|
|
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
|
|
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
|
|
|
|
for workflow_metric in reversed(current_metrics):
|
|
if isinstance(workflow_metric, JobMetrics):
|
|
workflows_to_track[
|
|
workflow_metric.job_name
|
|
] = workflow_metric.workflow_id
|
|
|
|
time.sleep(SCRAPE_INTERVAL_SECONDS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|