mirror of
https://github.com/llvm/llvm-project.git
synced 2025-04-16 15:56:39 +00:00

Before this commit, we only pushed a queue/running count when the value was not zero. This makes building Grafana alerting a bit harder. Changing this to always upload a value for watched workflows.
526 lines
19 KiB
Python
526 lines
19 KiB
Python
import collections
|
|
import datetime
|
|
import dateutil
|
|
import github
|
|
import json
|
|
import logging
|
|
import os
|
|
import requests
|
|
import sys
|
|
import time
|
|
|
|
from dataclasses import dataclass
|
|
from github import Auth
|
|
from github import Github
|
|
|
|
GRAFANA_URL = (
|
|
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
|
|
)
|
|
SCRAPE_INTERVAL_SECONDS = 5 * 60
|
|
|
|
# Lists the Github workflows we want to track. Maps the Github job name to
|
|
# the metric name prefix in grafana.
|
|
# This metric name is also used as a key in the job->name map.
|
|
GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}
|
|
|
|
# Lists the Github jobs to track for a given workflow. The key is the stable
|
|
# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
|
|
# Each value is a map to link the github job name to the corresponding metric
|
|
# name.
|
|
GITHUB_JOB_TO_TRACK = {
|
|
"github_llvm_premerge_checks": {
|
|
"Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
|
|
"Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
|
|
}
|
|
}
|
|
|
|
# The number of workflows to pull when sampling Github workflows.
|
|
# - Github API filtering is broken: we cannot apply any filtering:
|
|
# - See https://github.com/orgs/community/discussions/86766
|
|
# - A workflow can complete before another workflow, even when starting later.
|
|
# - We don't want to sample the same workflow twice.
|
|
#
|
|
# This means we essentially have a list of workflows sorted by creation date,
|
|
# and that's all we can deduce from it. So for each iteration, we'll blindly
|
|
# process the last N workflows.
|
|
GITHUB_WORKFLOWS_MAX_PROCESS_COUNT = 2000
|
|
# Second reason for the cut: reaching a workflow older than X.
|
|
# This means we will miss long-tails (exceptional jobs running for more than
|
|
# X hours), but that's also the case with the count cutoff above.
|
|
# Only solution to avoid missing any workflow would be to process the complete
|
|
# list, which is not possible.
|
|
GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS = 8
|
|
|
|
# Grafana will fail to insert any metric older than ~2 hours (value determined
|
|
# by trial and error).
|
|
GRAFANA_METRIC_MAX_AGE_MN = 120
|
|
|
|
# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to
|
|
# the metric name in Grafana. This is important not to lose metrics history
|
|
# if the workflow name changes.
|
|
BUILDKITE_WORKFLOW_TO_TRACK = {
|
|
":linux: Linux x64": "buildkite_linux",
|
|
":windows: Windows x64": "buildkite_windows",
|
|
}
|
|
|
|
# Number of builds to fetch per page. Since we scrape regularly, this can
|
|
# remain small.
|
|
BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 50
|
|
|
|
@dataclass
|
|
class JobMetrics:
|
|
job_name: str
|
|
queue_time: int
|
|
run_time: int
|
|
status: int
|
|
completed_at_ns: int
|
|
workflow_id: int
|
|
workflow_name: str
|
|
|
|
@dataclass
|
|
class GaugeMetric:
|
|
name: str
|
|
value: int
|
|
time_ns: int
|
|
|
|
|
|
def buildkite_fetch_page_build_list(
|
|
buildkite_token: str, after_cursor: str = None
|
|
) -> list[dict[str, str]]:
|
|
"""Fetches a page of the build list using the GraphQL BuildKite API.
|
|
|
|
Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last running/queued builds,
|
|
or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE running/queued builds
|
|
older than the one pointer by |after_cursor| if provided.
|
|
The |after_cursor| value is taken from the previous page returned by the
|
|
API.
|
|
|
|
Args:
|
|
buildkite_token: the secret token to authenticate GraphQL requests.
|
|
after_cursor: cursor after which to start the page fetch.
|
|
|
|
Returns:
|
|
The most recent builds after cursor (if set) with the following format:
|
|
[
|
|
{
|
|
"cursor": <value>,
|
|
"number": <build-number>,
|
|
}
|
|
]
|
|
"""
|
|
|
|
BUILDKITE_GRAPHQL_QUERY = """
|
|
query OrganizationShowQuery {{
|
|
organization(slug: "llvm-project") {{
|
|
pipelines(search: "Github pull requests", first: 1) {{
|
|
edges {{
|
|
node {{
|
|
builds (state: [CANCELING, CREATING, FAILING, RUNNING], first: {PAGE_SIZE}, after: {AFTER}) {{
|
|
edges {{
|
|
cursor
|
|
node {{
|
|
number
|
|
}}
|
|
}}
|
|
}}
|
|
}}
|
|
}}
|
|
}}
|
|
}}
|
|
}}
|
|
"""
|
|
query = BUILDKITE_GRAPHQL_QUERY.format(
|
|
PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE,
|
|
AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor),
|
|
)
|
|
query = json.dumps({"query": query})
|
|
url = "https://graphql.buildkite.com/v1"
|
|
headers = {
|
|
"Authorization": "Bearer " + buildkite_token,
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = requests.post(url, data=query, headers=headers).json()
|
|
# De-nest the build list.
|
|
if "errors" in data:
|
|
logging.info("Failed to fetch BuildKite jobs: {}".format(data["errors"]))
|
|
return []
|
|
builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][
|
|
"edges"
|
|
]
|
|
# Fold cursor info into the node dictionnary.
|
|
return [{**x["node"], "cursor": x["cursor"]} for x in builds]
|
|
|
|
|
|
def buildkite_get_build_info(build_number: str) -> dict:
|
|
"""Returns all the info associated with the provided build number.
|
|
|
|
Note: for unknown reasons, graphql returns no jobs for a given build,
|
|
while this endpoint does, hence why this uses this API instead of graphql.
|
|
|
|
Args:
|
|
build_number: which build number to fetch info for.
|
|
|
|
Returns:
|
|
The info for the target build, a JSON dictionnary.
|
|
"""
|
|
|
|
URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json"
|
|
return requests.get(URL.format(build_number)).json()
|
|
|
|
|
|
def buildkite_get_incomplete_tasks(buildkite_token: str) -> list:
|
|
"""Returns all the running/pending BuildKite builds.
|
|
|
|
Args:
|
|
buildkite_token: the secret token to authenticate GraphQL requests.
|
|
last_cursor: the cursor to stop at if set. If None, a full page is fetched.
|
|
"""
|
|
output = []
|
|
cursor = None
|
|
while True:
|
|
page = buildkite_fetch_page_build_list(buildkite_token, cursor)
|
|
if len(page) == 0:
|
|
break
|
|
cursor = page[-1]["cursor"]
|
|
output += page
|
|
return output
|
|
|
|
|
|
def buildkite_get_metrics(
|
|
buildkite_token: str, previously_incomplete: set[int]
|
|
) -> (list[JobMetrics], set[int]):
|
|
"""Returns a tuple with:
|
|
|
|
- the metrics recorded for newly completed workflow jobs.
|
|
- the set of workflow still running now.
|
|
|
|
Args:
|
|
buildkite_token: the secret token to authenticate GraphQL requests.
|
|
previously_incomplete: the set of running workflows the last time this
|
|
function was called.
|
|
"""
|
|
|
|
running_builds = buildkite_get_incomplete_tasks(buildkite_token)
|
|
incomplete_now = set([x["number"] for x in running_builds])
|
|
output = []
|
|
|
|
for build_id in previously_incomplete:
|
|
if build_id in incomplete_now:
|
|
continue
|
|
|
|
info = buildkite_get_build_info(build_id)
|
|
metric_timestamp = dateutil.parser.isoparse(info["finished_at"])
|
|
for job in info["jobs"]:
|
|
# This workflow is not interesting to us.
|
|
if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK:
|
|
continue
|
|
|
|
# Don't count canceled jobs.
|
|
if job["canceled_at"]:
|
|
continue
|
|
|
|
created_at = dateutil.parser.isoparse(job["created_at"])
|
|
scheduled_at = dateutil.parser.isoparse(job["scheduled_at"])
|
|
started_at = dateutil.parser.isoparse(job["started_at"])
|
|
finished_at = dateutil.parser.isoparse(job["finished_at"])
|
|
|
|
job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]]
|
|
queue_time = (started_at - scheduled_at).seconds
|
|
run_time = (finished_at - started_at).seconds
|
|
status = bool(job["passed"])
|
|
|
|
# Grafana will refuse to ingest metrics older than ~2 hours, so we
|
|
# should avoid sending historical data.
|
|
metric_age_mn = (
|
|
datetime.datetime.now(datetime.timezone.utc) - metric_timestamp
|
|
).total_seconds() / 60
|
|
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
|
|
logging.warning(
|
|
f"Job {job['name']} from workflow {build_id} dropped due"
|
|
+ f" to staleness: {metric_age_mn}mn old."
|
|
)
|
|
continue
|
|
|
|
metric_timestamp_ns = int(metric_timestamp.timestamp()) * 10**9
|
|
workflow_id = build_id
|
|
workflow_name = "Github pull requests"
|
|
output.append(
|
|
JobMetrics(
|
|
job_name,
|
|
queue_time,
|
|
run_time,
|
|
status,
|
|
metric_timestamp_ns,
|
|
workflow_id,
|
|
workflow_name,
|
|
)
|
|
)
|
|
|
|
return output, incomplete_now
|
|
|
|
def github_get_metrics(
|
|
github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
|
|
) -> tuple[list[JobMetrics], int]:
|
|
"""Gets the metrics for specified Github workflows.
|
|
|
|
This function takes in a list of workflows to track, and optionally the
|
|
workflow ID of the last tracked invocation. It grabs the relevant data
|
|
from Github, returning it to the caller.
|
|
If the last_seen_workflow parameter is None, this returns no metrics, but
|
|
returns the id of the most recent workflow.
|
|
|
|
Args:
|
|
github_repo: A github repo object to use to query the relevant information.
|
|
last_seen_workflow: the last workflow this function processed.
|
|
|
|
Returns:
|
|
Returns a tuple with 2 elements:
|
|
- a list of JobMetrics objects, one per processed job.
|
|
- the ID of the most recent processed workflow run.
|
|
"""
|
|
workflow_metrics = []
|
|
queued_count = collections.Counter()
|
|
running_count = collections.Counter()
|
|
|
|
# Initialize all the counters to 0 so we report 0 when no job is queued
|
|
# or running.
|
|
for wf_name, wf_metric_name in GITHUB_WORKFLOW_TO_TRACK.items():
|
|
for job_name, job_metric_name in GITHUB_JOB_TO_TRACK[wf_metric_name].items():
|
|
queued_count[wf_metric_name + "_" + job_metric_name] = 0
|
|
running_count[wf_metric_name + "_" + job_metric_name] = 0
|
|
|
|
# The list of workflows this iteration will process.
|
|
# MaxSize = GITHUB_WORKFLOWS_MAX_PROCESS_COUNT
|
|
workflow_seen_as_completed = set()
|
|
|
|
# Since we process a fixed count of workflows, we want to know when
|
|
# the depth is too small and if we miss workflows.
|
|
# E.g.: is there was more than N workflows int last 2 hours.
|
|
# To monitor this, we'll log the age of the oldest workflow processed,
|
|
# and setup alterting in Grafana to help us adjust this depth.
|
|
oldest_seen_workflow_age_mn = None
|
|
|
|
# Do not apply any filters to this query.
|
|
# See https://github.com/orgs/community/discussions/86766
|
|
# Applying filters like `status=completed` will break pagination, and
|
|
# return a non-sorted and incomplete list of workflows.
|
|
i = 0
|
|
for task in iter(github_repo.get_workflow_runs()):
|
|
# Max depth reached, stopping.
|
|
if i >= GITHUB_WORKFLOWS_MAX_PROCESS_COUNT:
|
|
break
|
|
i += 1
|
|
|
|
workflow_age_mn = (
|
|
datetime.datetime.now(datetime.timezone.utc) - task.created_at
|
|
).total_seconds() / 60
|
|
oldest_seen_workflow_age_mn = workflow_age_mn
|
|
# If we reach a workflow older than X, stop.
|
|
if workflow_age_mn > GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS * 60:
|
|
break
|
|
|
|
# This workflow is not interesting to us.
|
|
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
|
|
continue
|
|
|
|
if task.status == "completed":
|
|
workflow_seen_as_completed.add(task.id)
|
|
|
|
# This workflow has already been seen completed in the previous run.
|
|
if task.id in last_workflows_seen_as_completed:
|
|
continue
|
|
|
|
name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
|
|
for job in task.jobs():
|
|
# This job is not interesting to us.
|
|
if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
|
|
continue
|
|
|
|
name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
|
|
metric_name = name_prefix + "_" + name_suffix
|
|
|
|
if task.status != "completed":
|
|
if job.status == "queued":
|
|
queued_count[metric_name] += 1
|
|
elif job.status == "in_progress":
|
|
running_count[metric_name] += 1
|
|
continue
|
|
|
|
job_result = int(job.conclusion == "success")
|
|
if job_result:
|
|
# We still might want to mark the job as a failure if one of the steps
|
|
# failed. This is required due to use setting continue-on-error in
|
|
# the premerge pipeline to prevent sending emails while we are
|
|
# testing the infrastructure.
|
|
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
|
|
# longer in a testing state and we can directly assert the workflow
|
|
# result.
|
|
for step in job.steps:
|
|
if step.conclusion != "success" and step.conclusion != "skipped":
|
|
job_result = 0
|
|
break
|
|
|
|
created_at = job.created_at
|
|
started_at = job.started_at
|
|
completed_at = job.completed_at
|
|
|
|
# GitHub API can return results where the started_at is slightly
|
|
# later then the created_at (or completed earlier than started).
|
|
# This would cause a -23h59mn delta, which will show up as +24h
|
|
# queue/run time on grafana.
|
|
if started_at < created_at:
|
|
logging.info(
|
|
"Workflow {} started before being created.".format(task.id)
|
|
)
|
|
queue_time = datetime.timedelta(seconds=0)
|
|
else:
|
|
queue_time = started_at - created_at
|
|
if completed_at < started_at:
|
|
logging.info("Workflow {} finished before starting.".format(task.id))
|
|
run_time = datetime.timedelta(seconds=0)
|
|
else:
|
|
run_time = completed_at - started_at
|
|
|
|
if run_time.seconds == 0:
|
|
continue
|
|
|
|
# Grafana will refuse to ingest metrics older than ~2 hours, so we
|
|
# should avoid sending historical data.
|
|
metric_age_mn = (
|
|
datetime.datetime.now(datetime.timezone.utc) - completed_at
|
|
).total_seconds() / 60
|
|
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
|
|
logging.warning(
|
|
f"Job {job.id} from workflow {task.id} dropped due"
|
|
+ f" to staleness: {metric_age_mn}mn old."
|
|
)
|
|
continue
|
|
|
|
logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
|
|
# The timestamp associated with the event is expected by Grafana to be
|
|
# in nanoseconds.
|
|
completed_at_ns = int(completed_at.timestamp()) * 10**9
|
|
workflow_metrics.append(
|
|
JobMetrics(
|
|
metric_name,
|
|
queue_time.seconds,
|
|
run_time.seconds,
|
|
job_result,
|
|
completed_at_ns,
|
|
task.id,
|
|
task.name,
|
|
)
|
|
)
|
|
|
|
for name, value in queued_count.items():
|
|
workflow_metrics.append(
|
|
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
|
|
)
|
|
for name, value in running_count.items():
|
|
workflow_metrics.append(
|
|
GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
|
|
)
|
|
|
|
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
|
|
workflow_metrics.append(
|
|
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
|
|
)
|
|
|
|
# Log the oldest workflow we saw, allowing us to monitor if the processing
|
|
# depth is correctly set-up.
|
|
if oldest_seen_workflow_age_mn is not None:
|
|
workflow_metrics.append(
|
|
GaugeMetric(
|
|
"github_oldest_processed_workflow_mn",
|
|
oldest_seen_workflow_age_mn,
|
|
time.time_ns(),
|
|
)
|
|
)
|
|
return workflow_metrics, workflow_seen_as_completed
|
|
|
|
|
|
def upload_metrics(workflow_metrics, metrics_userid, api_key):
|
|
"""Upload metrics to Grafana.
|
|
|
|
Takes in a list of workflow metrics and then uploads them to Grafana
|
|
through a REST request.
|
|
|
|
Args:
|
|
workflow_metrics: A list of metrics to upload to Grafana.
|
|
metrics_userid: The userid to use for the upload.
|
|
api_key: The API key to use for the upload.
|
|
"""
|
|
|
|
if len(workflow_metrics) == 0:
|
|
logging.info("No metrics found to upload.")
|
|
return
|
|
|
|
metrics_batch = []
|
|
for workflow_metric in workflow_metrics:
|
|
if isinstance(workflow_metric, GaugeMetric):
|
|
name = workflow_metric.name.lower().replace(" ", "_")
|
|
metrics_batch.append(
|
|
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
|
|
)
|
|
elif isinstance(workflow_metric, JobMetrics):
|
|
name = workflow_metric.job_name.lower().replace(" ", "_")
|
|
metrics_batch.append(
|
|
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.completed_at_ns}"
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
|
|
)
|
|
|
|
request_data = "\n".join(metrics_batch)
|
|
response = requests.post(
|
|
GRAFANA_URL,
|
|
headers={"Content-Type": "text/plain"},
|
|
data=request_data,
|
|
auth=(metrics_userid, api_key),
|
|
)
|
|
|
|
if response.status_code < 200 or response.status_code >= 300:
|
|
logging.info(f"Failed to submit data to Grafana: {response.status_code}")
|
|
|
|
|
|
def main():
|
|
# Authenticate with Github
|
|
github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
|
|
buildkite_token = os.environ["BUILDKITE_TOKEN"]
|
|
grafana_api_key = os.environ["GRAFANA_API_KEY"]
|
|
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
|
|
|
|
# The last workflow this script processed.
|
|
# Because the Github queries are broken, we'll simply log a 'processed'
|
|
# bit for the last COUNT_TO_PROCESS workflows.
|
|
gh_last_workflows_seen_as_completed = set()
|
|
# Stores the list of pending/running builds in BuildKite we need to check
|
|
# at the next iteration.
|
|
bk_incomplete = set()
|
|
|
|
# Enter the main loop. Every five minutes we wake up and dump metrics for
|
|
# the relevant jobs.
|
|
while True:
|
|
github_object = Github(auth=github_auth)
|
|
github_repo = github_object.get_repo("llvm/llvm-project")
|
|
|
|
gh_metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
|
|
github_repo, gh_last_workflows_seen_as_completed
|
|
)
|
|
|
|
bk_metrics, bk_incomplete = buildkite_get_metrics(
|
|
buildkite_token, bk_incomplete
|
|
)
|
|
|
|
metrics = gh_metrics + bk_metrics
|
|
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
|
|
logging.info(f"Uploaded {len(metrics)} metrics")
|
|
|
|
time.sleep(SCRAPE_INTERVAL_SECONDS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
main()
|