Skip to content

Ensure partitioning is consistent accross message bus and queue #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/topics/cluster-setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ a common modules and import settings from it in component's modules.
'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware'
])

QUEUE_HOSTNAME_PARTITIONING = True

SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port
SCORING_TOPIC = 'frontier-scoring'
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'
Expand Down
22 changes: 22 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,28 @@ Default: ``0``

Per-spider setting, pointing spider to it's assigned partition.

.. setting:: SPIDER_FEED_PARTITIONER

SPIDER_LOG_PARTITIONER
-----------------------

Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``

Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of extracted links to the
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.

.. setting:: SPIDER_FEED_PARTITIONER

SPIDER_FEED_PARTITIONER
-----------------------

Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner``

Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of requests to the
spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is
``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname.

.. setting:: STATE_CACHE_SIZE

STATE_CACHE_SIZE
Expand Down
4 changes: 2 additions & 2 deletions examples/cluster/bc/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
#--------------------------------------------------------
# Crawl frontier backend
#--------------------------------------------------------
QUEUE_HOSTNAME_PARTITIONING = True
SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner'
URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint'

#MESSAGE_BUS='frontera.contrib.messagebus.kafkabus.MessageBus'
#KAFKA_LOCATION = 'localhost:9092'
#SCORING_GROUP = 'scrapy-scoring'
#SCORING_TOPIC = 'frontier-score'
#SCORING_TOPIC = 'frontier-score'
26 changes: 11 additions & 15 deletions frontera/contrib/backends/hbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from frontera.core.components import Metadata, Queue, States
from frontera.core.models import Request
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
from frontera.utils.misc import chunks, get_crc32
from frontera.utils.misc import chunks, get_crc32, load_object
from frontera.contrib.backends.remote.codecs.msgpack import Decoder, Encoder

from happybase import Connection
Expand Down Expand Up @@ -66,10 +66,9 @@ class HBaseQueue(Queue):

GET_RETRIES = 3

def __init__(self, connection, partitions, table_name, drop=False):
def __init__(self, connection, partitioner, table_name, drop=False):
self.connection = connection
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.partitioner = partitioner
self.logger = logging.getLogger("hbase.queue")
self.table_name = to_bytes(table_name)

Expand Down Expand Up @@ -141,14 +140,9 @@ def get_interval(score, resolution):
for request, score in batch:
domain = request.meta[b'domain']
fingerprint = request.meta[b'fingerprint']
if type(domain) == dict:
partition_id = self.partitioner.partition(domain[b'name'], self.partitions)
host_crc32 = get_crc32(domain[b'name'])
elif type(domain) == int:
partition_id = self.partitioner.partition_by_hash(domain, self.partitions)
host_crc32 = domain
else:
raise TypeError("domain of unknown type.")
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
host_crc32 = domain if type(domain) == int else get_crc32(key)
item = (unhexlify(fingerprint), host_crc32, self.encoder.encode_request(request), score)
score = 1 - score # because of lexicographical sort in HBase
rk = "%d_%s_%d" % (partition_id, "%0.2f_%0.2f" % get_interval(score, 0.01), random_str)
Expand Down Expand Up @@ -404,7 +398,9 @@ def __init__(self, manager):
self._min_hosts = settings.get('BC_MIN_HOSTS')
self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST')

self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS')
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self.partitioner = partitioner_cls(partitions)
host = choice(hosts) if type(hosts) in [list, tuple] else hosts
kwargs = {
'host': host,
Expand Down Expand Up @@ -435,7 +431,7 @@ def db_worker(cls, manager):
o = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('HBASE_DROP_ALL_TABLES')
o._queue = HBaseQueue(o.connection, o.queue_partitions,
o._queue = HBaseQueue(o.connection, o.partitioner,
settings.get('HBASE_QUEUE_TABLE'), drop=drop_all_tables)
o._metadata = HBaseMetadata(o.connection, settings.get('HBASE_METADATA_TABLE'), drop_all_tables,
settings.get('HBASE_USE_SNAPPY'), settings.get('HBASE_BATCH_SIZE'),
Expand Down Expand Up @@ -484,7 +480,7 @@ def get_next_requests(self, max_next_requests, **kwargs):
next_pages = []
self.logger.debug("Querying queue table.")
partitions = set(kwargs.pop('partitions', []))
for partition_id in range(0, self.queue_partitions):
for partition_id in self.partitioner.partitions:
if partition_id not in partitions:
continue
results = self.queue.get_next_requests(max_next_requests, partition_id,
Expand Down
47 changes: 20 additions & 27 deletions frontera/contrib/backends/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from frontera.core.components import Metadata, Queue, States
from frontera.core import OverusedBuffer
from frontera.utils.heap import Heap
from frontera.contrib.backends.partitioners import Crc32NamePartitioner
from frontera.utils.url import parse_domain_from_url_fast
from frontera.utils.misc import load_object
import six
from six.moves import map
from six.moves import range
Expand Down Expand Up @@ -52,12 +52,11 @@ def update_score(self, batch):


class MemoryQueue(Queue):
def __init__(self, partitions):
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
def __init__(self, partitioner):
self.partitioner = partitioner
self.logger = logging.getLogger("memory.queue")
self.heap = {}
for partition in self.partitions:
for partition in self.partitioner.partitions:
self.heap[partition] = Heap(self._compare_pages)

def count(self):
Expand All @@ -70,31 +69,26 @@ def schedule(self, batch):
for fprint, score, request, schedule in batch:
if schedule:
request.meta[b'_scr'] = score
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
if not hostname:
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
partition_id = self.partitions[0]
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
self.heap[partition_id].push(request)

def _compare_pages(self, first, second):
return cmp(first.meta[b'_scr'], second.meta[b'_scr'])


class MemoryDequeQueue(Queue):
def __init__(self, partitions, is_fifo=True):
def __init__(self, partitioner, is_fifo=True):
"""
Deque-based queue (see collections module). Efficient queue for LIFO and FIFO strategies.
:param partitions: int count of partitions
:param partitioner: Partitioner
:param type: bool, True for FIFO, False for LIFO
"""
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.partitioner = partitioner
self.logger = logging.getLogger("memory.dequequeue")
self.queues = {}
self.is_fifo = is_fifo
for partition in self.partitions:
for partition in self.partitioner.partitions:
self.queues[partition] = deque()

def count(self):
Expand All @@ -112,12 +106,8 @@ def schedule(self, batch):
for fprint, score, request, schedule in batch:
if schedule:
request.meta[b'_scr'] = score
_, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url)
if not hostname:
self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint)
partition_id = self.partitions[0]
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
key = self.partitioner.get_key(request)
partition_id = self.partitioner.partition(key)
self.queues[partition_id].append(request)


Expand Down Expand Up @@ -165,6 +155,9 @@ def __init__(self, manager):
settings = manager.settings
self._metadata = MemoryMetadata()
self._states = MemoryStates(settings.get("STATE_CACHE_SIZE"))
partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self._partitioner = partitioner_cls(partitions)
self._queue = self._create_queue(settings)
self._id = 0

Expand Down Expand Up @@ -222,27 +215,27 @@ def _compare_pages(self, first, second):

class MemoryFIFOBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryDequeQueue(self._partitioner)


class MemoryLIFOBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'), is_fifo=False)
return MemoryDequeQueue(self._partitioner, is_fifo=False)


class MemoryDFSBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryDFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryDFSQueue(self._partitioner)


class MemoryBFSBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryBFSQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryBFSQueue(self._partitioner)


class MemoryRandomBackend(MemoryBaseBackend):
def _create_queue(self, settings):
return MemoryRandomQueue(settings.get('SPIDER_FEED_PARTITIONS'))
return MemoryRandomQueue(self._partitioner)


class MemoryDFSOverusedBackend(MemoryDFSBackend):
Expand Down
36 changes: 29 additions & 7 deletions frontera/contrib/backends/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,43 @@

from frontera.core.components import Partitioner
from frontera.utils.misc import get_crc32
from frontera.utils.url import parse_domain_from_url_fast


class Crc32NamePartitioner(Partitioner):
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
if key is None:
return self.partitions[0]
value = get_crc32(key)
return self.partition_by_hash(value, partitions if partitions else self.partitions)
return partitions[0]
elif type(key) == int:
value = key
else:
value = get_crc32(key)
return self.partition_by_hash(value, partitions)

def partition_by_hash(self, value, partitions):
size = len(partitions)
idx = value % size
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)
@staticmethod
def get_key(request):
domain = request.meta.get(b'domain')
if domain is not None:
if type(domain) == dict:
return domain[b'name']
elif type(domain) == int:
return domain
else:
raise TypeError("domain of unknown type.")

try:
_, name, _, _, _, _ = parse_domain_from_url_fast(request.url)
except Exception:
return None
else:
return name.encode('utf-8', 'ignore')


class FingerprintPartitioner(Partitioner):
Expand All @@ -32,5 +53,6 @@ def partition(self, key, partitions=None):
idx = value[0] % len(partitions)
return partitions[idx]

def __call__(self, key, all_partitions, available):
return self.partition(key, all_partitions)
@staticmethod
def get_key(request):
return request.meta[b'fingerprint']
17 changes: 11 additions & 6 deletions frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def __init__(self, manager):
session.execute(table.delete())
session.commit()
session.close()

partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS')))
partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER'))
self.partitioner = partitioner_cls(partitions)

self._metadata = Metadata(self.session_cls, self.models['MetadataModel'],
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
self._states = States(self.session_cls, self.models['StateModel'],
Expand All @@ -48,7 +53,7 @@ def frontier_stop(self):
self.engine.dispose()

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

@property
def queue(self):
Expand All @@ -67,23 +72,23 @@ class FIFOBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy FIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
ordering='created')


class LIFOBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy LIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner,
ordering='created_desc')


class DFSBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy DFS Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

def _get_score(self, obj):
return -obj.meta[b'depth']
Expand All @@ -93,7 +98,7 @@ class BFSBackend(SQLAlchemyBackend):
component_name = 'SQLAlchemy BFS Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'))
return Queue(self.session_cls, self.models['QueueModel'], self.partitioner)

def _get_score(self, obj):
return obj.meta[b'depth']
Expand Down Expand Up @@ -170,7 +175,7 @@ def db_worker(cls, manager):

b._metadata = Metadata(b.session_cls, metadata_m,
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
b._queue = Queue(b.session_cls, queue_m, b.partitioner)
return b

@property
Expand Down
Loading