Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/sidekiq/grouping.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ module Grouping
autoload :Middleware, "sidekiq/grouping/middleware"
autoload :Flusher, "sidekiq/grouping/flusher"
autoload :FlusherObserver, "sidekiq/grouping/flusher_observer"
autoload :SingletonBase, "sidekiq/grouping/singleton_base"
autoload :SingletonWorker, "sidekiq/grouping/singleton_worker"
autoload :SingletonWorkerConcern, "sidekiq/grouping/singleton_worker_concern"
autoload :SingletonFlusher, "sidekiq/grouping/singleton_flusher"
autoload :SingletonFlusherConcern, "sidekiq/grouping/singleton_flusher_concern"

class << self
attr_writer :logger
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq/grouping/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module Sidekiq
module Grouping
class Batch
include Sidekiq::Grouping::SingletonFlusherConcern

def initialize(worker_class, queue, _redis_pool = nil)
@worker_class = worker_class
@queue = queue
Expand Down Expand Up @@ -50,6 +52,8 @@ def pluck
end

def flush
lock_singleton_flusher

chunk = pluck
return unless chunk

Expand All @@ -74,7 +78,7 @@ def worker_class_options
end

def could_flush?
could_flush_on_overflow? || could_flush_on_time?
could_flush_on_overflow? || could_flush_on_time? || could_flush_on_singleton_worker?
end

def last_execution_time
Expand Down
40 changes: 40 additions & 0 deletions lib/sidekiq/grouping/singleton_base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
class SingletonBase
attr_reader :queue, :worker_class

def initialize(queue, worker_class)
@queue = queue
@worker_class = worker_class
end

def singleton_flusher_locked?
redis_client.get(flusher_key).present?
end

def singleton_worker_locked?
redis_client.get(worker_key).present?
end

def unlock_singleton_flusher
redis_client.del(flusher_key)
end

private

def flusher_key
@flusher_key ||= "#{worker_key}:flusher"
end

def worker_key
@worker_key ||= "#{worker_class.to_s.underscore}:#{queue}"
end

def redis_client
@redis_client ||= Sidekiq.redis { _1 }
end
end
end
end
21 changes: 21 additions & 0 deletions lib/sidekiq/grouping/singleton_flusher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
class SingletonFlusher < SingletonBase
attr_reader :flusher_lock_ttl

def initialize(queue, worker_class, flusher_lock_ttl: 5)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds?

super(queue, worker_class)

@flusher_lock_ttl = flusher_lock_ttl
end

def lock_singleton_flusher
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclamation?

raise "Singleton flusher lock for '#{flusher_key}' already exists" if singleton_flusher_locked?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why raise?


redis_client.set(flusher_key, Time.current.to_s, ex: flusher_lock_ttl)
end
end
end
end
42 changes: 42 additions & 0 deletions lib/sidekiq/grouping/singleton_flusher_concern.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
module SingletonFlusherConcern
def could_flush_on_singleton_worker?
return true unless singleton_worker?

!(singleton_worker_running? || singleton_flusher_locked?)
end

def lock_singleton_flusher
# This lock is released within Sidekiq::Grouping::SingletonWorker#with_singleton_worker_lock.
# It prevents plucking again between the time when data is plucked and when the worker starts.
singleton_flusher.lock_singleton_flusher if singleton_worker?
end

private

def flusher_lock_ttl
worker_class_options["singleton_flusher_lock_ttl"]
end

def singleton_flusher
@singleton_flusher ||=
Sidekiq::Grouping::SingletonFlusher.new(queue, worker_class, flusher_lock_ttl:)
end

def singleton_flusher_locked?
@singleton_flusher.singleton_flusher_locked?
end

def singleton_worker?
!!worker_class_options["singleton_worker"]
end

def singleton_worker_running?
@singleton_flusher.singleton_worker_locked?
end
end
end
end
41 changes: 41 additions & 0 deletions lib/sidekiq/grouping/singleton_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
class SingletonWorker < SingletonBase
attr_reader :worker_lock_ttl

def initialize(queue, worker_class, worker_lock_ttl: 3_600)
super(queue, worker_class)

@worker_lock_ttl = worker_lock_ttl
end

def with_singleton_worker_lock(&)
lock_singleton_worker

yield
ensure
unlock_singleton_worker
end

def lock_singleton_worker
raise "Singleton lock for '#{worker_key}' already exists" if singleton_worker_locked?

redis_client.set(worker_key, Time.current.to_s, ex: worker_lock_ttl)
ensure
unlock_singleton_flusher
end

def unlock_singleton_worker
redis_client.del(worker_key)
end

# Method handy for unlocking from the console in case of need.
def release_locks
unlock_singleton_worker
unlock_singleton_flusher
end
end
end
end
29 changes: 29 additions & 0 deletions lib/sidekiq/grouping/singleton_worker_concern.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
module SingletonWorkerConcern
def with_singleton_worker_lock(&)
if singleton_worker?
singleton_worker.with_singleton_worker_lock(&)
else
yield
end
end

private

def singleton_worker
Sidekiq::Grouping::SingletonWorker.new(
sidekiq_options_hash['queue'],
self,
worker_lock_ttl: sidekiq_options_hash['singleton_worker_lock_ttl']
)
end

def singleton_worker?
sidekiq_options_hash['singleton_worker']
end
end
end
end