Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Pre-commit configuration for durabletask-python
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.2
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
hooks:
- id: mypy
additional_dependencies: [types-protobuf]
args: [--config-file=mypy.ini]
files: ^durabletask/
exclude: ^durabletask/internal/.*_pb2\.py$

- repo: local
hooks:
- id: pytest-asyncio
name: Run asyncio tests
entry: python -m pytest tests/aio/ -q
language: system
pass_filenames: false
stages: [pre-push] # Only run on git push, not every commit



263 changes: 261 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,32 @@

This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.

> **🚀 Enhanced Async Features**: This fork includes comprehensive async workflow enhancements with advanced error handling, non-determinism detection, timeout support, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md) for details.

## Quick Start - Async Workflows

For async workflow development, use the new `durabletask.aio` package:

```python
from durabletask.aio import AsyncWorkflowContext
from durabletask.worker import TaskHubGrpcWorker

async def my_workflow(ctx: AsyncWorkflowContext, name: str) -> str:
result = await ctx.call_activity(say_hello, input=name)
await ctx.sleep(1.0)
return f"Workflow completed: {result}"

def say_hello(ctx, name: str) -> str:
return f"Hello, {name}!"

# Register and run
with TaskHubGrpcWorker() as worker:
worker.add_activity(say_hello)
worker.add_orchestrator(my_workflow)
worker.start()
# ... schedule workflows with client
```

⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️

> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
Expand Down Expand Up @@ -118,15 +144,15 @@ Orchestrations can start child orchestrations using the `call_sub_orchestrator`

Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.

### Continue-as-new (TODO)
### Continue-as-new

Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input.

### Suspend, resume, and terminate

Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.

### Retry policies (TODO)
### Retry policies

Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.

Expand Down Expand Up @@ -155,6 +181,13 @@ python3 -m pip install .

See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them.

**Enhanced Async Examples:**
- `async_activity_sequence.py` - Updated to use new `durabletask.aio` package
- `async_fanout_fanin.py` - Updated to use new `durabletask.aio` package
- `async_enhanced_features.py` - Comprehensive demo of all enhanced features
- `async_non_determinism_demo.py` - Non-determinism detection demonstration
- See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for detailed examples and usage patterns

## Development

The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL.
Expand Down Expand Up @@ -191,6 +224,232 @@ To run the E2E tests, run the following command from the project root:
make test-e2e
```

### Configuration

The SDK connects to a Durable Task sidecar. By default it uses `localhost:4001`. You can override via environment variables (checked in order):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually only in tests


- `DURABLETASK_GRPC_ENDPOINT` (e.g., `localhost:4001`, `grpcs://host:443`)
- `DURABLETASK_GRPC_HOST` and `DURABLETASK_GRPC_PORT`
- `TASKHUB_GRPC_ENDPOINT` (legacy)

Example (common ports: 4001 for DurableTask-Go emulator, 50001 for Dapr sidecar):

```sh
export DURABLETASK_GRPC_ENDPOINT=localhost:4001
# or
export DURABLETASK_GRPC_ENDPOINT=localhost:50001
```

### Async workflow authoring

For a deeper tour of the async authoring surface (determinism helpers, sandbox modes, timeouts, concurrency patterns), see the Async Enhancements guide: [ASYNC_ENHANCEMENTS.md](./ASYNC_ENHANCEMENTS.md). The developer-facing migration notes are in [DEVELOPER_TRANSITION_GUIDE.md](./DEVELOPER_TRANSITION_GUIDE.md).

You can author orchestrators with `async def` using the new `durabletask.aio` package, which provides a comprehensive async workflow API:

```python
from durabletask.worker import TaskHubGrpcWorker
from durabletask.aio import AsyncWorkflowContext

async def my_orch(ctx: AsyncWorkflowContext, input) -> str:
r1 = await ctx.call_activity(act1, input=input)
await ctx.sleep(1.0)
r2 = await ctx.call_activity(act2, input=r1)
return r2

with TaskHubGrpcWorker() as worker:
worker.add_orchestrator(my_orch)
```

Optional sandbox mode (`best_effort` or `strict`) patches `asyncio.sleep`, `random`, `uuid.uuid4`, and `time.time` within the workflow step to deterministic equivalents. This is best-effort and not a correctness guarantee.

In `strict` mode, `asyncio.create_task` is blocked inside workflows to preserve determinism and will raise a `SandboxViolationError` if used.

> **Enhanced Sandbox Features**: The enhanced version includes comprehensive non-determinism detection, timeout support, enhanced concurrency primitives, and debugging tools. See [ASYNC_ENHANCEMENTS.md](./durabletask/aio/ASYNCIO_ENHANCEMENTS.md) for complete documentation.

#### Async patterns

- Activities and sub-orchestrations can be referenced by function object or by their registered string name. Both forms are supported:
- Function reference (preferred for IDE/type support) or string name (useful across modules/languages).

- Activities:
```python
result = await ctx.call_activity("process", input={"x": 1})
# or: result = await ctx.call_activity(process, input={"x": 1})
```

- Timers:
```python
await ctx.sleep(1.5) # seconds or timedelta
```

- External events:
```python
val = await ctx.wait_for_external_event("approval")
```

- Concurrency:
```python
t1 = ctx.call_activity("a"); t2 = ctx.call_activity("b")
await ctx.when_all([t1, t2])
winner = await ctx.when_any([ctx.wait_for_external_event("x"), ctx.sleep(5)])

# gather combines awaitables and preserves order
results = await ctx.gather(t1, t2)
# gather with exception capture
results_or_errors = await ctx.gather(t1, t2, return_exceptions=True)
```

#### Async vs. generator API differences

- Async authoring (`durabletask.aio`): awaiting returns the operation's value. Exceptions are raised on `await` (no `is_failed`).
- Generator authoring (`durabletask.task`): yielding returns `Task` objects. Use `get_result()` to read values; failures surface via `is_failed()` or by raising on `get_result()`.

Examples:

```python
# Async authoring (await returns value)
# when_any returns a proxy that compares equal to the original awaitable
# and exposes get_result() for the completed item.
approval = ctx.wait_for_external_event("approval")
winner = await ctx.when_any([approval, ctx.sleep(60)])
if winner == approval:
details = winner.get_result()
```

```python
# Async authoring (index + result)
idx, result = await ctx.when_any_with_result([approval, ctx.sleep(60)])
if idx == 0: # approval won
details = result
```

```python
# Generator authoring (yield returns Task)
approval = ctx.wait_for_external_event("approval")
winner = yield task.when_any([approval, ctx.create_timer(timedelta(seconds=60))])
if winner == approval:
details = approval.get_result()
```

Failure handling in async:

```python
try:
val = await ctx.call_activity("might_fail")
except Exception as e:
# handle failure branch
...
```

Or capture with gather:

```python
res = await ctx.gather(ctx.call_activity("a"), return_exceptions=True)
if isinstance(res[0], Exception):
...
```

- Sub-orchestrations (function reference or registered name):
```python
out = await ctx.call_sub_orchestrator(child_fn, input=payload)
# or: out = await ctx.call_sub_orchestrator("child", input=payload)
```

- Deterministic utilities:
```python
now = ctx.now(); rid = ctx.random().random(); uid = ctx.uuid4()
```

- Workflow metadata and info:
```python
# Read-only info snapshot (Temporal-style convenience)
info = ctx.info
print(f"Workflow: {info.workflow_name}, Instance: {info.instance_id}")
print(f"Replaying: {info.is_replaying}, Suspended: {info.is_suspended}")

# Or access properties directly
instance_id = ctx.instance_id
is_replaying = ctx.is_replaying
is_suspended = ctx.is_suspended
workflow_name = ctx.workflow_name
parent_instance_id = ctx.parent_instance_id # for sub-orchestrators
workflow_attempt = ctx.workflow_attempt # retry attempt number (if available)

# Execution info (internal metadata if provided by sidecar)
exec_info = ctx.execution_info

# Tracing span IDs
span_id = ctx.orchestration_span_id # or ctx.workflow_span_id (alias)
```

- Workflow metadata/headers (async only for now):
```python
# Attach contextual metadata (e.g., tracing, tenant, app info)
ctx.set_metadata({"x-trace": trace_id, "tenant": "acme"})
md = ctx.get_metadata()

# Header aliases (same data)
ctx.set_headers({"region": "us-east"})
headers = ctx.get_headers()
```
Notes:
- Useful for routing, observability, and cross-cutting concerns passed along activity/sub-orchestrator calls via the sidecar.
- In python-sdk, available for both async and generator orchestrators. In this repo, currently implemented on `durabletask.aio`; generator parity is planned.

- Cross-app activity/sub-orchestrator routing (async only for now):
```python
# Route activity to a different app via app_id
result = await ctx.call_activity("process", input=data, app_id="worker-app-2")

# Route sub-orchestrator to a different app
child_result = await ctx.call_sub_orchestrator("child_workflow", input=data, app_id="orchestrator-app-2")
```
Notes:
- The `app_id` parameter enables multi-app orchestrations where activities or child workflows run in different application instances.
- Requires sidecar support for cross-app invocation.

#### Worker readiness

When starting a worker and scheduling immediately, wait for the connection to the sidecar to be established:

```python
with TaskHubGrpcWorker() as worker:
worker.add_orchestrator(my_orch)
worker.start()
worker.wait_for_ready(timeout=5)
# Now safe to schedule
```

#### Suspension & termination

- `ctx.is_suspended` reflects suspension state during replay/processing.
- Suspend pauses progress without raising inside async orchestrators.
- Terminate completes with `TERMINATED` status; use client APIs to terminate/resume.
- Only new events are buffered while suspended; replay events continue to apply to rebuild local state deterministically.

### Tracing and context propagation

The SDK surfaces W3C tracing context provided by the sidecar:

- Orchestrations: `ctx.trace_parent`, `ctx.trace_state`, and `ctx.orchestration_span_id` are available on `OrchestrationContext` (and on `AsyncWorkflowContext`).
- Activities: `ctx.trace_parent` and `ctx.trace_state` are available on `ActivityContext`.

Propagate tracing to external systems (e.g., HTTP):

```python
def activity(ctx, payload):
headers = {
"traceparent": ctx.trace_parent or "",
"tracestate": ctx.trace_state or "",
}
# requests.post(url, headers=headers, json=payload)
return "ok"
```

Notes:
- The sidecar controls inbound `traceparent`/`tracestate`. App code can append vendor entries to `tracestate` for outbound calls but cannot currently alter the sidecar’s propagation for downstream Durable operations.
- Configure the sidecar endpoint with `DURABLETASK_GRPC_ENDPOINT` (e.g., `127.0.0.1:56178`).

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand Down
2 changes: 2 additions & 0 deletions codegen-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
grpcio-tools==1.62.3 # codegen only; conflicts with protobuf>=6 at runtime

7 changes: 6 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
pytest-asyncio>=0.23
pytest
pytest-cov
autopep8
grpcio>=1.74.0
protobuf>=6.31.1
8 changes: 7 additions & 1 deletion durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@
"""Durable Task SDK for Python"""


PACKAGE_NAME = "durabletask"
PACKAGE_NAME = 'durabletask'

# Public async exports (import directly from durabletask.aio)
from durabletask.aio import ( # noqa: F401
AsyncWorkflowContext,
CoroutineOrchestratorRunner,
)
Loading