Skip to content

Commit b583535

Browse files
committed
sort args before hashing, support partial arg extraction
1 parent 31a2cd1 commit b583535

File tree

3 files changed

+165
-9
lines changed

3 files changed

+165
-9
lines changed

src/riverqueue/client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
List,
99
runtime_checkable,
1010
)
11+
import json
1112

1213
from riverqueue.insert_opts import InsertOpts, UniqueOpts
1314

@@ -559,7 +560,20 @@ def _build_unique_key_and_bitmask(
559560

560561
if unique_opts.by_args:
561562
any_unique_opts = True
562-
unique_key += f"&args={insert_params.args}"
563+
564+
# Re-parse the args JSON for sorting and potentially filtering:
565+
args_dict = json.loads(insert_params.args)
566+
567+
args_to_include = args_dict
568+
if unique_opts.by_args is not True:
569+
# Filter to include only the specified keys:
570+
args_to_include = {
571+
key: args_dict[key] for key in unique_opts.by_args if key in args_dict
572+
}
573+
574+
# Serialize with sorted keys and append to unique key:
575+
sorted_args = json.dumps(args_to_include, sort_keys=True)
576+
unique_key += f"&args={sorted_args}"
563577

564578
if unique_opts.by_period:
565579
lower_period_bound = _truncate_time(

tests/client_test.py

Lines changed: 146 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from dataclasses import dataclass
22
from datetime import datetime, timezone
33
from unittest.mock import MagicMock, patch
4+
import json
45

56
import pytest
67

@@ -225,14 +226,14 @@ def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args):
225226
insert_opts = InsertOpts(
226227
unique_opts=UniqueOpts(
227228
by_state=[
228-
"available",
229-
"cancelled",
230-
"completed",
231-
"discarded",
232-
"pending",
233-
"retryable",
234-
"running",
235-
"scheduled",
229+
JobState.AVAILABLE,
230+
JobState.CANCELLED,
231+
JobState.COMPLETED,
232+
JobState.DISCARDED,
233+
JobState.PENDING,
234+
JobState.RETRYABLE,
235+
JobState.RUNNING,
236+
JobState.SCHEDULED,
236237
]
237238
)
238239
)
@@ -251,6 +252,143 @@ def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args):
251252
assert insert_params.unique_states == bytes([0b11111111])
252253

253254

255+
def test_insert_with_unique_opts_by_args_true(client, mock_exec, simple_args):
256+
"""Test that by_args=True uses full args with sorted keys"""
257+
mock_exec.job_insert_many.return_value = [("job_row", False)]
258+
259+
# Call with by_args=True
260+
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True))
261+
262+
insert_res = client.insert(simple_args, insert_opts=insert_opts)
263+
264+
mock_exec.job_insert_many.assert_called_once()
265+
assert insert_res.job == "job_row"
266+
267+
# Verify the by_args=True was properly handled
268+
call_args = mock_exec.job_insert_many.call_args[0][0]
269+
assert len(call_args) == 1
270+
insert_params = call_args[0]
271+
assert insert_params.unique_key is not None
272+
273+
274+
def test_insert_with_unique_opts_by_args_sorting(
275+
client: Client, mock_exec: MagicMock
276+
) -> None:
277+
"""Test that different key order in args produces the same unique key"""
278+
mock_exec.job_insert_many.side_effect = [
279+
[("job_row1", False)],
280+
[("job_row2", False)],
281+
]
282+
283+
@dataclass
284+
class JsonArgs:
285+
kind: str = "ordered"
286+
json_str: str = ""
287+
288+
def to_json(self) -> str:
289+
return self.json_str
290+
291+
# Insert with different key orders
292+
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True))
293+
294+
# Same data with different key orders
295+
ordered_json = '{"a": 1, "b": 2, "c": 3}'
296+
reverse_ordered_json = '{"c": 3, "b": 2, "a": 1}'
297+
298+
insert_res1 = client.insert(
299+
JsonArgs(json_str=ordered_json), insert_opts=insert_opts
300+
)
301+
insert_res2 = client.insert(
302+
JsonArgs(json_str=reverse_ordered_json), insert_opts=insert_opts
303+
)
304+
305+
# Get the unique keys that were generated
306+
call_args1 = mock_exec.job_insert_many.call_args_list[0][0][0] # type: ignore[index]
307+
call_args2 = mock_exec.job_insert_many.call_args_list[1][0][0] # type: ignore[index]
308+
309+
# The unique keys should be identical despite different order in original JSON
310+
assert call_args1[0].unique_key == call_args2[0].unique_key
311+
312+
313+
def test_insert_with_unique_opts_by_args_partial_keys(
314+
client: Client, mock_exec: MagicMock
315+
) -> None:
316+
"""Test that by_args with keys extracts only specified keys, even from nested objects"""
317+
mock_exec.job_insert_many.return_value = [("job_row", False)]
318+
319+
@dataclass
320+
class JsonArgs:
321+
kind: str = "partial"
322+
json_str: str = ""
323+
324+
def to_json(self) -> str:
325+
return self.json_str
326+
327+
args1 = json.dumps(
328+
{
329+
"a": "value",
330+
"b": "foo",
331+
"c": {
332+
"d": "bar",
333+
},
334+
"e": "ignore_this",
335+
}
336+
)
337+
338+
# Same data as args1 except for omitted `e`, and reordered keys. It's a duplicate:
339+
args2 = json.dumps(
340+
{
341+
"c": {
342+
"d": "bar",
343+
},
344+
"b": "foo",
345+
"a": "value",
346+
}
347+
)
348+
349+
# Missing `c`, so it's not a duplicate:
350+
args3 = json.dumps(
351+
{
352+
"a": "value",
353+
"b": "foo",
354+
"d": "something else", # Omitted
355+
}
356+
)
357+
358+
args4 = json.dumps(
359+
{
360+
"b": "foo",
361+
"a": "value",
362+
"e": "bar", # Omitted
363+
}
364+
)
365+
366+
# Filter by a, b, and c:
367+
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=["a", "b", "c"]))
368+
369+
client.insert(JsonArgs(json_str=args1), insert_opts=insert_opts)
370+
client.insert(JsonArgs(json_str=args2), insert_opts=insert_opts)
371+
client.insert(JsonArgs(json_str=args3), insert_opts=insert_opts)
372+
client.insert(JsonArgs(json_str=args4), insert_opts=insert_opts)
373+
374+
# Parse args to verify filtering
375+
call_args_1 = mock_exec.job_insert_many.call_args_list[0][0][0] # type: ignore[index]
376+
insert_params_1 = call_args_1[0]
377+
call_args_2 = mock_exec.job_insert_many.call_args_list[1][0][0] # type: ignore[index]
378+
insert_params_2 = call_args_2[0]
379+
call_args_3 = mock_exec.job_insert_many.call_args_list[2][0][0] # type: ignore[index]
380+
insert_params_3 = call_args_3[0]
381+
call_args_4 = mock_exec.job_insert_many.call_args_list[3][0][0] # type: ignore[index]
382+
insert_params_4 = call_args_4[0]
383+
384+
# Check that the keys were filtered correctly
385+
assert insert_params_1.unique_key == insert_params_2.unique_key
386+
# args3 is missing `c`, so it's not a duplicate:
387+
assert insert_params_1.unique_key != insert_params_3.unique_key
388+
# args3 and args4 are both the same when only looking at the filtered keys:
389+
assert insert_params_3.unique_key == insert_params_4.unique_key
390+
391+
254392
def test_insert_kind_error(client):
255393
@dataclass
256394
class MyArgs:

tests/driver/riversqlalchemy/sqlalchemy_driver_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ async def test_insert_with_opts(self, client, simple_args):
157157
insert_opts = InsertOpts(queue="high_priority", unique_opts=None)
158158
insert_res = await client.insert(simple_args, insert_opts=insert_opts)
159159
assert insert_res.job
160+
assert insert_res.job.unique_key is None
161+
assert insert_res.job.unique_states is None
160162

161163
@pytest.mark.asyncio
162164
async def test_insert_with_unique_opts_by_args(self, client, simple_args):
@@ -352,6 +354,8 @@ def test_insert_with_opts(self, client, simple_args):
352354
insert_opts = InsertOpts(queue="high_priority", unique_opts=None)
353355
insert_res = client.insert(simple_args, insert_opts=insert_opts)
354356
assert insert_res.job
357+
assert insert_res.job.unique_key is None
358+
assert insert_res.job.unique_states is None
355359

356360
def test_insert_with_unique_opts_by_args(self, client, simple_args):
357361
print("self", self)

0 commit comments

Comments
 (0)