diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 99e14150..d10997c7 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -37,13 +37,5 @@ class Engine < ::Rails::Engine include ActiveJob::ConcurrencyControls end end - - initializer "solid_queue.include_interruptible_concern" do - if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new("3.2") - SolidQueue::Processes::Base.include SolidQueue::Processes::Interruptible - else - SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible - end - end end end diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 59ec9f1a..6069a90d 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -4,7 +4,7 @@ module SolidQueue module Processes class Base include Callbacks # Defines callbacks needed by other concerns - include AppExecutor, Registrable, Procline + include AppExecutor, Registrable, Interruptible, Procline attr_reader :name diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index b7755f1f..67173aeb 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -2,36 +2,36 @@ module SolidQueue::Processes module Interruptible - include SolidQueue::AppExecutor - def wake_up interrupt end private + SELF_PIPE_BLOCK_SIZE = 11 def interrupt - queue << true + self_pipe[:writer].write_nonblock(".") + rescue Errno::EAGAIN, Errno::EINTR + # Ignore writes that would block and retry + # if another signal arrived while writing + retry end - # Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up. - # @param time [Numeric, Duration] the time to sleep. 0 returns immediately. def interruptible_sleep(time) - # Invoking this from the main thread may result in significant slowdown. - # Utilizing asynchronous execution (Futures) addresses this performance issue. - Concurrent::Promises.future(time) do |timeout| - queue.clear unless queue.pop(timeout:).nil? - end.on_rejection! do |e| - wrapped_exception = RuntimeError.new("Interruptible#interruptible_sleep - #{e.class}: #{e.message}") - wrapped_exception.set_backtrace(e.backtrace) - handle_thread_error(wrapped_exception) - end.value + if time > 0 && self_pipe[:reader].wait_readable(time) + loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } + end + rescue Errno::EAGAIN, Errno::EINTR + end - nil + # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) + def self_pipe + @self_pipe ||= create_self_pipe end - def queue - @queue ||= Queue.new + def create_self_pipe + reader, writer = IO.pipe + { reader: reader, writer: writer } end end end diff --git a/lib/solid_queue/processes/og_interruptible.rb b/lib/solid_queue/processes/og_interruptible.rb deleted file mode 100644 index 7eff46f1..00000000 --- a/lib/solid_queue/processes/og_interruptible.rb +++ /dev/null @@ -1,39 +0,0 @@ -# frozen_string_literal: true - -module SolidQueue::Processes - # The original implementation of Interruptible that works - # with Ruby 3.1 and earlier - module OgInterruptible - def wake_up - interrupt - end - - private - SELF_PIPE_BLOCK_SIZE = 11 - - def interrupt - self_pipe[:writer].write_nonblock(".") - rescue Errno::EAGAIN, Errno::EINTR - # Ignore writes that would block and retry - # if another signal arrived while writing - retry - end - - def interruptible_sleep(time) - if time > 0 && self_pipe[:reader].wait_readable(time) - loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } - end - rescue Errno::EAGAIN, Errno::EINTR - end - - # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) - def self_pipe - @self_pipe ||= create_self_pipe - end - - def create_self_pipe - reader, writer = IO.pipe - { reader: reader, writer: writer } - end - end -end