Skip to content

Add RedBeat scheduler monitoring support #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.s
--retry-interval=5
```

###### Configuring RedBeat scheduler monitoring

When using RedBeat as your Celery scheduler, you can configure the exporter to monitor
RedBeat schedules. The following parameters are available:

```sh
docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \
--redbeat-redis-url=redis://redis.service.consul/0 \
--redbeat-key-prefix=redbeat
```

For Redis Sentinel configurations:

```sh
docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \
--redbeat-redis-url=redis-sentinel://redis-sentinel:26379/0 \
--redbeat-sentinel=redis-sentinel1:26379 \
--redbeat-sentinel=redis-sentinel2:26379 \
--redbeat-redis-option=password=secret \
--redbeat-redis-option=service_name=master
```

You can also specify SSL options for RedBeat Redis:

```sh
docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \
--redbeat-redis-url=rediss://secure-redis:6379/0 \
--redbeat-redis-use-ssl=true
```

##### Grafana Dashboards & Prometheus Alerts

Head over to the [Celery-mixin in this subdirectory](https://github.com/danihodovic/celery-exporter/tree/master/celery-mixin) to generate rules and dashboards suited to your Prometheus setup.
Expand Down
5 changes: 5 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def exporter_cfg_defaults(find_free_port, celery_config, log_level):
"worker_timeout": 1,
"purge_offline_worker_metrics": 10,
"initial_queues": ["queue_from_command_line"],
"redbeat_redis_url": None,
"redbeat_redis_use_ssl": None,
"redbeat_key_prefix": None,
"redbeat_sentinel": None,
"redbeat_redis_option": [],
}
yield cfg

Expand Down
38 changes: 36 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Flask = "^3.1.0"
waitress = "^3.0.2"
arrow = "^1.3.0"
timy = "^0.4.2"
celery-redbeat = "^2.3.2"

[tool.poetry.group.dev.dependencies]
pytest = "^8.2.2"
Expand Down
37 changes: 37 additions & 0 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,38 @@ def _eq_sign_separated_argument_to_dict(_ctx, _param, value):
callback=_eq_sign_separated_argument_to_dict,
help="Add label with static value to all metrics",
)
@click.option(
"--redbeat-redis-url",
required=False,
default=None,
help="The Redis URL for RedBeat scheduler, e.g redis-sentinel://redis-sentinel:26379/0",
)
@click.option(
"--redbeat-redis-use-ssl",
required=False,
default=None,
help="SSL options for RedBeat Redis",
)
@click.option(
"--redbeat-key-prefix",
required=False,
default=None,
help="A prefix for all keys created by RedBeat, defaults to 'redbeat'",
)
@click.option(
"--redbeat-sentinel",
required=False,
default=None,
multiple=True,
help="RedBeat sentinel address in format 'host:port', can be specified multiple times",
)
@click.option(
"--redbeat-redis-option",
required=False,
default=[None],
multiple=True,
help="RedBeat Redis options, e.g password=secret or service_name=master",
)
def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals
broker_url,
broker_transport_option,
Expand All @@ -151,6 +183,11 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too
queues,
metric_prefix,
static_label,
redbeat_redis_url,
redbeat_redis_use_ssl,
redbeat_key_prefix,
redbeat_sentinel,
redbeat_redis_option,
): # pylint: disable=unused-argument
formatted_buckets = list(map(float, buckets.split(",")))
ctx = click.get_current_context()
Expand Down
90 changes: 76 additions & 14 deletions src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,7 @@ def track_worker_heartbeat(self, event):
)
logger.debug("Updated gauge='{}' value='{}'", self.celery_worker_up._name, up)

def run(self, click_params):
logger.remove()
logger.add(sys.stdout, level=click_params["log_level"])
self.app = Celery(broker=click_params["broker_url"])
if click_params["accept_content"] is not None:
accept_content_list = click_params["accept_content"].split(",")
logger.info("Setting celery accept_content {}", accept_content_list)
self.app.config_from_object(dict(accept_content=accept_content_list))
def _configure_broker_transport(self, click_params):
transport_options = {}
for transport_option in click_params["broker_transport_option"]:
if transport_option is not None:
Expand All @@ -370,9 +363,10 @@ def run(self, click_params):
)
transport_options[option] = transform_option_value(value)

if transport_options is not None:
if transport_options:
self.app.conf["broker_transport_options"] = transport_options

def _configure_ssl_options(self, click_params):
ssl_options = {}
for ssl_option in click_params["broker_ssl_option"]:
if ssl_option is not None:
Expand All @@ -384,21 +378,89 @@ def run(self, click_params):
else:
ssl_options[option] = value

if ssl_options is not None:
if ssl_options:
self.app.conf["broker_use_ssl"] = ssl_options

self.state = self.app.events.State() # type: ignore
self.retry_interval = click_params["retry_interval"]
if self.retry_interval:
logger.debug("Using retry_interval of {} seconds", self.retry_interval)
def _configure_redbeat(self, click_params):
if not click_params["redbeat_redis_url"]:
return

logger.debug("Setting redbeat_redis_url={}", click_params["redbeat_redis_url"])
self.app.conf["redbeat_redis_url"] = click_params["redbeat_redis_url"]

# Set redbeat configuration options if provided
if click_params["redbeat_redis_use_ssl"]:
self.app.conf["redbeat_redis_use_ssl"] = click_params[
"redbeat_redis_use_ssl"
]

if click_params["redbeat_key_prefix"]:
self.app.conf["redbeat_key_prefix"] = click_params["redbeat_key_prefix"]

# Process redbeat redis options
redbeat_options = self._process_redbeat_options(click_params)
if redbeat_options:
self.app.conf["redbeat_redis_options"] = redbeat_options
logger.debug("RedBeat Redis options: {}", redbeat_options)

def _process_redbeat_options(self, click_params):
redbeat_options = {}

# Process sentinel addresses if provided (convert to list of tuples)
if click_params["redbeat_sentinel"]:
sentinels = []
for sentinel in click_params["redbeat_sentinel"]:
if ":" in sentinel:
host, port_str = sentinel.split(":")
port = int(port_str)
sentinels.append((host, port))
else:
logger.warning(
f"Invalid sentinel format: {sentinel}, expected host:port"
)
if sentinels:
redbeat_options["sentinels"] = sentinels

# Process other redis options
for option in click_params["redbeat_redis_option"]:
if option is not None:
option_name, value = option.split("=", 1)
if option_name is not None:
logger.debug(
"Setting redbeat_redis_option {}={}", option_name, value
)
redbeat_options[option_name] = transform_option_value(value)

return redbeat_options

def _setup_event_handlers(self):
handlers = {
"worker-heartbeat": self.track_worker_heartbeat,
"worker-online": lambda event: self.track_worker_status(event, True),
"worker-offline": lambda event: self.track_worker_status(event, False),
}
for key in self.state_counters:
handlers[key] = self.track_task_event
return handlers

def run(self, click_params):
logger.remove()
logger.add(sys.stdout, level=click_params["log_level"])
self.app = Celery(broker=click_params["broker_url"])
if click_params["accept_content"] is not None:
accept_content_list = click_params["accept_content"].split(",")
logger.info("Setting celery accept_content {}", accept_content_list)
self.app.config_from_object(dict(accept_content=accept_content_list))
self._configure_broker_transport(click_params)
self._configure_ssl_options(click_params)
self._configure_redbeat(click_params)

self.state = self.app.events.State() # type: ignore
self.retry_interval = click_params["retry_interval"]
if self.retry_interval:
logger.debug("Using retry_interval of {} seconds", self.retry_interval)

handlers = self._setup_event_handlers()

with self.app.connection() as connection: # type: ignore
start_http_server(
Expand Down