-
Notifications
You must be signed in to change notification settings - Fork 12
feat: Add deduplication to add_batch_of_requests
#534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
add_batch_of_requests
and testadd_batch_of_requests
and test
add_batch_of_requests
and testadd_batch_of_requests
await rq.add_request(request) | ||
await rq.add_request(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could make two distinct Request
instances with the same uniqueKey
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point. I also added one more test to make it explicit that deduplication works based on unique_key
only and unless we use use_extended_unique_key
argument, some attributes of the request might be ignored. Another test makes this behavior clearly intentional to avoid some confusion in the future.
await rq.add_requests(requests) | ||
|
||
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)] | ||
await asyncio.gather(*add_requests_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you made sure that these do in fact run in parallel? To the naked eye, 100 requests doesn't seem like much, I'd expect that the event loop may run the tasks in sequence.
Maybe you could add the requests in each worker in smaller batches and add some random delays? Or just add a comment saying that you verified parallel execution empirically 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote the test for the implementation that did not take parallel execution into account, and it was failing consistently. So from that perspective, I consider the test sufficient.
Anyway, I added some chunking to make the test slightly more challenging. The parallel execution can be verified in the logs. For example, below. From the logs it can be seen that the add_batch_of_requests
that was started first did not finish first - as it was "taken over" during it's await by another worker.
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 10
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 20
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 90
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 80
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 0
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 40
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 50
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 60
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 30
DEBUG Tried to add new requests: 10, succeeded to add new requests: 10, skipped already present requests: 70
INFO {'readCount': 0, 'writeCount': 100, 'deleteCount': 0, 'headItemReadCount': 0, 'storageBytes': 7400}
with mock.patch( | ||
'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests', | ||
side_effect=return_unprocessed_requests, | ||
): | ||
# Simulate failed API call for adding requests. Request was not processed and should not be cached. | ||
await apify_named_rq.add_requests(['http://example.com/1']) | ||
|
||
# This will succeed. | ||
await apify_named_rq.add_requests(['http://example.com/1']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance we could verify that the request was actually not cached between the two add_requests
calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is checked implicitly in the last line where it is asserted that there was exactly 1 writeCount difference. The first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and thus has no chance of increasing the writeCount.
The second call can make the write only if it is not cached, as cached requests do not make the call (tested in other tests). So this means the request was not cached in between.
I could assert the state of the cache in between those calls, but since it is kind of an implementation detail, I would prefer not to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, can you explain this in a comment then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added to the test description.
|
||
for request in requests: | ||
if self._requests_cache.get(request.id): | ||
# We are no sure if it was already handled at this point, and it is not worth calling API for it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean "We are now sure that it was already handled..."? I'm not sure 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was not very clear. Updated
] | ||
|
||
# Send requests to API. | ||
response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably out of the scope of the PR, but it might be worth it to validate the response with a Pydantic model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That happens in the original code already few lines down: api_response = AddRequestsResponse.model_validate(response)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, I meant validating the whole response object with the two lists, so that you wouldn't need to do response['unprocessedRequests']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, added.
already_present_requests: list[ProcessedRequest] = [] | ||
|
||
for request in requests: | ||
if self._requests_cache.get(request.id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Judging by apify/crawlee#3120, a day may come when we try to limit the size of _requests_cache
somehow. Perhaps we should think ahead and come up with a more space-efficient way of tracking already added requests?
EDIT: hollup a minute, do you use the ID here for deduplication instead of unique key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is this deterministic transformation function unique_key_to_request_id
, which respects Apify platform way of creating IDs, this seems ok. If someone starts creating Requests with a custom id, then deduplication will most likely stop working.
There are two issues I created based on the discussion about this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! LGTM (let's wait for Honza's approval as well)
Description
api_client.batch_add_requests
calls to avoid expensive and pointless API calls.batch_add_requests
.Issues
Testing