Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 56 additions & 71 deletions src/crawlee/storage_clients/_file_system/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,28 +312,43 @@ async def add_batch_of_requests(
unprocessed_requests = list[UnprocessedRequest]()
state = self._state.current_value

# Prepare a dictionary to track existing requests by their unique keys.
existing_unique_keys: dict[str, Path] = {}
existing_request_files = await self._get_request_files(self.path_to_rq)
all_requests = state.forefront_requests | state.regular_requests

for request_file in existing_request_files:
existing_request = await self._parse_request_file(request_file)
if existing_request is not None:
existing_unique_keys[existing_request.unique_key] = request_file
requests_to_enqueue = {}

# Process each request in the batch.
# Determine which requests can be added or are modified.
for request in requests:
existing_request_file = existing_unique_keys.get(request.unique_key)
existing_request = None

# Only load the full request from disk if we found a duplicate
if existing_request_file is not None:
existing_request = await self._parse_request_file(existing_request_file)
# Check if the request has already been handled.
if request.unique_key in state.handled_requests:
processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=True,
)
)
# Check if the request is already in progress.
# Or if the request is already in the queue and the `forefront` flag is not used, we do not change the
# position of the request.
elif (request.unique_key in state.in_progress_requests) or (
request.unique_key in all_requests and not forefront
):
processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
)
)
# These requests must either be added or update their position.
else:
requests_to_enqueue[request.unique_key] = request

# If there is no existing request with the same unique key, add the new request.
if existing_request is None:
# Process each request in the batch.
for request in requests_to_enqueue.values():
# If the request is not already in the RQ, this is a new request.
if request.unique_key not in all_requests:
request_path = self._get_request_path(request.unique_key)

# Add sequence number to ensure FIFO ordering using state.
if forefront:
sequence_number = state.forefront_sequence_counter
Expand All @@ -352,9 +367,6 @@ async def add_batch_of_requests(
new_total_request_count += 1
new_pending_request_count += 1

# Add to our index for subsequent requests in this batch
existing_unique_keys[request.unique_key] = self._get_request_path(request.unique_key)

processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
Expand All @@ -363,57 +375,33 @@ async def add_batch_of_requests(
)
)

# If the request already exists in the RQ, just update it if needed.
else:
# Set the processed request flags.
was_already_present = existing_request is not None
was_already_handled = existing_request.unique_key in state.handled_requests

# If the request is already in the RQ and handled, just continue with the next one.
if was_already_present and was_already_handled:
processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=True,
)
)
# If the request already exists in the RQ and use the forefront flag to update its position
elif forefront:
# If the request is among `regular`, remove it from its current position.
if request.unique_key in state.regular_requests:
state.regular_requests.pop(request.unique_key)

# If the request is already in the RQ but not handled yet, update it.
elif was_already_present and not was_already_handled:
# Update request type (forefront vs regular) in state
if forefront:
# Move from regular to forefront if needed
if existing_request.unique_key in state.regular_requests:
state.regular_requests.pop(existing_request.unique_key)
if existing_request.unique_key not in state.forefront_requests:
state.forefront_requests[existing_request.unique_key] = state.forefront_sequence_counter
state.forefront_sequence_counter += 1
elif (
existing_request.unique_key not in state.forefront_requests
and existing_request.unique_key not in state.regular_requests
):
# Keep as regular if not already forefront
state.regular_requests[existing_request.unique_key] = state.sequence_counter
state.sequence_counter += 1

processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
)
# If the request is already in `forefront`, we just need to update its position.
state.forefront_requests[request.unique_key] = state.forefront_sequence_counter
state.forefront_sequence_counter += 1

processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
)
)

else:
logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.')
unprocessed_requests.append(
UnprocessedRequest(
unique_key=request.unique_key,
url=request.url,
method=request.method,
)
else:
logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.')
unprocessed_requests.append(
UnprocessedRequest(
unique_key=request.unique_key,
url=request.url,
method=request.method,
)
)

await self._update_metadata(
update_modified_at=True,
Expand Down Expand Up @@ -752,10 +740,7 @@ async def _get_request_files(cls, path_to_rq: Path) -> list[Path]:
files = await asyncio.to_thread(list, path_to_rq.glob('*.json'))

# Filter out metadata file and non-file entries.
filtered = filter(
lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME,
files,
)
filtered = filter(lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME, files)

return list(filtered)

Expand Down
27 changes: 17 additions & 10 deletions src/crawlee/storage_clients/_memory/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,27 @@ async def add_batch_of_requests(

# If the request is already in the queue but not handled, update it.
if was_already_present and existing_request:
# Update the existing request with any new data and
# remove old request from pending queue if it's there.
with suppress(ValueError):
self._pending_requests.remove(existing_request)

# Update indexes.
self._requests_by_unique_key[request.unique_key] = request

# Add updated request back to queue.
# We only update `forefront` by updating its position by shifting it to the left.
if forefront:
# Update the existing request with any new data and
# remove old request from pending queue if it's there.
with suppress(ValueError):
self._pending_requests.remove(existing_request)

# Add updated request back to queue.
self._pending_requests.appendleft(request)
else:
self._pending_requests.append(request)

processed_requests.append(
ProcessedRequest(
unique_key=request.unique_key,
was_already_present=True,
was_already_handled=False,
)
)

# Add the new request to the queue.
else:
if forefront:
Expand Down Expand Up @@ -217,8 +225,7 @@ async def fetch_next_request(self) -> Request | None:

# Skip if already in progress (shouldn't happen, but safety check).
if request.unique_key in self._in_progress_requests:
self._pending_requests.appendleft(request)
break
continue

# Mark as in progress.
self._in_progress_requests[request.unique_key] = request
Expand Down
Loading