diff --git a/CHANGELOG.md b/CHANGELOG.md index 12b3e04..9488eac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ v?.?.? - ?? ??? ???? --- +v0.6.0 - ?? ??? 2024 +--- + +* Add `before_started`, `after_completed` and `after_failed` functionality. +* Add `logger` helper method, available within task methods. +* Use `SecureRandom.hex(10)` instead of `uuid` for shorter process and tasks IDs. +* Bug fix for options on `sequential`, `concurrent`, `for_each` and `sub_process` methods. +* Bug fix instrumentation payload. +* Documentation updates. + v0.5.2 - 04 Oct 2024 --- * Time arguments fix for Redis 5.0. Fixes #28 diff --git a/Gemfile.lock b/Gemfile.lock index b097af4..eb63330 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - taskinator (0.5.2) + taskinator (0.6.0) builder (>= 3.2.2) connection_pool (>= 2.2.0) globalid (>= 0.3) @@ -115,10 +115,10 @@ GEM rspec-mocks (~> 3.12.0) rspec-core (3.12.0) rspec-support (~> 3.12.0) - rspec-expectations (3.12.1) + rspec-expectations (3.12.2) diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.12.0) - rspec-mocks (3.12.1) + rspec-mocks (3.12.2) diff-lcs (>= 1.2.0, < 2.0) rspec-support (~> 3.12.0) rspec-rails (5.1.2) diff --git a/LICENSE.txt b/LICENSE similarity index 100% rename from LICENSE.txt rename to LICENSE diff --git a/README.md b/README.md index e212fb4..7bcdc97 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ end process = MyProcess.create_process Date.today, :option_1 => true ``` -_NOTE:_ The current implementation performs a naive check on the count of arguments. +_NOTE:_ The current implementation performs a naïve check on the count of arguments. Next, specify the tasks with their corresponding implementation methods, that make up the process, using the `task` method and providing the `method` to execute for the task. @@ -291,6 +291,51 @@ module MyProcess end ``` +#### Before Process Started and After Process Completion or Failure + +You may want to run further tasks asynchrously before or after a process has completed +or failed. These tasks provide a way to execute logic independently of the process. + +Specify these tasks using the `before_started`, `after_completed` or `after_failed` methods. + +For example, using `after_completed` to set off another business process or `after_failed` to +send an email to an operator. + +```ruby +module MyProcess + extend Taskinator::Definition + + # defines a process + define_process do + + # define task to execute on before + before_started :slack_notification + + # usual tasks, sub-process, etc. + + # define task to execute on completion + after_completed :further_process + + # define task to execute on failure + after_failed :email_operations + + end + + def slack_notification + # ... + end + + def further_process + # ... + end + + def email_operations + # ... + end + +end +``` + #### Complex Process Definitions Any combination or nesting of `task`, `sequential`, `concurrent` and `for_each` steps are @@ -363,12 +408,12 @@ MyProcess.create_process(1, 2, 3, :send_notification => true) ``` -#### Reusing ActiveJob jobs +#### Reusing `ActiveJob` jobs It is likely that you already have one or more [jobs](https://guides.rubyonrails.org/active_job_basics.html) and want to reuse them within the process definition. -Define a `job` step, providing the class of the Active Job to run and then taskinator will +Define a `job` step, providing the class of the `ActiveJob` to run and then taskinator will invoke that job as part of the process. The `job` step will be queued and executed on same queue as @@ -425,7 +470,7 @@ _This may be something that gets refactored down the line_. To best understand how arguments are handled, you need to break it down into 3 phases. Namely: * Definition, - * Creation and + * Creation, and * Execution Firstly, a process definition is declarative in that the `define_process` and a mix of diff --git a/lib/taskinator.rb b/lib/taskinator.rb index 3f8f171..30d30c9 100644 --- a/lib/taskinator.rb +++ b/lib/taskinator.rb @@ -3,6 +3,7 @@ require 'securerandom' require 'benchmark' require 'delegate' +require 'builder' require 'taskinator/version' @@ -10,6 +11,7 @@ require 'taskinator/redis_connection' require 'taskinator/logger' +require 'taskinator/builder' require 'taskinator/definition' require 'taskinator/workflow' @@ -33,7 +35,7 @@ module Taskinator NAME = "Taskinator" - LICENSE = 'See LICENSE.txt for licensing details.' + LICENSE = 'See LICENSE for licensing details.' DEFAULTS = { # none for now... @@ -48,7 +50,7 @@ def options=(opts) end def generate_uuid - SecureRandom.uuid + SecureRandom.hex(10) end ## diff --git a/lib/taskinator/builder.rb b/lib/taskinator/builder.rb index 619dd16..d27e106 100644 --- a/lib/taskinator/builder.rb +++ b/lib/taskinator/builder.rb @@ -15,6 +15,9 @@ def initialize(process, definition, *args) end def option?(key, &block) + # instead of LocalJumpError + raise ArgumentError, 'block' unless block_given? + yield if builder_options[key] end @@ -24,7 +27,7 @@ def sequential(options={}, &block) sub_process = Process.define_sequential_process_for(@definition, options) task = define_sub_process_task(@process, sub_process, options) - Builder.new(sub_process, @definition, *@args).instance_eval(&block) + Builder.new(sub_process, @definition, *@args, @builder_options).instance_eval(&block) @process.tasks << task if sub_process.tasks.any? nil end @@ -35,7 +38,7 @@ def concurrent(complete_on=CompleteOn::Default, options={}, &block) sub_process = Process.define_concurrent_process_for(@definition, complete_on, options) task = define_sub_process_task(@process, sub_process, options) - Builder.new(sub_process, @definition, *@args).instance_eval(&block) + Builder.new(sub_process, @definition, *@args, @builder_options).instance_eval(&block) @process.tasks << task if sub_process.tasks.any? nil end @@ -54,7 +57,7 @@ def for_each(method, options={}, &block) # method_args = options.any? ? [*@args, options] : @args @executor.send(method, *method_args) do |*args| - Builder.new(@process, @definition, *args).instance_eval(&block) + Builder.new(@process, @definition, *args, @builder_options).instance_eval(&block) end nil end @@ -80,9 +83,32 @@ def job(job, options={}) nil end - # TODO: add mailer - # TODO: add complete! - # TODO: add fail! + # defines a task which executes the given @method before the process has started + def before_started(method, options={}) + raise ArgumentError, 'method' if method.nil? + raise NoMethodError, method unless @executor.respond_to?(method) + + define_before_started_task(@process, method, @args, options) + nil + end + + # defines a task which executes the given @method after the process has completed + def after_completed(method, options={}) + raise ArgumentError, 'method' if method.nil? + raise NoMethodError, method unless @executor.respond_to?(method) + + define_after_completed_task(@process, method, @args, options) + nil + end + + # defines a task which executes the given @method after the process has failed + def after_failed(method, options={}) + raise ArgumentError, 'method' if method.nil? + raise NoMethodError, method unless @executor.respond_to?(method) + + define_after_failed_task(@process, method, @args, options) + nil + end # defines a sub process task, for the given @definition # the definition specified must have input compatible arguments @@ -101,13 +127,31 @@ def sub_process(definition, options={}) private def define_step_task(process, method, args, options={}) - define_task(process) { + add_task(process.tasks) { Task.define_step_task(process, method, args, combine_options(options)) } end + def define_before_started_task(process, method, args, options={}) + add_task(process.before_started_tasks) { + Task.define_hook_task(process, method, args, combine_options(options)) + } + end + + def define_after_completed_task(process, method, args, options={}) + add_task(process.after_completed_tasks) { + Task.define_hook_task(process, method, args, combine_options(options)) + } + end + + def define_after_failed_task(process, method, args, options={}) + add_task(process.after_failed_tasks) { + Task.define_hook_task(process, method, args, combine_options(options)) + } + end + def define_job_task(process, job, args, options={}) - define_task(process) { + add_task(process.tasks) { Task.define_job_task(process, job, args, combine_options(options)) } end @@ -116,8 +160,8 @@ def define_sub_process_task(process, sub_process, options={}) Task.define_sub_process_task(process, sub_process, combine_options(options)) end - def define_task(process) - process.tasks << task = yield + def add_task(list) + list << task = yield task end diff --git a/lib/taskinator/create_process_worker.rb b/lib/taskinator/create_process_worker.rb index 2684774..55b6229 100644 --- a/lib/taskinator/create_process_worker.rb +++ b/lib/taskinator/create_process_worker.rb @@ -31,6 +31,10 @@ def perform process_args << { :uuid => uuid } end + # generate the process for the given definition and arguments + # and enqueue the processes tasks + # -> sequential processes - enqueues the first task + # -> concurrent processes - enqueues all the tasks @definition._create_process_(false, *process_args).enqueue! end diff --git a/lib/taskinator/definition.rb b/lib/taskinator/definition.rb index 3db418b..a16e069 100644 --- a/lib/taskinator/definition.rb +++ b/lib/taskinator/definition.rb @@ -1,5 +1,3 @@ -require 'taskinator/builder' - module Taskinator module Definition diff --git a/lib/taskinator/executor.rb b/lib/taskinator/executor.rb index aece2b1..e419777 100644 --- a/lib/taskinator/executor.rb +++ b/lib/taskinator/executor.rb @@ -25,5 +25,16 @@ def options task.options if task end + # helpers + + def logger + Taskinator.logger + end + + def error + # task.process.error + raise NoMethodError + end + end end diff --git a/lib/taskinator/instrumentation.rb b/lib/taskinator/instrumentation.rb index 6e78a59..1085a84 100644 --- a/lib/taskinator/instrumentation.rb +++ b/lib/taskinator/instrumentation.rb @@ -44,7 +44,7 @@ def payload_for(state, additional={}) # need to cache here, since this method hits redis, so can't be part of multi statement following process_key = self.process_key - tasks_count, processing_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn| + count, processing, completed, cancelled, failed = Taskinator.redis do |conn| conn.hmget process_key, :tasks_count, :tasks_processing, @@ -53,7 +53,7 @@ def payload_for(state, additional={}) :tasks_failed end - tasks_count = tasks_count.to_f + count = count.to_f return OpenStruct.new( { @@ -64,12 +64,12 @@ def payload_for(state, additional={}) :uuid => uuid, :options => options.dup, :state => state, - :percentage_failed => (tasks_count > 0) ? (failed_count.to_i / tasks_count) * 100.0 : 0.0, - :percentage_cancelled => (tasks_count > 0) ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0, - :percentage_processing => (tasks_count > 0) ? (processing_count.to_i / tasks_count) * 100.0 : 0.0, - :percentage_completed => (tasks_count > 0) ? (completed_count.to_i / tasks_count) * 100.0 : 0.0, + :percentage_failed => (count > 0) ? (failed.to_i / count) * 100.0 : 0.0, + :percentage_cancelled => (count > 0) ? (cancelled.to_i / count) * 100.0 : 0.0, + :percentage_processing => (count > 0) ? (processing.to_i / count) * 100.0 : 0.0, + :percentage_completed => (count > 0) ? (completed.to_i / count) * 100.0 : 0.0, }.merge(additional) - ).freeze + ) end diff --git a/lib/taskinator/persistence.rb b/lib/taskinator/persistence.rb index 0eb16e0..3ed923b 100644 --- a/lib/taskinator/persistence.rb +++ b/lib/taskinator/persistence.rb @@ -1,5 +1,3 @@ -require 'builder' - module Taskinator module Persistence @@ -224,7 +222,6 @@ def cleanup(expire_in=EXPIRE_IN) end end - end class RedisSerializationVisitor < Taskinator::Visitor::Base @@ -276,16 +273,19 @@ def visit_process(attribute) end def visit_tasks(tasks) - tasks.each do |task| - RedisSerializationVisitor.new(@conn, task, @base_visitor).visit - @conn.rpush "#{@key}:tasks", task.uuid - unless task.is_a?(Task::SubProcess) - incr_task_count unless self == @base_visitor - @base_visitor.incr_task_count - end - end - @conn.set("#{@key}.count", tasks.count) - @conn.set("#{@key}.pending", tasks.count) + _visit_tasks(tasks) + end + + def visit_before_started_tasks(tasks) + _visit_tasks(tasks, ':before_started') + end + + def visit_after_completed_tasks(tasks) + _visit_tasks(tasks, ':after_completed') + end + + def visit_after_failed_tasks(tasks) + _visit_tasks(tasks, ':after_failed') end def visit_attribute(attribute) @@ -336,6 +336,21 @@ def task_count def incr_task_count @task_count += 1 end + + private + + def _visit_tasks(tasks, list='') + tasks.each do |task| + RedisSerializationVisitor.new(@conn, task, @base_visitor).visit + @conn.rpush "#{@key}#{list}:tasks", task.uuid + unless task.is_a?(Task::SubProcess) + incr_task_count unless self == @base_visitor + @base_visitor.incr_task_count + end + end + @conn.set("#{@key}#{list}.count", tasks.count) + @conn.set("#{@key}#{list}.pending", tasks.count) + end end class XmlSerializationVisitor < Taskinator::Visitor::Base @@ -367,7 +382,7 @@ def visit @instance.accept(self) - @attributes << [:task_count, @task_count] + @attributes << [:task_count, task_count] @attributes.each do |(name, value)| builder.tag!('attribute', name => value) @@ -388,17 +403,19 @@ def visit_process(attribute) end def visit_tasks(tasks) - builder.tag!('tasks', :count => tasks.count) do |xml| - tasks.each do |task| - xml.tag!('task', :key => task.key) do |xml2| - XmlSerializationVisitor.new(xml2, task, @base_visitor).visit - unless task.is_a?(Task::SubProcess) - incr_task_count unless self == @base_visitor - @base_visitor.incr_task_count - end - end - end - end + _visit_tasks(tasks) + end + + def visit_before_started_tasks(tasks) + _visit_tasks(tasks, 'before_started') + end + + def visit_after_completed_tasks(tasks) + _visit_tasks(tasks, 'after_completed') + end + + def visit_after_failed_tasks(tasks) + _visit_tasks(tasks, 'after_failed') end def visit_attribute(attribute) @@ -448,6 +465,22 @@ def task_count def incr_task_count @task_count += 1 end + + private + + def _visit_tasks(tasks, list='tasks') + builder.tag!(list, :count => tasks.count) do |xml| + tasks.each do |task| + xml.tag!('task', :key => task.key) do |xml2| + XmlSerializationVisitor.new(xml2, task, @base_visitor).visit + unless task.is_a?(Task::SubProcess) + incr_task_count unless self == @base_visitor + @base_visitor.incr_task_count + end + end + end + end + end end class UnknownTypeError < StandardError @@ -543,11 +576,19 @@ def visit_process(attribute) end def visit_tasks(tasks) - # tasks are a linked list, so just get the first one - Taskinator.redis do |conn| - uuid = conn.lindex("#{@key}:tasks", 0) - tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}.count").to_i) if uuid - end + _visit_tasks(tasks) + end + + def visit_before_started_tasks(tasks) + _visit_tasks(tasks, ':before_started') + end + + def visit_after_completed_tasks(tasks) + _visit_tasks(tasks, ':after_completed') + end + + def visit_after_failed_tasks(tasks) + _visit_tasks(tasks, ':after_failed') end def visit_process_reference(attribute) @@ -609,6 +650,14 @@ def visit_args(attribute) private + def _visit_tasks(tasks, list='') + # tasks are a linked list, so just get the first one + Taskinator.redis do |conn| + uuid = conn.lindex("#{@key}#{list}:tasks", 0) + tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}#{list}.count").to_i) if uuid + end + end + # # creates a proxy for the instance which # will only fetch the instance when used @@ -651,14 +700,31 @@ def visit_process(attribute) end def visit_tasks(tasks) - @conn.expire "#{@key}:tasks", expire_in - @conn.expire "#{@key}.count", expire_in - @conn.expire "#{@key}.pending", expire_in + _visit_tasks(tasks) + end + + def visit_before_started_tasks(tasks) + _visit_tasks(tasks, ':before_started') + end + + def visit_after_completed_tasks(tasks) + _visit_tasks(tasks, ':after_completed') + end + + def visit_after_failed_tasks(tasks) + _visit_tasks(tasks, ':after_failed') + end + + private + + def _visit_tasks(tasks, list='') + @conn.expire "#{@key}#{list}:tasks", expire_in + @conn.expire "#{@key}#{list}.count", expire_in + @conn.expire "#{@key}#{list}.pending", expire_in tasks.each do |task| RedisCleanupVisitor.new(@conn, task, expire_in).visit end end - end # lazily loads the object specified by the type and uuid @@ -683,7 +749,6 @@ def __getobj__ # and memoize for subsequent calls @instance ||= @type.fetch(@uuid, @instance_cache) end - end class << self diff --git a/lib/taskinator/process.rb b/lib/taskinator/process.rb index 421a617..e075736 100644 --- a/lib/taskinator/process.rb +++ b/lib/taskinator/process.rb @@ -51,10 +51,26 @@ def parent=(value) @key = nil # NB: invalidate memoized key end + def sub_process? + defined?(@parent) + end + def tasks @tasks ||= Tasks.new end + def before_started_tasks + @before_started_tasks ||= Tasks.new + end + + def after_completed_tasks + @after_completed_tasks ||= Tasks.new + end + + def after_failed_tasks + @after_failed_tasks ||= Tasks.new + end + def no_tasks_defined? tasks.empty? end @@ -64,6 +80,9 @@ def accept(visitor) visitor.visit_task_reference(:parent) visitor.visit_type(:definition) visitor.visit_tasks(tasks) + visitor.visit_before_started_tasks(before_started_tasks) + visitor.visit_after_completed_tasks(after_completed_tasks) + visitor.visit_after_failed_tasks(after_failed_tasks) visitor.visit_args(:options) visitor.visit_attribute(:scope) visitor.visit_attribute(:queue) @@ -92,6 +111,9 @@ def enqueue! def start! return if paused? || cancelled? + # enqueue before started tasks independently + before_started_tasks.each(&:enqueue!) + transition(:processing) do instrument('taskinator.process.processing', processing_payload) do start @@ -132,10 +154,10 @@ def complete! end end end - end - # TODO: add retry method - to pick up from a failed task - # e.g. like retrying a failed job in Resque Web + # enqueue completion tasks independently + after_completed_tasks.each(&:enqueue!) + end def tasks_completed? # TODO: optimize this @@ -159,10 +181,30 @@ def fail!(error) parent.fail!(error) unless parent.nil? end end + + # enqueue completion tasks independently + after_failed_tasks.each(&:enqueue!) + end + + #-------------------------------------------------- + + # TODO: add retry method - to pick up from a failed task + # e.g. like retrying a failed job in Resque Web + + def task_started(task) + return if processing? || sub_process? + + transition(:processing) do + # enqueue before started tasks independently + before_started_tasks.each(&:enqueue!) + end + end + + def task_cancelled(task) + cancel! end def task_failed(task, error) - # for now, fail this process fail!(error) end @@ -247,7 +289,6 @@ def enqueue end end - # this method only called in-process (usually from the console) def start if tasks.empty? complete! # weren't any tasks to start with diff --git a/lib/taskinator/task.rb b/lib/taskinator/task.rb index 109f913..65e3487 100644 --- a/lib/taskinator/task.rb +++ b/lib/taskinator/task.rb @@ -18,6 +18,10 @@ def define_job_task(process, job, args, options={}) def define_sub_process_task(process, sub_process, options={}) SubProcess.new(process, sub_process, options) end + + def define_hook_task(process, method, args, options={}) + Hook.new(process, method, args, options) + end end attr_reader :process @@ -79,6 +83,9 @@ def start! transition(:processing) do instrument('taskinator.task.processing', processing_payload) do + # notify the process that this task has started + process.task_started(self) if notify_process? + start end end @@ -95,21 +102,27 @@ def paused? end def complete! + self.incr_completed if incr_count? + transition(:completed) do - self.incr_completed if incr_count? instrument('taskinator.task.completed', completed_payload) do complete if respond_to?(:complete) + # notify the process that this task has completed - process.task_completed(self) + process.task_completed(self) if notify_process? end end end def cancel! + self.incr_cancelled if incr_count? + transition(:cancelled) do - self.incr_cancelled if incr_count? instrument('taskinator.task.cancelled', cancelled_payload) do cancel if respond_to?(:cancel) + + # notify the process that this task has cancelled + process.task_cancelled(self) if notify_process? end end end @@ -119,12 +132,14 @@ def cancelled? end def fail!(error) + self.incr_failed if incr_count? + transition(:failed) do - self.incr_failed if incr_count? instrument('taskinator.task.failed', failed_payload(error)) do fail(error) if respond_to?(:fail) + # notify the process that this task has failed - process.task_failed(self, error) + process.task_failed(self, error) if notify_process? end end end @@ -133,6 +148,10 @@ def incr_count? true end + def notify_process? + true + end + #-------------------------------------------------- # subclasses must implement the following methods #-------------------------------------------------- @@ -303,5 +322,64 @@ def inspect %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", definition=:#{definition}, sub_process=#{sub_process.inspect}, current_state=:#{current_state}>) end end + + #-------------------------------------------------- + + # a task which invokes the specified method on the definition + # the task is executed independently of the process, so there isn't any further + # processing once it completes (or fails) + # the args must be intrinsic types, since they are serialized to YAML + class Hook < Task + attr_reader :method + attr_reader :args + + def initialize(process, method, args, options={}) + super(process, options) + + raise ArgumentError, 'method' if method.nil? + raise NoMethodError, method unless executor.respond_to?(method) + + @method = method + @args = args + end + + def enqueue + Taskinator.queue.enqueue_task(self) + end + + def start + executor.send(method, *args) + # ASSUMPTION: when the method returns, the task is considered to be complete + complete! + + rescue => e + Taskinator.logger.error(e) + Taskinator.logger.debug(e.backtrace) + fail!(e) + raise e + end + + def accept(visitor) + super + visitor.visit_attribute(:method) + visitor.visit_args(:args) + end + + def executor + @executor ||= Taskinator::Executor.new(definition, self) + end + + def incr_count? + false + end + + def notify_process? + false + end + + def inspect + %(#<#{self.class.name}:0x#{self.__id__.to_s(16)} uuid="#{uuid}", definition=:#{definition}, method=:#{method}, args=#{args}, current_state=:#{current_state}>) + end + end end end diff --git a/lib/taskinator/version.rb b/lib/taskinator/version.rb index cb9213f..3f56e67 100644 --- a/lib/taskinator/version.rb +++ b/lib/taskinator/version.rb @@ -1,3 +1,3 @@ module Taskinator - VERSION = "0.5.2" + VERSION = "0.6.0" end diff --git a/lib/taskinator/visitor.rb b/lib/taskinator/visitor.rb index 85dfa59..619b667 100644 --- a/lib/taskinator/visitor.rb +++ b/lib/taskinator/visitor.rb @@ -7,6 +7,15 @@ def visit_process(attribute) def visit_tasks(tasks) end + def visit_before_started_tasks(tasks) + end + + def visit_after_completed_tasks(tasks) + end + + def visit_after_failed_tasks(tasks) + end + def visit_attribute(attribute) end diff --git a/lib/taskinator/workflow.rb b/lib/taskinator/workflow.rb index 0308607..3744184 100644 --- a/lib/taskinator/workflow.rb +++ b/lib/taskinator/workflow.rb @@ -21,7 +21,6 @@ def transition(new_state) enqueued processing paused - resumed completed cancelled failed diff --git a/processes_workflow.png b/processes_workflow.png deleted file mode 100644 index 346ad79..0000000 Binary files a/processes_workflow.png and /dev/null differ diff --git a/sequence.txt b/sequence.txt deleted file mode 100644 index a5afd8f..0000000 --- a/sequence.txt +++ /dev/null @@ -1,70 +0,0 @@ -# https://www.websequencediagrams.com -title Taskinator Sequence - -User->+Web: Request process -Web-->+Process(seq): Create sequential process -Process(seq)-->-Web: -Web-->+Queue: Enqueue process -Queue-->-Web: -Web-->-User: - -opt Sequential process - Queue->+Worker: Dequeue process - note right of Worker: Start sequential process - Worker->+Queue: Enqueue task - Queue-->-Worker: - Worker-->-Queue: - - loop Sequential Tasks - Queue->+Worker: Dequeue task - note right of Worker: Start task - Worker->+Queue: Enqueue task - Queue-->-Worker: - Worker->Process(seq): Task completed - note left of Process(seq): All tasks complete? - Worker-->-Queue: - end - - opt Sub Process Task - Queue->+Worker: Dequeue task - note right of Worker: Start task - Worker-->+Process(con): Create concurrent process - Process(con)-->-Worker: - Worker-->+Queue: Enqueue process - Queue-->-Worker: - Worker->Process(seq): Task completed - note left of Process(seq): All tasks complete? - Worker-->-Queue: - end - - opt Concurrent process - Queue->+Worker: Dequeue process - note right of Worker: Start concurrent process - Worker->+Queue: Enqueue task - Queue-->-Worker: - Worker->+Queue: Enqueue task - Queue-->-Worker: - Worker->Process(seq): Task completed - note left of Process(seq): All tasks complete? - Worker-->-Queue: - - opt Concurrent Tasks - Queue->+Worker: Dequeue task - Queue->+Worker: Dequeue task - Queue->+Worker: Dequeue task - Worker->Process(con): Task completed - note right of Process(con): All tasks complete? - Worker->Process(con): Task completed - note right of Process(con): All tasks complete? - Worker-->-Queue: - Worker-->-Queue: - Worker->Process(con): Task completed - Worker-->-Queue: - end - - note right of Process(con): All tasks complete? - Process(con)->Process(seq): Process completed - note left of Process(seq): All tasks complete? - - end -end diff --git a/spec/examples/process_examples.rb b/spec/examples/process_examples.rb index c39f26a..29edc84 100644 --- a/spec/examples/process_examples.rb +++ b/spec/examples/process_examples.rb @@ -9,5 +9,8 @@ it { expect(subject.to_s).to match(/#{subject.uuid}/) } it { expect(subject.options).to_not be_nil } it { expect(subject.tasks).to_not be_nil } + it { expect(subject.before_started_tasks).to_not be_nil } + it { expect(subject.after_completed_tasks).to_not be_nil } + it { expect(subject.after_failed_tasks).to_not be_nil } end diff --git a/spec/examples/visitor_examples.rb b/spec/examples/visitor_examples.rb new file mode 100644 index 0000000..8ae7330 --- /dev/null +++ b/spec/examples/visitor_examples.rb @@ -0,0 +1,46 @@ +require 'spec_helper' + +shared_examples_for "a visitor" do |visitor| + + visitor_methods = visitor.instance_methods + + # visit_process(attribute) + it { expect(visitor_methods.include?(:visit_process)).to be } + + # visit_tasks(tasks) + it { expect(visitor_methods.include?(:visit_tasks)).to be } + + # visit_before_started_tasks(tasks) + it { expect(visitor_methods.include?(:visit_before_started_tasks)).to be } + + # visit_after_completed_tasks(tasks) + it { expect(visitor_methods.include?(:visit_after_completed_tasks)).to be } + + # visit_after_failed_tasks(tasks) + it { expect(visitor_methods.include?(:visit_after_failed_tasks)).to be } + + # visit_attribute(attribute) + it { expect(visitor_methods.include?(:visit_attribute)).to be } + + # visit_attribute_time(attribute) + it { expect(visitor_methods.include?(:visit_attribute_time)).to be } + + # visit_attribute_enum(attribute, type) + it { expect(visitor_methods.include?(:visit_attribute_enum)).to be } + + # visit_process_reference(attribute) + it { expect(visitor_methods.include?(:visit_process_reference)).to be } + + # visit_task_reference(attribute) + it { expect(visitor_methods.include?(:visit_task_reference)).to be } + + # visit_type(attribute) + it { expect(visitor_methods.include?(:visit_type)).to be } + + # visit_args(attribute) + it { expect(visitor_methods.include?(:visit_args)).to be } + + # task_count + it { expect(visitor_methods.include?(:task_count)).to be } + +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 135c5d1..404af58 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -74,7 +74,7 @@ class ApplicationJob < ActiveJob::Base end config.before(:each, :redis => true) do - Taskinator.redis = { :namespace => "taskinator:test:#{SecureRandom.uuid}" } + Taskinator.redis = { :namespace => "taskinator:test:#{SecureRandom.hex(4)}" } end config.before(:each, :sidekiq => true) do diff --git a/spec/support/mock_definition.rb b/spec/support/mock_definition.rb index 2b95e14..29050dd 100644 --- a/spec/support/mock_definition.rb +++ b/spec/support/mock_definition.rb @@ -14,7 +14,7 @@ def create(queue=nil) definition.queue = queue # create a constant, so that the mock definition isn't anonymous - Object.const_set("Mock#{SecureRandom.hex}Definition", definition) + Object.const_set("Mock#{SecureRandom.hex(4)}Definition", definition) end end diff --git a/spec/support/test_definition.rb b/spec/support/test_definition.rb deleted file mode 100644 index 81ab422..0000000 --- a/spec/support/test_definition.rb +++ /dev/null @@ -1,7 +0,0 @@ -module TestDefinition - extend Taskinator::Definition - - def do_task(*args) - end - -end diff --git a/spec/support/test_definitions.rb b/spec/support/test_definitions.rb new file mode 100644 index 0000000..1a3cc51 --- /dev/null +++ b/spec/support/test_definitions.rb @@ -0,0 +1,259 @@ +module TestDefinitions + + module Worker + def self.perform(*args) + # nop + end + end + + class TestTaskFailed < StandardError + end + + module Support + + def iterator(task_count, *args) + task_count.times do |i| + yield i, *args + end + end + + # generate task methods so it's easy to see which task + # corresponds with each method when debugging specs + 20.times do |i| + define_method "task#{i}" do |*args| + Taskinator.logger.info(">>> Executing task #{__method__} [#{uuid}]...") + end + end + + def task_fail(*args) + raise TestTaskFailed + end + + def task_before_started(*args) + Taskinator.logger.info(">>> Executing before started task #{__method__} [#{uuid}]...") + end + + def task_after_completed(*args) + Taskinator.logger.info(">>> Executing after completed task #{__method__} [#{uuid}]...") + end + + def task_after_failed(*args) + Taskinator.logger.info(">>> Executing after failed task #{__method__} [#{uuid}]...") + end + + end + + module Definition + extend Taskinator::Definition + include Support + + end + + module Task + extend Taskinator::Definition + include Support + + define_process :task_count do + for_each :iterator do + task :task1, :queue => :foo + end + end + + end + + module TaskBeforeStarted + extend Taskinator::Definition + include Support + + define_process do + before_started :task_before_started + + task :task1 + end + end + + module TaskBeforeStartedSubProcess + extend Taskinator::Definition + include Support + + define_process do + sub_process TaskBeforeStarted + end + + end + + module TaskAfterCompleted + extend Taskinator::Definition + include Support + + define_process do + task :task1 + + after_completed :task_after_completed + end + + end + + module TaskAfterCompletedSubProcess + extend Taskinator::Definition + include Support + + define_process do + sub_process TaskAfterCompleted + end + + end + + module TaskAfterFailed + extend Taskinator::Definition + include Support + + define_process do + task :task_fail + + after_failed :task_after_failed + end + + end + + module TaskAfterFailedSubProcess + extend Taskinator::Definition + include Support + + define_process do + sub_process TaskAfterFailed + end + + end + + module Job + extend Taskinator::Definition + include Support + + define_process :task_count do + for_each :iterator do + job Worker + end + end + + end + + module SubProcess + extend Taskinator::Definition + include Support + + define_process :task_count do + sub_process Task + end + + end + + module Sequential + extend Taskinator::Definition + include Support + + define_process :task_count do + sequential do + for_each :iterator do + task :task1 + end + end + end + + end + + module Concurrent + extend Taskinator::Definition + include Support + + define_process :task_count do + concurrent do + for_each :iterator do + task :task1 + end + end + end + + end + + module EmptySequentialProcessTest + extend Taskinator::Definition + include Support + + define_process do + + task :task0 + + sequential do + # NB: empty! + end + + sequential do + task :task1 + end + + task :task2 + + end + end + + module EmptyConcurrentProcessTest + extend Taskinator::Definition + include Support + + define_process do + + task :task0 + + concurrent do + # NB: empty! + end + + concurrent do + task :task1 + end + + task :task2 + + end + end + + module NestedTask + extend Taskinator::Definition + include Support + + define_process :task_count do + task :task1 + + concurrent do + task :task2 + task :task3 + + sequential do + task :task4 + task :task5 + + concurrent do + task :task6 + task :task7 + + sequential do + task :task8 + task :task9 + + end + + task :task10 + end + + task :task11 + end + + task :task12 + end + + task :task13 + end + end + +end diff --git a/spec/support/test_flow.rb b/spec/support/test_flow.rb deleted file mode 100644 index c2fd114..0000000 --- a/spec/support/test_flow.rb +++ /dev/null @@ -1,84 +0,0 @@ -module TestFlow - extend Taskinator::Definition - - define_process :some_arg1, :some_arg2 do - - # TODO: add support for "continue_on_error" - task :error_task, :continue_on_error => true - - task :the_task - - for_each :iterator do - task :the_task - end - - for_each :iterator, :sub_option => 1 do - task :the_task - end - - sequential do - task :the_task - task :the_task - task :the_task - end - - task :the_task - - concurrent do - 20.times do |i| - task :the_task - end - task :the_task - end - - task :the_task - - # invoke the specified sub process - sub_process TestSubFlow - - job TestWorkerJob - end - - def error_task(*args) - raise "It's a huge problem!" - end - - # note: arg1 and arg2 are passed in all the way from the - # definition#create_process method - def iterator(arg1, arg2, options={}) - 3.times do |i| - yield [arg1, arg2, i] - end - end - - def the_task(*args) - t = rand(1..11) - Taskinator.logger.info "Executing task '#{task}' with [#{args}] for #{t} secs..." - sleep 1 # 1 - end - - module TestSubFlow - extend Taskinator::Definition - - define_process :some_arg1, :some_arg2 do - task :the_task - task :the_task - task :the_task - end - - def the_task(*args) - t = rand(1..11) - Taskinator.logger.info "Executing sub task '#{task}' with [#{args}] for #{t} secs..." - sleep 1 # t - end - end - - module TestWorkerJob - def self.perform(*args) - end - - def perform(*args) - end - end - -end diff --git a/spec/support/test_flows.rb b/spec/support/test_flows.rb deleted file mode 100644 index 739c00a..0000000 --- a/spec/support/test_flows.rb +++ /dev/null @@ -1,173 +0,0 @@ -module TestFlows - - module Worker - def self.perform(*args) - # nop - end - end - - module Support - - def iterator(task_count, *args) - task_count.times do |i| - yield i, *args - end - end - - def do_task(*args) - Taskinator.logger.info(">>> Executing task do_task [#{uuid}]...") - end - - # just create lots of these, so it's easy to see which task - # corresponds with each method when debugging specs - 20.times do |i| - define_method "task_#{i}" do |*args| - Taskinator.logger.info(">>> Executing task #{__method__} [#{uuid}]...") - end - end - - end - - module Task - extend Taskinator::Definition - include Support - - define_process :task_count do - for_each :iterator do - task :do_task, :queue => :foo - end - end - - end - - module Job - extend Taskinator::Definition - include Support - - define_process :task_count do - for_each :iterator do - job Worker - end - end - - end - - module SubProcess - extend Taskinator::Definition - include Support - - define_process :task_count do - sub_process Task - end - - end - - module Sequential - extend Taskinator::Definition - include Support - - define_process :task_count do - sequential do - for_each :iterator do - task :do_task - end - end - end - - end - - module Concurrent - extend Taskinator::Definition - include Support - - define_process :task_count do - concurrent do - for_each :iterator do - task :do_task - end - end - end - - end - - module EmptySequentialProcessTest - extend Taskinator::Definition - include Support - - define_process do - - task :task_0 - - sequential do - # NB: empty! - end - - sequential do - task :task_1 - end - - task :task_2 - - end - end - - module EmptyConcurrentProcessTest - extend Taskinator::Definition - include Support - - define_process do - - task :task_0 - - concurrent do - # NB: empty! - end - - concurrent do - task :task_1 - end - - task :task_2 - - end - end - - module NestedTask - extend Taskinator::Definition - include Support - - define_process :task_count do - task :task_1 - - concurrent do - task :task_2 - task :task_3 - - sequential do - task :task_4 - task :task_5 - - concurrent do - task :task_6 - task :task_7 - - sequential do - task :task_8 - task :task_9 - - end - - task :task_10 - end - - task :task_11 - end - - task :task_12 - end - - task :task_13 - end - end - -end diff --git a/spec/taskinator/builder_spec.rb b/spec/taskinator/builder_spec.rb index 6bece90..fc84742 100644 --- a/spec/taskinator/builder_spec.rb +++ b/spec/taskinator/builder_spec.rb @@ -6,7 +6,10 @@ Module.new do extend Taskinator::Definition - def iterator_method(*); end + def iterator_method(*args) + yield *args + end + def task_method(*); end end end @@ -50,6 +53,110 @@ def task_method(*); end expect(block).to_not receive(:call) subject.option?(:unspecified, &define_block) end + + describe "scopes" do + it "base" do + expect(block).to receive(:call) + blk = define_block + + definition.define_process :a, :b do + option?(:option1, &blk) + end + + definition.create_process(*args, builder_options) + end + + it "sequential" do + expect(block).to receive(:call) + blk = define_block + + definition.define_process :a, :b do + sequential do + option?(:option1, &blk) + end + end + + definition.create_process(*args, builder_options) + end + + it "concurrent" do + expect(block).to receive(:call) + blk = define_block + + definition.define_process :a, :b do + concurrent do + option?(:option1, &blk) + end + end + + definition.create_process(*args, builder_options) + end + + it "for_each" do + expect(block).to receive(:call) + blk = define_block + + definition.define_process :a, :b do + for_each :iterator_method do + option?(:option1, &blk) + end + end + + definition.create_process(*args, builder_options) + end + + it "nested" do + expect(block).to receive(:call) + blk = define_block + + definition.define_process :a, :b do + concurrent do + sequential do + for_each :iterator_method do + option?(:option1, &blk) + end + end + end + end + + definition.create_process(*args, builder_options) + end + + it "sub-process" do + expect(block).to receive(:call).exactly(4).times + blk = define_block + + sub_definition = Module.new do + extend Taskinator::Definition + + define_process do + option?(:option1, &blk) #1 + + sequential do + option?(:option1, &blk) #2 + end + + concurrent do + option?(:option1, &blk) #3 + end + + for_each :iterator_method do + option?(:option1, &blk) #4 + end + end + + def iterator_method(*args) + yield *args + end + end + + definition.define_process :a, :b do + sub_process sub_definition + end + + definition.create_process(*args, builder_options) + end + end end describe "#sequential" do @@ -212,6 +319,12 @@ def task_method(*); end expect(Taskinator::Task).to receive(:define_step_task).with(process, :task_method, args, builder_options.merge(options)) subject.task(:task_method, options) end + + it "adds task to process" do + expect { + subject.task(:task_method) + }.to change { process.tasks.count }.by(1) + end end describe "#job" do @@ -239,6 +352,102 @@ def task_method(*); end expect(Taskinator::Task).to receive(:define_job_task).with(process, job, args, builder_options.merge(options)) subject.job(job, options) end + + it "adds job to process" do + expect { + subject.task(:task_method) + }.to change { process.tasks.count }.by(1) + end + end + + describe "#before_started" do + it "creates a task" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options) + subject.before_started(:task_method) + end + + it "fails if task method is nil" do + expect { + subject.before_started(nil) + }.to raise_error(ArgumentError) + end + + it "fails if task method is not defined" do + expect { + subject.before_started(:undefined) + }.to raise_error(NoMethodError) + end + + it "includes options" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options.merge(options)) + subject.before_started(:task_method, options) + end + + it "adds task to process" do + expect { + subject.before_started(:task_method) + }.to change { process.before_started_tasks.count }.by(1) + end + end + + describe "#after_completed" do + it "creates a task" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options) + subject.after_completed(:task_method) + end + + it "fails if task method is nil" do + expect { + subject.after_completed(nil) + }.to raise_error(ArgumentError) + end + + it "fails if task method is not defined" do + expect { + subject.after_completed(:undefined) + }.to raise_error(NoMethodError) + end + + it "includes options" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options.merge(options)) + subject.after_completed(:task_method, options) + end + + it "adds task to process" do + expect { + subject.after_completed(:task_method) + }.to change { process.after_completed_tasks.count }.by(1) + end + end + + describe "#after_failed" do + it "creates a task" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options) + subject.after_failed(:task_method) + end + + it "fails if task method is nil" do + expect { + subject.after_failed(nil) + }.to raise_error(ArgumentError) + end + + it "fails if method is not defined" do + expect { + subject.after_failed(:undefined) + }.to raise_error(NoMethodError) + end + + it "includes options" do + expect(Taskinator::Task).to receive(:define_hook_task).with(process, :task_method, args, builder_options.merge(options)) + subject.after_failed(:task_method, options) + end + + it "adds task to process" do + expect { + subject.after_failed(:task_method) + }.to change { process.after_failed_tasks.count }.by(1) + end end describe "#sub_process" do @@ -272,16 +481,16 @@ def task_method(*); end block = Proc.new {|p| p.task :task_method } - expect(process.tasks).to be_empty - subject.sequential(options, &block) - expect(process.tasks).to_not be_empty + expect { + subject.sequential(options, &block) + }.to change { process.tasks.count }.by(1) end it "ignores sub-processes without tasks" do allow(block).to receive(:call) - expect(process.tasks).to be_empty - subject.sequential(options, &define_block) - expect(process.tasks).to be_empty + expect { + subject.sequential(options, &define_block) + }.to_not change { process.tasks.count } end end diff --git a/spec/taskinator/complex_process_spec.rb b/spec/taskinator/complex_process_spec.rb deleted file mode 100644 index 4843275..0000000 --- a/spec/taskinator/complex_process_spec.rb +++ /dev/null @@ -1,23 +0,0 @@ -require 'spec_helper' - -describe TestFlow, :redis => true do - it "should persist and retrieve" do - processA = TestFlow.create_process(:arg1, :arg2) - - processB = Taskinator::Process.fetch(processA.uuid) - - expect(processB.uuid).to eq(processA.uuid) - expect(processB.definition).to eq(processA.definition) - expect(processB.options).to eq(processA.options) - - expect(processB.tasks.count).to eq(processA.tasks.count) - - tasks = processA.tasks.zip(processB.tasks) - - tasks.each do |(taskB, taskA)| - expect(taskA.process).to eq(taskB.process) - expect(taskA.uuid).to eq(taskB.uuid) - expect(taskA.options).to eq(taskB.options) - end - end -end diff --git a/spec/taskinator/instrumentation_spec.rb b/spec/taskinator/instrumentation_spec.rb index 67cac51..0e9fe02 100644 --- a/spec/taskinator/instrumentation_spec.rb +++ b/spec/taskinator/instrumentation_spec.rb @@ -18,7 +18,7 @@ def self.base_key def initialize @uuid = Taskinator.generate_uuid @options = { :bar => :baz } - @definition = TestDefinition + @definition = TestDefinitions::Definition end end @@ -49,54 +49,137 @@ def initialize } end - describe "#enqueued_payload" do - pending - end + describe "#payload_for" do + [ + [ 1, 100, 0, 0, 0, 0, 0 ], + [ 2, 100, 10, 10, 0, 0, 20 ], + [ 3, 100, 20, 30, 0, 0, 50 ], + [ 4, 100, 25, 40, 0, 0, 65 ], + [ 5, 100, 0, 100, 0, 0, 100 ], + [ 6, 100, 0, 90, 1, 0, 91 ], + ].each do |(s, count, processing, completed, cancelled, failed, check)| + + it "scenario ##{s}" do + Taskinator.redis do |conn| + conn.hset(subject.key, :process_uuid, subject.uuid) + conn.hmset( + subject.process_key, + [:options, YAML.dump({:foo => :bar})], + [:tasks_count, count], + [:tasks_processing, processing], + [:tasks_completed, completed], + [:tasks_cancelled, cancelled], + [:tasks_failed, failed] + ) + end + + # private method, so use "send" + payload = subject.send(:payload_for, "baz", {:qux => :quuz}) + + expect(payload.instance_eval { + percentage_failed + + percentage_cancelled + + percentage_processing + + percentage_completed + }).to eq(check) + + expect(payload).to eq( + OpenStruct.new({ + :type => subject.class.name, + :definition => subject.definition.name, + :process_uuid => subject.uuid, + :process_options => {:foo => :bar}, + :uuid => subject.uuid, + :options => subject.options, + :state => "baz", + :percentage_failed => (failed / count.to_f) * 100.0, + :percentage_cancelled => (cancelled / count.to_f) * 100.0, + :percentage_processing => (processing / count.to_f) * 100.0, + :percentage_completed => (completed / count.to_f) * 100.0, + :qux => :quuz + }) + ) - describe "#processing_payload" do - pending + end + end end - describe "#completed_payload" do - it { + describe "payloads per state" do + let(:additional) { {:qux => :quuz} } + + def payload_for(state, moar={}) + OpenStruct.new({ + :type => subject.class.name, + :definition => subject.definition.name, + :process_uuid => subject.uuid, + :process_options => {:foo => :bar}, + :uuid => subject.uuid, + :options => subject.options, + :state => state, + :percentage_failed => 40.0, + :percentage_cancelled => 30.0, + :percentage_processing => 10.0, + :percentage_completed => 20.0, + }.merge(additional).merge(moar)) + end + + before do Taskinator.redis do |conn| conn.hset(subject.key, :process_uuid, subject.uuid) conn.hmset( subject.process_key, [:options, YAML.dump({:foo => :bar})], [:tasks_count, 100], - [:tasks_processing, 1], - [:tasks_completed, 2], - [:tasks_cancelled, 3], - [:tasks_failed, 4] + [:tasks_processing, 10], + [:tasks_completed, 20], + [:tasks_cancelled, 30], + [:tasks_failed, 40] ) end + end - expect(subject.completed_payload(:baz => :qux)).to eq( - OpenStruct.new({ - :type => subject.class.name, - :definition => subject.definition.name, - :process_uuid => subject.uuid, - :process_options => {:foo => :bar}, - :uuid => subject.uuid, - :state => :completed, - :options => subject.options, - :percentage_processing => 1.0, - :percentage_completed => 2.0, - :percentage_cancelled => 3.0, - :percentage_failed => 4.0, - :baz => :qux - }) - ) - } - end + describe "#enqueued_payload" do + it { + expect(subject.enqueued_payload(additional)).to eq(payload_for(:enqueued)) + } + end - describe "#cancelled_payload" do - pending - end + describe "#processing_payload" do + it { + expect(subject.processing_payload(additional)).to eq(payload_for(:processing)) + } + end - describe "#failed_payload" do - pending - end + describe "#paused_payload" do + it { + expect(subject.paused_payload(additional)).to eq(payload_for(:paused)) + } + end + describe "#resumed_payload" do + it { + expect(subject.resumed_payload(additional)).to eq(payload_for(:resumed)) + } + end + + describe "#completed_payload" do + it { + expect(subject.completed_payload(additional)).to eq(payload_for(:completed)) + } + end + + describe "#cancelled_payload" do + it { + expect(subject.cancelled_payload(additional)).to eq(payload_for(:cancelled)) + } + end + + describe "#failed_payload" do + it { + err = StandardError.new + expect(subject.failed_payload(err, additional)).to eq( + payload_for(:failed, :exception => err.to_s, :backtrace => err.backtrace)) + } + end + end end diff --git a/spec/taskinator/persistence_spec.rb b/spec/taskinator/persistence_spec.rb index 2b2ac08..32e2e2f 100644 --- a/spec/taskinator/persistence_spec.rb +++ b/spec/taskinator/persistence_spec.rb @@ -2,7 +2,7 @@ describe Taskinator::Persistence, :redis => true do - let(:definition) { TestDefinition } + let(:definition) { TestDefinitions::Definition } describe "class methods" do subject { @@ -336,7 +336,7 @@ def initialize describe "#to_xml" do it { - process = TestFlows::Task.create_process(1) + process = TestDefinitions::NestedTask.create_process(1) expect(process.to_xml).to match(/xml/) } end @@ -493,14 +493,17 @@ def initialize describe "#cleanup" do [ - TestFlows::Task, - TestFlows::Job, - TestFlows::SubProcess, - TestFlows::Sequential, - TestFlows::Concurrent, - TestFlows::EmptySequentialProcessTest, - TestFlows::EmptyConcurrentProcessTest, - TestFlows::NestedTask, + TestDefinitions::Task, + TestDefinitions::Job, + TestDefinitions::SubProcess, + TestDefinitions::Sequential, + TestDefinitions::Concurrent, + TestDefinitions::EmptySequentialProcessTest, + TestDefinitions::EmptyConcurrentProcessTest, + TestDefinitions::NestedTask, + TestDefinitions::TaskBeforeStarted, + TestDefinitions::TaskAfterCompleted, + TestDefinitions::TaskAfterFailed, ].each do |definition| describe "#{definition.name} expire immediately" do @@ -531,7 +534,7 @@ def initialize # sanity check expect(conn.keys).to be_empty - process = TestFlows::Task.create_process(1) + process = TestDefinitions::Task.create_process(1) # sanity check expect(conn.hget(process.key, :uuid)).to eq(process.uuid) @@ -554,4 +557,5 @@ def initialize end end + end diff --git a/spec/taskinator/process_spec.rb b/spec/taskinator/process_spec.rb index 84ffec9..ee7953a 100644 --- a/spec/taskinator/process_spec.rb +++ b/spec/taskinator/process_spec.rb @@ -2,7 +2,7 @@ describe Taskinator::Process do - let(:definition) { TestDefinition } + let(:definition) { TestDefinitions::Definition } describe "Base" do @@ -35,6 +35,18 @@ it { expect(subject.tasks).to be_a(Taskinator::Tasks) } end + describe "#before_started_tasks" do + it { expect(subject.before_started_tasks).to be_a(Taskinator::Tasks) } + end + + describe "#after_completed_tasks" do + it { expect(subject.after_completed_tasks).to be_a(Taskinator::Tasks) } + end + + describe "#after_failed_tasks" do + it { expect(subject.after_failed_tasks).to be_a(Taskinator::Tasks) } + end + describe "#no_tasks_defined?" do it { expect(subject.no_tasks_defined?).to be } it { @@ -91,6 +103,14 @@ subject.start! expect(subject.current_state).to eq(:processing) } + + it "enqueues before_started_tasks" do + task = Class.new(Taskinator::Task).new(subject) + expect(task).to receive(:enqueue!) + subject.before_started_tasks << task + + subject.start! + end end describe "#cancel!" do @@ -149,6 +169,14 @@ subject.complete! expect(subject.current_state).to eq(:completed) } + + it "enqueues after_completed_tasks" do + task = Class.new(Taskinator::Task).new(subject) + expect(task).to receive(:enqueue!) + subject.after_completed_tasks << task + + subject.complete! + end end describe "#fail!" do @@ -164,6 +192,14 @@ subject.fail!(StandardError.new) expect(subject.current_state).to eq(:failed) } + + it "enqueues after_failed_tasks" do + task = Class.new(Taskinator::Task).new(subject) + expect(task).to receive(:enqueue!) + subject.after_failed_tasks << task + + subject.fail!(StandardError.new) + end end end @@ -199,7 +235,10 @@ expect(visitor).to receive(:visit_attribute).with(:uuid) expect(visitor).to receive(:visit_args).with(:options) expect(visitor).to receive(:visit_task_reference).with(:parent) - expect(visitor).to receive(:visit_tasks) + expect(visitor).to receive(:visit_tasks).with(subject.tasks) + expect(visitor).to receive(:visit_before_started_tasks).with(subject.before_started_tasks) + expect(visitor).to receive(:visit_after_completed_tasks).with(subject.after_completed_tasks) + expect(visitor).to receive(:visit_after_failed_tasks).with(subject.after_failed_tasks) expect(visitor).to receive(:visit_attribute).with(:scope) expect(visitor).to receive(:visit_attribute).with(:queue) expect(visitor).to receive(:visit_attribute_time).with(:created_at) @@ -208,13 +247,6 @@ subject.accept(visitor) } end - - describe "#tasks_count" do - it { - expect(subject.tasks_count).to eq(0) - } - end - end describe Taskinator::Process::Sequential do @@ -400,7 +432,10 @@ expect(visitor).to receive(:visit_attribute).with(:uuid) expect(visitor).to receive(:visit_args).with(:options) expect(visitor).to receive(:visit_task_reference).with(:parent) - expect(visitor).to receive(:visit_tasks) + expect(visitor).to receive(:visit_tasks).with(subject.tasks) + expect(visitor).to receive(:visit_before_started_tasks).with(subject.before_started_tasks) + expect(visitor).to receive(:visit_after_completed_tasks).with(subject.after_completed_tasks) + expect(visitor).to receive(:visit_after_failed_tasks).with(subject.after_failed_tasks) expect(visitor).to receive(:visit_attribute).with(:scope) expect(visitor).to receive(:visit_attribute).with(:queue) expect(visitor).to receive(:visit_attribute_time).with(:created_at) @@ -685,7 +720,10 @@ expect(visitor).to receive(:visit_attribute_enum).with(:complete_on, Taskinator::CompleteOn) expect(visitor).to receive(:visit_args).with(:options) expect(visitor).to receive(:visit_task_reference).with(:parent) - expect(visitor).to receive(:visit_tasks) + expect(visitor).to receive(:visit_tasks).with(subject.tasks) + expect(visitor).to receive(:visit_before_started_tasks).with(subject.before_started_tasks) + expect(visitor).to receive(:visit_after_completed_tasks).with(subject.after_completed_tasks) + expect(visitor).to receive(:visit_after_failed_tasks).with(subject.after_failed_tasks) expect(visitor).to receive(:visit_attribute).with(:scope) expect(visitor).to receive(:visit_attribute).with(:queue) expect(visitor).to receive(:visit_attribute_time).with(:created_at) diff --git a/spec/taskinator/task_spec.rb b/spec/taskinator/task_spec.rb index c9f8cd2..f89af36 100644 --- a/spec/taskinator/task_spec.rb +++ b/spec/taskinator/task_spec.rb @@ -2,7 +2,7 @@ describe Taskinator::Task do - let(:definition) { TestDefinition } + let(:definition) { TestDefinitions::Definition } let(:process) do Class.new(Taskinator::Process) do @@ -164,25 +164,17 @@ subject.accept(visitor) } end - - describe "#tasks_count" do - it { - process_uuid = SecureRandom.hex - allow(subject).to receive(:process_uuid) { process_uuid } - expect(subject.tasks_count).to eq(0) - } - end end describe Taskinator::Task::Step do - subject { Taskinator::Task.define_step_task(process, :do_task, {:a => 1, :b => 2}) } + subject { Taskinator::Task.define_step_task(process, :task1, {:a => 1, :b => 2}) } it_should_behave_like "a task", Taskinator::Task::Step describe ".define_step_task" do it "sets the queue to use" do - task = Taskinator::Task.define_step_task(process, :do_task, {:a => 1, :b => 2}, :queue => :foo) + task = Taskinator::Task.define_step_task(process, :task1, {:a => 1, :b => 2}, :queue => :foo) expect(task.queue).to eq(:foo) end end @@ -290,7 +282,7 @@ end end - describe "#complete" do + describe "#complete!" do it "notifies parent process" do expect(process).to receive(:task_completed).with(subject) @@ -312,6 +304,206 @@ end end + describe "#fail!" do + it "notifies parent process" do + err = StandardError.new + expect(process).to receive(:task_failed).with(subject, err) + + subject.fail!(err) + end + + it "is instrumented" do + err = StandardError.new + allow(process).to receive(:task_failed).with(subject, err) + + instrumentation_block = SpecSupport::Block.new + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.failed') + end + + TestInstrumenter.subscribe(instrumentation_block, /taskinator.task/) do + subject.fail!(err) + end + end + end + + describe "#accept" do + it { + expect(subject).to receive(:accept) + subject.save + } + + it { + visitor = double('visitor') + expect(visitor).to receive(:visit_type).with(:definition) + expect(visitor).to receive(:visit_attribute).with(:uuid) + expect(visitor).to receive(:visit_process_reference).with(:process) + expect(visitor).to receive(:visit_task_reference).with(:next) + expect(visitor).to receive(:visit_args).with(:options) + expect(visitor).to receive(:visit_attribute).with(:method) + expect(visitor).to receive(:visit_args).with(:args) + expect(visitor).to receive(:visit_attribute).with(:queue) + expect(visitor).to receive(:visit_attribute_time).with(:created_at) + expect(visitor).to receive(:visit_attribute_time).with(:updated_at) + + subject.accept(visitor) + } + end + + describe "#inspect" do + it { expect(subject.inspect).to_not be_nil } + it { expect(subject.inspect).to include(definition.name) } + end + end + + describe Taskinator::Task::Hook do + + subject { Taskinator::Task.define_hook_task(process, :task1, {:a => 1, :b => 2}) } + + it_should_behave_like "a task", Taskinator::Task::Hook + + describe ".define_hook_task" do + it "sets the queue to use" do + task = Taskinator::Task.define_hook_task(process, :task1, {:a => 1, :b => 2}, :queue => :foo) + expect(task.queue).to eq(:foo) + end + end + + describe "#executor" do + it { expect(subject.executor).to_not be_nil } + it { expect(subject.executor).to be_a(definition) } + + it "handles failure" do + error = StandardError.new + allow(subject.executor).to receive(subject.method).with(*subject.args).and_raise(error) + expect(subject).to receive(:fail!).with(error) + expect { + subject.start! + }.to raise_error(error) + end + end + + describe "#enqueue!" do + it { + expect { + subject.enqueue! + }.to change { Taskinator.queue.tasks.length }.by(1) + } + + it "is instrumented" do + allow(subject.executor).to receive(subject.method).with(*subject.args) + + instrumentation_block = SpecSupport::Block.new + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.enqueued') + end + + TestInstrumenter.subscribe(instrumentation_block, /taskinator.task/) do + subject.enqueue! + end + end + end + + describe "#start!" do + before do + allow(process).to receive(:task_completed).with(subject) + end + + it "invokes executor" do + expect(subject.executor).to receive(subject.method).with(*subject.args) + subject.start! + end + + it "provides execution context" do + executor = Taskinator::Executor.new(definition, subject) + + method = subject.method + + executor.singleton_class.class_eval do + define_method method do |*args| + # this method executes in the scope of the executor + # store the context in an instance variable + @exec_context = self + end + end + + # replace the internal executor instance for the task + # with this one, so we can hook into the methods + subject.instance_eval { @executor = executor } + + # task start will invoke the method on the executor + subject.start! + + # extract the instance variable + exec_context = executor.instance_eval { @exec_context } + + expect(exec_context).to eq(executor) + expect(exec_context.uuid).to eq(subject.uuid) + expect(exec_context.options).to eq(subject.options) + end + + it "is instrumented" do + instrumentation_block = SpecSupport::Block.new + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.processing') + end + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.completed') + end + + TestInstrumenter.subscribe(instrumentation_block) do + subject.start! + end + end + end + + describe "#complete!" do + it "does not notify parent process" do + expect(process).to_not receive(:task_completed).with(subject) + + subject.complete! + end + + it "is instrumented" do + allow(process).to receive(:task_completed).with(subject) + + instrumentation_block = SpecSupport::Block.new + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.completed') + end + + TestInstrumenter.subscribe(instrumentation_block, /taskinator.task/) do + subject.complete! + end + end + end + + describe "#fail!" do + it "does not notify parent process" do + err = StandardError.new + expect(process).to_not receive(:task_failed).with(subject, err) + + subject.fail!(err) + end + + it "is instrumented" do + instrumentation_block = SpecSupport::Block.new + + expect(instrumentation_block).to receive(:call) do |*args| + expect(args.first).to eq('taskinator.task.failed') + end + + TestInstrumenter.subscribe(instrumentation_block, /taskinator.task/) do + subject.fail!(StandardError.new) + end + end + end + describe "#accept" do it { expect(subject).to receive(:accept) diff --git a/spec/taskinator/test_flows_spec.rb b/spec/taskinator/test_definitions_spec.rb similarity index 75% rename from spec/taskinator/test_flows_spec.rb rename to spec/taskinator/test_definitions_spec.rb index 5366ec4..7e9676f 100644 --- a/spec/taskinator/test_flows_spec.rb +++ b/spec/taskinator/test_definitions_spec.rb @@ -1,12 +1,12 @@ require 'spec_helper' -describe TestFlows do +describe TestDefinitions do [ - TestFlows::Task, - TestFlows::Job, - TestFlows::SubProcess, - TestFlows::Sequential + TestDefinitions::Task, + TestDefinitions::Job, + TestDefinitions::SubProcess, + TestDefinitions::Sequential ].each do |definition| describe definition.name do @@ -92,10 +92,56 @@ Taskinator.queue_adapter = :test_queue_worker end + context "before_started" do + let(:definition) { TestDefinitions::TaskBeforeStarted } + subject { definition.create_process } + + it "invokes before_started task" do + expect(subject.before_started_tasks.count).to eq(1) + expect_any_instance_of(definition).to receive(:task_before_started) + + expect { + subject.enqueue! + }.to change { Taskinator.queue.tasks.length }.by(2) + end + end + + context "after_completed" do + let(:definition) { TestDefinitions::TaskAfterCompleted } + subject { definition.create_process } + + it "invokes after_completed task" do + expect(subject.after_completed_tasks.count).to eq(1) + expect_any_instance_of(definition).to receive(:task_after_completed) + + expect { + subject.enqueue! + }.to change { Taskinator.queue.tasks.length }.by(2) + end + end + + context "after_failed" do + let(:definition) { TestDefinitions::TaskAfterFailed } + subject { definition.create_process } + + it "invokes after_failed task" do + expect(subject.after_failed_tasks.count).to eq(1) + expect_any_instance_of(definition).to receive(:task_after_failed) + + expect { + begin + subject.enqueue! + rescue TestDefinitions::TestTaskFailed + # ignore error + end + }.to change { Taskinator.queue.tasks.length }.by(2) + end + end + context "empty subprocesses" do context "sequential" do - let(:definition) { TestFlows::EmptySequentialProcessTest } + let(:definition) { TestDefinitions::EmptySequentialProcessTest } subject { definition.create_process } it "contains 3 tasks" do @@ -103,9 +149,9 @@ end it "invokes each task" do - expect_any_instance_of(definition).to receive(:task_0) - expect_any_instance_of(definition).to receive(:task_1) - expect_any_instance_of(definition).to receive(:task_2) + expect_any_instance_of(definition).to receive(:task0) + expect_any_instance_of(definition).to receive(:task1) + expect_any_instance_of(definition).to receive(:task2) expect { subject.enqueue! @@ -114,7 +160,7 @@ end context "concurrent" do - let(:definition) { TestFlows::EmptyConcurrentProcessTest } + let(:definition) { TestDefinitions::EmptyConcurrentProcessTest } subject { definition.create_process } it "contains 3 tasks" do @@ -122,9 +168,9 @@ end it "invokes each task" do - expect_any_instance_of(definition).to receive(:task_0) - expect_any_instance_of(definition).to receive(:task_1) - expect_any_instance_of(definition).to receive(:task_2) + expect_any_instance_of(definition).to receive(:task0) + expect_any_instance_of(definition).to receive(:task1) + expect_any_instance_of(definition).to receive(:task2) expect { subject.enqueue! @@ -133,6 +179,54 @@ end end + + context "subprocesses" do + + context "before_started" do + let(:definition) { TestDefinitions::TaskBeforeStartedSubProcess } + subject { definition.create_process } + + it "invokes before_started task" do + expect_any_instance_of(definition).to receive(:task_before_started) + + expect { + subject.enqueue! + }.to change { Taskinator.queue.tasks.length }.by(1) + end + end + + context "after_completed" do + let(:definition) { TestDefinitions::TaskAfterCompletedSubProcess } + subject { definition.create_process } + + it "invokes after_completed task" do + expect_any_instance_of(definition).to receive(:task_after_completed) + + expect { + subject.enqueue! + }.to change { Taskinator.queue.tasks.length }.by(2) + end + end + + context "after_failed" do + let(:definition) { TestDefinitions::TaskAfterFailedSubProcess } + subject { definition.create_process } + + it "invokes after_failed task" do + expect_any_instance_of(definition).to receive(:task_after_failed) + + expect { + begin + subject.enqueue! + rescue TestDefinitions::TestTaskFailed + # ignore error + end + }.to change { Taskinator.queue.tasks.length }.by(2) + end + end + + end + end describe "statuses" do @@ -146,7 +240,7 @@ end let(:task_count) { 2 } - let(:definition) { TestFlows::Task } + let(:definition) { TestDefinitions::Task } subject { definition.create_process(task_count) } it "reports process and task state" do @@ -184,7 +278,7 @@ pending end - describe "subprocess" do + describe "sub_process" do pending end end @@ -200,7 +294,7 @@ end let(:task_count) { 10 } - let(:definition) { TestFlows::Task } + let(:definition) { TestDefinitions::Task } subject { definition.create_process(task_count) } it "reports task completed" do @@ -266,7 +360,7 @@ end let(:task_count) { 10 } - let(:definition) { TestFlows::Job } + let(:definition) { TestDefinitions::Job } subject { definition.create_process(task_count) } it "reports task completed" do @@ -322,7 +416,7 @@ end - describe "sub process" do + describe "sub_process" do before do # override enqueue allow_any_instance_of(Taskinator::Task::Step).to receive(:enqueue!) { |task| @@ -338,7 +432,7 @@ end let(:task_count) { 10 } - let(:definition) { TestFlows::SubProcess } + let(:definition) { TestDefinitions::SubProcess } subject { definition.create_process(task_count) } it "reports task completed" do diff --git a/spec/taskinator/visitor_spec.rb b/spec/taskinator/visitor_spec.rb index f8f3d1a..6934910 100644 --- a/spec/taskinator/visitor_spec.rb +++ b/spec/taskinator/visitor_spec.rb @@ -1,14 +1,11 @@ require 'spec_helper' -describe Taskinator::Visitor::Base do +describe "Visitors" do - it { respond_to(:visit_process) } - it { respond_to(:visit_tasks) } - it { respond_to(:visit_attribute) } - it { respond_to(:visit_process_reference) } - it { respond_to(:visit_task_reference) } - it { respond_to(:visit_type) } - it { respond_to(:visit_args) } - it { respond_to(:task_count) } + it_should_behave_like "a visitor", Taskinator::Visitor::Base + it_should_behave_like "a visitor", Taskinator::Persistence::RedisSerializationVisitor + it_should_behave_like "a visitor", Taskinator::Persistence::XmlSerializationVisitor + it_should_behave_like "a visitor", Taskinator::Persistence::RedisDeserializationVisitor + it_should_behave_like "a visitor", Taskinator::Persistence::RedisCleanupVisitor end diff --git a/tasks_workflow.png b/tasks_workflow.png deleted file mode 100644 index d92d1c5..0000000 Binary files a/tasks_workflow.png and /dev/null differ