From 13b44283e90f357ea31c553445527953facccdbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= Date: Thu, 16 Jan 2025 11:41:49 +0100 Subject: [PATCH] [CI] Add queue size, running count metrics (#122714) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commits allows the container to report 3 additional metrics at every sampling event: - a heartbeat - the size of the workflow queue (filtered) - the number of running workflows (filtered) The heartbeat is a simple metric allowing us to monitor the metrics health. Before this commit, a new metrics was pushed only when a workflow was completed. This meant we had to wait a few hours before noticing if the metrics container was unable to push metrics. In addition to this, this commits adds a sampling of the workflow queue size and running count. This should allow us to better understand the load, and improve the autoscale values we pick for the cluster. --------- Signed-off-by: Nathan Gauër --- .ci/metrics/metrics.py | 108 +++++++++++++++++++++++++++++++++++------ 1 file changed, 94 insertions(+), 14 deletions(-) diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py index 8edc00bc6bd3..48d2aa2f330e 100644 --- a/.ci/metrics/metrics.py +++ b/.ci/metrics/metrics.py @@ -26,7 +26,67 @@ class JobMetrics: workflow_id: int -def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]): +@dataclass +class GaugeMetric: + name: str + value: int + time_ns: int + + +def get_sampled_workflow_metrics(github_repo: github.Repository): + """Gets global statistics about the Github workflow queue + + Args: + github_repo: A github repo object to use to query the relevant information. + + Returns: + Returns a list of GaugeMetric objects, containing the relevant metrics about + the workflow + """ + + # Other states are available (pending, waiting, etc), but the meaning + # is not documented (See #70540). + # "queued" seems to be the info we want. + queued_workflow_count = len( + [ + x + for x in github_repo.get_workflow_runs(status="queued") + if x.name in WORKFLOWS_TO_TRACK + ] + ) + running_workflow_count = len( + [ + x + for x in github_repo.get_workflow_runs(status="in_progress") + if x.name in WORKFLOWS_TO_TRACK + ] + ) + + workflow_metrics = [] + workflow_metrics.append( + GaugeMetric( + "workflow_queue_size", + queued_workflow_count, + time.time_ns(), + ) + ) + workflow_metrics.append( + GaugeMetric( + "running_workflow_count", + running_workflow_count, + time.time_ns(), + ) + ) + # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + workflow_metrics.append( + GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) + ) + return workflow_metrics + + +def get_per_workflow_metrics( + github_repo: github.Repository, workflows_to_track: dict[str, int] +): """Gets the metrics for specified Github workflows. This function takes in a list of workflows to track, and optionally the @@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in Returns a list of JobMetrics objects, containing the relevant metrics about the workflow. """ - workflow_runs = iter(github_repo.get_workflow_runs()) - workflow_metrics = [] workflows_to_include = set(workflows_to_track.keys()) - while len(workflows_to_include) > 0: - workflow_run = next(workflow_runs) + for workflow_run in iter(github_repo.get_workflow_runs()): + if len(workflows_to_include) == 0: + break + if workflow_run.status != "completed": continue @@ -139,12 +199,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key): metrics_userid: The userid to use for the upload. api_key: The API key to use for the upload. """ + + if len(workflow_metrics) == 0: + print("No metrics found to upload.", file=sys.stderr) + return + metrics_batch = [] for workflow_metric in workflow_metrics: - workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_") - metrics_batch.append( - f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}" - ) + if isinstance(workflow_metric, GaugeMetric): + name = workflow_metric.name.lower().replace(" ", "_") + metrics_batch.append( + f"{name} value={workflow_metric.value} {workflow_metric.time_ns}" + ) + elif isinstance(workflow_metric, JobMetrics): + name = workflow_metric.job_name.lower().replace(" ", "_") + metrics_batch.append( + f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}" + ) + else: + raise ValueError( + f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}" + ) request_data = "\n".join(metrics_batch) response = requests.post( @@ -176,16 +251,21 @@ def main(): # Enter the main loop. Every five minutes we wake up and dump metrics for # the relevant jobs. while True: - current_metrics = get_metrics(github_repo, workflows_to_track) - if len(current_metrics) == 0: - print("No metrics found to upload.", file=sys.stderr) - continue + current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track) + current_metrics += get_sampled_workflow_metrics(github_repo) + # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + current_metrics.append( + GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) + ) upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key) print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr) for workflow_metric in reversed(current_metrics): - workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id + if isinstance(workflow_metric, JobMetrics): + workflows_to_track[ + workflow_metric.job_name + ] = workflow_metric.workflow_id time.sleep(SCRAPE_INTERVAL_SECONDS)