Skip to content

Commit 83817c9

Browse files
zhiltsov-maxflopez7
authored andcommitted
[CVAT] Fix possible no assignments error when there are (#3346)
* Fix possible no assignments error when there are * Update readme
1 parent 2b12961 commit 83817c9

File tree

5 files changed

+132
-136
lines changed

5 files changed

+132
-136
lines changed

packages/examples/cvat/exchange-oracle/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ Available at `/docs` route
7373

7474
To run tests
7575
```
76-
docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test
76+
docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test && \
77+
docker compose -f docker-compose.test.yml down
7778
```

packages/examples/cvat/exchange-oracle/src/services/cvat.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,8 @@ def count_jobs_by_escrow_address(
794794

795795
def get_free_job(
796796
session: Session,
797-
cvat_projects: list[int],
797+
escrow_address: str,
798+
chain_id: int,
798799
*,
799800
user_wallet_address: str,
800801
for_update: bool | ForUpdateParams = False,
@@ -805,7 +806,11 @@ def get_free_job(
805806
return (
806807
_maybe_for_update(session.query(Job), enable=for_update)
807808
.where(
808-
Job.cvat_project_id.in_(cvat_projects),
809+
Job.project.has(
810+
(Project.escrow_address == escrow_address)
811+
& (Project.chain_id == chain_id)
812+
& (Project.status == ProjectStatuses.annotation)
813+
),
809814
Job.status == JobStatuses.new,
810815
~Job.assignments.any(
811816
(
@@ -984,22 +989,27 @@ def get_user_assignments_in_cvat_projects(
984989
)
985990

986991

987-
def count_active_user_assignments(
992+
def has_active_user_assignments(
988993
session: Session,
989994
wallet_address: int,
990-
cvat_projects: list[int],
991-
) -> int:
992-
return (
995+
escrow_address: str,
996+
chain_id: int,
997+
) -> bool:
998+
return session.query(
993999
session.query(Assignment)
9941000
.where(
995-
Assignment.job.has(Job.cvat_project_id.in_(cvat_projects)),
1001+
Assignment.job.has(
1002+
Job.project.has(
1003+
(Project.escrow_address == escrow_address) & (Project.chain_id == chain_id)
1004+
)
1005+
),
9961006
Assignment.user_wallet_address == wallet_address,
9971007
Assignment.status == AssignmentStatuses.created.value,
9981008
Assignment.completed_at == None,
9991009
utcnow() < Assignment.expires_at,
10001010
)
1001-
.count()
1002-
)
1011+
.exists()
1012+
).scalar()
10031013

10041014

10051015
# Image

packages/examples/cvat/exchange-oracle/src/services/exchange.py

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,45 @@ def __str__(self) -> str:
1818
)
1919

2020

21-
def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: str) -> str | None: # noqa: ARG001 (don't we want to use chain_id for filter?)
21+
def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: str) -> str | None:
2222
with SessionLocal.begin() as session:
2323
user = get_or_404(
2424
cvat_service.get_user_by_id(session, wallet_address, for_update=True),
2525
wallet_address,
2626
"user",
2727
)
2828

29-
# There can be several projects under one escrow, we need any
30-
project = cvat_service.get_project_by_escrow_address(
29+
if cvat_service.has_active_user_assignments(
3130
session,
32-
escrow_address,
33-
status_in=[
34-
ProjectStatuses.annotation
35-
], # avoid unnecessary locking on completed projects
36-
for_update=True,
37-
)
38-
39-
if not project:
40-
# Retry without a lock to check if the project doesn't exist
41-
get_or_404(
42-
cvat_service.get_project_by_escrow_address(
43-
session, escrow_address, status_in=[ProjectStatuses.annotation]
44-
),
45-
escrow_address,
46-
"job",
47-
)
48-
return None
49-
50-
has_active_assignments = (
51-
cvat_service.count_active_user_assignments(
52-
session, wallet_address=wallet_address, cvat_projects=[project.cvat_id]
53-
)
54-
> 0
55-
)
56-
if has_active_assignments:
31+
wallet_address=wallet_address,
32+
escrow_address=escrow_address,
33+
chain_id=chain_id.value,
34+
):
5735
raise UserHasUnfinishedAssignmentError(
5836
"The user already has an unfinished assignment in this project"
5937
)
6038

39+
# TODO: Try to put into 1 request. SQLAlchemy generates 2 queries with simple
40+
# .options(selectinload(Job.project))
41+
project = get_or_404(
42+
cvat_service.get_project_by_escrow_address(
43+
session, escrow_address, status_in=[ProjectStatuses.annotation]
44+
),
45+
escrow_address,
46+
"job",
47+
)
48+
6149
unassigned_job = cvat_service.get_free_job(
6250
session,
63-
cvat_projects=[project.cvat_id],
51+
escrow_address=escrow_address,
52+
chain_id=chain_id.value,
6453
user_wallet_address=wallet_address,
6554
for_update=True,
55+
# lock the job to be able to make a rollback if CVAT requests fail
56+
# can potentially be optimized to make less DB requests
57+
# and rely only on assignment expiration
6658
)
59+
6760
if not unassigned_job:
6861
return None
6962

@@ -72,7 +65,12 @@ def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: s
7265
wallet_address=user.wallet_address,
7366
cvat_job_id=unassigned_job.cvat_id,
7467
expires_at=utcnow()
75-
+ timedelta(seconds=get_default_assignment_timeout(TaskTypes(project.job_type))),
68+
+ timedelta(
69+
seconds=get_default_assignment_timeout(
70+
TaskTypes(project.job_type)
71+
# TODO: need to update this if we have multiple job types per escrow
72+
)
73+
),
7674
)
7775

7876
cvat_service.touch(session, Job, [unassigned_job.id])

0 commit comments

Comments
 (0)