diff --git a/.env.example b/.env.example index 7c2cd9f0..c376c8c2 100644 --- a/.env.example +++ b/.env.example @@ -86,3 +86,9 @@ CALLBACK_READ_TIMEOUT = 10 # require as a env if you want to use doc transformation OPENAI_API_KEY="" + +# Resource Monitoring Configuration +RESOURCE_MONITORING_ENABLED=True +RESOURCE_CHECK_INTERVAL=5 +CPU_THRESHOLD_PERCENT=80.0 +MEMORY_THRESHOLD_PERCENT=85.0 diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 81fba8cb..69246042 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -1,7 +1,13 @@ +import logging from celery import Celery +from celery.app.control import Control +from celery.signals import worker_ready, worker_shutdown, task_prerun, task_postrun from kombu import Exchange, Queue from app.core.config import settings +from app.celery.resource_monitor import resource_monitor + +logger = logging.getLogger(__name__) # Create Celery instance celery_app = Celery( @@ -16,7 +22,7 @@ # Define exchanges and queues with priority default_exchange = Exchange("default", type="direct") -# Celery configuration using environment variables +# Celery configuration celery_app.conf.update( # Queue configuration with priority support task_queues=( @@ -52,23 +58,23 @@ # Enable priority support task_inherit_parent_priority=True, worker_prefetch_multiplier=settings.CELERY_WORKER_PREFETCH_MULTIPLIER, - # Worker configuration from environment + # Worker configuration worker_concurrency=settings.COMPUTED_CELERY_WORKER_CONCURRENCY, worker_max_tasks_per_child=settings.CELERY_WORKER_MAX_TASKS_PER_CHILD, worker_max_memory_per_child=settings.CELERY_WORKER_MAX_MEMORY_PER_CHILD, # Security worker_hijack_root_logger=False, worker_log_color=False, - # Task execution from environment + # Task execution task_soft_time_limit=settings.CELERY_TASK_SOFT_TIME_LIMIT, task_time_limit=settings.CELERY_TASK_TIME_LIMIT, task_reject_on_worker_lost=True, task_ignore_result=False, task_acks_late=True, - # Retry configuration from environment + # Retry configuration task_default_retry_delay=settings.CELERY_TASK_DEFAULT_RETRY_DELAY, task_max_retries=settings.CELERY_TASK_MAX_RETRIES, - # Task configuration from environment + # Task configuration task_serializer="json", accept_content=["json"], result_serializer="json", @@ -76,16 +82,74 @@ enable_utc=settings.CELERY_ENABLE_UTC, task_track_started=True, task_always_eager=False, - # Result backend settings from environment + # Result backend settings result_expires=settings.CELERY_RESULT_EXPIRES, result_compression="gzip", # Monitoring worker_send_task_events=True, task_send_sent_event=True, - # Connection settings from environment + # Connection settings broker_connection_retry_on_startup=True, broker_pool_limit=settings.CELERY_BROKER_POOL_LIMIT, + worker_pool_restarts=True, ) + +@worker_ready.connect +def start_resource_monitoring(sender, **kwargs): + """Start resource monitoring when worker is ready.""" + if not settings.RESOURCE_MONITORING_ENABLED: + logger.info("Resource monitoring is disabled") + return + + try: + # Create Control instance + control = Control(app=celery_app) + + # Get worker hostname from the sender (consumer) + worker_hostname = sender.hostname + + # Extract queue names from configuration + queue_names = [queue.name for queue in celery_app.conf.task_queues] + + # Inject into resource monitor + resource_monitor.control = control + resource_monitor.worker_hostname = worker_hostname + resource_monitor.queue_names = queue_names + + logger.info( + f"Resource monitor initialized - " f"Queues: {', '.join(queue_names)}" + ) + + # Start monitoring + resource_monitor.start_monitoring() + + except Exception as e: + logger.error(f"Failed to start resource monitoring: {e}", exc_info=True) + + +@worker_shutdown.connect +def stop_resource_monitoring(**kwargs): + """Stop resource monitoring on worker shutdown.""" + if not settings.RESOURCE_MONITORING_ENABLED: + return + + resource_monitor.stop_monitoring() + + +@task_prerun.connect +def track_task_start(**kwargs): + """Track when a task starts executing.""" + if settings.RESOURCE_MONITORING_ENABLED: + resource_monitor.increment_active_tasks() + + +@task_postrun.connect +def track_task_end(**kwargs): + """Track when a task finishes executing.""" + if settings.RESOURCE_MONITORING_ENABLED: + resource_monitor.decrement_active_tasks() + + # Auto-discover tasks celery_app.autodiscover_tasks() diff --git a/backend/app/celery/resource_monitor.py b/backend/app/celery/resource_monitor.py new file mode 100644 index 00000000..4dc8c338 --- /dev/null +++ b/backend/app/celery/resource_monitor.py @@ -0,0 +1,194 @@ +# app/celery/resource_monitor.py +import logging +import psutil +import threading +import time +from typing import List + +from app.core.config import settings +from celery.app.control import Control + +logger = logging.getLogger(__name__) + + +class ResourceMonitor: + """ + Monitor system resources and control task consumption. + Uses Celery Control API to pause/resume queue consumption. + """ + + def __init__( + self, + cpu_threshold: float = settings.CPU_THRESHOLD_PERCENT, + memory_threshold: float = settings.MEMORY_THRESHOLD_PERCENT, + check_interval: int = settings.RESOURCE_CHECK_INTERVAL, + ): + self.cpu_threshold = cpu_threshold + self.memory_threshold = memory_threshold + self.check_interval = check_interval + self.is_paused = False + self.active_tasks = 0 + self.lock = threading.Lock() + self._should_stop = False + + # Will be set by signal + self.control: Control = None + self.worker_hostname = None + self.queue_names: List[str] = [] + + def get_cpu_usage(self) -> float: + """Get current CPU usage percentage.""" + return psutil.cpu_percent(interval=1) + + def get_memory_usage(self) -> float: + """Get current memory usage percentage.""" + return psutil.virtual_memory().percent + + def should_pause(self, cpu: float, memory: float) -> bool: + """Determine if worker should pause based on thresholds.""" + return cpu > self.cpu_threshold or memory > self.memory_threshold + + def pause_consumer(self): + """Stop consuming tasks from all queues.""" + if not self.control or not self.worker_hostname: + logger.error("Control or worker hostname not initialized") + return + + if not self.queue_names: + logger.error("No queue names configured") + return + + try: + # Cancel consumption for each queue + for queue_name in self.queue_names: + self.control.cancel_consumer( + queue=queue_name, destination=[self.worker_hostname] + ) + logger.info(f"Cancelled consumer for queue: {queue_name}") + + self.is_paused = True + logger.warning( + f"Worker PAUSED - stopped consuming from queues: {', '.join(self.queue_names)}" + ) + except Exception as e: + logger.error(f"Error pausing consumer: {e}", exc_info=True) + + def resume_consumer(self): + """Resume consuming tasks from all queues.""" + if not self.control or not self.worker_hostname: + logger.error("Control or worker hostname not initialized") + return + + if not self.queue_names: + logger.error("No queue names configured") + return + + try: + # Add consumers back for each queue + for queue_name in self.queue_names: + self.control.add_consumer( + queue=queue_name, destination=[self.worker_hostname] + ) + logger.info(f"Added consumer for queue: {queue_name}") + + self.is_paused = False + logger.info( + f"Worker RESUMED - started consuming from queues: {', '.join(self.queue_names)}" + ) + except Exception as e: + logger.error(f"Error resuming consumer: {e}", exc_info=True) + + def monitor_loop(self): + """Main monitoring loop - runs in separate thread.""" + logger.info( + f"Resource monitoring started - " + f"CPU threshold: {self.cpu_threshold}%, " + f"Memory threshold: {self.memory_threshold}%, " + f"Check interval: {self.check_interval}s, " + f"Monitoring queues: {', '.join(self.queue_names)}" + ) + + while not self._should_stop: + try: + cpu = self.get_cpu_usage() + memory = self.get_memory_usage() + should_pause_now = self.should_pause(cpu, memory) + # Remove this line later + logger.info( + "Memmory Usage: {:.2f}%, CPU Usage: {:.2f}%".format(memory, cpu) + ) + with self.lock: + # Pause if resources exceeded and not already paused + if should_pause_now and not self.is_paused: + logger.warning( + f"Resource threshold exceeded! " + f"CPU: {cpu:.1f}% (limit: {self.cpu_threshold}%), " + f"Memory: {memory:.1f}% (limit: {self.memory_threshold}%), " + f"Active tasks: {self.active_tasks}. " + f"Pausing task consumption..." + ) + self.pause_consumer() + + # Resume if resources OK and currently paused + elif not should_pause_now and self.is_paused: + logger.info( + f"Resources within limits - " + f"CPU: {cpu:.1f}%, Memory: {memory:.1f}%, " + f"Active tasks: {self.active_tasks}. " + f"Resuming task consumption..." + ) + self.resume_consumer() + + elif not self.is_paused: + logger.debug( + f"Status - CPU: {cpu:.1f}%, Memory: {memory:.1f}%, " + f"Active tasks: {self.active_tasks}, Paused: {self.is_paused}" + ) + + except Exception as e: + logger.error(f"Error in resource monitor loop: {e}", exc_info=True) + + time.sleep(self.check_interval) + + logger.info("Resource monitoring loop ended") + + def start_monitoring(self): + """Start the monitoring thread.""" + if not self.control or not self.worker_hostname: + logger.error("Cannot start monitoring: control or worker hostname not set") + return + + if not self.queue_names: + logger.error("Cannot start monitoring: no queues configured") + return + + self._should_stop = False + monitor_thread = threading.Thread( + target=self.monitor_loop, daemon=True, name="ResourceMonitor" + ) + monitor_thread.start() + logger.info("Resource monitoring thread started") + + def stop_monitoring(self): + """Stop the monitoring thread.""" + logger.info("Stopping resource monitoring...") + self._should_stop = True + + # Ensure consumer is consuming on shutdown + with self.lock: + if self.is_paused: + logger.info("Resuming consumer before shutdown...") + self.resume_consumer() + + def increment_active_tasks(self): + """Track task start.""" + with self.lock: + self.active_tasks += 1 + + def decrement_active_tasks(self): + """Track task end.""" + with self.lock: + self.active_tasks = max(0, self.active_tasks - 1) + + +resource_monitor = ResourceMonitor() diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 515874af..feb5c29c 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -122,6 +122,12 @@ def AWS_S3_BUCKET(self) -> str: CALLBACK_CONNECT_TIMEOUT: int = 3 CALLBACK_READ_TIMEOUT: int = 10 + # Resources Monitoring + RESOURCE_MONITORING_ENABLED: bool = True + RESOURCE_CHECK_INTERVAL: int = 5 + CPU_THRESHOLD_PERCENT: float = 75.0 + MEMORY_THRESHOLD_PERCENT: float = 75.0 + @computed_field # type: ignore[prop-decorator] @property def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int: diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 98e87d7c..2b8354b6 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "celery>=5.3.0,<6.0.0", "redis>=5.0.0,<6.0.0", "flower>=2.0.1", + "psutil>=7.1.3", ] [tool.uv] diff --git a/backend/uv.lock b/backend/uv.lock index b1da7424..9c768f59 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -200,6 +200,7 @@ dependencies = [ { name = "pandas" }, { name = "passlib", extra = ["bcrypt"] }, { name = "pre-commit" }, + { name = "psutil" }, { name = "psycopg", extra = ["binary"] }, { name = "py-zerox" }, { name = "pydantic" }, @@ -245,6 +246,7 @@ requires-dist = [ { name = "pandas", specifier = ">=2.3.2" }, { name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4,<2.0.0" }, { name = "pre-commit", specifier = ">=3.8.0" }, + { name = "psutil", specifier = ">=7.1.3" }, { name = "psycopg", extras = ["binary"], specifier = ">=3.1.13,<4.0.0" }, { name = "py-zerox", specifier = ">=0.0.7,<1.0.0" }, { name = "pydantic", specifier = ">2.0" }, @@ -2154,6 +2156,32 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5b/5a/bc7b4a4ef808fa59a816c17b20c4bef6884daebbdf627ff2a161da67da19/propcache-0.4.1-py3-none-any.whl", hash = "sha256:af2a6052aeb6cf17d3e46ee169099044fd8224cbaf75c76a2ef596e8163e2237", size = 13305, upload-time = "2025-10-08T19:49:00.792Z" }, ] +[[package]] +name = "psutil" +version = "7.1.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e1/88/bdd0a41e5857d5d703287598cbf08dad90aed56774ea52ae071bae9071b6/psutil-7.1.3.tar.gz", hash = "sha256:6c86281738d77335af7aec228328e944b30930899ea760ecf33a4dba66be5e74", size = 489059, upload-time = "2025-11-02T12:25:54.619Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/93/0c49e776b8734fef56ec9c5c57f923922f2cf0497d62e0f419465f28f3d0/psutil-7.1.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:0005da714eee687b4b8decd3d6cc7c6db36215c9e74e5ad2264b90c3df7d92dc", size = 239751, upload-time = "2025-11-02T12:25:58.161Z" }, + { url = "https://files.pythonhosted.org/packages/6f/8d/b31e39c769e70780f007969815195a55c81a63efebdd4dbe9e7a113adb2f/psutil-7.1.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:19644c85dcb987e35eeeaefdc3915d059dac7bd1167cdcdbf27e0ce2df0c08c0", size = 240368, upload-time = "2025-11-02T12:26:00.491Z" }, + { url = "https://files.pythonhosted.org/packages/62/61/23fd4acc3c9eebbf6b6c78bcd89e5d020cfde4acf0a9233e9d4e3fa698b4/psutil-7.1.3-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:95ef04cf2e5ba0ab9eaafc4a11eaae91b44f4ef5541acd2ee91d9108d00d59a7", size = 287134, upload-time = "2025-11-02T12:26:02.613Z" }, + { url = "https://files.pythonhosted.org/packages/30/1c/f921a009ea9ceb51aa355cb0cc118f68d354db36eae18174bab63affb3e6/psutil-7.1.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1068c303be3a72f8e18e412c5b2a8f6d31750fb152f9cb106b54090296c9d251", size = 289904, upload-time = "2025-11-02T12:26:05.207Z" }, + { url = "https://files.pythonhosted.org/packages/a6/82/62d68066e13e46a5116df187d319d1724b3f437ddd0f958756fc052677f4/psutil-7.1.3-cp313-cp313t-win_amd64.whl", hash = "sha256:18349c5c24b06ac5612c0428ec2a0331c26443d259e2a0144a9b24b4395b58fa", size = 249642, upload-time = "2025-11-02T12:26:07.447Z" }, + { url = "https://files.pythonhosted.org/packages/df/ad/c1cd5fe965c14a0392112f68362cfceb5230819dbb5b1888950d18a11d9f/psutil-7.1.3-cp313-cp313t-win_arm64.whl", hash = "sha256:c525ffa774fe4496282fb0b1187725793de3e7c6b29e41562733cae9ada151ee", size = 245518, upload-time = "2025-11-02T12:26:09.719Z" }, + { url = "https://files.pythonhosted.org/packages/2e/bb/6670bded3e3236eb4287c7bcdc167e9fae6e1e9286e437f7111caed2f909/psutil-7.1.3-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:b403da1df4d6d43973dc004d19cee3b848e998ae3154cc8097d139b77156c353", size = 239843, upload-time = "2025-11-02T12:26:11.968Z" }, + { url = "https://files.pythonhosted.org/packages/b8/66/853d50e75a38c9a7370ddbeefabdd3d3116b9c31ef94dc92c6729bc36bec/psutil-7.1.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:ad81425efc5e75da3f39b3e636293360ad8d0b49bed7df824c79764fb4ba9b8b", size = 240369, upload-time = "2025-11-02T12:26:14.358Z" }, + { url = "https://files.pythonhosted.org/packages/41/bd/313aba97cb5bfb26916dc29cf0646cbe4dd6a89ca69e8c6edce654876d39/psutil-7.1.3-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f33a3702e167783a9213db10ad29650ebf383946e91bc77f28a5eb083496bc9", size = 288210, upload-time = "2025-11-02T12:26:16.699Z" }, + { url = "https://files.pythonhosted.org/packages/c2/fa/76e3c06e760927a0cfb5705eb38164254de34e9bd86db656d4dbaa228b04/psutil-7.1.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:fac9cd332c67f4422504297889da5ab7e05fd11e3c4392140f7370f4208ded1f", size = 291182, upload-time = "2025-11-02T12:26:18.848Z" }, + { url = "https://files.pythonhosted.org/packages/0f/1d/5774a91607035ee5078b8fd747686ebec28a962f178712de100d00b78a32/psutil-7.1.3-cp314-cp314t-win_amd64.whl", hash = "sha256:3792983e23b69843aea49c8f5b8f115572c5ab64c153bada5270086a2123c7e7", size = 250466, upload-time = "2025-11-02T12:26:21.183Z" }, + { url = "https://files.pythonhosted.org/packages/00/ca/e426584bacb43a5cb1ac91fae1937f478cd8fbe5e4ff96574e698a2c77cd/psutil-7.1.3-cp314-cp314t-win_arm64.whl", hash = "sha256:31d77fcedb7529f27bb3a0472bea9334349f9a04160e8e6e5020f22c59893264", size = 245756, upload-time = "2025-11-02T12:26:23.148Z" }, + { url = "https://files.pythonhosted.org/packages/ef/94/46b9154a800253e7ecff5aaacdf8ebf43db99de4a2dfa18575b02548654e/psutil-7.1.3-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:2bdbcd0e58ca14996a42adf3621a6244f1bb2e2e528886959c72cf1e326677ab", size = 238359, upload-time = "2025-11-02T12:26:25.284Z" }, + { url = "https://files.pythonhosted.org/packages/68/3a/9f93cff5c025029a36d9a92fef47220ab4692ee7f2be0fba9f92813d0cb8/psutil-7.1.3-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:bc31fa00f1fbc3c3802141eede66f3a2d51d89716a194bf2cd6fc68310a19880", size = 239171, upload-time = "2025-11-02T12:26:27.23Z" }, + { url = "https://files.pythonhosted.org/packages/ce/b1/5f49af514f76431ba4eea935b8ad3725cdeb397e9245ab919dbc1d1dc20f/psutil-7.1.3-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3bb428f9f05c1225a558f53e30ccbad9930b11c3fc206836242de1091d3e7dd3", size = 263261, upload-time = "2025-11-02T12:26:29.48Z" }, + { url = "https://files.pythonhosted.org/packages/e0/95/992c8816a74016eb095e73585d747e0a8ea21a061ed3689474fabb29a395/psutil-7.1.3-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:56d974e02ca2c8eb4812c3f76c30e28836fffc311d55d979f1465c1feeb2b68b", size = 264635, upload-time = "2025-11-02T12:26:31.74Z" }, + { url = "https://files.pythonhosted.org/packages/55/4c/c3ed1a622b6ae2fd3c945a366e64eb35247a31e4db16cf5095e269e8eb3c/psutil-7.1.3-cp37-abi3-win_amd64.whl", hash = "sha256:f39c2c19fe824b47484b96f9692932248a54c43799a84282cfe58d05a6449efd", size = 247633, upload-time = "2025-11-02T12:26:33.887Z" }, + { url = "https://files.pythonhosted.org/packages/c9/ad/33b2ccec09bf96c2b2ef3f9a6f66baac8253d7565d8839e024a6b905d45d/psutil-7.1.3-cp37-abi3-win_arm64.whl", hash = "sha256:bd0d69cee829226a761e92f28140bec9a5ee9d5b4fb4b0cc589068dbfff559b1", size = 244608, upload-time = "2025-11-02T12:26:36.136Z" }, +] + [[package]] name = "psycopg" version = "3.2.12"