Skip to content
This repository was archived by the owner on Jan 10, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,3 @@ Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

1 change: 1 addition & 0 deletions aws-flow/lib/aws/decider/generic_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _retry_with_options(lambda_to_execute, retry_function, retry_options, args =
if failure.nil?
output.set(result)
else
raise failure if retry_options.return_on_start
output.set(nil)
end
end
Expand Down
17 changes: 15 additions & 2 deletions aws-flow/lib/aws/decider/task_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_decision_task
@domain.decision_tasks.poll_for_single_task(@task_list)
end

def poll_and_process_single_task
def poll_and_process_single_task(opts={})
# TODO waitIfSuspended
begin
@logger.debug "Polling for a new decision task of type #{@handler.workflow_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
Expand Down Expand Up @@ -96,6 +96,9 @@ def poll_and_process_single_task
@logger.info Utilities.workflow_task_to_debug_string("Finished executing task", task, @task_list)
rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
@logger.error "Error in the poller, #{e.inspect}"
rescue Interrupt => e
@logger.error "Error in the poller, #{e.inspect}"
raise Interrupt
rescue Exception => e
@logger.error "Error in the poller, #{e.inspect}"
end
Expand Down Expand Up @@ -356,7 +359,13 @@ def process_single_task(task)
# *Optional*. Whether to use forking to execute the task. On Windows,
# you should set this to `false`.
#
def poll_and_process_single_task(use_forking = true)
def poll_and_process_single_task(opts = {})
# Support older style where the argument passed was a boolean
use_forking = if [true, false].include? opts
opts
else
opts[:use_forking] || true
end
@poll_semaphore ||= SuspendableSemaphore.new
@poll_semaphore.acquire
semaphore_needs_release = true
Expand All @@ -369,6 +378,10 @@ def poll_and_process_single_task(use_forking = true)
if task
@logger.info Utilities.activity_task_to_debug_string("Got activity task", task)
end
rescue Interrupt => e
@poll_semaphore.release
@logger.error "Error in the poller, #{e.inspect}"
raise Interrupt
rescue Exception => e
@logger.error "Error in the poller, #{e.inspect}"
@poll_semaphore.release
Expand Down
99 changes: 63 additions & 36 deletions aws-flow/lib/aws/decider/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,37 @@ def initialize(service, domain, task_list_to_poll, *args, &block)
if args
args.each { |klass_or_instance| add_implementation(klass_or_instance) }
end
@aws_flow_signals = []
@shutting_down = false
%w{ TERM INT }.each do |signal|
Signal.trap(signal) do
if @shutting_down
@executor.shutdown(0) if @executor
Kernel.exit! 1
else
@shutting_down = true
@shutdown_first_time_function.call if @shutdown_first_time_function
end
@aws_flow_signals << signal
raise Interrupt
end
end
end

def run_once(should_register = false, poller = nil)
register if should_register
poller ||= generate_poller
Kernel.exit if @shutting_down
poller.poll_and_process_single_task(@options)
end

def handle_signals
# This function itself needs to be able to handle interrupts, in case we get them in close succession
begin
return if @aws_flow_signals.empty?
if @shutting_down
@executor.shutdown(0) if @executor
Kernel.exit! 1
else
@shutting_down = true
@shutdown_first_time_function.call if @shutdown_first_time_function
end
rescue Interrupt
@executor.shutdown(0) if @executor
Kernel.exit! 1
end
end

Expand Down Expand Up @@ -207,15 +227,15 @@ def register
# first. If {#register} was already called
# for this workflow worker, specify `false`.
#
def start(should_register = true)
def start(should_register = true, poller = nil)
# TODO check to make sure that the correct properties are set
# TODO Register the domain if not already registered
# TODO register types to poll
# TODO Set up throttler
# TODO Set up a timeout on the throttler correctly,
# TODO Make this a generic poller, go to the right kind correctly

poller = WorkflowTaskPoller.new(
poller ||= WorkflowTaskPoller.new(
@service,
@domain,
DecisionTaskHandler.new(@workflow_definition_map, @options),
Expand All @@ -226,10 +246,24 @@ def start(should_register = true)
register if should_register
@logger.debug "Starting an infinite loop to poll and process workflow tasks."
loop do
run_once(false, poller)
begin
run_once(false, poller)
rescue Interrupt
handle_signals
end
end
end


def generate_poller
WorkflowTaskPoller.new(
@service,
@domain,
DecisionTaskHandler.new(@workflow_definition_map, @options),
@task_list,
@options
)
end
# Starts the workflow and runs it once, with an optional
# {WorkflowTaskPoller}.
#
Expand All @@ -239,18 +273,7 @@ def start(should_register = true)
# An optional {WorkflowTaskPoller} to use.
#
def run_once(should_register = false, poller = nil)
register if should_register

poller = WorkflowTaskPoller.new(
@service,
@domain,
DecisionTaskHandler.new(@workflow_definition_map, @options),
@task_list,
@options
) if poller.nil?

Kernel.exit if @shutting_down
poller.poll_and_process_single_task
super
end
end

Expand Down Expand Up @@ -391,7 +414,6 @@ def add_activities_implementation(class_or_instance)
end
end


# Starts the activity that was added to the `ActivityWorker`.
#
# @param [true, false] should_register
Expand All @@ -412,10 +434,26 @@ def start(should_register = true)

@logger.debug "Starting an infinite loop to poll and process activity tasks."
loop do
run_once(false, poller)
begin
run_once(false, poller)
rescue Interrupt
handle_signals
end

end
end

def generate_poller
ActivityTaskPoller.new(
@service,
@domain,
@task_list,
@activity_definition_map,
@executor,
@options
)
end

# Starts the activity that was added to the `ActivityWorker` and,
# optionally, sets the {ActivityTaskPoller}.
#
Expand All @@ -428,18 +466,7 @@ def start(should_register = true)
# {ActivityTaskPoller} will be created.
#
def run_once(should_register = true, poller = nil)
register if should_register
poller = ActivityTaskPoller.new(
@service,
@domain,
@task_list,
@activity_definition_map,
@executor,
@options
) if poller.nil?

Kernel.exit if @shutting_down
poller.poll_and_process_single_task(@options.use_forking)
super
end
end

Expand Down
Loading