Skip to content

Conversation

@v1r3n
Copy link
Contributor

@v1r3n v1r3n commented Nov 9, 2025

Worker Configuration, Event-Driven Observability & Metrics

Overview

Introduces event-driven observability with Prometheus metrics, hierarchical worker configuration, runtime pausing, and startup logging.

Key Features

1. Event-Driven Observability

Zero-coupling architecture for metrics and monitoring:

class CustomListener:
    def on_task_execution_completed(self, event: TaskExecutionCompleted):
        statsd.timing(f'task.{event.task_type}', event.duration_ms)

TaskHandler(event_listeners=[CustomListener(), MetricsCollector()])

Benefits: Multiple backends (Prometheus, DataDog, custom), protocol-based, non-blocking

2. Built-in Prometheus Metrics

HTTP server with automatic multiprocess aggregation:

metrics_settings = MetricsSettings(http_port=8000)
# Access: curl http://localhost:8000/metrics

Metrics: API latency (p50-p99), task execution time, error rates, queue saturation

3. Worker Configuration

Single-line startup logging + hierarchical env overrides:

# Log output
INFO - Conductor Worker[name=task, status=active, poll_interval=500ms, domain=prod]

# Environment config (overrides code)
export conductor.worker.all.domain=production
export conductor.worker.critical_task.thread_count=50

4. Runtime Worker Pausing

Environment-only control (no code changes):

export conductor.worker.all.paused=true  # Maintenance mode
export conductor.worker.task_name.paused=true  # Specific worker

5. Async functions can now be used with @worker_task annotations

When a function is marked as async, they are executed using background asyncio event loop.

Related Documentation

@codecov
Copy link

codecov bot commented Nov 9, 2025

Codecov Report

❌ Patch coverage is 80.09479% with 294 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
.../conductor/client/automator/task_runner_asyncio.py 80.90% 105 Missing ⚠️
src/conductor/client/worker/worker_loader.py 0.00% 94 Missing ⚠️
...conductor/client/automator/task_handler_asyncio.py 78.45% 39 Missing ⚠️
...rc/conductor/client/telemetry/metrics_collector.py 82.50% 21 Missing ⚠️
src/conductor/client/event/listeners.py 67.64% 11 Missing ⚠️
src/conductor/client/event/listener_register.py 66.66% 10 Missing ⚠️
src/conductor/client/worker/worker_config.py 89.55% 7 Missing ⚠️
src/conductor/client/context/task_context.py 95.00% 3 Missing ⚠️
src/conductor/client/event/event_dispatcher.py 95.55% 2 Missing ⚠️
src/conductor/client/workflow/task/task.py 0.00% 2 Missing ⚠️
Files with missing lines Coverage Δ
src/conductor/client/automator/task_handler.py 97.56% <100.00%> (+32.22%) ⬆️
src/conductor/client/automator/task_runner.py 100.00% <100.00%> (+20.27%) ⬆️
src/conductor/client/event/conductor_event.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_runner_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/workflow_events.py 100.00% <100.00%> (ø)
src/conductor/client/http/api_client.py 98.97% <100.00%> (+44.17%) ⬆️
...rc/conductor/client/http/models/integration_api.py 97.79% <ø> (-0.14%) ⬇️
src/conductor/client/http/models/schema_def.py 91.81% <ø> (-0.15%) ⬇️
src/conductor/client/http/models/workflow_def.py 85.19% <ø> (-0.38%) ⬇️
... and 16 more

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@v1r3n v1r3n marked this pull request as draft November 9, 2025 08:54
@v1r3n v1r3n requested review from am-orkes and nthmost-orkes and removed request for am-orkes November 24, 2025 05:59
@IgorChvyrov-sm
Copy link
Contributor

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all?
From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling.
As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

@v1r3n
Copy link
Contributor Author

v1r3n commented Nov 26, 2025

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md
Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference.

One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

@IgorChvyrov-sm
Copy link
Contributor

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference.

One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

Totally agree that the current design (one process per worker, threads inside for poll/execute/update) solves the “multi-worker GIL contention” problem well. My question was a bit narrower: within a single worker that’s already marked async def, we still keep the per-task threads (ThreadPoolExecutor.submit(...) + BackgroundEventLoop) even though the work after polling is 100% asyncio. That thread handoff is what adds context-switch overhead and keeps the poll loop in a different thread from the async execution.
I’m not proposing another global flag that forces all workers into async mode. I’m suggesting an optional AsyncTaskRunner that’s picked automatically when the decorated function is async def. It would still be one process per worker (no change there) but switch from:

Main thread: poll → submit to ThreadPoolExecutor
ThreadPool thread: start coroutine → hand to BackgroundEventLoop thread
BackgroundEventLoop thread: actually run coroutine
to:
Single thread/event loop in that process:
async poll() → async execute() → async update()

So the initial suggestion is about sync workers to keep the current code path (polling thread + worker threads) and Async workers to get “one process = one thread + asyncio loop”, which removes the extra thread switches and BackgroundEventLoop plumbing while still keeping process isolation. That was the motivation behind the question.

@v1r3n
Copy link
Contributor Author

v1r3n commented Nov 30, 2025

@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? From what we are seeing currently, Async workers use BackgroundEventLoop (separate thread) for coroutine execution, while worker threads handle task submission and polling. As a suggestion, for pure async workers we could use a single-threaded event loop architecture (one process = one thread + event loop) which should be faster for I/O-bound async workloads by eliminating thread switching overhead and GIL contention.

https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md Each worker runs in its own process, and has its own poll/execute/update loop. This avoids GIL contention across threads for multiple workers (with an expense of having a separate process per worker). The actual execution of the task can be in an async loop (if marked as async) or thread (otherwise). So that is the only difference.
One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers.

Totally agree that the current design (one process per worker, threads inside for poll/execute/update) solves the “multi-worker GIL contention” problem well. My question was a bit narrower: within a single worker that’s already marked async def, we still keep the per-task threads (ThreadPoolExecutor.submit(...) + BackgroundEventLoop) even though the work after polling is 100% asyncio. That thread handoff is what adds context-switch overhead and keeps the poll loop in a different thread from the async execution. I’m not proposing another global flag that forces all workers into async mode. I’m suggesting an optional AsyncTaskRunner that’s picked automatically when the decorated function is async def. It would still be one process per worker (no change there) but switch from:

Main thread: poll → submit to ThreadPoolExecutor ThreadPool thread: start coroutine → hand to BackgroundEventLoop thread BackgroundEventLoop thread: actually run coroutine to: Single thread/event loop in that process: async poll() → async execute() → async update()

So the initial suggestion is about sync workers to keep the current code path (polling thread + worker threads) and Async workers to get “one process = one thread + asyncio loop”, which removes the extra thread switches and BackgroundEventLoop plumbing while still keeping process isolation. That was the motivation behind the question.

These are good points. I updated the PR to take care of this, the new architecture
https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_DESIGN.md

@nthmost-orkes nthmost-orkes dismissed their stale review December 1, 2025 20:19

concerns were addressed

@v1r3n v1r3n merged commit 6f4cc4e into main Dec 2, 2025
1 check passed
@v1r3n v1r3n deleted the async_io_workers branch December 2, 2025 05:54
DOCKER_BUILDKIT=1 docker build . \
--target python_test_base \
-t conductor-sdk-test:latest
python -m pip install --upgrade pip
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here be dragons :)

"""
# Simulate async I/O operation
# Print execution info to verify parallel execution
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # milliseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

round(time.time() * 1000) will get you ms

full_command = [command]
full_command = full_command + args
result = subprocess.run(full_command, stdout=subprocess.PIPE)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle errors here? Ideally unless there are hard API requirements we probably want to return a tuple of (exit_code, str(result.stdout) and str(result.stderr)). If the requirement is for the output to be string only, using sentinel characters may help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example created in response to someone asking for an example shell worker in the past. Not really meant to be used in a production env.

else:
# Job still running - poll again in 30 seconds
ctx.add_log(f"Job {job_id} still running, will check again in 30s")
ctx.set_callback_after(30)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's my lack of familiarity with the naming convention, but how it does this unclear to me? Perhaps a reference to the event loop would be helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is not not related to event loop in async but rather ability for conductor to schedule the task later for long running tasks.

return {"status": "success", "operation": operation}
else:
ctx.add_log("Operation failed, will retry")
raise Exception("Operation failed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generally recommended to throw specific exceptions instead of the base class. Easier to catch the ones you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is an example only. I would expect users to actually create exceptions and use them in production code.

results = []

try:
import httpx
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at the top.

ctx.add_log(f"✗ {url} - Error: {e}")

except Exception as e:
ctx.add_log(f"Fatal error: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a possibility of AsyncClient object creation failing, I am not sure you will ever get here.

"""
if n <= 1:
return n
return await calculate_fibonacci(n - 1) + await calculate_fibonacci(n - 2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you could make cpu bound work async, I am not sure what this gets us? Also I think this may not be correct :) f(<0) should be 0 and f(1)/f(2) should be 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, just an example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants