Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<!-- MarkdownTOC autolink=true -->

- [Unreleased](#unreleased)
- [2.4.1 \(2024-10-28\)](#241-2024-10-28)
- [2.4.0 \(2024-08-21\)](#240-2024-08-21)
- [2.3.0 \(2023-10-16\)](#230-2023-10-16)
Expand Down Expand Up @@ -60,6 +61,25 @@

<!-- /MarkdownTOC -->

## Unreleased

- **Changed**:
+ Replaced `md5()` with `hashtext()` in `que_job_notify()` function for PostgreSQL server running in FIPS mode.

This release contains a database migration. You will need to migrate Que to the latest database schema version (8):

```ruby
class UpdateQueTablesToVersion8 < ActiveRecord::Migration[7.0]
def up
Que.migrate!(version: 8)
end

def down
Que.migrate!(version: 7)
end
end
```

## 2.4.1 (2024-10-28)

- **Fixed**:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class CreateQueSchema < ActiveRecord::Migration[6.0]
# Whenever you use Que in a migration, always specify the version you're
# migrating to. If you're unsure what the current version is, check the
# changelog.
Que.migrate!(version: 7)
Que.migrate!(version: 8)
end

def down
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Que
module Migrations
# In order to ship a schema change, add the relevant up and down sql files
# to the migrations directory, and bump the version here.
CURRENT_VERSION = 7
CURRENT_VERSION = 8

class << self
def migrate!(version:)
Expand Down
59 changes: 59 additions & 0 deletions lib/que/migrations/8/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Revert hashtext() back to md5()
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE
locker_pid integer;
sort_key json;
BEGIN
-- Don't do anything if the job is scheduled for a future time.
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
RETURN null;
END IF;

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the md5-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
FROM (
SELECT *, last_value(row_number) OVER () + 1 AS count
FROM (
SELECT *, row_number() OVER () - 1 AS row_number
FROM (
SELECT *
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
WHERE
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY md5(pid::text || id::text)
) t1
) t2
) t3
WHERE NEW.id % count = row_number;

IF locker_pid IS NOT NULL THEN
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
-- rather than throw errors when someone enqueues a big job, just
-- broadcast the most pertinent information, and let the locker query for
-- the record after it's taken the lock. The worker will have to hit the
-- DB in order to make sure the job is still visible anyway.
SELECT row_to_json(t)
INTO sort_key
FROM (
SELECT
'job_available' AS message_type,
NEW.queue AS queue,
NEW.priority AS priority,
NEW.id AS id,
-- Make sure we output timestamps as UTC ISO 8601
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
) t;

PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
END IF;

RETURN null;
END
$$
LANGUAGE plpgsql;
59 changes: 59 additions & 0 deletions lib/que/migrations/8/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Replace md5() with hashtext() for FIPS compliance
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE
locker_pid integer;
sort_key json;
BEGIN
-- Don't do anything if the job is scheduled for a future time.
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
RETURN null;
END IF;

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the hashtext-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
FROM (
SELECT *, last_value(row_number) OVER () + 1 AS count
FROM (
SELECT *, row_number() OVER () - 1 AS row_number
FROM (
SELECT *
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
WHERE
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY hashtext(pid::text || id::text)
) t1
) t2
) t3
WHERE NEW.id % count = row_number;

IF locker_pid IS NOT NULL THEN
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
-- rather than throw errors when someone enqueues a big job, just
-- broadcast the most pertinent information, and let the locker query for
-- the record after it's taken the lock. The worker will have to hit the
-- DB in order to make sure the job is still visible anyway.
SELECT row_to_json(t)
INTO sort_key
FROM (
SELECT
'job_available' AS message_type,
NEW.queue AS queue,
NEW.priority AS priority,
NEW.id AS id,
-- Make sure we output timestamps as UTC ISO 8601
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
) t;

PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
END IF;

RETURN null;
END
$$
LANGUAGE plpgsql;
3 changes: 1 addition & 2 deletions spec/que/command_line_interface_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

require 'spec_helper'

require 'digest/md5'
require 'que/command_line_interface'

describe Que::CommandLineInterface do
Expand Down Expand Up @@ -93,7 +92,7 @@ def write_file
# same files will result in spec failures. So instead just generate a new
# file name for each spec to write/delete.

name = "spec/temp/file_#{Digest::MD5.hexdigest(rand.to_s)}"
name = "spec/temp/file_#{rand}"
written_files << name
File.open("#{name}.rb", 'w') { |f| f.puts %(LOADED_FILES["#{name}"] = true) }
name
Expand Down