[CI] Add queue size, running count metrics (#122714)

This commits allows the container to report 3 additional metrics at
every sampling event:
- a heartbeat
- the size of the workflow queue (filtered)
- the number of running workflows (filtered)

The heartbeat is a simple metric allowing us to monitor the metrics
health. Before this commit, a new metrics was pushed only when a
workflow was completed. This meant we had to wait a few hours
before noticing if the metrics container was unable to push metrics.

In addition to this, this commits adds a sampling of the workflow
queue size and running count. This should allow us to better understand
the load, and improve the autoscale values we pick for the cluster.

---------

Signed-off-by: Nathan Gauër <brioche@google.com>
This commit is contained in:
Nathan Gauër 2025-01-16 11:41:49 +01:00 committed by GitHub
parent 24df8f5da4
commit 13b44283e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -26,7 +26,67 @@ class JobMetrics:
workflow_id: int
def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
@dataclass
class GaugeMetric:
name: str
value: int
time_ns: int
def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue
Args:
github_repo: A github repo object to use to query the relevant information.
Returns:
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""
# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
queued_workflow_count = len(
[
x
for x in github_repo.get_workflow_runs(status="queued")
if x.name in WORKFLOWS_TO_TRACK
]
)
running_workflow_count = len(
[
x
for x in github_repo.get_workflow_runs(status="in_progress")
if x.name in WORKFLOWS_TO_TRACK
]
)
workflow_metrics = []
workflow_metrics.append(
GaugeMetric(
"workflow_queue_size",
queued_workflow_count,
time.time_ns(),
)
)
workflow_metrics.append(
GaugeMetric(
"running_workflow_count",
running_workflow_count,
time.time_ns(),
)
)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
return workflow_metrics
def get_per_workflow_metrics(
github_repo: github.Repository, workflows_to_track: dict[str, int]
):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
workflow_runs = iter(github_repo.get_workflow_runs())
workflow_metrics = []
workflows_to_include = set(workflows_to_track.keys())
while len(workflows_to_include) > 0:
workflow_run = next(workflow_runs)
for workflow_run in iter(github_repo.get_workflow_runs()):
if len(workflows_to_include) == 0:
break
if workflow_run.status != "completed":
continue
@ -139,12 +199,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""
if len(workflow_metrics) == 0:
print("No metrics found to upload.", file=sys.stderr)
return
metrics_batch = []
for workflow_metric in workflow_metrics:
workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
if isinstance(workflow_metric, GaugeMetric):
name = workflow_metric.name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
)
elif isinstance(workflow_metric, JobMetrics):
name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
else:
raise ValueError(
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
)
request_data = "\n".join(metrics_batch)
response = requests.post(
@ -176,16 +251,21 @@ def main():
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
current_metrics = get_metrics(github_repo, workflows_to_track)
if len(current_metrics) == 0:
print("No metrics found to upload.", file=sys.stderr)
continue
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
current_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
for workflow_metric in reversed(current_metrics):
workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
if isinstance(workflow_metric, JobMetrics):
workflows_to_track[
workflow_metric.job_name
] = workflow_metric.workflow_id
time.sleep(SCRAPE_INTERVAL_SECONDS)