diff --git a/lib/sidekiq/grouping.rb b/lib/sidekiq/grouping.rb index a0e1235..d70fcc8 100644 --- a/lib/sidekiq/grouping.rb +++ b/lib/sidekiq/grouping.rb @@ -16,6 +16,11 @@ module Grouping autoload :Middleware, "sidekiq/grouping/middleware" autoload :Flusher, "sidekiq/grouping/flusher" autoload :FlusherObserver, "sidekiq/grouping/flusher_observer" + autoload :SingletonBase, "sidekiq/grouping/singleton_base" + autoload :SingletonWorker, "sidekiq/grouping/singleton_worker" + autoload :SingletonWorkerConcern, "sidekiq/grouping/singleton_worker_concern" + autoload :SingletonFlusher, "sidekiq/grouping/singleton_flusher" + autoload :SingletonFlusherConcern, "sidekiq/grouping/singleton_flusher_concern" class << self attr_writer :logger diff --git a/lib/sidekiq/grouping/batch.rb b/lib/sidekiq/grouping/batch.rb index 4171ec3..e2b98a4 100644 --- a/lib/sidekiq/grouping/batch.rb +++ b/lib/sidekiq/grouping/batch.rb @@ -3,6 +3,8 @@ module Sidekiq module Grouping class Batch + include Sidekiq::Grouping::SingletonFlusherConcern + def initialize(worker_class, queue, _redis_pool = nil) @worker_class = worker_class @queue = queue @@ -50,6 +52,8 @@ def pluck end def flush + lock_singleton_flusher + chunk = pluck return unless chunk @@ -74,7 +78,7 @@ def worker_class_options end def could_flush? - could_flush_on_overflow? || could_flush_on_time? + could_flush_on_overflow? || could_flush_on_time? || could_flush_on_singleton_worker? end def last_execution_time diff --git a/lib/sidekiq/grouping/singleton_base.rb b/lib/sidekiq/grouping/singleton_base.rb new file mode 100644 index 0000000..f2399f1 --- /dev/null +++ b/lib/sidekiq/grouping/singleton_base.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + class SingletonBase + attr_reader :queue, :worker_class + + def initialize(queue, worker_class) + @queue = queue + @worker_class = worker_class + end + + def singleton_flusher_locked? + redis_client.get(flusher_key).present? + end + + def singleton_worker_locked? + redis_client.get(worker_key).present? + end + + def unlock_singleton_flusher + redis_client.del(flusher_key) + end + + private + + def flusher_key + @flusher_key ||= "#{worker_key}:flusher" + end + + def worker_key + @worker_key ||= "#{worker_class.to_s.underscore}:#{queue}" + end + + def redis_client + @redis_client ||= Sidekiq.redis { _1 } + end + end + end +end diff --git a/lib/sidekiq/grouping/singleton_flusher.rb b/lib/sidekiq/grouping/singleton_flusher.rb new file mode 100644 index 0000000..4fc3a75 --- /dev/null +++ b/lib/sidekiq/grouping/singleton_flusher.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + class SingletonFlusher < SingletonBase + attr_reader :flusher_lock_ttl + + def initialize(queue, worker_class, flusher_lock_ttl: 5) + super(queue, worker_class) + + @flusher_lock_ttl = flusher_lock_ttl + end + + def lock_singleton_flusher + raise "Singleton flusher lock for '#{flusher_key}' already exists" if singleton_flusher_locked? + + redis_client.set(flusher_key, Time.current.to_s, ex: flusher_lock_ttl) + end + end + end +end diff --git a/lib/sidekiq/grouping/singleton_flusher_concern.rb b/lib/sidekiq/grouping/singleton_flusher_concern.rb new file mode 100644 index 0000000..13fce4a --- /dev/null +++ b/lib/sidekiq/grouping/singleton_flusher_concern.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + module SingletonFlusherConcern + def could_flush_on_singleton_worker? + return true unless singleton_worker? + + !(singleton_worker_running? || singleton_flusher_locked?) + end + + def lock_singleton_flusher + # This lock is released within Sidekiq::Grouping::SingletonWorker#with_singleton_worker_lock. + # It prevents plucking again between the time when data is plucked and when the worker starts. + singleton_flusher.lock_singleton_flusher if singleton_worker? + end + + private + + def flusher_lock_ttl + worker_class_options["singleton_flusher_lock_ttl"] + end + + def singleton_flusher + @singleton_flusher ||= + Sidekiq::Grouping::SingletonFlusher.new(queue, worker_class, flusher_lock_ttl:) + end + + def singleton_flusher_locked? + @singleton_flusher.singleton_flusher_locked? + end + + def singleton_worker? + !!worker_class_options["singleton_worker"] + end + + def singleton_worker_running? + @singleton_flusher.singleton_worker_locked? + end + end + end +end diff --git a/lib/sidekiq/grouping/singleton_worker.rb b/lib/sidekiq/grouping/singleton_worker.rb new file mode 100644 index 0000000..1f68c74 --- /dev/null +++ b/lib/sidekiq/grouping/singleton_worker.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + class SingletonWorker < SingletonBase + attr_reader :worker_lock_ttl + + def initialize(queue, worker_class, worker_lock_ttl: 3_600) + super(queue, worker_class) + + @worker_lock_ttl = worker_lock_ttl + end + + def with_singleton_worker_lock(&) + lock_singleton_worker + + yield + ensure + unlock_singleton_worker + end + + def lock_singleton_worker + raise "Singleton lock for '#{worker_key}' already exists" if singleton_worker_locked? + + redis_client.set(worker_key, Time.current.to_s, ex: worker_lock_ttl) + ensure + unlock_singleton_flusher + end + + def unlock_singleton_worker + redis_client.del(worker_key) + end + + # Method handy for unlocking from the console in case of need. + def release_locks + unlock_singleton_worker + unlock_singleton_flusher + end + end + end +end diff --git a/lib/sidekiq/grouping/singleton_worker_concern.rb b/lib/sidekiq/grouping/singleton_worker_concern.rb new file mode 100644 index 0000000..af0f312 --- /dev/null +++ b/lib/sidekiq/grouping/singleton_worker_concern.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Sidekiq + module Grouping + module SingletonWorkerConcern + def with_singleton_worker_lock(&) + if singleton_worker? + singleton_worker.with_singleton_worker_lock(&) + else + yield + end + end + + private + + def singleton_worker + Sidekiq::Grouping::SingletonWorker.new( + sidekiq_options_hash['queue'], + self, + worker_lock_ttl: sidekiq_options_hash['singleton_worker_lock_ttl'] + ) + end + + def singleton_worker? + sidekiq_options_hash['singleton_worker'] + end + end + end +end