Skip to content

Commit 1455b6b

Browse files
committed
Added Lifo, fifo, dfs, bfs backend and correspoding tests for it
1 parent 75a9567 commit 1455b6b

File tree

9 files changed

+145
-101
lines changed

9 files changed

+145
-101
lines changed

frontera/contrib/backends/__init__.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,6 @@ def finished(self):
9292

9393
class CommonStorageBackend(CommonBackend):
9494

95-
def _create_queue(self, settings):
96-
if not issubclass(self.queue_component, BaseQueue):
97-
raise TypeError('expected queue_component to '
98-
'belong to class: %s, got %s instead' % (type(BaseQueue).__name__,
99-
type(self.queue_component).__name__))
100-
return self.queue_component(self.session,
101-
self.models['QueueModel'],
102-
settings.get('SPIDER_FEED_PARTITIONS'))
103-
10495
@property
10596
def queue(self):
10697
return self._queue

frontera/contrib/backends/cassandra/__init__.py

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
class CassandraBackend(CommonStorageBackend):
1616

17-
queue_component = Queue
18-
1917
def __init__(self, manager):
2018
self.manager = manager
2119
settings = manager.settings
@@ -28,35 +26,70 @@ def __init__(self, manager):
2826
self.models = dict([(name, load_object(cls)) for name, cls in six.iteritems(models)])
2927
cluster_kwargs = {
3028
'port': cluster_port,
31-
'compression': True
29+
'compression': True,
3230
}
33-
self.cluster = Cluster(contact_points=cluster_hosts, **cluster_kwargs)
34-
self.session = self.cluster.connect(keyspace)
35-
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
36-
self.session.default_timeout = connection.session.default_timeout = \
37-
settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
31+
if not connection.cluster:
32+
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
33+
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
3834

3935
if drop_all_tables:
4036
for name, table in six.iteritems(self.models):
4137
drop_table(table)
4238

43-
for name, table in six.iteritems(self.models):
44-
sync_table(table)
45-
46-
self._metadata = Metadata(self.session,
47-
self.models['MetadataModel'],
48-
settings.get('CASSANDRABACKEND_CACHE_SIZE'))
49-
self._states = States(self.session,
50-
self.models['StateModel'],
51-
settings.get('STATE_CACHE_SIZE_LIMIT'))
39+
self._metadata = Metadata(self.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'))
40+
self._states = States(self.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'))
5241
self._queue = self._create_queue(settings)
5342

5443
def frontier_stop(self):
5544
self.states.flush()
56-
self.session.shutdown()
45+
46+
def _create_queue(self, settings):
47+
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
48+
49+
50+
class FIFOBackend(CassandraBackend):
51+
component_name = 'Cassandra FIFO Backend'
52+
53+
def _create_queue(self, settings):
54+
return Queue(self.models['FifoOrLIfoQueueModel'],
55+
settings.get('SPIDER_FEED_PARTITIONS'),
56+
ordering='created')
57+
58+
59+
class LIFOBackend(CassandraBackend):
60+
component_name = 'Cassandra LIFO Backend'
61+
62+
def _create_queue(self, settings):
63+
return Queue(self.models['FifoOrLIfoQueueModel'],
64+
settings.get('SPIDER_FEED_PARTITIONS'),
65+
ordering='created_desc')
66+
67+
68+
class DFSBackend(CassandraBackend):
69+
component_name = 'Cassandra DFS Backend'
70+
71+
def _create_queue(self, settings):
72+
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
73+
74+
def _get_score(self, obj):
75+
return -obj.meta[b'depth']
76+
77+
78+
class BFSBackend(CassandraBackend):
79+
component_name = 'Cassandra BFS Backend'
80+
81+
def _create_queue(self, settings):
82+
return Queue(self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
83+
84+
def _get_score(self, obj):
85+
return obj.meta[b'depth']
5786

5887

5988
BASE = CassandraBackend
89+
LIFO = LIFOBackend
90+
FIFO = FIFOBackend
91+
DFS = DFSBackend
92+
BFS = BFSBackend
6093

6194

6295
class Distributed(CommonDistributedStorageBackend):
@@ -111,6 +144,6 @@ def db_worker(cls, manager):
111144
sync_table(metadata_m)
112145
sync_table(queue_m)
113146

114-
b._metadata = Metadata(b.session, metadata_m)
115-
b._queue = Queue(b.session, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
147+
b._metadata = Metadata(metadata_m)
148+
b._queue = Queue(queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
116149
return b

frontera/contrib/backends/cassandra/components.py

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
import six
4-
import sys
5-
import traceback
63
import uuid
74
from time import time
85

6+
import six
97
from cachetools import LRUCache
10-
from cassandra import (OperationTimedOut, ReadFailure, ReadTimeout,
11-
WriteFailure, WriteTimeout)
8+
from cassandra.cqlengine.management import sync_table
129
from cassandra.cqlengine.query import BatchQuery
10+
from w3lib.util import to_bytes, to_native_str
1311

1412
from frontera.contrib.backends import CreateOrModifyPageMixin
1513
from frontera.contrib.backends.memory import MemoryStates
@@ -20,36 +18,15 @@
2018
from frontera.utils.misc import chunks, get_crc32
2119
from frontera.utils.url import parse_domain_from_url_fast
2220

23-
from w3lib.util import to_bytes, to_native_str
24-
25-
26-
def _retry(func):
27-
def func_wrapper(self, *args, **kwargs):
28-
tries = 5
29-
count = 0
30-
while count < tries:
31-
try:
32-
return func(self, *args, **kwargs)
33-
except (OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure) as exc:
34-
ex_type, ex, tb = sys.exc_info()
35-
tries += 1
36-
self.logger.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
37-
del tb
38-
self.logger.info("Tries left %i" % tries - count)
39-
40-
raise exc
41-
42-
return func_wrapper
43-
4421

4522
class Metadata(BaseMetadata, CreateOrModifyPageMixin):
4623

47-
def __init__(self, session, model_cls, cache_size):
48-
self.session = session
24+
def __init__(self, model_cls, cache_size):
4925
self.model = model_cls
5026
self.cache = LRUCache(cache_size)
5127
self.batch = BatchQuery()
5228
self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Metadata")
29+
sync_table(model_cls)
5330

5431
def frontier_stop(self):
5532
pass
@@ -80,7 +57,9 @@ def links_extracted(self, request, links):
8057
self.batch.execute()
8158

8259
def update_score(self, batch):
83-
for fprint, (score, url, schedule) in six.iteritems(batch):
60+
if isinstance(batch, dict):
61+
batch = [(fprint, score, url, schedule) for fprint, (score, url, schedule) in six.iteritems(batch)]
62+
for fprint, score, url, schedule in batch:
8463
page = self.cache[fprint]
8564
page.fingerprint = to_native_str(fprint)
8665
page.score = score
@@ -93,20 +72,20 @@ def _add_to_batch_and_update_cache(self, page):
9372

9473
class States(MemoryStates):
9574

96-
def __init__(self, session, model_cls, cache_size_limit):
75+
def __init__(self, model_cls, cache_size_limit):
9776
super(States, self).__init__(cache_size_limit)
98-
self.session = session
9977
self.model = model_cls
10078
self.batch = BatchQuery()
10179
self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.States")
80+
sync_table(model_cls)
10281

10382
def frontier_stop(self):
10483
self.flush()
10584

10685
def fetch(self, fingerprints):
10786
to_fetch = [to_native_str(f) for f in fingerprints if f not in self._cache]
10887
self.logger.debug("cache size %s", len(self._cache))
109-
self.logger.debug("to fetch %d from %d", (len(to_fetch), len(fingerprints)))
88+
self.logger.debug("to fetch %d from %d", len(to_fetch), len(fingerprints))
11089

11190
for chunk in chunks(to_fetch, 128):
11291
for state in self.model.objects.filter(fingerprint__in=chunk):
@@ -123,14 +102,14 @@ def flush(self, force_clear=False):
123102

124103
class Queue(BaseQueue):
125104

126-
def __init__(self, session, queue_cls, partitions, ordering='default'):
127-
self.session = session
105+
def __init__(self, queue_cls, partitions, ordering='default'):
128106
self.queue_model = queue_cls
129107
self.logger = logging.getLogger("frontera.contrib.backends.cassandra.components.Queue")
130108
self.partitions = [i for i in range(0, partitions)]
131109
self.partitioner = Crc32NamePartitioner(self.partitions)
132110
self.ordering = ordering
133111
self.batch = BatchQuery()
112+
sync_table(queue_cls)
134113

135114
def frontier_stop(self):
136115
pass
@@ -228,7 +207,7 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
228207
tries, limit, count, len(queue.keys()))
229208
queue.clear()
230209
count = 0
231-
for item in self._order_by(self.queue_model.filter(partition_id=partition_id)).limit(max_n_requests):
210+
for item in self._order_by(self.queue_model.filter(partition_id=partition_id).allow_filtering()).limit(max_n_requests):
232211
if item.host_crc32 not in queue:
233212
queue[item.host_crc32] = []
234213
if max_requests_per_host is not None and len(queue[item.host_crc32]) > max_requests_per_host:

frontera/contrib/backends/cassandra/models.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,27 @@ class QueueModel(Model):
8888

8989
def __repr__(self):
9090
return '<Queue:%s (%s)>' % (self.url, self.id)
91+
92+
93+
class FifoOrLIfoQueueModel(Model):
94+
# Separate models are needed as
95+
# order_by is supported on columns
96+
# only in the order, the clustering
97+
# keys were created
98+
99+
# Also Inheriting model has some runtime issues
100+
# mostly a bug in the driver
101+
# Hence the duplicate code
102+
103+
partition_id = Integer(primary_key=True)
104+
score = Float(required=True)
105+
created_at = BigInt(primary_key=True)
106+
id = UUID(primary_key=True)
107+
url = Text(required=True)
108+
fingerprint = Text(required=True)
109+
host_crc32 = Integer(required=True)
110+
meta = PickleDict()
111+
headers = PickleDict()
112+
cookies = PickleDict()
113+
method = Text()
114+
depth = SmallInt()

frontera/contrib/backends/cassandra/revisiting.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import logging
44
from datetime import datetime, timedelta
5-
from time import sleep, time
5+
from time import time
66

77
from cassandra.cqlengine import columns
88
from cassandra.cqlengine.models import Model
@@ -22,24 +22,6 @@ class RevisitingQueueModel(Model):
2222
crawl_at = columns.DateTime(required=True, default=datetime.now(), index=True)
2323

2424

25-
def retry_and_rollback(func):
26-
def func_wrapper(self, *args, **kwargs):
27-
tries = 5
28-
while True:
29-
try:
30-
return func(self, *args, **kwargs)
31-
except Exception as exc:
32-
self.logger.exception(exc)
33-
sleep(5)
34-
tries -= 1
35-
if tries > 0:
36-
self.logger.info("Tries left %i" % tries)
37-
continue
38-
else:
39-
raise exc
40-
return func_wrapper
41-
42-
4325
class RevisitingQueue(BaseQueue):
4426
def __init__(self, session, queue_cls, partitions):
4527
self.session = session()
@@ -64,7 +46,6 @@ def get_next_requests(self, max_n_requests, partition_id, **kwargs):
6446
self.logger.exception(exc)
6547
return results
6648

67-
@retry_and_rollback
6849
def schedule(self, batch):
6950
for fprint, score, request, schedule_at in batch:
7051
if schedule_at:
@@ -107,7 +88,6 @@ def _create_queue(self, obj, fingerprint, score, partition_id, host_crc32, creat
10788

10889
return db_queue
10990

110-
@retry_and_rollback
11191
def count(self):
11292
return self.session.query(self.queue_model).count()
11393

frontera/contrib/backends/sqlalchemy/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212

1313
class SQLAlchemyBackend(CommonStorageBackend):
1414

15-
queue_component = Queue
16-
1715
def __init__(self, manager):
1816
self.manager = manager
1917
settings = manager.settings
@@ -48,6 +46,9 @@ def frontier_stop(self):
4846
super(SQLAlchemyBackend, self).frontier_stop()
4947
self.engine.dispose()
5048

49+
def _create_queue(self, settings):
50+
return Queue(self.session, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
51+
5152

5253
class FIFOBackend(SQLAlchemyBackend):
5354
component_name = 'SQLAlchemy FIFO Backend'

frontera/settings/default_settings.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
CASSANDRABACKEND_MODELS = {
1515
'MetadataModel': 'frontera.contrib.backends.cassandra.models.MetadataModel',
1616
'StateModel': 'frontera.contrib.backends.cassandra.models.StateModel',
17-
'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel'
17+
'QueueModel': 'frontera.contrib.backends.cassandra.models.QueueModel',
18+
'FifoOrLIfoQueueModel': 'frontera.contrib.backends.cassandra.models.FifoOrLIfoQueueModel',
1819
}
1920
CASSANDRABACKEND_REVISIT_INTERVAL = timedelta(days=1)
2021
CASSANDRABACKEND_CLUSTER_HOSTS = ['127.0.0.1']

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@
7373
'Twisted'
7474
],
7575
'cassandra': [
76-
'cassandra-driver==3.7.0'
76+
'cassandra-driver==3.7.0',
77+
'cachetools'
7778
]
7879
},
7980
tests_require=[

0 commit comments

Comments
 (0)