Skip to content

Commit 06be952

Browse files
committed
Added cassandra revisiting backend and tests for it
1 parent 8d49ac0 commit 06be952

File tree

9 files changed

+158
-163
lines changed

9 files changed

+158
-163
lines changed

frontera/contrib/backends/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from frontera import Backend
77
from frontera.core.components import States, Queue as BaseQueue, DistributedBackend
88
from frontera.core.models import Request, Response
9+
from frontera.utils.misc import utcnow_timestamp
910

1011
from w3lib.util import to_native_str
1112

@@ -182,3 +183,26 @@ def _modify_page(self, obj):
182183
db_page.cookies = obj.request.cookies
183184
db_page.status_code = obj.status_code
184185
return db_page
186+
187+
188+
class CommonRevisitingStorageBackendMixin(object):
189+
190+
def _schedule(self, requests):
191+
batch = []
192+
for request in requests:
193+
if request.meta[b'state'] in [States.NOT_CRAWLED]:
194+
request.meta[b'crawl_at'] = utcnow_timestamp()
195+
elif request.meta[b'state'] in [States.CRAWLED, States.ERROR]:
196+
request.meta[b'crawl_at'] = utcnow_timestamp() + self.interval
197+
else:
198+
continue # QUEUED
199+
batch.append((request.meta[b'fingerprint'], self._get_score(request), request, True))
200+
self.queue.schedule(batch)
201+
self.metadata.update_score(batch)
202+
self.queue_size += len(batch)
203+
204+
def page_crawled(self, response):
205+
super(CommonRevisitingStorageBackendMixin, self).page_crawled(response)
206+
self.states.set_states(response.request)
207+
self._schedule([response.request])
208+
self.states.update_cache(response.request)

frontera/contrib/backends/cassandra/__init__.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from __future__ import absolute_import
22

33
import six
4-
from cassandra.cluster import Cluster
54
from cassandra.cqlengine import connection
6-
from cassandra.cqlengine.management import drop_table, sync_table
5+
from cassandra.cqlengine.management import drop_table
76

87
from frontera.contrib.backends import (CommonDistributedStorageBackend,
98
CommonStorageBackend)
10-
from frontera.contrib.backends.cassandra.components import (Metadata, Queue,
11-
States)
9+
from frontera.contrib.backends.cassandra.components import (Metadata,
10+
BroadCrawlingQueue,
11+
Queue, States)
1212
from frontera.utils.misc import load_object
1313

1414

@@ -99,17 +99,22 @@ def __init__(self, manager):
9999
settings = manager.settings
100100
cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS')
101101
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
102-
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
102+
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
103103
models = settings.get('CASSANDRABACKEND_MODELS')
104+
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
105+
106+
self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)])
104107
cluster_kwargs = {
105108
'port': cluster_port,
106-
'compression': True
109+
'compression': True,
107110
}
108-
self.cluster = Cluster(cluster_hosts, **cluster_kwargs)
109-
self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)])
111+
if not connection.cluster:
112+
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
113+
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
110114

111-
self.session.set_keyspace(keyspace)
112-
connection.set_session(self.session)
115+
if drop_all_tables:
116+
for name, table in six.iteritems(self.models):
117+
drop_table(table)
113118

114119
self._metadata = None
115120
self._queue = None
@@ -119,32 +124,17 @@ def __init__(self, manager):
119124
def strategy_worker(cls, manager):
120125
b = cls(manager)
121126
settings = manager.settings
122-
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
123-
model = b.models['StateModel']
124-
125-
if drop_all_tables:
126-
drop_table(model)
127-
128-
sync_table(model)
129-
130-
b._states = States(b.session, model, settings.get('STATE_CACHE_SIZE_LIMIT'))
127+
b._states = States(b.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'))
131128
return b
132129

133130
@classmethod
134131
def db_worker(cls, manager):
135132
b = cls(manager)
136133
settings = manager.settings
137-
drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
138-
metadata_m = b.models['MetadataModel']
139-
queue_m = b.models['QueueModel']
140-
141-
if drop:
142-
drop_table(metadata_m)
143-
drop_table(queue_m)
144-
145-
sync_table(metadata_m)
146-
sync_table(queue_m)
147-
148-
b._metadata = Metadata(metadata_m)
149-
b._queue = Queue(queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
134+
b._metadata = Metadata(b.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'))
135+
b._queue = BroadCrawlingQueue(b.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
150136
return b
137+
138+
def frontier_stop(self):
139+
super(Distributed, self).frontier_stop()
140+
connection.unregister_connection('default')

frontera/contrib/backends/cassandra/components.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
132132
"""
133133
results = []
134134
try:
135-
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).limit(max_n_requests):
135+
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).
136+
allow_filtering()).limit(max_n_requests):
136137
method = item.method or b'GET'
137138
r = Request(item.url, method=method, meta=item.meta, headers=item.headers, cookies=item.cookies)
138139
r.meta[b'fingerprint'] = to_bytes(item.fingerprint)
@@ -207,7 +208,8 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
207208
tries, limit, count, len(queue.keys()))
208209
queue.clear()
209210
count = 0
210-
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).limit(max_n_requests):
211+
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).
212+
allow_filtering()).limit(max_n_requests):
211213
if item.host_crc32 not in queue:
212214
queue[item.host_crc32] = []
213215
if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host:
@@ -227,8 +229,11 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
227229
for items in six.itervalues(queue):
228230
for item in items:
229231
method = item.method or b'GET'
230-
results.append(Request(item.url, method=method,
231-
meta=item.meta, headers=item.headers, cookies=item.cookies))
232+
results.append(Request(item.url,
233+
method=method,
234+
meta=item.meta,
235+
headers=item.headers,
236+
cookies=item.cookies))
232237
item.batch(self.batch).delete()
233238
self.batch.execute()
234239
return results

frontera/contrib/backends/cassandra/models.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,9 @@ def __repr__(self):
7070
return '<State:%s=%s>' % (self.fingerprint, self.state)
7171

7272

73-
class QueueModel(Model):
74-
__table_name__ = 'queue'
73+
class BaseQueueModel(Model):
74+
__abstract__ = True
7575

76-
partition_id = Integer(primary_key=True)
77-
score = Float(primary_key=True)
78-
created_at = BigInt(primary_key=True)
79-
id = UUID(primary_key=True)
8076
url = Text(required=True)
8177
fingerprint = Text(required=True)
8278
host_crc32 = Integer(required=True)
@@ -90,25 +86,32 @@ def __repr__(self):
9086
return '<Queue:%s (%s)>' % (self.url, self.id)
9187

9288

93-
class FifoOrLIfoQueueModel(Model):
94-
# Separate models are needed as
95-
# order_by is supported on columns
96-
# only in the order, the clustering
97-
# keys were created
89+
class QueueModel(BaseQueueModel):
90+
__abstract__ = False
91+
__table_name__ = 'queue'
9892

99-
# Also Inheriting model has some runtime issues
100-
# mostly a bug in the driver
101-
# Hence the duplicate code
93+
partition_id = Integer(primary_key=True)
94+
score = Float(primary_key=True)
95+
created_at = BigInt(primary_key=True)
96+
id = UUID(primary_key=True)
97+
98+
99+
class FifoOrLIfoQueueModel(BaseQueueModel):
100+
__abstract__ = False
101+
__table_name__ = 'fifo_lifo_queue'
102102

103103
partition_id = Integer(primary_key=True)
104104
score = Float(required=True)
105105
created_at = BigInt(primary_key=True)
106106
id = UUID(primary_key=True)
107-
url = Text(required=True)
108-
fingerprint = Text(required=True)
109-
host_crc32 = Integer(required=True)
110-
meta = PickleDict()
111-
headers = PickleDict()
112-
cookies = PickleDict()
113-
method = Text()
114-
depth = SmallInt()
107+
108+
109+
class RevisitingQueueModel(BaseQueueModel):
110+
__abstract__ = False
111+
__table_name__ = 'revisiting_queue'
112+
113+
partition_id = Integer(primary_key=True)
114+
crawl_at = BigInt(primary_key=True)
115+
id = UUID(primary_key=True)
116+
score = Float(required=True)
117+
created_at = BigInt(required=True)
Lines changed: 54 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,53 @@
11
# -*- coding: utf-8 -*-
2-
import json
32
import logging
4-
from datetime import datetime, timedelta
3+
import uuid
4+
from datetime import timedelta
55
from time import time
66

7-
from cassandra.cqlengine import columns
8-
from cassandra.cqlengine.models import Model
7+
from cassandra.cqlengine.management import sync_table
8+
from cassandra.cqlengine.query import BatchQuery
9+
from w3lib.util import to_native_str
910

1011
from frontera import Request
12+
from frontera.contrib.backends import CommonRevisitingStorageBackendMixin
1113
from frontera.contrib.backends.cassandra import CassandraBackend
14+
from frontera.contrib.backends.cassandra.models import RevisitingQueueModel
1215
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
1316
from frontera.core.components import Queue as BaseQueue
1417
from frontera.core.components import States
15-
from frontera.utils.misc import get_crc32
18+
from frontera.utils.misc import get_crc32, utcnow_timestamp
1619
from frontera.utils.url import parse_domain_from_url_fast
1720

1821

19-
class RevisitingQueueModel(Model):
20-
__table_name__ = 'revisiting_queue'
21-
22-
crawl_at = columns.DateTime(required=True, default=datetime.now(), index=True)
23-
24-
2522
class RevisitingQueue(BaseQueue):
26-
def __init__(self, session, queue_cls, partitions):
27-
self.session = session()
23+
def __init__(self, queue_cls, partitions):
2824
self.queue_model = queue_cls
29-
self.logger = logging.getLogger("frontera.contrib.backends.sqlalchemy.revisiting.RevisitingQueue")
25+
self.logger = logging.getLogger("frontera.contrib.backends.cassandra.revisiting.RevisitingQueue")
3026
self.partitions = [i for i in range(0, partitions)]
3127
self.partitioner = Crc32NamePartitioner(self.partitions)
28+
self.batch = BatchQuery()
29+
sync_table(queue_cls)
3230

3331
def frontier_stop(self):
3432
pass
3533

3634
def get_next_requests(self, max_n_requests, partition_id, **kwargs):
3735
results = []
3836
try:
39-
for item in self.queue_model.objects.filter(crawl_at=datetime.utcnow(), partition_id=partition_id).\
40-
limit(max_n_requests):
37+
for item in self.queue_model.objects.filter(partition_id=partition_id,
38+
crawl_at__lte=utcnow_timestamp()).limit(max_n_requests):
4139
method = 'GET' if not item.method else item.method
4240
results.append(Request(item.url, method=method, meta=item.meta, headers=item.headers,
4341
cookies=item.cookies))
44-
item.delete()
42+
item.batch(self.batch).delete()
43+
self.batch.execute()
4544
except Exception as exc:
4645
self.logger.exception(exc)
4746
return results
4847

4948
def schedule(self, batch):
50-
for fprint, score, request, schedule_at in batch:
51-
if schedule_at:
49+
for fprint, score, request, schedule in batch:
50+
if schedule:
5251
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
5352
if not hostname:
5453
self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint))
@@ -57,65 +56,46 @@ def schedule(self, batch):
5756
else:
5857
partition_id = self.partitioner.partition(hostname, self.partitions)
5958
host_crc32 = get_crc32(hostname)
60-
created_at = time()*1E+6
61-
q = self._create_queue(request, fprint, score, partition_id, host_crc32, created_at)
62-
63-
q.save()
64-
request.meta['state'] = States.QUEUED
65-
66-
def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, created_at):
67-
db_queue = self.queue_model()
68-
db_queue.fingerprint = fingerprint
69-
db_queue.score = score
70-
db_queue.partition_id = partition_id
71-
db_queue.host_crc32 = host_crc32
72-
db_queue.url = obj.url
73-
db_queue.created_at = created_at
74-
75-
new_dict = {}
76-
for kmeta, vmeta in obj.meta.iteritems():
77-
if type(vmeta) is dict:
78-
new_dict[kmeta] = json.dumps(vmeta)
79-
else:
80-
new_dict[kmeta] = str(vmeta)
81-
82-
db_queue.meta = new_dict
83-
db_queue.depth = 0
84-
85-
db_queue.headers = obj.headers
86-
db_queue.method = obj.method
87-
db_queue.cookies = obj.cookies
88-
89-
return db_queue
59+
schedule_at = request.meta[b'crawl_at'] if b'crawl_at' in request.meta else utcnow_timestamp()
60+
q = self.queue_model(id=uuid.uuid4(),
61+
fingerprint=to_native_str(fprint),
62+
score=score,
63+
url=request.url,
64+
meta=request.meta,
65+
headers=request.headers,
66+
cookies=request.cookies,
67+
method=to_native_str(request.method),
68+
partition_id=partition_id,
69+
host_crc32=host_crc32,
70+
created_at=time() * 1E+6,
71+
crawl_at=schedule_at)
72+
q.batch(self.batch).save()
73+
request.meta[b'state'] = States.QUEUED
74+
self.batch.execute()
75+
76+
def _create_queue_obj(self, fprint, score, request, partition_id, host_crc32, schedule_at):
77+
q = self.queue_model(id=uuid.uuid4(),
78+
fingerprint=to_native_str(fprint),
79+
score=score,
80+
url=request.url,
81+
meta=request.meta,
82+
headers=request.headers,
83+
cookies=request.cookies,
84+
method=to_native_str(request.method),
85+
partition_id=partition_id,
86+
host_crc32=host_crc32,
87+
created_at=time() * 1E+6,
88+
crawl_at=schedule_at)
89+
return q
9090

9191
def count(self):
92-
return self.session.query(self.queue_model).count()
92+
return self.queue_model.all().count()
9393

9494

95-
class Backend(CassandraBackend):
95+
class Backend(CommonRevisitingStorageBackendMixin, CassandraBackend):
9696

9797
def _create_queue(self, settings):
98-
self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL")
98+
self.interval = settings.get("CASSANDRABACKEND_REVISIT_INTERVAL")
9999
assert isinstance(self.interval, timedelta)
100-
return RevisitingQueue(self.session, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS'))
101-
102-
def _schedule(self, requests):
103-
batch = []
104-
queue_incr = 0
105-
for request in requests:
106-
if request.meta['state'] in [States.NOT_CRAWLED, None]:
107-
schedule_at = datetime.utcnow()
108-
elif request.meta['state'] in [States.CRAWLED, States.ERROR]:
109-
schedule_at = datetime.utcnow() + self.interval
110-
else: # QUEUED
111-
schedule_at = None
112-
batch.append((request.meta['fingerprint'], self._get_score(request), request, schedule_at))
113-
if schedule_at:
114-
queue_incr += 1
115-
self.queue.schedule(batch)
116-
self.metadata.update_score(batch)
117-
self.queue_size += queue_incr
118-
119-
def page_crawled(self, response, links):
120-
super(Backend, self).page_crawled(response, links)
121-
self._schedule([response.request])
100+
self.interval = self.interval.total_seconds()
101+
return RevisitingQueue(RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS'))

0 commit comments

Comments
 (0)