Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,9 @@ CALLBACK_READ_TIMEOUT = 10

# require as a env if you want to use doc transformation
OPENAI_API_KEY=""

# Resource Monitoring Configuration
RESOURCE_MONITORING_ENABLED=True
RESOURCE_CHECK_INTERVAL=5
CPU_THRESHOLD_PERCENT=80.0
MEMORY_THRESHOLD_PERCENT=85.0
78 changes: 71 additions & 7 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import logging
from celery import Celery
from celery.app.control import Control
from celery.signals import worker_ready, worker_shutdown, task_prerun, task_postrun
from kombu import Exchange, Queue

from app.core.config import settings
from app.celery.resource_monitor import resource_monitor

logger = logging.getLogger(__name__)

# Create Celery instance
celery_app = Celery(
Expand All @@ -16,7 +22,7 @@
# Define exchanges and queues with priority
default_exchange = Exchange("default", type="direct")

# Celery configuration using environment variables
# Celery configuration
celery_app.conf.update(
# Queue configuration with priority support
task_queues=(
Expand Down Expand Up @@ -52,40 +58,98 @@
# Enable priority support
task_inherit_parent_priority=True,
worker_prefetch_multiplier=settings.CELERY_WORKER_PREFETCH_MULTIPLIER,
# Worker configuration from environment
# Worker configuration
worker_concurrency=settings.COMPUTED_CELERY_WORKER_CONCURRENCY,
worker_max_tasks_per_child=settings.CELERY_WORKER_MAX_TASKS_PER_CHILD,
worker_max_memory_per_child=settings.CELERY_WORKER_MAX_MEMORY_PER_CHILD,
# Security
worker_hijack_root_logger=False,
worker_log_color=False,
# Task execution from environment
# Task execution
task_soft_time_limit=settings.CELERY_TASK_SOFT_TIME_LIMIT,
task_time_limit=settings.CELERY_TASK_TIME_LIMIT,
task_reject_on_worker_lost=True,
task_ignore_result=False,
task_acks_late=True,
# Retry configuration from environment
# Retry configuration
task_default_retry_delay=settings.CELERY_TASK_DEFAULT_RETRY_DELAY,
task_max_retries=settings.CELERY_TASK_MAX_RETRIES,
# Task configuration from environment
# Task configuration
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone=settings.CELERY_TIMEZONE,
enable_utc=settings.CELERY_ENABLE_UTC,
task_track_started=True,
task_always_eager=False,
# Result backend settings from environment
# Result backend settings
result_expires=settings.CELERY_RESULT_EXPIRES,
result_compression="gzip",
# Monitoring
worker_send_task_events=True,
task_send_sent_event=True,
# Connection settings from environment
# Connection settings
broker_connection_retry_on_startup=True,
broker_pool_limit=settings.CELERY_BROKER_POOL_LIMIT,
worker_pool_restarts=True,
)


@worker_ready.connect
def start_resource_monitoring(sender, **kwargs):
"""Start resource monitoring when worker is ready."""
if not settings.RESOURCE_MONITORING_ENABLED:
logger.info("Resource monitoring is disabled")
return

try:
# Create Control instance
control = Control(app=celery_app)

# Get worker hostname from the sender (consumer)
worker_hostname = sender.hostname

# Extract queue names from configuration
queue_names = [queue.name for queue in celery_app.conf.task_queues]

# Inject into resource monitor
resource_monitor.control = control
resource_monitor.worker_hostname = worker_hostname
resource_monitor.queue_names = queue_names

logger.info(
f"Resource monitor initialized - " f"Queues: {', '.join(queue_names)}"
)

# Start monitoring
resource_monitor.start_monitoring()

except Exception as e:
logger.error(f"Failed to start resource monitoring: {e}", exc_info=True)


@worker_shutdown.connect
def stop_resource_monitoring(**kwargs):
"""Stop resource monitoring on worker shutdown."""
if not settings.RESOURCE_MONITORING_ENABLED:
return

resource_monitor.stop_monitoring()


@task_prerun.connect
def track_task_start(**kwargs):
"""Track when a task starts executing."""
if settings.RESOURCE_MONITORING_ENABLED:
resource_monitor.increment_active_tasks()


@task_postrun.connect
def track_task_end(**kwargs):
"""Track when a task finishes executing."""
if settings.RESOURCE_MONITORING_ENABLED:
resource_monitor.decrement_active_tasks()


# Auto-discover tasks
celery_app.autodiscover_tasks()
194 changes: 194 additions & 0 deletions backend/app/celery/resource_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# app/celery/resource_monitor.py
import logging
import psutil
import threading
import time
from typing import List

from app.core.config import settings
from celery.app.control import Control

logger = logging.getLogger(__name__)


class ResourceMonitor:
"""
Monitor system resources and control task consumption.
Uses Celery Control API to pause/resume queue consumption.
"""

def __init__(
self,
cpu_threshold: float = settings.CPU_THRESHOLD_PERCENT,
memory_threshold: float = settings.MEMORY_THRESHOLD_PERCENT,
check_interval: int = settings.RESOURCE_CHECK_INTERVAL,
):
self.cpu_threshold = cpu_threshold
self.memory_threshold = memory_threshold
self.check_interval = check_interval
self.is_paused = False
self.active_tasks = 0
self.lock = threading.Lock()
self._should_stop = False

# Will be set by signal
self.control: Control = None
self.worker_hostname = None
Comment on lines +35 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Fix type annotations for optional attributes.

Lines 35-36 have type annotation issues:

  • Line 35: Annotated as Control but initialized with None—should be Control | None
  • Line 36: Missing type annotation entirely
-        # Will be set by signal
-        self.control: Control = None
-        self.worker_hostname = None
+        # Will be set by signal
+        self.control: Control | None = None
+        self.worker_hostname: str | None = None

As per coding guidelines, type hints are required for Python 3.11+ projects.

🤖 Prompt for AI Agents
In backend/app/celery/resource_monitor.py around lines 35-36, fix the type
annotations for optional attributes: change self.control: Control = None to
self.control: Control | None = None and add a type for worker_hostname, e.g.
self.worker_hostname: str | None = None (ensure Control is imported or
referenced and adjust imports if needed).

self.queue_names: List[str] = []

def get_cpu_usage(self) -> float:
"""Get current CPU usage percentage."""
return psutil.cpu_percent(interval=1)
Comment on lines +39 to +41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

psutil cpu_percent interval parameter behavior blocking

💡 Result:

Short summary — psutil.cpu_percent interval behavior:

  • If interval > 0: the call blocks for that many seconds and measures CPU between start and end (blocking). [1]
  • If interval == 0.0 or interval is None: it returns immediately and reports CPU usage since the last call (or since module import on first call). The very first call with 0.0/None returns a meaningless 0.0 and should be ignored / followed by a subsequent call (>= ~0.1s later for accuracy). [1][2]
  • Note: newer psutil versions maintain per-thread last-sample state so calls from different threads are independent; cpu_percent became thread-safe in recent releases. [1]

Sources: psutil docs. [1] psutil (latest) cpu_percent; [2] psutil historical docs.


🏁 Script executed:

cat -n backend/app/celery/resource_monitor.py

Repository: ProjectTech4DevAI/ai-platform

Length of output: 8584


🏁 Script executed:

fd -t f "config.py" --path-separator / | head -20

Repository: ProjectTech4DevAI/ai-platform

Length of output: 306


🏁 Script executed:

grep -n "RESOURCE_CHECK_INTERVAL\|CPU_THRESHOLD\|MEMORY_THRESHOLD" backend/app/core/config.py

Repository: ProjectTech4DevAI/ai-platform

Length of output: 204


The cpu_percent(interval=1) call adds measurable latency to the monitoring cycle.

With check_interval set to 5 seconds, each monitoring iteration blocks for 1 second, making the effective cycle 6 seconds instead of 5. While this runs in a separate daemon thread and doesn't block Celery's main worker, it reduces monitoring frequency by ~17%. If tighter monitoring cadence is needed, consider interval=None with cached state (handling the first call returning 0.0), or accept the current behavior if the accuracy trade-off is acceptable.

🤖 Prompt for AI Agents
In backend/app/celery/resource_monitor.py around lines 39-41, the use of
psutil.cpu_percent(interval=1) blocks for 1s per call and lengthens the
monitoring loop; change to a non-blocking pattern by calling
psutil.cpu_percent(interval=None) and handle the initial 0.0 result (either by
priming once at startup and discarding that first value, or by returning the
last cached value until a real reading is available) so the monitoring cadence
stays close to check_interval without the added 1s delay.


def get_memory_usage(self) -> float:
"""Get current memory usage percentage."""
return psutil.virtual_memory().percent

def should_pause(self, cpu: float, memory: float) -> bool:
"""Determine if worker should pause based on thresholds."""
return cpu > self.cpu_threshold or memory > self.memory_threshold

def pause_consumer(self):
"""Stop consuming tasks from all queues."""
if not self.control or not self.worker_hostname:
logger.error("Control or worker hostname not initialized")
return

if not self.queue_names:
logger.error("No queue names configured")
return

try:
# Cancel consumption for each queue
for queue_name in self.queue_names:
self.control.cancel_consumer(
queue=queue_name, destination=[self.worker_hostname]
)
logger.info(f"Cancelled consumer for queue: {queue_name}")

self.is_paused = True
logger.warning(
f"🛑 Worker PAUSED - stopped consuming from queues: {', '.join(self.queue_names)}"
)
except Exception as e:
logger.error(f"Error pausing consumer: {e}", exc_info=True)

def resume_consumer(self):
"""Resume consuming tasks from all queues."""
if not self.control or not self.worker_hostname:
logger.error("Control or worker hostname not initialized")
return

if not self.queue_names:
logger.error("No queue names configured")
return

try:
# Add consumers back for each queue
for queue_name in self.queue_names:
self.control.add_consumer(
queue=queue_name, destination=[self.worker_hostname]
)
logger.info(f"Added consumer for queue: {queue_name}")

self.is_paused = False
logger.info(
f"✅ Worker RESUMED - started consuming from queues: {', '.join(self.queue_names)}"
)
except Exception as e:
logger.error(f"Error resuming consumer: {e}", exc_info=True)

def monitor_loop(self):
"""Main monitoring loop - runs in separate thread."""
logger.info(
f"🔍 Resource monitoring started - "
f"CPU threshold: {self.cpu_threshold}%, "
f"Memory threshold: {self.memory_threshold}%, "
f"Check interval: {self.check_interval}s, "
f"Monitoring queues: {', '.join(self.queue_names)}"
)

while not self._should_stop:
try:
cpu = self.get_cpu_usage()
memory = self.get_memory_usage()
should_pause_now = self.should_pause(cpu, memory)
# Remove this line later
logger.info(
"Memmory Usage: {:.2f}%, CPU Usage: {:.2f}%".format(memory, cpu)
)
with self.lock:
# Pause if resources exceeded and not already paused
if should_pause_now and not self.is_paused:
logger.warning(
f"⚠️ Resource threshold exceeded! "
f"CPU: {cpu:.1f}% (limit: {self.cpu_threshold}%), "
f"Memory: {memory:.1f}% (limit: {self.memory_threshold}%), "
f"Active tasks: {self.active_tasks}. "
f"Pausing task consumption..."
)
self.pause_consumer()

# Resume if resources OK and currently paused
elif not should_pause_now and self.is_paused:
logger.info(
f"✓ Resources within limits - "
f"CPU: {cpu:.1f}%, Memory: {memory:.1f}%, "
f"Active tasks: {self.active_tasks}. "
f"Resuming task consumption..."
)
self.resume_consumer()

elif not self.is_paused:
logger.debug(
f"📊 Status - CPU: {cpu:.1f}%, Memory: {memory:.1f}%, "
f"Active tasks: {self.active_tasks}, Paused: {self.is_paused}"
)

except Exception as e:
logger.error(f"Error in resource monitor loop: {e}", exc_info=True)

time.sleep(self.check_interval)

logger.info("Resource monitoring loop ended")

def start_monitoring(self):
"""Start the monitoring thread."""
if not self.control or not self.worker_hostname:
logger.error("Cannot start monitoring: control or worker hostname not set")
return

if not self.queue_names:
logger.error("Cannot start monitoring: no queues configured")
return

self._should_stop = False
monitor_thread = threading.Thread(
target=self.monitor_loop, daemon=True, name="ResourceMonitor"
)
monitor_thread.start()
logger.info("✨ Resource monitoring thread started")

def stop_monitoring(self):
"""Stop the monitoring thread."""
logger.info("Stopping resource monitoring...")
self._should_stop = True

# Ensure consumer is consuming on shutdown
with self.lock:
if self.is_paused:
logger.info("Resuming consumer before shutdown...")
self.resume_consumer()

def increment_active_tasks(self):
"""Track task start."""
with self.lock:
self.active_tasks += 1

def decrement_active_tasks(self):
"""Track task end."""
with self.lock:
self.active_tasks = max(0, self.active_tasks - 1)


resource_monitor = ResourceMonitor()
6 changes: 6 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ def AWS_S3_BUCKET(self) -> str:
CALLBACK_CONNECT_TIMEOUT: int = 3
CALLBACK_READ_TIMEOUT: int = 10

# Resources Monitoring
RESOURCE_MONITORING_ENABLED: bool = True
RESOURCE_CHECK_INTERVAL: int = 5
CPU_THRESHOLD_PERCENT: float = 75.0
MEMORY_THRESHOLD_PERCENT: float = 75.0

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"celery>=5.3.0,<6.0.0",
"redis>=5.0.0,<6.0.0",
"flower>=2.0.1",
"psutil>=7.1.3",
]

[tool.uv]
Expand Down
Loading