From 5d8240159d109ef8459ee9bfe916e470ce378d1a Mon Sep 17 00:00:00 2001 From: "Hamacek, Jan" Date: Wed, 19 Mar 2025 16:34:56 +0100 Subject: [PATCH 1/3] Add redbeat config options --- .python-version | 2 +- conftest.py | 6 ++++ poetry.lock | 38 +++++++++++++++++++-- pyproject.toml | 1 + src/cli.py | 37 ++++++++++++++++++++ src/exporter.py | 90 +++++++++++++++++++++++++++++++++++++++++-------- 6 files changed, 157 insertions(+), 17 deletions(-) diff --git a/.python-version b/.python-version index d9506ce..04e2079 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.12.5 +3.12.8 diff --git a/conftest.py b/conftest.py index f8dd7a4..4c54f81 100644 --- a/conftest.py +++ b/conftest.py @@ -97,6 +97,12 @@ def exporter_cfg_defaults(find_free_port, celery_config, log_level): "worker_timeout": 1, "purge_offline_worker_metrics": 10, "initial_queues": ["queue_from_command_line"], + # New RedBeat parameters with default values + "redbeat_redis_url": None, + "redbeat_redis_use_ssl": None, + "redbeat_key_prefix": None, + "redbeat_sentinel": None, + "redbeat_redis_option": [], } yield cfg diff --git a/poetry.lock b/poetry.lock index 821d31c..af48b70 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. [[package]] name = "altgraph" @@ -210,6 +210,24 @@ yaml = ["PyYAML (>=3.10)"] zookeeper = ["kazoo (>=1.3.1)"] zstd = ["zstandard (==0.22.0)"] +[[package]] +name = "celery-redbeat" +version = "2.3.2" +description = "A Celery Beat Scheduler using Redis for persistent storage" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "celery_redbeat-2.3.2-py2.py3-none-any.whl", hash = "sha256:9d61ab6e29f59d676621a846c778a347d5acad22e90fb4386343a4852d4f7ac3"}, + {file = "celery_redbeat-2.3.2.tar.gz", hash = "sha256:7d6e41e4b807fafba5d88b98e3d3863f0df95ac3ed1692d70b134c079b936b8b"}, +] + +[package.dependencies] +celery = ">=5.0" +python-dateutil = "*" +redis = ">=3.2" +tenacity = "*" + [[package]] name = "celery-types" version = "0.11.0" @@ -1438,6 +1456,22 @@ files = [ {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, ] +[[package]] +name = "tenacity" +version = "9.0.0" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, + {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, +] + +[package.extras] +doc = ["reno", "sphinx"] +test = ["pytest", "tornado (>=4.5)", "typeguard"] + [[package]] name = "timy" version = "0.4.2" @@ -1684,4 +1718,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.13" -content-hash = "974df3d0bf38a8e447ab0750d36eec8c7311f8b79047327f9b3eb51f5128a18d" +content-hash = "47021ab939af01449d8ebe02bcdd362887aa85feaeca2823094e418e2cb3b6ca" diff --git a/pyproject.toml b/pyproject.toml index f518fa6..320cc5e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ Flask = "^3.1.0" waitress = "^3.0.2" arrow = "^1.3.0" timy = "^0.4.2" +celery-redbeat = "^2.3.2" [tool.poetry.group.dev.dependencies] pytest = "^8.2.2" diff --git a/src/cli.py b/src/cli.py index c0614b1..d21b23e 100644 --- a/src/cli.py +++ b/src/cli.py @@ -135,6 +135,38 @@ def _eq_sign_separated_argument_to_dict(_ctx, _param, value): callback=_eq_sign_separated_argument_to_dict, help="Add label with static value to all metrics", ) +@click.option( + "--redbeat-redis-url", + required=False, + default=None, + help="The Redis URL for RedBeat scheduler, e.g redis-sentinel://redis-sentinel:26379/0", +) +@click.option( + "--redbeat-redis-use-ssl", + required=False, + default=None, + help="SSL options for RedBeat Redis", +) +@click.option( + "--redbeat-key-prefix", + required=False, + default=None, + help="A prefix for all keys created by RedBeat, defaults to 'redbeat'", +) +@click.option( + "--redbeat-sentinel", + required=False, + default=None, + multiple=True, + help="RedBeat sentinel address in format 'host:port', can be specified multiple times", +) +@click.option( + "--redbeat-redis-option", + required=False, + default=[None], + multiple=True, + help="RedBeat Redis options, e.g password=secret or service_name=master", +) def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals broker_url, broker_transport_option, @@ -151,6 +183,11 @@ def cli( # pylint: disable=too-many-arguments,too-many-positional-arguments,too queues, metric_prefix, static_label, + redbeat_redis_url, + redbeat_redis_use_ssl, + redbeat_key_prefix, + redbeat_sentinel, + redbeat_redis_option, ): # pylint: disable=unused-argument formatted_buckets = list(map(float, buckets.split(","))) ctx = click.get_current_context() diff --git a/src/exporter.py b/src/exporter.py index 609227a..f8726ca 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -352,14 +352,7 @@ def track_worker_heartbeat(self, event): ) logger.debug("Updated gauge='{}' value='{}'", self.celery_worker_up._name, up) - def run(self, click_params): - logger.remove() - logger.add(sys.stdout, level=click_params["log_level"]) - self.app = Celery(broker=click_params["broker_url"]) - if click_params["accept_content"] is not None: - accept_content_list = click_params["accept_content"].split(",") - logger.info("Setting celery accept_content {}", accept_content_list) - self.app.config_from_object(dict(accept_content=accept_content_list)) + def _configure_broker_transport(self, click_params): transport_options = {} for transport_option in click_params["broker_transport_option"]: if transport_option is not None: @@ -370,9 +363,10 @@ def run(self, click_params): ) transport_options[option] = transform_option_value(value) - if transport_options is not None: + if transport_options: self.app.conf["broker_transport_options"] = transport_options + def _configure_ssl_options(self, click_params): ssl_options = {} for ssl_option in click_params["broker_ssl_option"]: if ssl_option is not None: @@ -384,14 +378,62 @@ def run(self, click_params): else: ssl_options[option] = value - if ssl_options is not None: + if ssl_options: self.app.conf["broker_use_ssl"] = ssl_options - self.state = self.app.events.State() # type: ignore - self.retry_interval = click_params["retry_interval"] - if self.retry_interval: - logger.debug("Using retry_interval of {} seconds", self.retry_interval) + def _configure_redbeat(self, click_params): + if not click_params["redbeat_redis_url"]: + return + + logger.debug("Setting redbeat_redis_url={}", click_params["redbeat_redis_url"]) + self.app.conf["redbeat_redis_url"] = click_params["redbeat_redis_url"] + + # Set redbeat configuration options if provided + if click_params["redbeat_redis_use_ssl"]: + self.app.conf["redbeat_redis_use_ssl"] = click_params[ + "redbeat_redis_use_ssl" + ] + + if click_params["redbeat_key_prefix"]: + self.app.conf["redbeat_key_prefix"] = click_params["redbeat_key_prefix"] + + # Process redbeat redis options + redbeat_options = self._process_redbeat_options(click_params) + if redbeat_options: + self.app.conf["redbeat_redis_options"] = redbeat_options + logger.debug("RedBeat Redis options: {}", redbeat_options) + + def _process_redbeat_options(self, click_params): + redbeat_options = {} + + # Process sentinel addresses if provided (convert to list of tuples) + if click_params["redbeat_sentinel"]: + sentinels = [] + for sentinel in click_params["redbeat_sentinel"]: + if ":" in sentinel: + host, port_str = sentinel.split(":") + port = int(port_str) + sentinels.append((host, port)) + else: + logger.warning( + f"Invalid sentinel format: {sentinel}, expected host:port" + ) + if sentinels: + redbeat_options["sentinels"] = sentinels + + # Process other redis options + for option in click_params["redbeat_redis_option"]: + if option is not None: + option_name, value = option.split("=", 1) + if option_name is not None: + logger.debug( + "Setting redbeat_redis_option {}={}", option_name, value + ) + redbeat_options[option_name] = transform_option_value(value) + + return redbeat_options + def _setup_event_handlers(self): handlers = { "worker-heartbeat": self.track_worker_heartbeat, "worker-online": lambda event: self.track_worker_status(event, True), @@ -399,6 +441,26 @@ def run(self, click_params): } for key in self.state_counters: handlers[key] = self.track_task_event + return handlers + + def run(self, click_params): + logger.remove() + logger.add(sys.stdout, level=click_params["log_level"]) + self.app = Celery(broker=click_params["broker_url"]) + if click_params["accept_content"] is not None: + accept_content_list = click_params["accept_content"].split(",") + logger.info("Setting celery accept_content {}", accept_content_list) + self.app.config_from_object(dict(accept_content=accept_content_list)) + self._configure_broker_transport(click_params) + self._configure_ssl_options(click_params) + self._configure_redbeat(click_params) + + self.state = self.app.events.State() # type: ignore + self.retry_interval = click_params["retry_interval"] + if self.retry_interval: + logger.debug("Using retry_interval of {} seconds", self.retry_interval) + + handlers = self._setup_event_handlers() with self.app.connection() as connection: # type: ignore start_http_server( From b44a5f9f0176379a811042255877240b5553ae08 Mon Sep 17 00:00:00 2001 From: "Hamacek, Jan" Date: Thu, 20 Mar 2025 14:19:21 +0100 Subject: [PATCH 2/3] docs: add RedBeat configuration parameters to README --- README.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/README.md b/README.md index ddf2cfc..7f20a46 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,36 @@ docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.s --retry-interval=5 ``` +###### Configuring RedBeat scheduler monitoring + +When using RedBeat as your Celery scheduler, you can configure the exporter to monitor +RedBeat schedules. The following parameters are available: + +```sh +docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \ + --redbeat-redis-url=redis://redis.service.consul/0 \ + --redbeat-key-prefix=redbeat +``` + +For Redis Sentinel configurations: + +```sh +docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \ + --redbeat-redis-url=redis-sentinel://redis-sentinel:26379/0 \ + --redbeat-sentinel=redis-sentinel1:26379 \ + --redbeat-sentinel=redis-sentinel2:26379 \ + --redbeat-redis-option=password=secret \ + --redbeat-redis-option=service_name=master +``` + +You can also specify SSL options for RedBeat Redis: + +```sh +docker run -p 9808:9808 danihodovic/celery-exporter --broker-url=redis://redis.service.consul/1 \ + --redbeat-redis-url=rediss://secure-redis:6379/0 \ + --redbeat-redis-use-ssl=true +``` + ##### Grafana Dashboards & Prometheus Alerts Head over to the [Celery-mixin in this subdirectory](https://github.com/danihodovic/celery-exporter/tree/master/celery-mixin) to generate rules and dashboards suited to your Prometheus setup. From 4ba33a6ba1d2b24b221210ed51e2025efa41aa7f Mon Sep 17 00:00:00 2001 From: "Hamacek, Jan" Date: Thu, 20 Mar 2025 14:27:25 +0100 Subject: [PATCH 3/3] Cleanup --- .python-version | 2 +- conftest.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.python-version b/.python-version index 04e2079..d9506ce 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.12.8 +3.12.5 diff --git a/conftest.py b/conftest.py index 4c54f81..2adf218 100644 --- a/conftest.py +++ b/conftest.py @@ -97,7 +97,6 @@ def exporter_cfg_defaults(find_free_port, celery_config, log_level): "worker_timeout": 1, "purge_offline_worker_metrics": 10, "initial_queues": ["queue_from_command_line"], - # New RedBeat parameters with default values "redbeat_redis_url": None, "redbeat_redis_use_ssl": None, "redbeat_key_prefix": None,