Skip to content

Conversation

xaibot
Copy link
Collaborator

@xaibot xaibot commented May 30, 2024

When the worker passes the singleton_worker: true option, flush will not happen if there is a worker of this class/queue already running.

The worker is expected to include Sidekiq::Grouping::SingletonWorkerConcern and use with_singleton_worker_lock to wrap the critical operations it wants to prevent concurrency issues for.

By not launching another worker while there is already one running, we achieve 2 goals:

  1. improved data deduplication: since the data is left in redis deduplication continues happening while the active job is running.
  2. avoid losing data processing due to timeouts: if jobs were launched and blocked on another form of lock (ie: advisory lock), there is a risk of timeouts causing the data plucked for that job to not be processed.

@xaibot xaibot self-assigned this May 30, 2024
@xaibot xaibot requested a review from vitalinfo May 30, 2024 11:59
@xaibot
Copy link
Collaborator Author

xaibot commented May 30, 2024

@vitalinfo Please let me know your thoughts?

Notice that test coverage is pending, as well as manual testing.

I just want to run the concept (see PR description above) by you guys before proceeding any further.

@xaibot xaibot requested a review from jolim May 31, 2024 12:22
Copy link
Collaborator

@jolim jolim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The theory is great. I'm having trouble reading it through right now - it's definitely a big lift, but conceptually seems valid.

Copy link
Owner

@vitalinfo vitalinfo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not big fan to implement such big changes in the third-party gem

@flusher_lock_ttl = flusher_lock_ttl
end

def lock_singleton_flusher
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exclamation?

end

def lock_singleton_flusher
raise "Singleton flusher lock for '#{flusher_key}' already exists" if singleton_flusher_locked?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why raise?

class SingletonFlusher < SingletonBase
attr_reader :flusher_lock_ttl

def initialize(queue, worker_class, flusher_lock_ttl: 5)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants