Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Failed jobs and retries](#failed-jobs-and-retries)
- [Error reporting on jobs](#error-reporting-on-jobs)
- [Puma plugin](#puma-plugin)
- [Health-check HTTP server](#health-check-http-server)
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
- [Recurring tasks](#recurring-tasks)
- [Inspiration](#inspiration)
Expand Down Expand Up @@ -603,6 +604,30 @@ that you set in production only. This is what Rails 8's default Puma config look

**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.

## Health-check HTTP server

Solid Queue provides a tiny HTTP health-check server that runs as a supervised process.

- Endpoints:
- `/` and `/health`:
- Returns `200 OK` with body `OK` when the supervisor and all supervised processes (workers, dispatchers, scheduler, and the health server itself) have fresh heartbeats.
- Returns `503 Service Unavailable` with body `Unhealthy` if any supervised process (or the supervisor) has a stale heartbeat.
- Any other path: returns `404 Not Found`
- Configure via `config/queue.yml` under `health_server:`. Both `host` and `port` are required.

Enable and configure via process configuration:

```yml
production:
health_server:
host: 0.0.0.0
port: 9393
```

Note:
- This runs under the supervisor just like workers/dispatchers.
- When the Puma plugin is active (`plugin :solid_queue` in `puma.rb`), the configured health server is skipped to avoid running multiple HTTP servers in the same process tree. A warning is logged. If you need the health server, run Solid Queue outside Puma (for example, via `bin/jobs`) or disable the plugin on that instance.

## Jobs and transactional integrity
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.

Expand Down
2 changes: 2 additions & 0 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def start(launcher)

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
SolidQueue.puma_plugin = true
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
Expand All @@ -23,6 +24,7 @@ def start(launcher)
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
SolidQueue.puma_plugin = true
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
Expand Down
4 changes: 3 additions & 1 deletion lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

mattr_accessor :puma_plugin, default: false

delegate :on_start, :on_stop, :on_exit, to: Supervisor

[ Dispatcher, Scheduler, Worker ].each do |process|
[ Dispatcher, Scheduler, Worker, HealthServer ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start(&block)
end
Expand Down
42 changes: 38 additions & 4 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ class Configuration
validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool
validate :ensure_valid_health_server

class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
"SolidQueue::#{kind.to_s.camelize}".safe_constantize.new(**attributes)
end
end

Expand Down Expand Up @@ -38,7 +39,7 @@ def initialize(**options)
def configured_processes
if only_work? then workers
else
dispatchers + workers + schedulers
dispatchers + workers + schedulers + health_server
end
end

Expand Down Expand Up @@ -129,6 +130,31 @@ def schedulers
end
end

def health_server
if SolidQueue.puma_plugin
SolidQueue.logger&.warn("SolidQueue health server is configured but Puma plugin is active; skipping starting health server to avoid duplicate servers")
return []
end

options = health_server_options
return [] unless options

[ Process.new(:health_server, options) ]
end

def ensure_valid_health_server
server_options = health_server_options
return unless server_options

unless server_options[:host].present?
errors.add(:base, "Health server: host is required")
end

unless server_options.key?(:port) && server_options[:port].present?
errors.add(:base, "Health server: port is required")
end
end

def workers_options
@workers_options ||= processes_config.fetch(:workers, [])
.map { |options| options.dup.symbolize_keys }
Expand All @@ -139,6 +165,14 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def health_server_options
@health_server_options ||= begin
options = processes_config[:health_server]
options = options.dup.symbolize_keys if options
options.present? ? options : nil
end
end

def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
Expand All @@ -147,8 +181,8 @@ def recurring_tasks

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
options.slice(:workers, :dispatchers, :health_server).presence || options[:config_file],
keys: [ :workers, :dispatchers, :health_server ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
end

Expand Down
142 changes: 142 additions & 0 deletions lib/solid_queue/health_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# frozen_string_literal: true

require "socket"
require "logger"

module SolidQueue
class HealthServer < Processes::Base
include Processes::Runnable

attr_reader :host, :port, :logger

def initialize(host:, port:, logger: nil, **options)
@host = host
@port = port
@logger = logger || default_logger
@server = nil

super(**options)
end

def metadata
super.merge(host: host, port: port)
end

def running?
@thread&.alive?
end

private
def run
begin
@server = TCPServer.new(host, port)
log_info("listening on #{host}:#{port}")

loop do
break if shutting_down?

readables, = IO.select([ @server, self_pipe[:reader] ].compact, nil, nil, 1)
next unless readables

if readables.include?(self_pipe[:reader])
drain_self_pipe
end

if readables.include?(@server)
handle_connection
end
end
rescue Exception => exception
handle_thread_error(exception)
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
run_callbacks(:shutdown) { shutdown }
end
end
end

def handle_connection
socket = @server.accept_nonblock(exception: false)
return unless socket.is_a?(::TCPSocket)

begin
request_line = socket.gets
path = request_line&.split(" ")&.at(1) || "/"

if path == "/" || path == "/health"
if system_healthy?
body = "OK"
status_line = "HTTP/1.1 200 OK"
else
body = "Unhealthy"
status_line = "HTTP/1.1 503 Service Unavailable"
end
else
body = "Not Found"
status_line = "HTTP/1.1 404 Not Found"
end

headers = [
"Content-Type: text/plain",
"Content-Length: #{body.bytesize}",
"Connection: close"
].join("\r\n")

socket.write("#{status_line}\r\n#{headers}\r\n\r\n#{body}")
ensure
begin
socket.close
rescue StandardError
end
end
end

def shutdown
begin
@server&.close
rescue StandardError
ensure
@server = nil
end
end

def set_procline
procline "http #{host}:#{port}"
end

def default_logger
logger = Logger.new($stdout)
logger.level = Logger::INFO
logger.progname = "SolidQueueHTTP"
logger
end

def log_info(message)
logger&.info(message)
end

def drain_self_pipe
loop { self_pipe[:reader].read_nonblock(11) }
rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable
end

def system_healthy?
wrap_in_app_executor do
# If not supervised (e.g., unit tests), consider healthy
supervisor_record = process&.supervisor
return true unless supervisor_record

# Supervisor must be alive
supervisor_alive = SolidQueue::Process.where(id: supervisor_record.id).merge(SolidQueue::Process.prunable).none?

# All supervisees must be alive (including this health server)
supervisees_alive = supervisor_record.supervisees.merge(SolidQueue::Process.prunable).none?

supervisor_alive && supervisees_alive
end
rescue StandardError => error
log_info("health check error: #{error.class}: #{error.message}")
false
end
end
end
Loading