Skip to content

Commit c36b645

Browse files
committed
Fix sqlalchemy queue component partition_id
1 parent e1a4ca9 commit c36b645

File tree

4 files changed

+21
-5
lines changed

4 files changed

+21
-5
lines changed

docs/source/topics/frontera-settings.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,15 @@ Default: ``5.0``
310310
(in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered
311311
overused. This affects only Scrapy scheduler."
312312

313+
.. setting:: QUEUE_HOSTNAME_PARTITIONING
314+
315+
QUEUE_HOSTNAME_PARTITIONING
316+
--------------------
317+
318+
Default: ``False``
319+
320+
Wheter to use the hostname as a partitioning scheme or not (uses the fingerprint as default).
321+
313322
.. setting:: REQUEST_MODEL
314323

315324
REQUEST_MODEL

frontera/contrib/backends/sqlalchemy/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def _init_db_worker(self, manager):
5858
self.check_and_create_tables(drop, clear_content, (metadata_m, queue_m))
5959
self._metadata = Metadata(self.session_cls, metadata_m,
6060
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
61-
self._queue = Queue(self.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
61+
self._queue = Queue(self.session_cls, queue_m, settings)
6262

6363
@classmethod
6464
def strategy_worker(cls, manager):

frontera/contrib/backends/sqlalchemy/components.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ def flush(self):
148148

149149

150150
class Queue(BaseQueue):
151-
def __init__(self, session_cls, queue_cls, partitions, ordering='default'):
151+
def __init__(self, session_cls, queue_cls, settings, ordering='default'):
152+
partitions = settings.get('SPIDER_FEED_PARTITIONS')
152153
self.session = session_cls()
153154
self.queue_model = queue_cls
154155
self.logger = logging.getLogger("sqlalchemy.queue")
155156
self.partitions = [i for i in range(0, partitions)]
156157
self.partitioner = Crc32NamePartitioner(self.partitions)
158+
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
157159
self.ordering = ordering
158160

159161
def frontier_stop(self):
@@ -202,7 +204,8 @@ def schedule(self, batch):
202204
partition_id = self.partitions[0]
203205
host_crc32 = 0
204206
else:
205-
partition_id = self.partitioner.partition(hostname, self.partitions)
207+
partition_key = hostname if self.hostname_partitioning else to_native_str(fprint)
208+
partition_id = self.partitioner.partition(partition_key, self.partitions)
206209
host_crc32 = get_crc32(hostname)
207210
q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta,
208211
headers=request.headers, cookies=request.cookies, method=to_native_str(request.method),

tests/contrib/backends/test_backends.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from frontera.core.components import States
33
from frontera.core.models import Request
4+
from frontera import Settings
45
from happybase import Connection
56
from frontera.contrib.backends.hbase import HBaseState, HBaseQueue
67
from frontera.contrib.backends.sqlalchemy import States as SQLAlchemyStates, Queue as SQLAlchemyQueue
@@ -88,11 +89,14 @@ def queue(request):
8889
return
8990

9091
if request.param == "sqlalchemy":
92+
settings = Settings()
93+
settings.SPIDER_FEED_PARTITIONS = 2
94+
settings.QUEUE_HOSTNAME_PARTITIONING = True
9195
engine = create_engine('sqlite:///:memory:', echo=False)
9296
session_cls = sessionmaker()
9397
session_cls.configure(bind=engine)
9498
QueueModel.__table__.create(bind=engine)
95-
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, 2)
99+
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, settings)
96100
yield sqla_queue
97101
sqla_queue.frontier_stop()
98102
engine.dispose()
@@ -114,4 +118,4 @@ def test_queue(queue):
114118
assert set([r.url for r in queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,
115119
max_requests_per_host=10)]) == set([r3.url])
116120
assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1,
117-
max_requests_per_host=10)]) == set([r1.url, r2.url])
121+
max_requests_per_host=10)]) == set([r1.url, r2.url])

0 commit comments

Comments
 (0)