Skip to content

Commit 80c935d

Browse files
committed
avoid MD5 for FIPS support
This is a temporary patch until upstream fix is merged, see que-rb/que#437
1 parent 0f1cd2b commit 80c935d

File tree

2 files changed

+138
-3
lines changed

2 files changed

+138
-3
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
class ReplaceMd5WithHashtextInQueJobNotify < ActiveRecord::Migration[7.0]
2+
# This fixes https://github.com/que-rb/que/pull/437
3+
# Be careful on Que upgrade in case the final fix differed.
4+
5+
def up
6+
Que.transaction do
7+
Que.execute <<~SQL
8+
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
9+
DECLARE
10+
locker_pid integer;
11+
sort_key json;
12+
BEGIN
13+
-- Don't do anything if the job is scheduled for a future time.
14+
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
15+
RETURN null;
16+
END IF;
17+
18+
-- Pick a locker to notify of the job's insertion, weighted by their number
19+
-- of workers. Should bounce pseudorandomly between lockers on each
20+
-- invocation, hence the hashtext-ordering, but still touch each one equally,
21+
-- hence the modulo using the job_id.
22+
SELECT pid
23+
INTO locker_pid
24+
FROM (
25+
SELECT *, last_value(row_number) OVER () + 1 AS count
26+
FROM (
27+
SELECT *, row_number() OVER () - 1 AS row_number
28+
FROM (
29+
SELECT *
30+
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
31+
WHERE
32+
listening AND
33+
queues @> ARRAY[NEW.queue] AND
34+
ql.job_schema_version = NEW.job_schema_version
35+
ORDER BY hashtext(pid::text || id::text)
36+
) t1
37+
) t2
38+
) t3
39+
WHERE NEW.id % count = row_number;
40+
41+
IF locker_pid IS NOT NULL THEN
42+
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
43+
-- rather than throw errors when someone enqueues a big job, just
44+
-- broadcast the most pertinent information, and let the locker query for
45+
-- the record after it's taken the lock. The worker will have to hit the
46+
-- DB in order to make sure the job is still visible anyway.
47+
SELECT row_to_json(t)
48+
INTO sort_key
49+
FROM (
50+
SELECT
51+
'job_available' AS message_type,
52+
NEW.queue AS queue,
53+
NEW.priority AS priority,
54+
NEW.id AS id,
55+
-- Make sure we output timestamps as UTC ISO 8601
56+
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
57+
) t;
58+
59+
PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
60+
END IF;
61+
62+
RETURN null;
63+
END
64+
$$
65+
LANGUAGE plpgsql;
66+
SQL
67+
end
68+
end
69+
70+
def down
71+
Que.transaction do
72+
Que.execute <<~SQL
73+
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
74+
DECLARE
75+
locker_pid integer;
76+
sort_key json;
77+
BEGIN
78+
-- Don't do anything if the job is scheduled for a future time.
79+
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
80+
RETURN null;
81+
END IF;
82+
83+
-- Pick a locker to notify of the job's insertion, weighted by their number
84+
-- of workers. Should bounce pseudorandomly between lockers on each
85+
-- invocation, hence the md5-ordering, but still touch each one equally,
86+
-- hence the modulo using the job_id.
87+
SELECT pid
88+
INTO locker_pid
89+
FROM (
90+
SELECT *, last_value(row_number) OVER () + 1 AS count
91+
FROM (
92+
SELECT *, row_number() OVER () - 1 AS row_number
93+
FROM (
94+
SELECT *
95+
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
96+
WHERE
97+
listening AND
98+
queues @> ARRAY[NEW.queue] AND
99+
ql.job_schema_version = NEW.job_schema_version
100+
ORDER BY md5(pid::text || id::text)
101+
) t1
102+
) t2
103+
) t3
104+
WHERE NEW.id % count = row_number;
105+
106+
IF locker_pid IS NOT NULL THEN
107+
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
108+
-- rather than throw errors when someone enqueues a big job, just
109+
-- broadcast the most pertinent information, and let the locker query for
110+
-- the record after it's taken the lock. The worker will have to hit the
111+
-- DB in order to make sure the job is still visible anyway.
112+
SELECT row_to_json(t)
113+
INTO sort_key
114+
FROM (
115+
SELECT
116+
'job_available' AS message_type,
117+
NEW.queue AS queue,
118+
NEW.priority AS priority,
119+
NEW.id AS id,
120+
-- Make sure we output timestamps as UTC ISO 8601
121+
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
122+
) t;
123+
124+
PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
125+
END IF;
126+
127+
RETURN null;
128+
END
129+
$$
130+
LANGUAGE plpgsql;
131+
SQL
132+
end
133+
end
134+
end

db/structure.sql

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger
114114

115115
-- Pick a locker to notify of the job's insertion, weighted by their number
116116
-- of workers. Should bounce pseudorandomly between lockers on each
117-
-- invocation, hence the md5-ordering, but still touch each one equally,
117+
-- invocation, hence the hashtext-ordering, but still touch each one equally,
118118
-- hence the modulo using the job_id.
119119
SELECT pid
120120
INTO locker_pid
@@ -129,7 +129,7 @@ CREATE FUNCTION public.que_job_notify() RETURNS trigger
129129
listening AND
130130
queues @> ARRAY[NEW.queue] AND
131131
ql.job_schema_version = NEW.job_schema_version
132-
ORDER BY md5(pid::text || id::text)
132+
ORDER BY hashtext(pid::text || id::text)
133133
) t1
134134
) t2
135135
) t3
@@ -1529,6 +1529,7 @@ INSERT INTO "schema_migrations" (version) VALUES
15291529
('20230629131935'),
15301530
('20230703133544'),
15311531
('20230703134109'),
1532-
('20230704131552');
1532+
('20230704131552'),
1533+
('20251008141931');
15331534

15341535

0 commit comments

Comments
 (0)