Skip to content

Commit a8f4572

Browse files
committed
added tests for cassandra distributed backend
1 parent 7ead222 commit a8f4572

File tree

4 files changed

+71
-25
lines changed

4 files changed

+71
-25
lines changed

frontera/contrib/backends/cassandra/__init__.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ def __init__(self, manager):
9999
settings = manager.settings
100100
cluster_hosts = settings.get('CASSANDRABACKEND_CLUSTER_HOSTS')
101101
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
102-
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
103102
models = settings.get('CASSANDRABACKEND_MODELS')
104103
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
105104

@@ -112,10 +111,6 @@ def __init__(self, manager):
112111
connection.setup(cluster_hosts, keyspace, **cluster_kwargs)
113112
connection.session.default_timeout = settings.get('CASSANDRABACKEND_REQUEST_TIMEOUT')
114113

115-
if drop_all_tables:
116-
for name, table in six.iteritems(self.models):
117-
drop_table(table)
118-
119114
self._metadata = None
120115
self._queue = None
121116
self._states = None
@@ -124,15 +119,25 @@ def __init__(self, manager):
124119
def strategy_worker(cls, manager):
125120
b = cls(manager)
126121
settings = manager.settings
127-
b._states = States(b.models['StateModel'], settings.get('STATE_CACHE_SIZE_LIMIT'))
122+
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
123+
state_model = b.models['StateModel']
124+
if drop_all_tables:
125+
drop_table(state_model)
126+
b._states = States(state_model, settings.get('STATE_CACHE_SIZE_LIMIT'))
128127
return b
129128

130129
@classmethod
131130
def db_worker(cls, manager):
132131
b = cls(manager)
133132
settings = manager.settings
134-
b._metadata = Metadata(b.models['MetadataModel'], settings.get('CASSANDRABACKEND_CACHE_SIZE'))
135-
b._queue = BroadCrawlingQueue(b.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
133+
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
134+
metadata_model = b.models['MetadataModel']
135+
queue_model = b.models['QueueModel']
136+
if drop_all_tables:
137+
drop_table(metadata_model)
138+
drop_table(queue_model)
139+
b._metadata = Metadata(metadata_model, settings.get('CASSANDRABACKEND_CACHE_SIZE'))
140+
b._queue = BroadCrawlingQueue(queue_model, settings.get('SPIDER_FEED_PARTITIONS'))
136141
return b
137142

138143
def frontier_stop(self):

frontera/contrib/backends/cassandra/components.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def schedule(self, batch):
172172
self.batch.execute()
173173

174174
def count(self):
175-
return self.queue_model.all().count()
175+
return self.queue_model.objects.count()
176176

177177

178178
class BroadCrawlingQueue(Queue):

frontera/contrib/backends/cassandra/revisiting.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def _create_queue_obj(self, fprint, score, request, partition_id, host_crc32, sc
8989
return q
9090

9191
def count(self):
92-
return self.queue_model.all().count()
92+
return self.queue_model.objects.count()
9393

9494

9595
class Backend(CommonRevisitingStorageBackendMixin, CassandraBackend):

tests/contrib/backends/cassandra/test_backend_cassandra.py

+56-15
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
drop_keyspace, drop_table,
1010
sync_table)
1111

12-
from frontera.contrib.backends.cassandra import CassandraBackend
12+
from frontera.contrib.backends.cassandra import CassandraBackend, Distributed
1313
from frontera.contrib.backends.cassandra.models import (FifoOrLIfoQueueModel,
1414
MetadataModel,
1515
QueueModel, StateModel)
@@ -29,7 +29,7 @@
2929
r4 = r3.copy()
3030

3131

32-
class BaseCassandraTest(object):
32+
class CassandraConfig(object):
3333

3434
def setUp(self):
3535
settings = Settings()
@@ -53,7 +53,7 @@ def _set_global_connection(self, hosts, port, timeout):
5353
connection.session.default_timeout = timeout
5454

5555

56-
class TestCassandraBackendModels(BaseCassandraTest, unittest.TestCase):
56+
class TestCassandraBackendModels(CassandraConfig, unittest.TestCase):
5757

5858
def test_pickled_fields(self):
5959
sync_table(MetadataModel)
@@ -131,7 +131,25 @@ def assert_db_values(self, model, _filter, fields):
131131
self.assertEqual(stored_value, original_value)
132132

133133

134-
class TestCassandraBackend(BaseCassandraTest, unittest.TestCase):
134+
class TestCassandraBackend(CassandraConfig, unittest.TestCase):
135+
136+
def init_backend(self):
137+
self.backend = CassandraBackend(self.manager)
138+
139+
@property
140+
def metadata(self):
141+
self.init_backend()
142+
return self.backend.metadata
143+
144+
@property
145+
def states(self):
146+
self.init_backend()
147+
return self.backend.states
148+
149+
@property
150+
def queue(self):
151+
self.init_backend()
152+
return self.backend.queue
135153

136154
def _get_tables(self):
137155
query = 'SELECT table_name FROM system_schema.tables WHERE keyspace_name = \'{}\''.format(self.keyspace)
@@ -141,7 +159,7 @@ def _get_tables(self):
141159
def test_tables_created(self):
142160
tables_before = self._get_tables()
143161
self.assertEqual(tables_before, [])
144-
CassandraBackend(self.manager)
162+
self.init_backend()
145163
tables_after = self._get_tables()
146164
self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue']))
147165

@@ -158,14 +176,14 @@ def _get_state_data():
158176
rows_before = _get_state_data()
159177
self.assertEqual(rows_before.count(), 1)
160178
self.manager.settings.CASSANDRABACKEND_DROP_ALL_TABLES = True
161-
CassandraBackend(self.manager)
162-
self.assertEqual(set(tables_before), set(['metadata', 'states', 'queue']))
179+
self.init_backend()
180+
tables_after = self._get_tables()
181+
self.assertEqual(set(tables_after), set(['metadata', 'states', 'queue']))
163182
rows_after = _get_state_data()
164183
self.assertEqual(rows_after.count(), 0)
165184

166185
def test_metadata(self):
167-
b = CassandraBackend(self.manager)
168-
metadata = b.metadata
186+
metadata = self.metadata
169187
metadata.add_seeds([r1, r2, r3])
170188
meta_qs = MetadataModel.objects.all()
171189
self.assertEqual(set([r1.url, r2.url, r3.url]), set([m.url for m in meta_qs]))
@@ -183,10 +201,9 @@ def test_metadata(self):
183201
self.assertEqual(meta_qs.count(), 3)
184202

185203
def test_state(self):
186-
b = CassandraBackend(self.manager)
187-
state = b.states
204+
state = self.states
188205
state.set_states([r1, r2, r3])
189-
self.assertEqual([r.meta[b'state'] for r in [r1, r2, r3]], [States.NOT_CRAWLED]*3)
206+
self.assertEqual([r.meta[b'state'] for r in [r1, r2, r3]], [States.NOT_CRAWLED] * 3)
190207
state.update_cache([r1, r2, r3])
191208
self.assertDictEqual(state._cache, {b'10': States.NOT_CRAWLED,
192209
b'11': States.NOT_CRAWLED,
@@ -209,11 +226,11 @@ def test_state(self):
209226

210227
def test_queue(self):
211228
self.manager.settings.SPIDER_FEED_PARTITIONS = 2
212-
b = CassandraBackend(self.manager)
213-
queue = b.queue
229+
queue = self.queue
214230
batch = [('10', 0.5, r1, True), ('11', 0.6, r2, True),
215231
('12', 0.7, r3, True)]
216232
queue.schedule(batch)
233+
self.assertEqual(queue.count(), 3)
217234
self.assertEqual(set([r.url for r in queue.get_next_requests(10, 0,
218235
min_requests=3,
219236
min_hosts=1,
@@ -224,10 +241,34 @@ def test_queue(self):
224241
min_hosts=1,
225242
max_requests_per_host=10)]),
226243
set([r1.url, r2.url]))
244+
self.assertEqual(queue.count(), 0)
245+
246+
247+
class TestCassandraDistributedBackend(TestCassandraBackend):
248+
249+
def init_backend(self):
250+
self.backend = Distributed(self.manager)
251+
self.strategy_worker = self.backend.strategy_worker(self.manager)
252+
self.db_worker = self.backend.db_worker(self.manager)
253+
254+
@property
255+
def metadata(self):
256+
self.init_backend()
257+
return self.db_worker.metadata
258+
259+
@property
260+
def states(self):
261+
self.init_backend()
262+
return self.strategy_worker.states
263+
264+
@property
265+
def queue(self):
266+
self.init_backend()
267+
return self.db_worker.queue
227268

228269

229270
class BaseCassandraIntegrationTests(object):
230-
obj = BaseCassandraTest()
271+
obj = CassandraConfig()
231272

232273
def setup_backend(self, method):
233274
self.obj.setUp()

0 commit comments

Comments
 (0)