Skip to content

Commit 901db50

Browse files
authored
[Feature] Implement Worker cancellation on signals and graceful shutdown timeout (#103)
* Add Signal handling to Temporalio::Worker * Implement activity cancellation on a shutdown after a graceful timeout * Switch to using ActivityCancelled error for Cancellations * Rename stop_on_signals to shutdown_signals
1 parent b54ff8c commit 901db50

File tree

21 files changed

+221
-64
lines changed

21 files changed

+221
-64
lines changed

.rubocop.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,5 @@ Metrics/PerceivedComplexity:
7676
Naming/MemoizedInstanceVariableName:
7777
Enabled: false
7878

79-
Name/VariableNumber:
79+
Naming/VariableNumber:
8080
EnforcedStyle: snake_case

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,23 @@ Temporal::Worker.run(worker_1, worker_2, worker_3)
154154
Please note that similar to `Temporal::Worker#run`, `Temporal::Worker.run` accepts a block that
155155
behaves the same way — the workers will be shut down when the block finishes.
156156

157+
You can also configure your worker to listen on process signals to initiate a shutdown:
158+
159+
```ruby
160+
Temporal::Worker.run(worker_1, worker_2, worker_3, shutdown_signals: %w[INT TERM])
161+
```
162+
157163
#### Worker Shutdown
158164

159165
The `Temporal::Worker#run` (as well as `Temporal::Worker#shutdown`) invocation will wait on all
160166
activities to complete, so if a long-running activity does not at least respect cancellation, the
161167
shutdown may never complete.
162168

169+
If a worker was initialized with a `graceful_shutdown_timeout` option then a cancellation will be
170+
issued for every running activity after the set timeout. The bahaviour is mostly identical to a
171+
server requested cancellation and should be handled accordingly. More on this in [Heartbeating and
172+
Cancellation](#heartbeating-and-cancellation).
173+
163174
### Activities
164175

165176
#### Definition
@@ -206,7 +217,7 @@ persisted on the server for retrieval during activity retry. If an activity call
206217
return an array containing `123` and `456` on the next run.
207218

208219
A cancellation is implemented using the `Thread#raise` method, which will raise a
209-
`Temporal::Error::CancelledError` during the execution of an activity. This means that your code
220+
`Temporal::Error::ActivityCancelled` during the execution of an activity. This means that your code
210221
might get interrupted at any point and never complete. In order to protect critical parts of your
211222
activities wrap them in `activity.shield`:
212223

@@ -245,6 +256,9 @@ end
245256
For any long-running activity using this approach it is recommended to periodically check
246257
`activity.cancelled?` flag and respond accordingly.
247258

259+
Please note that your activities can also get cancelled during a worker shutdown process ([if
260+
configured accordingly](#worker-shutdown)).
261+
248262

249263
## Dev Setup
250264

lib/temporalio/activity/context.rb

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'temporalio/error/failure'
2+
require 'temporalio/errors'
23

34
module Temporalio
45
class Activity
@@ -14,7 +15,7 @@ def initialize(info, heartbeat_proc, shielded: false)
1415
@thread = Thread.current
1516
@info = info
1617
@heartbeat_proc = heartbeat_proc
17-
@cancelled = false
18+
@pending_cancellation = nil
1819
@shielded = shielded
1920
@mutex = Mutex.new
2021
end
@@ -41,7 +42,7 @@ def heartbeat(*details)
4142
def shield(&block)
4243
# The whole activity is shielded, called from a nested shield
4344
# or while handling a CancelledError (in a rescue or ensure blocks)
44-
return block.call if @shielded || @cancelled
45+
return block.call if @shielded || cancelled?
4546

4647
if Thread.current != thread
4748
# TODO: Use logger instead when implemented
@@ -52,7 +53,8 @@ def shield(&block)
5253
mutex.synchronize do
5354
@shielded = true
5455
result = block.call
55-
raise Temporalio::Error::CancelledError, 'Unhandled cancellation' if @cancelled
56+
# RBS: StandardError fallback is only needed to satisfy steep - https://github.com/soutaro/steep/issues/477
57+
raise @pending_cancellation || StandardError if cancelled?
5658

5759
result
5860
ensure # runs while still holding the lock
@@ -64,19 +66,23 @@ def shield(&block)
6466
#
6567
# @return [Boolean] true if the activity has had a cancellation request, false otherwise.
6668
def cancelled?
67-
@cancelled
69+
!!@pending_cancellation
6870
end
6971

70-
# Cancel the running activity.
72+
# Cancel the running activity by raising a provided error.
73+
#
74+
# @param reason [String] Reason for cancellation.
75+
# @param by_request [Boolean] Cancellation requested by the server or not.
7176
#
7277
# @api private
73-
def cancel
74-
@cancelled = true
78+
def cancel(reason, by_request: false)
79+
error = Temporalio::Error::ActivityCancelled.new(reason, by_request)
80+
@pending_cancellation = error
7581

7682
locked = mutex.try_lock
7783
# @shielded inside the lock means the whole activity is shielded
7884
if locked && !@shielded
79-
thread.raise(Temporalio::Error::CancelledError.new('Unhandled cancellation'))
85+
thread.raise(error)
8086
end
8187
ensure
8288
# only release the lock if we locked it

lib/temporalio/errors.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,18 @@ def initialize(status)
2323
class Internal < Error; end
2424

2525
class WorkerShutdown < Internal; end
26+
27+
# This error is used within the SDK to communicate Activity cancellations
28+
# (and whether it was requested by server or not)
29+
class ActivityCancelled < Internal
30+
def initialize(reason, by_request)
31+
super(reason)
32+
@by_request = by_request
33+
end
34+
35+
def by_request?
36+
@by_request
37+
end
38+
end
2639
end
2740
end

lib/temporalio/worker.rb

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,26 @@ class Worker
1717
# finished) and will raise if any of the workers raises a fatal error.
1818
#
1919
# @param workers [Array<Temporalio::Worker>] A list of the workers to be run.
20+
# @param shutdown_signals [Array<String>] A list of process signals for the worker to stop on.
21+
# This argument can not be used with a custom block.
2022
#
2123
# @yield Optionally you can provide a block by the end of which all the workers will be shut
2224
# down. Any errors raised from this block will be re-raised by this method.
23-
def self.run(*workers, &block)
24-
# TODO: Add signal handling
25+
def self.run(*workers, shutdown_signals: [], &block)
26+
unless shutdown_signals.empty?
27+
if block
28+
raise ArgumentError, 'Temporalio::Worker.run accepts :shutdown_signals or a block, but not both'
29+
end
30+
31+
signal_queue = Queue.new
32+
33+
shutdown_signals.each do |signal|
34+
Signal.trap(signal) { signal_queue.close }
35+
end
36+
37+
block = -> { signal_queue.pop }
38+
end
39+
2540
Runner.new(*workers).run(&block)
2641
end
2742

@@ -36,6 +51,9 @@ def self.run(*workers, &block)
3651
# @param activity_executor [ThreadPoolExecutor] Concurrent executor for all activities. Defaults
3752
# to a {ThreadPoolExecutor} with `:max_concurrent_activities` available threads.
3853
# @param max_concurrent_activities [Integer] Number of concurrently running activities.
54+
# @param graceful_shutdown_timeout [Integer] Amount of time (in seconds) activities are given
55+
# after a shutdown to complete before they are cancelled. A default value of `nil` means that
56+
# activities are never cancelled when handling a shutdown.
3957
#
4058
# @raise [ArgumentError] When no activities or workflows have been provided.
4159
def initialize(
@@ -45,7 +63,8 @@ def initialize(
4563
activities: [],
4664
data_converter: Temporalio::DataConverter.new,
4765
activity_executor: nil,
48-
max_concurrent_activities: 100
66+
max_concurrent_activities: 100,
67+
graceful_shutdown_timeout: nil
4968
)
5069
# TODO: Add worker interceptors
5170
@started = false
@@ -59,13 +78,17 @@ def initialize(
5978
namespace,
6079
task_queue,
6180
)
62-
@activity_worker = init_activity_worker(
63-
task_queue,
64-
@core_worker,
65-
activities,
66-
data_converter,
67-
@activity_executor,
68-
)
81+
@activity_worker =
82+
unless activities.empty?
83+
Worker::ActivityWorker.new(
84+
task_queue,
85+
@core_worker,
86+
activities,
87+
data_converter,
88+
@activity_executor,
89+
graceful_shutdown_timeout,
90+
)
91+
end
6992
@workflow_worker = nil
7093

7194
if !@activity_worker && !@workflow_worker
@@ -171,11 +194,5 @@ def running?
171194

172195
attr_reader :mutex, :runtime, :activity_executor, :core_worker, :activity_worker,
173196
:workflow_worker, :runner
174-
175-
def init_activity_worker(task_queue, core_worker, activities, data_converter, executor)
176-
return if activities.empty?
177-
178-
Worker::ActivityWorker.new(task_queue, core_worker, activities, data_converter, executor)
179-
end
180197
end
181198
end

lib/temporalio/worker/activity_runner.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
require 'google/protobuf/well_known_types'
22
require 'temporalio/activity/context'
33
require 'temporalio/activity/info'
4+
require 'temporalio/error/failure'
5+
require 'temporalio/errors'
46

57
module Temporalio
68
class Worker
@@ -27,11 +29,21 @@ def run
2729

2830
converter.to_payload(result)
2931
rescue StandardError => e
32+
# Temporal server ignores cancellation failures that were not requested by the server.
33+
# However within the SDK cancellations are also used during the worker shutdown. In order
34+
# to provide a seamless handling experience (same error raised within the Activity) we are
35+
# using the ActivityCancelled error and then swapping it with a CancelledError here.
36+
#
37+
# In the future this will be handled by the SDK Core — https://github.com/temporalio/sdk-core/issues/461
38+
if e.is_a?(Temporalio::Error::ActivityCancelled) && e.by_request?
39+
e = Temporalio::Error::CancelledError.new(e.message)
40+
end
41+
3042
converter.to_failure(e)
3143
end
3244

33-
def cancel
34-
context.cancel
45+
def cancel(reason, by_request:)
46+
context.cancel(reason, by_request: by_request)
3547
end
3648

3749
private

lib/temporalio/worker/activity_worker.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ module Temporalio
77
class Worker
88
# @api private
99
class ActivityWorker
10-
def initialize(task_queue, core_worker, activities, converter, executor)
10+
def initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout)
1111
@task_queue = task_queue
1212
@worker = SyncWorker.new(core_worker)
1313
@activities = prepare_activities(activities)
1414
@converter = converter
1515
@executor = executor
16+
@graceful_timeout = graceful_timeout
1617
@running_activities = {}
1718
@cancellations = []
1819
@drain_queue = Queue.new
@@ -37,8 +38,19 @@ def run(reactor)
3738
rescue Temporalio::Bridge::Error::WorkerShutdown
3839
# No need to re-raise this error, it's a part of a normal shutdown
3940
ensure
40-
reactor.async do
41+
reactor.async do |async_task|
42+
cancelation_task =
43+
if graceful_timeout
44+
async_task.async do
45+
sleep graceful_timeout
46+
running_activities.each_value do |activity_runner|
47+
activity_runner.cancel('Worker is shutting down', by_request: false)
48+
end
49+
end
50+
end
51+
4152
outstanding_tasks.each(&:wait)
53+
cancelation_task&.stop # all tasks completed, stop cancellations
4254
drain_queue.close
4355
end
4456
end
@@ -49,8 +61,8 @@ def drain
4961

5062
private
5163

52-
attr_reader :task_queue, :worker, :activities, :converter, :executor, :running_activities,
53-
:cancellations, :drain_queue
64+
attr_reader :task_queue, :worker, :activities, :converter, :executor, :graceful_timeout,
65+
:running_activities, :cancellations, :drain_queue
5466

5567
def prepare_activities(activities)
5668
activities.each_with_object({}) do |activity, result|
@@ -119,7 +131,7 @@ def handle_cancel_activity(task_token, _cancel)
119131
end
120132

121133
cancellations << task_token
122-
runner&.cancel
134+
runner&.cancel('Activity cancellation requested', by_request: true)
123135
end
124136
end
125137
end

lib/temporalio/worker/runner.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'temporalio/errors'
2+
13
module Temporalio
24
class Worker
35
# A class used to manage the lifecycle of running any number of workers.

sig/async.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ module Async
88
class Task
99
def async: { (Async::Task) -> void } -> Async::Task
1010
def wait: -> void
11+
def stop: -> void
1112
end
1213
end
1314

sig/temporalio/activity/context.rbs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ module Temporalio
77
def heartbeat: (*untyped details) -> void
88
def shield: { -> untyped } -> untyped
99
def cancelled?: -> bool
10-
def cancel: -> void
10+
def cancel: (String reason, ?by_request: bool) -> void
1111

1212
private
1313

1414
@shielded: bool
15-
@cancelled: bool
15+
@pending_cancellation: Exception?
1616

1717
attr_reader thread: Thread
1818
attr_reader heartbeat_proc: ^(*untyped) -> void

0 commit comments

Comments
 (0)