Skip to content

use new unique jobs implementation #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$TEST_DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$TEST_DATABASE_URL"

- name: Test
run: rye test
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$DATABASE_URL"

- name: Run examples
run: rye run python3 -m examples.all
Expand Down
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Breaking

- **Breaking change:** The return type of `Client#insert_many` and `Client#insert_many_tx` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead. [PR #38](https://github.com/riverqueue/riverqueue-python/pull/38).
- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list.

### Added

- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types.
- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields:

```python
UniqueOpts(by_args=["customer_id"])
```

Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently.

### Changed

- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`.

This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list.

## [0.7.0] - 2024-07-30

### Changed
Expand Down Expand Up @@ -79,4 +101,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
18 changes: 5 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class JobArgs(Protocol):
pass
```

* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
- `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
- `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.

They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.

Expand Down Expand Up @@ -95,22 +95,14 @@ insert_res.job
insert_res.unique_skipped_as_duplicated
```

### Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```python
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix=123456)
```

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
Unique jobs can also be inserted in bulk.

## Inserting jobs in bulk

Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:

```python
num_inserted = client.insert_many([
results = client.insert_many([
SimpleArgs(job_num=1),
SimpleArgs(job_num=2)
])
Expand All @@ -119,7 +111,7 @@ num_inserted = client.insert_many([
Or with `InsertManyParams`, which may include insertion options:

```python
num_inserted = client.insert_many([
results = client.insert_many([
InsertManyParams(args=SimpleArgs(job_num=1), insert_opts=riverqueue.InsertOpts(max_attempts=5)),
InsertManyParams(args=SimpleArgs(job_num=2), insert_opts=riverqueue.InsertOpts(queue="high_priority"))
])
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
asyncpg==0.29.0
Expand Down
1 change: 1 addition & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
sqlalchemy==2.0.30
Expand Down
Loading
Loading