Skip to content

Commit 75a9567

Browse files
committed
added cassandra queue and tests for it
1 parent aced1ed commit 75a9567

File tree

5 files changed

+72
-95
lines changed

5 files changed

+72
-95
lines changed

frontera/contrib/backends/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ def finished(self):
9393
class CommonStorageBackend(CommonBackend):
9494

9595
def _create_queue(self, settings):
96-
if not isinstance(self.queue_component, BaseQueue):
96+
if not issubclass(self.queue_component, BaseQueue):
9797
raise TypeError('expected queue_component to '
9898
'belong to class: %s, got %s instead' % (type(BaseQueue).__name__,
9999
type(self.queue_component).__name__))
100-
return self.queue_component(self.session_cls,
100+
return self.queue_component(self.session,
101101
self.models['QueueModel'],
102102
settings.get('SPIDER_FEED_PARTITIONS'))
103103

frontera/contrib/backends/cassandra/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, manager):
4949
self._states = States(self.session,
5050
self.models['StateModel'],
5151
settings.get('STATE_CACHE_SIZE_LIMIT'))
52-
# self._queue = self._create_queue(settings)
52+
self._queue = self._create_queue(settings)
5353

5454
def frontier_stop(self):
5555
self.states.flush()
@@ -93,8 +93,7 @@ def strategy_worker(cls, manager):
9393

9494
sync_table(model)
9595

96-
b._states = States(b.session, model,
97-
settings.get('STATE_CACHE_SIZE_LIMIT'))
96+
b._states = States(b.session, model, settings.get('STATE_CACHE_SIZE_LIMIT'))
9897
return b
9998

10099
@classmethod

frontera/contrib/backends/cassandra/components.py

Lines changed: 43 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
from cachetools import LRUCache
1010
from cassandra import (OperationTimedOut, ReadFailure, ReadTimeout,
1111
WriteFailure, WriteTimeout)
12-
from cassandra.concurrent import execute_concurrent_with_args
1312
from cassandra.cqlengine.query import BatchQuery
14-
from w3lib.util import to_bytes, to_native_str
1513

1614
from frontera.contrib.backends import CreateOrModifyPageMixin
1715
from frontera.contrib.backends.memory import MemoryStates
@@ -22,6 +20,8 @@
2220
from frontera.utils.misc import chunks, get_crc32
2321
from frontera.utils.url import parse_domain_from_url_fast
2422

23+
from w3lib.util import to_bytes, to_native_str
24+
2525

2626
def _retry(func):
2727
def func_wrapper(self, *args, **kwargs):
@@ -122,21 +122,26 @@ def flush(self, force_clear=False):
122122

123123

124124
class Queue(BaseQueue):
125-
def __init__(self, session, queue_cls, partitions, crawl_id, generate_stats, ordering='default'):
125+
126+
def __init__(self, session, queue_cls, partitions, ordering='default'):
126127
self.session = session
127128
self.queue_model = queue_cls
128129
self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue")
129130
self.partitions = [i for i in range(0, partitions)]
130131
self.partitioner = Crc32NamePartitioner(self.partitions)
131132
self.ordering = ordering
133+
self.batch = BatchQuery()
132134

133135
def frontier_stop(self):
134136
pass
135137

136-
def _order_by(self):
138+
def _order_by(self, query):
137139
if self.ordering == 'created':
138-
return "created_at"
139-
return "created_at"
140+
return query.order_by('created_at')
141+
if self.ordering == 'created_desc':
142+
return query.order_by('-created_at')
143+
return query.order_by('score', 'created_at') # TODO: remove second parameter,
144+
# it's not necessary for proper crawling, but needed for tests
140145

141146
def get_next_requests(self, max_n_requests, partition_id, **kwargs):
142147
"""
@@ -148,53 +153,19 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
148153
"""
149154
results = []
150155
try:
151-
dequeued_urls = 0
152-
cql_ditems = []
153-
d_query = self.session.prepare("DELETE FROM queue WHERE crawl = ? AND fingerprint = ? AND partition_id = ? "
154-
"AND score = ? AND created_at = ?")
155-
for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\
156-
order_by("partition_id", "score", self._order_by()).limit(max_n_requests):
157-
method = 'GET' if not item.method else item.method
158-
159-
meta_dict2 = dict((name, getattr(item.meta, name)) for name in dir(item.meta)
160-
if not name.startswith('__'))
161-
# TODO: How the result can be an dict not an object -> Objects get error while encodeing for Message Bus
162-
# If I take meta_dict2 direct to Request i get the same error message
163-
164-
meta_dict = dict()
165-
meta_dict["fingerprint"] = meta_dict2["fingerprint"]
166-
meta_dict["domain"] = meta_dict2["domain"]
167-
meta_dict["origin_is_frontier"] = meta_dict2["origin_is_frontier"]
168-
meta_dict["scrapy_callback"] = meta_dict2["scrapy_callback"]
169-
meta_dict["scrapy_errback"] = meta_dict2["scrapy_errback"]
170-
meta_dict["scrapy_meta"] = meta_dict2["scrapy_meta"]
171-
meta_dict["score"] = meta_dict2["score"]
172-
meta_dict["jid"] = meta_dict2["jid"]
173-
174-
r = Request(item.url, method=method, meta=meta_dict, headers=item.headers, cookies=item.cookies)
175-
r.meta['fingerprint'] = item.fingerprint
176-
r.meta['score'] = item.score
156+
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).limit(max_n_requests):
157+
method = item.method or b'GET'
158+
r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies)
159+
r.meta[b'fingerprint'] = to_bytes(item.fingerprint)
160+
r.meta[b'score'] = item.score
177161
results.append(r)
178-
179-
cql_d = (item.crawl, item.fingerprint, item.partition_id, item.score, item.created_at)
180-
cql_ditems.append(cql_d)
181-
dequeued_urls += 1
182-
183-
if dequeued_urls > 0:
184-
execute_concurrent_with_args(self.session, d_query, cql_ditems, concurrency=200)
185-
186-
self.counter_cls.cass_count({"dequeued_urls": dequeued_urls})
187-
162+
item.batch(self.batch).delete()
163+
self.batch.execute()
188164
except Exception as exc:
189165
self.logger.exception(exc)
190-
191166
return results
192167

193168
def schedule(self, batch):
194-
query = self.session.prepare("INSERT INTO queue (id, fingerprint, score, partition_id, host_crc32, url, "
195-
"created_at, meta, depth, headers, method, cookies) "
196-
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
197-
cql_items = []
198169
for fprint, score, request, schedule in batch:
199170
if schedule:
200171
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
@@ -205,35 +176,23 @@ def schedule(self, batch):
205176
else:
206177
partition_id = self.partitioner.partition(hostname, self.partitions)
207178
host_crc32 = get_crc32(hostname)
208-
created_at = time()*1E+6
209-
210-
if "domain" not in request.meta:
211-
request.meta["domain"] = {}
212-
if "origin_is_frontier" not in request.meta:
213-
request.meta["origin_is_frontier"] = ''
214-
if "scrapy_callback" not in request.meta:
215-
request.meta["scrapy_callback"] = None
216-
if "scrapy_errback" not in request.meta:
217-
request.meta["scrapy_errback"] = None
218-
if "scrapy_meta" not in request.meta:
219-
request.meta["scrapy_meta"] = {}
220-
if "score" not in request.meta:
221-
request.meta["score"] = 0
222-
if "jid" not in request.meta:
223-
request.meta["jid"] = 0
224-
225-
cql_i = (uuid.uuid4(), fprint, score, partition_id, host_crc32, request.url, created_at,
226-
request.meta, 0, request.headers, request.method, request.cookies)
227-
cql_items.append(cql_i)
228-
229-
request.meta['state'] = States.QUEUED
230-
231-
execute_concurrent_with_args(self.session, query, cql_items, concurrency=400)
232-
self.counter_cls.cass_count({"queued_urls": len(cql_items)})
179+
q = self.queue_model(id=uuid.uuid4(),
180+
fingerprint=to_native_str(fprint),
181+
score=score,
182+
url=request.url,
183+
meta=request.meta,
184+
headers=request.headers,
185+
cookies=request.cookies,
186+
method=to_native_str(request.method),
187+
partition_id=partition_id,
188+
host_crc32=host_crc32,
189+
created_at=time() * 1E+6)
190+
q.batch(self.batch).save()
191+
request.meta[b'state'] = States.QUEUED
192+
self.batch.execute()
233193

234194
def count(self):
235-
count = self.queue_model.objects.filter().count()
236-
return count
195+
return self.queue_model.all().count()
237196

238197

239198
class BroadCrawlingQueue(Queue):
@@ -265,12 +224,11 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
265224
while tries < self.GET_RETRIES:
266225
tries += 1
267226
limit *= 5.5 if tries > 1 else 1.0
268-
self.logger.debug("Try %d, limit %d, last attempt: requests %d, hosts %d" %
269-
(tries, limit, count, len(queue.keys())))
227+
self.logger.debug("Try %d, limit %d, last attempt: requests %d, hosts %d",
228+
tries, limit, count, len(queue.keys()))
270229
queue.clear()
271230
count = 0
272-
for item in self.queue_model.objects.filter(crawl=self.crawl_id, partition_id=partition_id).\
273-
order_by("crawl", "score", self._order_by()).limit(limit):
231+
for item in self._order_by(self.queue_model.filter(partition_id=partition_id)).limit(max_n_requests):
274232
if item.host_crc32 not in queue:
275233
queue[item.host_crc32] = []
276234
if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host:
@@ -284,13 +242,14 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
284242
if min_requests is not None and count < min_requests:
285243
continue
286244
break
287-
self.logger.debug("Finished: tries %d, hosts %d, requests %d" % (tries, len(queue.keys()), count))
245+
self.logger.debug("Finished: tries %d, hosts %d, requests %d", tries, len(queue.keys()), count)
288246

289247
results = []
290-
for items in queue.itervalues():
248+
for items in six.itervalues(queue):
291249
for item in items:
292-
method = 'GET' if not item.method else item.method
293-
results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers,
294-
cookies=item.cookies))
295-
item.delete()
250+
method = item.method or b'GET'
251+
results.append(Request(item.url, method=method,
252+
meta=item.meta, headers=item.headers, cookies=item.cookies))
253+
item.batch(self.batch).delete()
254+
self.batch.execute()
296255
return results

frontera/contrib/backends/cassandra/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@ def __repr__(self):
7373
class QueueModel(Model):
7474
__table_name__ = 'queue'
7575

76-
id = UUID(primary_key=True)
7776
partition_id = Integer(primary_key=True)
78-
score = Float(required=True)
77+
score = Float(primary_key=True)
78+
created_at = BigInt(primary_key=True)
79+
id = UUID(primary_key=True)
7980
url = Text(required=True)
8081
fingerprint = Text(required=True)
8182
host_crc32 = Integer(required=True)
8283
meta = PickleDict()
8384
headers = PickleDict()
8485
cookies = PickleDict()
8586
method = Text()
86-
created_at = BigInt(required=True)
8787
depth = SmallInt()
8888

8989
def __repr__(self):

tests/contrib/backends/cassandra/test_backend_cassandra.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ def setUp(self):
3333
self.manager = type('manager', (object,), {})
3434
self.manager.settings = settings
3535
self.keyspace = settings.CASSANDRABACKEND_KEYSPACE
36+
timeout = settings.CASSANDRABACKEND_REQUEST_TIMEOUT
3637
cluster = Cluster(hosts, port)
3738
self.session = cluster.connect()
3839
self.session.execute("CREATE KEYSPACE IF NOT EXISTS %s WITH "
39-
"replication = {'class':'SimpleStrategy', 'replication_factor' : 1}" % self.keyspace)
40+
"replication = {'class':'SimpleStrategy', 'replication_factor' : 1}" % self.keyspace,
41+
timeout=timeout)
4042
self.session.set_keyspace(self.keyspace)
41-
timeout = settings.CASSANDRABACKEND_REQUEST_TIMEOUT
4243
connection.setup(hosts, self.keyspace, port=port)
4344
self.session.default_timeout = connection.session.default_timeout = timeout
4445

@@ -110,7 +111,7 @@ def assert_db_values(self, model, _filter, fields):
110111
sync_table(model)
111112
m = model(**fields)
112113
m.save()
113-
stored_obj = m.get(**_filter)
114+
stored_obj = m.objects.allow_filtering().get(**_filter)
114115
for field, original_value in six.iteritems(fields):
115116
stored_value = getattr(stored_obj, field)
116117
if isinstance(original_value, dict):
@@ -198,3 +199,21 @@ def test_state(self):
198199
self.assertEqual(r4.meta[b'state'], States.CRAWLED)
199200
state.flush(True)
200201
self.assertEqual(state._cache, {})
202+
203+
def test_queue(self):
204+
self.manager.settings.SPIDER_FEED_PARTITIONS = 2
205+
b = CassandraBackend(self.manager)
206+
queue = b.queue
207+
batch = [('10', 0.5, r1, True), ('11', 0.6, r2, True),
208+
('12', 0.7, r3, True)]
209+
queue.schedule(batch)
210+
self.assertEqual(set([r.url for r in queue.get_next_requests(10, 0,
211+
min_requests=3,
212+
min_hosts=1,
213+
max_requests_per_host=10)]),
214+
set([r3.url]))
215+
self.assertEqual(set([r.url for r in queue.get_next_requests(10, 1,
216+
min_requests=3,
217+
min_hosts=1,
218+
max_requests_per_host=10)]),
219+
set([r1.url, r2.url]))

0 commit comments

Comments
 (0)