diff --git a/CHANGELOG.md b/CHANGELOG.md index 71d0d424..df44343a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ +- [Unreleased](#unreleased) - [2.4.1 \(2024-10-28\)](#241-2024-10-28) - [2.4.0 \(2024-08-21\)](#240-2024-08-21) - [2.3.0 \(2023-10-16\)](#230-2023-10-16) @@ -60,6 +61,25 @@ +## Unreleased + +- **Changed**: + + Replaced `md5()` with `hashtext()` in `que_job_notify()` function for PostgreSQL server running in FIPS mode. + +This release contains a database migration. You will need to migrate Que to the latest database schema version (8): + +```ruby +class UpdateQueTablesToVersion8 < ActiveRecord::Migration[7.0] + def up + Que.migrate!(version: 8) + end + + def down + Que.migrate!(version: 7) + end +end +``` + ## 2.4.1 (2024-10-28) - **Fixed**: diff --git a/README.md b/README.md index 91261691..08a107c1 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ class CreateQueSchema < ActiveRecord::Migration[6.0] # Whenever you use Que in a migration, always specify the version you're # migrating to. If you're unsure what the current version is, check the # changelog. - Que.migrate!(version: 7) + Que.migrate!(version: 8) end def down diff --git a/lib/que/migrations.rb b/lib/que/migrations.rb index 255ae6da..967a0291 100644 --- a/lib/que/migrations.rb +++ b/lib/que/migrations.rb @@ -4,7 +4,7 @@ module Que module Migrations # In order to ship a schema change, add the relevant up and down sql files # to the migrations directory, and bump the version here. - CURRENT_VERSION = 7 + CURRENT_VERSION = 8 class << self def migrate!(version:) diff --git a/lib/que/migrations/8/down.sql b/lib/que/migrations/8/down.sql new file mode 100644 index 00000000..d977595b --- /dev/null +++ b/lib/que/migrations/8/down.sql @@ -0,0 +1,59 @@ +-- Revert hashtext() back to md5() +CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the md5-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE + listening AND + queues @> ARRAY[NEW.queue] AND + ql.job_schema_version = NEW.job_schema_version + ORDER BY md5(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END +$$ +LANGUAGE plpgsql; diff --git a/lib/que/migrations/8/up.sql b/lib/que/migrations/8/up.sql new file mode 100644 index 00000000..ec2cfd0c --- /dev/null +++ b/lib/que/migrations/8/up.sql @@ -0,0 +1,59 @@ +-- Replace md5() with hashtext() for FIPS compliance +CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$ + DECLARE + locker_pid integer; + sort_key json; + BEGIN + -- Don't do anything if the job is scheduled for a future time. + IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN + RETURN null; + END IF; + + -- Pick a locker to notify of the job's insertion, weighted by their number + -- of workers. Should bounce pseudorandomly between lockers on each + -- invocation, hence the hashtext-ordering, but still touch each one equally, + -- hence the modulo using the job_id. + SELECT pid + INTO locker_pid + FROM ( + SELECT *, last_value(row_number) OVER () + 1 AS count + FROM ( + SELECT *, row_number() OVER () - 1 AS row_number + FROM ( + SELECT * + FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id + WHERE + listening AND + queues @> ARRAY[NEW.queue] AND + ql.job_schema_version = NEW.job_schema_version + ORDER BY hashtext(pid::text || id::text) + ) t1 + ) t2 + ) t3 + WHERE NEW.id % count = row_number; + + IF locker_pid IS NOT NULL THEN + -- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so + -- rather than throw errors when someone enqueues a big job, just + -- broadcast the most pertinent information, and let the locker query for + -- the record after it's taken the lock. The worker will have to hit the + -- DB in order to make sure the job is still visible anyway. + SELECT row_to_json(t) + INTO sort_key + FROM ( + SELECT + 'job_available' AS message_type, + NEW.queue AS queue, + NEW.priority AS priority, + NEW.id AS id, + -- Make sure we output timestamps as UTC ISO 8601 + to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at + ) t; + + PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text); + END IF; + + RETURN null; + END +$$ +LANGUAGE plpgsql; diff --git a/spec/que/command_line_interface_spec.rb b/spec/que/command_line_interface_spec.rb index 7ee69c31..91260eb9 100644 --- a/spec/que/command_line_interface_spec.rb +++ b/spec/que/command_line_interface_spec.rb @@ -2,7 +2,6 @@ require 'spec_helper' -require 'digest/md5' require 'que/command_line_interface' describe Que::CommandLineInterface do @@ -93,7 +92,7 @@ def write_file # same files will result in spec failures. So instead just generate a new # file name for each spec to write/delete. - name = "spec/temp/file_#{Digest::MD5.hexdigest(rand.to_s)}" + name = "spec/temp/file_#{rand}" written_files << name File.open("#{name}.rb", 'w') { |f| f.puts %(LOADED_FILES["#{name}"] = true) } name