Skip to content

Commit 06357df

Browse files
Merge pull request dapr#15 from acroca/cross-app
Adds support for multi app calls
2 parents 3ba64c5 + 849eddc commit 06357df

File tree

4 files changed

+191
-18
lines changed

4 files changed

+191
-18
lines changed

durabletask/internal/helpers.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,16 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
178178
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))
179179

180180

181-
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
182-
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
183-
name=name,
184-
input=get_string_value(encoded_input)
185-
))
181+
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None) -> pb.OrchestratorAction:
182+
return pb.OrchestratorAction(
183+
id=id,
184+
scheduleTask=pb.ScheduleTaskAction(
185+
name=name,
186+
input=get_string_value(encoded_input),
187+
router=router,
188+
),
189+
router=router,
190+
)
186191

187192

188193
def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
@@ -195,12 +200,18 @@ def new_create_sub_orchestration_action(
195200
id: int,
196201
name: str,
197202
instance_id: Optional[str],
198-
encoded_input: Optional[str]) -> pb.OrchestratorAction:
199-
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
200-
name=name,
201-
instanceId=instance_id,
202-
input=get_string_value(encoded_input)
203-
))
203+
encoded_input: Optional[str],
204+
router: Optional[pb.TaskRouter] = None) -> pb.OrchestratorAction:
205+
return pb.OrchestratorAction(
206+
id=id,
207+
createSubOrchestration=pb.CreateSubOrchestrationAction(
208+
name=name,
209+
instanceId=instance_id,
210+
input=get_string_value(encoded_input),
211+
router=router,
212+
),
213+
router=router,
214+
)
204215

205216

206217
def is_empty(v: wrappers_pb2.StringValue):

durabletask/task.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
100100
@abstractmethod
101101
def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
102102
input: Optional[TInput] = None,
103-
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
103+
retry_policy: Optional[RetryPolicy] = None,
104+
app_id: Optional[str] = None) -> Task[TOutput]:
104105
"""Schedule an activity for execution.
105106
106107
Parameters
@@ -111,6 +112,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
111112
The JSON-serializable input (or None) to pass to the activity.
112113
retry_policy: Optional[RetryPolicy]
113114
The retry policy to use for this activity call.
115+
app_id: Optional[str]
116+
The app ID that will execute the activity. If not specified, the activity will be executed by the same app as the orchestrator.
114117
115118
Returns
116119
-------
@@ -123,7 +126,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
123126
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
124127
input: Optional[TInput] = None,
125128
instance_id: Optional[str] = None,
126-
retry_policy: Optional[RetryPolicy] = None) -> Task[TOutput]:
129+
retry_policy: Optional[RetryPolicy] = None,
130+
app_id: Optional[str] = None) -> Task[TOutput]:
127131
"""Schedule sub-orchestrator function for execution.
128132
129133
Parameters
@@ -137,6 +141,8 @@ def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
137141
random UUID will be used.
138142
retry_policy: Optional[RetryPolicy]
139143
The retry policy to use for this sub-orchestrator call.
144+
app_id: Optional[str]
145+
The app ID that will execute the sub-orchestrator. If not specified, the sub-orchestrator will be executed by the same app as the orchestrator.
140146
141147
Returns
142148
-------

durabletask/worker.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ def __init__(self, instance_id: str):
584584
self._sequence_number = 0
585585
self._current_utc_datetime = datetime(1000, 1, 1)
586586
self._instance_id = instance_id
587+
self._app_id = None
587588
self._completion_status: Optional[pb.OrchestrationStatus] = None
588589
self._received_events: dict[str, list[Any]] = {}
589590
self._pending_events: dict[str, list[task.CompletableTask]] = {}
@@ -705,6 +706,10 @@ def next_sequence_number(self) -> int:
705706
self._sequence_number += 1
706707
return self._sequence_number
707708

709+
@property
710+
def app_id(self) -> str:
711+
return self._app_id
712+
708713
@property
709714
def instance_id(self) -> str:
710715
return self._instance_id
@@ -752,31 +757,37 @@ def call_activity(
752757
*,
753758
input: Optional[TInput] = None,
754759
retry_policy: Optional[task.RetryPolicy] = None,
760+
app_id: Optional[str] = None,
755761
) -> task.Task[TOutput]:
756762
id = self.next_sequence_number()
757763

758764
self.call_activity_function_helper(
759-
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False
765+
id, activity, input=input, retry_policy=retry_policy, is_sub_orch=False, app_id=app_id
760766
)
761767
return self._pending_tasks.get(id, task.CompletableTask())
762768

763769
def call_sub_orchestrator(
764770
self,
765-
orchestrator: task.Orchestrator[TInput, TOutput],
771+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
766772
*,
767773
input: Optional[TInput] = None,
768774
instance_id: Optional[str] = None,
769775
retry_policy: Optional[task.RetryPolicy] = None,
776+
app_id: Optional[str] = None,
770777
) -> task.Task[TOutput]:
771778
id = self.next_sequence_number()
772-
orchestrator_name = task.get_name(orchestrator)
779+
if isinstance(orchestrator, str):
780+
orchestrator_name = orchestrator
781+
else:
782+
orchestrator_name = task.get_name(orchestrator)
773783
self.call_activity_function_helper(
774784
id,
775785
orchestrator_name,
776786
input=input,
777787
retry_policy=retry_policy,
778788
is_sub_orch=True,
779789
instance_id=instance_id,
790+
app_id=app_id,
780791
)
781792
return self._pending_tasks.get(id, task.CompletableTask())
782793

@@ -790,10 +801,16 @@ def call_activity_function_helper(
790801
is_sub_orch: bool = False,
791802
instance_id: Optional[str] = None,
792803
fn_task: Optional[task.CompletableTask[TOutput]] = None,
804+
app_id: Optional[str] = None,
793805
):
794806
if id is None:
795807
id = self.next_sequence_number()
796808

809+
router = pb.TaskRouter()
810+
router.sourceAppID = self._app_id
811+
if app_id is not None:
812+
router.targetAppID = app_id
813+
797814
if fn_task is None:
798815
encoded_input = shared.to_json(input) if input is not None else None
799816
else:
@@ -806,15 +823,15 @@ def call_activity_function_helper(
806823
if isinstance(activity_function, str)
807824
else task.get_name(activity_function)
808825
)
809-
action = ph.new_schedule_task_action(id, name, encoded_input)
826+
action = ph.new_schedule_task_action(id, name, encoded_input, router)
810827
else:
811828
if instance_id is None:
812829
# Create a deteministic instance ID based on the parent instance ID
813830
instance_id = f"{self.instance_id}:{id:04x}"
814831
if not isinstance(activity_function, str):
815832
raise ValueError("Orchestrator function name must be a string")
816833
action = ph.new_create_sub_orchestration_action(
817-
id, activity_function, instance_id, encoded_input
834+
id, activity_function, instance_id, encoded_input, router
818835
)
819836
self._pending_actions[id] = action
820837

@@ -953,6 +970,11 @@ def process_event(
953970
if event.HasField("orchestratorStarted"):
954971
ctx.current_utc_datetime = event.timestamp.ToDatetime()
955972
elif event.HasField("executionStarted"):
973+
if event.router.targetAppID:
974+
ctx._app_id = event.router.targetAppID
975+
else:
976+
ctx._app_id = event.router.sourceAppID
977+
956978
# TODO: Check if we already started the orchestration
957979
fn = self._registry.get_orchestrator(event.executionStarted.name)
958980
if fn is None:
@@ -1010,6 +1032,11 @@ def process_event(
10101032
else:
10111033
cur_task = activity_action.createSubOrchestration
10121034
instance_id = cur_task.instanceId
1035+
if cur_task.router and cur_task.router.targetAppID:
1036+
target_app_id = cur_task.router.targetAppID
1037+
else:
1038+
target_app_id = None
1039+
10131040
ctx.call_activity_function_helper(
10141041
id=activity_action.id,
10151042
activity_function=cur_task.name,
@@ -1018,6 +1045,7 @@ def process_event(
10181045
is_sub_orch=timer_task._retryable_parent._is_sub_orch,
10191046
instance_id=instance_id,
10201047
fn_task=timer_task._retryable_parent,
1048+
app_id=target_app_id,
10211049
)
10221050
else:
10231051
ctx.resume()

tests/durabletask/test_orchestration_executor.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,70 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input):
171171
assert actions[0].scheduleTask.input.value == encoded_input
172172

173173

174+
def test_schedule_activity_actions_router_without_app_id():
175+
"""Tests that scheduleTask action contains correct router fields when app_id is specified"""
176+
def dummy_activity(ctx, _):
177+
pass
178+
179+
def orchestrator(ctx: task.OrchestrationContext, _):
180+
yield ctx.call_activity(dummy_activity, input=42)
181+
182+
registry = worker._Registry()
183+
name = registry.add_orchestrator(orchestrator)
184+
185+
# Prepare execution started event with source app set on router
186+
exec_evt = helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
187+
exec_evt.router.sourceAppID = "source-app"
188+
189+
new_events = [
190+
helpers.new_orchestrator_started_event(),
191+
exec_evt,
192+
]
193+
194+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
195+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
196+
actions = result.actions
197+
198+
assert len(actions) == 1
199+
action = actions[0]
200+
assert action.router.sourceAppID == "source-app"
201+
assert action.router.targetAppID == ''
202+
assert action.scheduleTask.router.sourceAppID == "source-app"
203+
assert action.scheduleTask.router.targetAppID == ''
204+
205+
206+
def test_schedule_activity_actions_router_with_app_id():
207+
"""Tests that scheduleTask action contains correct router fields when app_id is specified"""
208+
def dummy_activity(ctx, _):
209+
pass
210+
211+
def orchestrator(ctx: task.OrchestrationContext, _):
212+
yield ctx.call_activity(dummy_activity, input=42, app_id="target-app")
213+
214+
registry = worker._Registry()
215+
name = registry.add_orchestrator(orchestrator)
216+
217+
# Prepare execution started event with source app set on router
218+
exec_evt = helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)
219+
exec_evt.router.sourceAppID = "source-app"
220+
221+
new_events = [
222+
helpers.new_orchestrator_started_event(),
223+
exec_evt,
224+
]
225+
226+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
227+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
228+
actions = result.actions
229+
230+
assert len(actions) == 1
231+
action = actions[0]
232+
assert action.router.sourceAppID == "source-app"
233+
assert action.router.targetAppID == "target-app"
234+
assert action.scheduleTask.router.sourceAppID == "source-app"
235+
assert action.scheduleTask.router.targetAppID == "target-app"
236+
237+
174238
def test_activity_task_completion():
175239
"""Tests the successful completion of an activity task"""
176240

@@ -561,6 +625,70 @@ def orchestrator(ctx: task.OrchestrationContext, _):
561625
assert complete_action.result.value == "42"
562626

563627

628+
def test_create_sub_orchestration_actions_router_without_app_id():
629+
"""Tests that createSubOrchestration action contains correct router fields when app_id is specified"""
630+
def suborchestrator(ctx: task.OrchestrationContext, _):
631+
pass
632+
633+
def orchestrator(ctx: task.OrchestrationContext, _):
634+
yield ctx.call_sub_orchestrator(suborchestrator, input=None)
635+
636+
registry = worker._Registry()
637+
suborchestrator_name = registry.add_orchestrator(suborchestrator)
638+
orchestrator_name = registry.add_orchestrator(orchestrator)
639+
640+
exec_evt = helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None)
641+
exec_evt.router.sourceAppID = "source-app"
642+
643+
new_events = [
644+
helpers.new_orchestrator_started_event(),
645+
exec_evt,
646+
]
647+
648+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
649+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
650+
actions = result.actions
651+
652+
assert len(actions) == 1
653+
action = actions[0]
654+
assert action.router.sourceAppID == "source-app"
655+
assert action.router.targetAppID == ''
656+
assert action.createSubOrchestration.router.sourceAppID == "source-app"
657+
assert action.createSubOrchestration.router.targetAppID == ''
658+
659+
660+
def test_create_sub_orchestration_actions_router_with_app_id():
661+
"""Tests that createSubOrchestration action contains correct router fields when app_id is specified"""
662+
def suborchestrator(ctx: task.OrchestrationContext, _):
663+
pass
664+
665+
def orchestrator(ctx: task.OrchestrationContext, _):
666+
yield ctx.call_sub_orchestrator(suborchestrator, input=None, app_id="target-app")
667+
668+
registry = worker._Registry()
669+
suborchestrator_name = registry.add_orchestrator(suborchestrator)
670+
orchestrator_name = registry.add_orchestrator(orchestrator)
671+
672+
exec_evt = helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None)
673+
exec_evt.router.sourceAppID = "source-app"
674+
675+
new_events = [
676+
helpers.new_orchestrator_started_event(),
677+
exec_evt,
678+
]
679+
680+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
681+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
682+
actions = result.actions
683+
684+
assert len(actions) == 1
685+
action = actions[0]
686+
assert action.router.sourceAppID == "source-app"
687+
assert action.router.targetAppID == "target-app"
688+
assert action.createSubOrchestration.router.sourceAppID == "source-app"
689+
assert action.createSubOrchestration.router.targetAppID == "target-app"
690+
691+
564692
def test_sub_orchestration_task_failed():
565693
"""Tests that a sub-orchestration task is completed when the sub-orchestration fails"""
566694
def suborchestrator(ctx: task.OrchestrationContext, _):

0 commit comments

Comments
 (0)