Skip to content

style: refactor static labels feat to be more compact #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 149 additions & 34 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,120 @@
import re
import sys
import time

from collections import defaultdict
from typing import Callable, Optional

import prometheus_client

from celery import Celery
from celery.events.state import State # type: ignore
from celery.utils import nodesplit # type: ignore
from celery.utils.time import utcoffset # type: ignore
from kombu.exceptions import ChannelError # type: ignore
from loguru import logger
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import CollectorRegistry

from .http_server import start_http_server


class Counter(prometheus_client.Counter):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Gauge(prometheus_client.Gauge):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
multiprocess_mode="all",
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
multiprocess_mode,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Histogram(prometheus_client.Histogram):
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
name,
documentation,
labelnames=...,
namespace="",
subsystem="",
unit="",
registry=...,
_labelvalues=None,
buckets=...,
static_labels=None,
):
self.static_labels = static_labels or {}
static_labels_keys = self.static_labels.keys() or []
super().__init__(
name,
documentation,
[*labelnames, *static_labels_keys],
namespace,
subsystem,
unit,
registry,
_labelvalues,
buckets,
)

def labels(self, *labelvalues, **labelkwargs):
return super().labels(*labelvalues, **labelkwargs, **self.static_labels)


class Exporter: # pylint: disable=too-many-instance-attributes,too-many-branches
state: State = None

Expand All @@ -40,34 +140,34 @@ def __init__(
)
self.generic_hostname_task_sent_metric = generic_hostname_task_sent_metric

# Static labels
self.static_label = static_label or {}
self.static_label_keys = self.static_label.keys()

self.state_counters = {
"task-sent": Counter(
f"{metric_prefix}task_sent",
"Sent when a task message is published.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-received": Counter(
f"{metric_prefix}task_received",
"Sent when the worker receives a task.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-started": Counter(
f"{metric_prefix}task_started",
"Sent just before the worker executes the task.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-succeeded": Counter(
f"{metric_prefix}task_succeeded",
"Sent if the task executed successfully.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-failed": Counter(
f"{metric_prefix}task_failed",
Expand All @@ -77,72 +177,82 @@ def __init__(
"hostname",
"exception",
"queue_name",
*self.static_label_keys,
],
registry=self.registry,
static_labels=static_label,
),
"task-rejected": Counter(
f"{metric_prefix}task_rejected",
# pylint: disable=line-too-long
"The task was rejected by the worker, possibly to be re-queued or moved to a dead letter queue.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-revoked": Counter(
f"{metric_prefix}task_revoked",
"Sent if the task has been revoked.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
"task-retried": Counter(
f"{metric_prefix}task_retried",
"Sent if the task failed, but will be retried in the future.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
),
}
self.celery_worker_up = Gauge(
f"{metric_prefix}worker_up",
"Indicates if a worker has recently sent a heartbeat.",
["hostname", *self.static_label_keys],
["hostname"],
registry=self.registry,
static_labels=static_label,
)
self.worker_tasks_active = Gauge(
f"{metric_prefix}worker_tasks_active",
"The number of tasks the worker is currently processing",
["hostname", *self.static_label_keys],
["hostname"],
registry=self.registry,
static_labels=static_label,
)
self.celery_task_runtime = Histogram(
f"{metric_prefix}task_runtime",
"Histogram of task runtime measurements.",
["name", "hostname", "queue_name", *self.static_label_keys],
["name", "hostname", "queue_name"],
registry=self.registry,
static_labels=static_label,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
f"{metric_prefix}queue_length",
"The number of message in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_consumer_count = Gauge(
f"{metric_prefix}active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_worker_count = Gauge(
f"{metric_prefix}active_worker_count",
"The number of active workers in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)
self.celery_active_process_count = Gauge(
f"{metric_prefix}active_process_count",
"The number of active processes in broker queue.",
["queue_name", *self.static_label_keys],
["queue_name"],
registry=self.registry,
static_labels=static_label,
)

def scrape(self):
Expand All @@ -155,10 +265,12 @@ def scrape(self):

def forget_worker(self, hostname):
if hostname in self.worker_last_seen:
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(0)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
0
)
self.celery_worker_up.labels(
hostname=hostname,
).set(0)
self.worker_tasks_active.labels(
hostname=hostname,
).set(0)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, 0
)
Expand Down Expand Up @@ -253,19 +365,19 @@ def track_queue_metrics(self):
if transport in ["amqp", "amqps", "memory"]:
consumer_count = rabbitmq_queue_consumer_count(connection, queue)
self.celery_active_consumer_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(consumer_count)

self.celery_active_process_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(processes_per_queue[queue])
self.celery_active_worker_count.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(workers_per_queue[queue])
length = queue_length(transport, connection, queue)
if length is not None:
self.celery_queue_length.labels(
queue_name=queue, **self.static_label
queue_name=queue,
).set(length)

def track_task_event(self, event):
Expand All @@ -280,7 +392,6 @@ def track_task_event(self, event):
"name": task.name,
"hostname": get_hostname(task.hostname),
"queue_name": getattr(task, "queue", "celery"),
**self.static_label,
}
if event["type"] == "task-sent" and self.generic_hostname_task_sent_metric:
labels["hostname"] = "generic"
Expand Down Expand Up @@ -320,7 +431,9 @@ def track_worker_status(self, event, is_online):
event_name = "worker-online" if is_online else "worker-offline"
hostname = get_hostname(event["hostname"])
logger.debug("Received event='{}' for hostname='{}'", event_name, hostname)
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(value)
self.celery_worker_up.labels(
hostname=hostname,
).set(value)

if is_online:
self.worker_last_seen[hostname] = {
Expand All @@ -343,10 +456,12 @@ def track_worker_heartbeat(self, event):
worker_state = self.state.event(event)[0][0]
active = worker_state.active or 0
up = 1 if worker_state.alive else 0
self.celery_worker_up.labels(hostname=hostname, **self.static_label).set(up)
self.worker_tasks_active.labels(hostname=hostname, **self.static_label).set(
active
)
self.celery_worker_up.labels(
hostname=hostname,
).set(up)
self.worker_tasks_active.labels(
hostname=hostname,
).set(active)
logger.debug(
"Updated gauge='{}' value='{}'", self.worker_tasks_active._name, active
)
Expand Down
Loading