Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
class ReplaceMd5WithHashtextInQueJobNotify < ActiveRecord::Migration[7.0]
# This fixes https://github.com/que-rb/que/pull/437
# Be careful on Que upgrade in case the final fix differed.

def up
Que.transaction do
Que.execute <<~SQL
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE
locker_pid integer;
sort_key json;
BEGIN
-- Don't do anything if the job is scheduled for a future time.
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
RETURN null;
END IF;

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the hashtext-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
FROM (
SELECT *, last_value(row_number) OVER () + 1 AS count
FROM (
SELECT *, row_number() OVER () - 1 AS row_number
FROM (
SELECT *
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
WHERE
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY hashtext(pid::text || id::text)
) t1
) t2
) t3
WHERE NEW.id % count = row_number;

IF locker_pid IS NOT NULL THEN
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
-- rather than throw errors when someone enqueues a big job, just
-- broadcast the most pertinent information, and let the locker query for
-- the record after it's taken the lock. The worker will have to hit the
-- DB in order to make sure the job is still visible anyway.
SELECT row_to_json(t)
INTO sort_key
FROM (
SELECT
'job_available' AS message_type,
NEW.queue AS queue,
NEW.priority AS priority,
NEW.id AS id,
-- Make sure we output timestamps as UTC ISO 8601
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
) t;

PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
END IF;

RETURN null;
END
$$
LANGUAGE plpgsql;
SQL
end
end

def down
Que.transaction do
Que.execute <<~SQL
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
DECLARE
locker_pid integer;
sort_key json;
BEGIN
-- Don't do anything if the job is scheduled for a future time.
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
RETURN null;
END IF;

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the md5-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
FROM (
SELECT *, last_value(row_number) OVER () + 1 AS count
FROM (
SELECT *, row_number() OVER () - 1 AS row_number
FROM (
SELECT *
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
WHERE
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY md5(pid::text || id::text)
) t1
) t2
) t3
WHERE NEW.id % count = row_number;

IF locker_pid IS NOT NULL THEN
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
-- rather than throw errors when someone enqueues a big job, just
-- broadcast the most pertinent information, and let the locker query for
-- the record after it's taken the lock. The worker will have to hit the
-- DB in order to make sure the job is still visible anyway.
SELECT row_to_json(t)
INTO sort_key
FROM (
SELECT
'job_available' AS message_type,
NEW.queue AS queue,
NEW.priority AS priority,
NEW.id AS id,
-- Make sure we output timestamps as UTC ISO 8601
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
) t;

PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
END IF;

RETURN null;
END
$$
LANGUAGE plpgsql;
SQL
end
end
end
7 changes: 4 additions & 3 deletions db/structure-10.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the md5-ordering, but still touch each one equally,
-- invocation, hence the hashtext-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
Expand All @@ -127,7 +127,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY md5(pid::text || id::text)
ORDER BY hashtext(pid::text || id::text)
) t1
) t2
) t3
Expand Down Expand Up @@ -1527,6 +1527,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20230629131935'),
('20230703133544'),
('20230703134109'),
('20230704131552');
('20230704131552'),
('20251008141931');


7 changes: 4 additions & 3 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger

-- Pick a locker to notify of the job's insertion, weighted by their number
-- of workers. Should bounce pseudorandomly between lockers on each
-- invocation, hence the md5-ordering, but still touch each one equally,
-- invocation, hence the hashtext-ordering, but still touch each one equally,
-- hence the modulo using the job_id.
SELECT pid
INTO locker_pid
Expand All @@ -129,7 +129,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger
listening AND
queues @> ARRAY[NEW.queue] AND
ql.job_schema_version = NEW.job_schema_version
ORDER BY md5(pid::text || id::text)
ORDER BY hashtext(pid::text || id::text)
) t1
) t2
) t3
Expand Down Expand Up @@ -1529,6 +1529,7 @@ INSERT INTO "schema_migrations" (version) VALUES
('20230629131935'),
('20230703133544'),
('20230703134109'),
('20230704131552');
('20230704131552'),
('20251008141931');