From 1da2d7346d00c0cb3f5add83fe0267db7d6d417f Mon Sep 17 00:00:00 2001 From: isra17 Date: Tue, 20 Jun 2017 17:58:14 -0400 Subject: [PATCH] Ensure partitioning is consistent accross message bus and queue --- docs/source/topics/cluster-setup.rst | 3 +- docs/source/topics/frontera-settings.rst | 22 +++++++++ examples/cluster/bc/config/common.py | 4 +- frontera/contrib/backends/hbase.py | 26 +++++----- frontera/contrib/backends/memory/__init__.py | 47 ++++++++----------- frontera/contrib/backends/partitioners.py | 36 +++++++++++--- .../contrib/backends/sqlalchemy/__init__.py | 17 ++++--- .../contrib/backends/sqlalchemy/components.py | 18 ++++--- .../contrib/backends/sqlalchemy/revisiting.py | 19 ++++---- frontera/contrib/messagebus/kafkabus.py | 31 +++++++----- .../contrib/messagebus/zeromq/__init__.py | 32 +++++++++---- frontera/core/components.py | 9 ++++ frontera/settings/default_settings.py | 5 +- frontera/worker/db.py | 29 +++--------- tests/contrib/backends/hbase/test_hbase.py | 7 +-- tests/test_message_bus.py | 2 +- tests/test_partitioners.py | 15 +++++- 17 files changed, 192 insertions(+), 130 deletions(-) diff --git a/docs/source/topics/cluster-setup.rst b/docs/source/topics/cluster-setup.rst index 26c51c117..cbf84e235 100644 --- a/docs/source/topics/cluster-setup.rst +++ b/docs/source/topics/cluster-setup.rst @@ -58,7 +58,8 @@ a common modules and import settings from it in component's modules. 'frontera.contrib.middlewares.fingerprint.DomainFingerprintMiddleware' ]) - QUEUE_HOSTNAME_PARTITIONING = True + + SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner' KAFKA_LOCATION = 'localhost:9092' # your Kafka broker host:port SCORING_TOPIC = 'frontier-scoring' URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint' diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index 805fc52f7..a23122fb9 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -332,6 +332,28 @@ Default: ``0`` Per-spider setting, pointing spider to it's assigned partition. +.. setting:: SPIDER_FEED_PARTITIONER + +SPIDER_LOG_PARTITIONER +----------------------- + +Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner`` + +Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of extracted links to the +spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is +``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname. + +.. setting:: SPIDER_FEED_PARTITIONER + +SPIDER_FEED_PARTITIONER +----------------------- + +Default: ``frontera.contrib.backends.partitioners.FingerprintPartitioner`` + +Partitioner used to calculate a :term:`spider feed` partition id. This affect the distribution of requests to the +spiders. Default value partition based on the request ``fingerprint``. The other available built-in value is +``frontera.contrib.backends.partitioners.Crc32NamePartitioner`` to partition based on the hostname. + .. setting:: STATE_CACHE_SIZE STATE_CACHE_SIZE diff --git a/examples/cluster/bc/config/common.py b/examples/cluster/bc/config/common.py index b7d90613f..c39263cf8 100644 --- a/examples/cluster/bc/config/common.py +++ b/examples/cluster/bc/config/common.py @@ -14,10 +14,10 @@ #-------------------------------------------------------- # Crawl frontier backend #-------------------------------------------------------- -QUEUE_HOSTNAME_PARTITIONING = True +SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.Crc32NamePartitioner' URL_FINGERPRINT_FUNCTION='frontera.utils.fingerprint.hostname_local_fingerprint' #MESSAGE_BUS='frontera.contrib.messagebus.kafkabus.MessageBus' #KAFKA_LOCATION = 'localhost:9092' #SCORING_GROUP = 'scrapy-scoring' -#SCORING_TOPIC = 'frontier-score' \ No newline at end of file +#SCORING_TOPIC = 'frontier-score' diff --git a/frontera/contrib/backends/hbase.py b/frontera/contrib/backends/hbase.py index 8f60cb6d3..43d1a816b 100644 --- a/frontera/contrib/backends/hbase.py +++ b/frontera/contrib/backends/hbase.py @@ -5,7 +5,7 @@ from frontera.core.components import Metadata, Queue, States from frontera.core.models import Request from frontera.contrib.backends.partitioners import Crc32NamePartitioner -from frontera.utils.misc import chunks, get_crc32 +from frontera.utils.misc import chunks, get_crc32, load_object from frontera.contrib.backends.remote.codecs.msgpack import Decoder, Encoder from happybase import Connection @@ -66,10 +66,9 @@ class HBaseQueue(Queue): GET_RETRIES = 3 - def __init__(self, connection, partitions, table_name, drop=False): + def __init__(self, connection, partitioner, table_name, drop=False): self.connection = connection - self.partitions = [i for i in range(0, partitions)] - self.partitioner = Crc32NamePartitioner(self.partitions) + self.partitioner = partitioner self.logger = logging.getLogger("hbase.queue") self.table_name = to_bytes(table_name) @@ -141,14 +140,9 @@ def get_interval(score, resolution): for request, score in batch: domain = request.meta[b'domain'] fingerprint = request.meta[b'fingerprint'] - if type(domain) == dict: - partition_id = self.partitioner.partition(domain[b'name'], self.partitions) - host_crc32 = get_crc32(domain[b'name']) - elif type(domain) == int: - partition_id = self.partitioner.partition_by_hash(domain, self.partitions) - host_crc32 = domain - else: - raise TypeError("domain of unknown type.") + key = self.partitioner.get_key(request) + partition_id = self.partitioner.partition(key) + host_crc32 = domain if type(domain) == int else get_crc32(key) item = (unhexlify(fingerprint), host_crc32, self.encoder.encode_request(request), score) score = 1 - score # because of lexicographical sort in HBase rk = "%d_%s_%d" % (partition_id, "%0.2f_%0.2f" % get_interval(score, 0.01), random_str) @@ -404,7 +398,9 @@ def __init__(self, manager): self._min_hosts = settings.get('BC_MIN_HOSTS') self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST') - self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS') + partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS'))) + partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) + self.partitioner = partitioner_cls(partitions) host = choice(hosts) if type(hosts) in [list, tuple] else hosts kwargs = { 'host': host, @@ -435,7 +431,7 @@ def db_worker(cls, manager): o = cls(manager) settings = manager.settings drop_all_tables = settings.get('HBASE_DROP_ALL_TABLES') - o._queue = HBaseQueue(o.connection, o.queue_partitions, + o._queue = HBaseQueue(o.connection, o.partitioner, settings.get('HBASE_QUEUE_TABLE'), drop=drop_all_tables) o._metadata = HBaseMetadata(o.connection, settings.get('HBASE_METADATA_TABLE'), drop_all_tables, settings.get('HBASE_USE_SNAPPY'), settings.get('HBASE_BATCH_SIZE'), @@ -484,7 +480,7 @@ def get_next_requests(self, max_next_requests, **kwargs): next_pages = [] self.logger.debug("Querying queue table.") partitions = set(kwargs.pop('partitions', [])) - for partition_id in range(0, self.queue_partitions): + for partition_id in self.partitioner.partitions: if partition_id not in partitions: continue results = self.queue.get_next_requests(max_next_requests, partition_id, diff --git a/frontera/contrib/backends/memory/__init__.py b/frontera/contrib/backends/memory/__init__.py index e96cd29a8..5f329d5c2 100644 --- a/frontera/contrib/backends/memory/__init__.py +++ b/frontera/contrib/backends/memory/__init__.py @@ -7,8 +7,8 @@ from frontera.core.components import Metadata, Queue, States from frontera.core import OverusedBuffer from frontera.utils.heap import Heap -from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.utils.url import parse_domain_from_url_fast +from frontera.utils.misc import load_object import six from six.moves import map from six.moves import range @@ -52,12 +52,11 @@ def update_score(self, batch): class MemoryQueue(Queue): - def __init__(self, partitions): - self.partitions = [i for i in range(0, partitions)] - self.partitioner = Crc32NamePartitioner(self.partitions) + def __init__(self, partitioner): + self.partitioner = partitioner self.logger = logging.getLogger("memory.queue") self.heap = {} - for partition in self.partitions: + for partition in self.partitioner.partitions: self.heap[partition] = Heap(self._compare_pages) def count(self): @@ -70,12 +69,8 @@ def schedule(self, batch): for fprint, score, request, schedule in batch: if schedule: request.meta[b'_scr'] = score - _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) - if not hostname: - self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint) - partition_id = self.partitions[0] - else: - partition_id = self.partitioner.partition(hostname, self.partitions) + key = self.partitioner.get_key(request) + partition_id = self.partitioner.partition(key) self.heap[partition_id].push(request) def _compare_pages(self, first, second): @@ -83,18 +78,17 @@ def _compare_pages(self, first, second): class MemoryDequeQueue(Queue): - def __init__(self, partitions, is_fifo=True): + def __init__(self, partitioner, is_fifo=True): """ Deque-based queue (see collections module). Efficient queue for LIFO and FIFO strategies. - :param partitions: int count of partitions + :param partitioner: Partitioner :param type: bool, True for FIFO, False for LIFO """ - self.partitions = [i for i in range(0, partitions)] - self.partitioner = Crc32NamePartitioner(self.partitions) + self.partitioner = partitioner self.logger = logging.getLogger("memory.dequequeue") self.queues = {} self.is_fifo = is_fifo - for partition in self.partitions: + for partition in self.partitioner.partitions: self.queues[partition] = deque() def count(self): @@ -112,12 +106,8 @@ def schedule(self, batch): for fprint, score, request, schedule in batch: if schedule: request.meta[b'_scr'] = score - _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) - if not hostname: - self.logger.error("Can't get hostname for URL %s, fingerprint %s", request.url, fprint) - partition_id = self.partitions[0] - else: - partition_id = self.partitioner.partition(hostname, self.partitions) + key = self.partitioner.get_key(request) + partition_id = self.partitioner.partition(key) self.queues[partition_id].append(request) @@ -165,6 +155,9 @@ def __init__(self, manager): settings = manager.settings self._metadata = MemoryMetadata() self._states = MemoryStates(settings.get("STATE_CACHE_SIZE")) + partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS'))) + partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) + self._partitioner = partitioner_cls(partitions) self._queue = self._create_queue(settings) self._id = 0 @@ -222,27 +215,27 @@ def _compare_pages(self, first, second): class MemoryFIFOBackend(MemoryBaseBackend): def _create_queue(self, settings): - return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS')) + return MemoryDequeQueue(self._partitioner) class MemoryLIFOBackend(MemoryBaseBackend): def _create_queue(self, settings): - return MemoryDequeQueue(settings.get('SPIDER_FEED_PARTITIONS'), is_fifo=False) + return MemoryDequeQueue(self._partitioner, is_fifo=False) class MemoryDFSBackend(MemoryBaseBackend): def _create_queue(self, settings): - return MemoryDFSQueue(settings.get('SPIDER_FEED_PARTITIONS')) + return MemoryDFSQueue(self._partitioner) class MemoryBFSBackend(MemoryBaseBackend): def _create_queue(self, settings): - return MemoryBFSQueue(settings.get('SPIDER_FEED_PARTITIONS')) + return MemoryBFSQueue(self._partitioner) class MemoryRandomBackend(MemoryBaseBackend): def _create_queue(self, settings): - return MemoryRandomQueue(settings.get('SPIDER_FEED_PARTITIONS')) + return MemoryRandomQueue(self._partitioner) class MemoryDFSOverusedBackend(MemoryDFSBackend): diff --git a/frontera/contrib/backends/partitioners.py b/frontera/contrib/backends/partitioners.py index 5b425c20e..fec9d4ed9 100644 --- a/frontera/contrib/backends/partitioners.py +++ b/frontera/contrib/backends/partitioners.py @@ -5,22 +5,43 @@ from frontera.core.components import Partitioner from frontera.utils.misc import get_crc32 +from frontera.utils.url import parse_domain_from_url_fast class Crc32NamePartitioner(Partitioner): def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions if key is None: - return self.partitions[0] - value = get_crc32(key) - return self.partition_by_hash(value, partitions if partitions else self.partitions) + return partitions[0] + elif type(key) == int: + value = key + else: + value = get_crc32(key) + return self.partition_by_hash(value, partitions) def partition_by_hash(self, value, partitions): size = len(partitions) idx = value % size return partitions[idx] - def __call__(self, key, all_partitions, available): - return self.partition(key, all_partitions) + @staticmethod + def get_key(request): + domain = request.meta.get(b'domain') + if domain is not None: + if type(domain) == dict: + return domain[b'name'] + elif type(domain) == int: + return domain + else: + raise TypeError("domain of unknown type.") + + try: + _, name, _, _, _, _ = parse_domain_from_url_fast(request.url) + except Exception: + return None + else: + return name.encode('utf-8', 'ignore') class FingerprintPartitioner(Partitioner): @@ -32,5 +53,6 @@ def partition(self, key, partitions=None): idx = value[0] % len(partitions) return partitions[idx] - def __call__(self, key, all_partitions, available): - return self.partition(key, all_partitions) \ No newline at end of file + @staticmethod + def get_key(request): + return request.meta[b'fingerprint'] diff --git a/frontera/contrib/backends/sqlalchemy/__init__.py b/frontera/contrib/backends/sqlalchemy/__init__.py index 810975d67..2e2f147b9 100644 --- a/frontera/contrib/backends/sqlalchemy/__init__.py +++ b/frontera/contrib/backends/sqlalchemy/__init__.py @@ -37,6 +37,11 @@ def __init__(self, manager): session.execute(table.delete()) session.commit() session.close() + + partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS'))) + partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) + self.partitioner = partitioner_cls(partitions) + self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) self._states = States(self.session_cls, self.models['StateModel'], @@ -48,7 +53,7 @@ def frontier_stop(self): self.engine.dispose() def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + return Queue(self.session_cls, self.models['QueueModel'], self.partitioner) @property def queue(self): @@ -67,7 +72,7 @@ class FIFOBackend(SQLAlchemyBackend): component_name = 'SQLAlchemy FIFO Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + return Queue(self.session_cls, self.models['QueueModel'], self.partitioner, ordering='created') @@ -75,7 +80,7 @@ class LIFOBackend(SQLAlchemyBackend): component_name = 'SQLAlchemy LIFO Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), + return Queue(self.session_cls, self.models['QueueModel'], self.partitioner, ordering='created_desc') @@ -83,7 +88,7 @@ class DFSBackend(SQLAlchemyBackend): component_name = 'SQLAlchemy DFS Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + return Queue(self.session_cls, self.models['QueueModel'], self.partitioner) def _get_score(self, obj): return -obj.meta[b'depth'] @@ -93,7 +98,7 @@ class BFSBackend(SQLAlchemyBackend): component_name = 'SQLAlchemy BFS Backend' def _create_queue(self, settings): - return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS')) + return Queue(self.session_cls, self.models['QueueModel'], self.partitioner) def _get_score(self, obj): return obj.meta[b'depth'] @@ -170,7 +175,7 @@ def db_worker(cls, manager): b._metadata = Metadata(b.session_cls, metadata_m, settings.get('SQLALCHEMYBACKEND_CACHE_SIZE')) - b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) + b._queue = Queue(b.session_cls, queue_m, b.partitioner) return b @property diff --git a/frontera/contrib/backends/sqlalchemy/components.py b/frontera/contrib/backends/sqlalchemy/components.py index 8661ac576..5f5c9bd03 100644 --- a/frontera/contrib/backends/sqlalchemy/components.py +++ b/frontera/contrib/backends/sqlalchemy/components.py @@ -5,7 +5,6 @@ from time import time, sleep from cachetools import LRUCache -from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.contrib.backends.memory import MemoryStates from frontera.contrib.backends.sqlalchemy.models import DeclarativeBase from frontera.core.components import Metadata as BaseMetadata, Queue as BaseQueue @@ -145,12 +144,11 @@ def flush(self, force_clear=False): class Queue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions, ordering='default'): + def __init__(self, session_cls, queue_cls, partitioner, ordering='default'): self.session = session_cls() self.queue_model = queue_cls self.logger = logging.getLogger("sqlalchemy.queue") - self.partitions = [i for i in range(0, partitions)] - self.partitioner = Crc32NamePartitioner(self.partitions) + self.partitioner = partitioner self.ordering = ordering def frontier_stop(self): @@ -193,14 +191,14 @@ def schedule(self, batch): to_save = [] for fprint, score, request, schedule in batch: if schedule: - _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) - if not hostname: - self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) - partition_id = self.partitions[0] + key = self.partitioner.get_key(request) + if key is None: + self.logger.error("Can't get partition key for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitioner.partitions[0] host_crc32 = 0 else: - partition_id = self.partitioner.partition(hostname, self.partitions) - host_crc32 = get_crc32(hostname) + partition_id = self.partitioner.partition(key) + host_crc32 = get_crc32(key) q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta, headers=request.headers, cookies=request.cookies, method=to_native_str(request.method), partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6) diff --git a/frontera/contrib/backends/sqlalchemy/revisiting.py b/frontera/contrib/backends/sqlalchemy/revisiting.py index b2b574715..a46c30b2d 100644 --- a/frontera/contrib/backends/sqlalchemy/revisiting.py +++ b/frontera/contrib/backends/sqlalchemy/revisiting.py @@ -48,12 +48,11 @@ def func_wrapper(self, *args, **kwargs): class RevisitingQueue(BaseQueue): - def __init__(self, session_cls, queue_cls, partitions): + def __init__(self, session_cls, queue_cls, partitioner): self.session = session_cls() self.queue_model = queue_cls self.logger = logging.getLogger("sqlalchemy.revisiting.queue") - self.partitions = [i for i in range(0, partitions)] - self.partitioner = Crc32NamePartitioner(self.partitions) + self.partitioner = partitioner def frontier_stop(self): self.session.close() @@ -80,14 +79,14 @@ def schedule(self, batch): to_save = [] for fprint, score, request, schedule in batch: if schedule: - _, hostname, _, _, _, _ = parse_domain_from_url_fast(request.url) - if not hostname: - self.logger.error("Can't get hostname for URL %s, fingerprint %s" % (request.url, fprint)) - partition_id = self.partitions[0] + key = self.partitioner.get_key(request) + if key is None: + self.logger.error("Can't get partition key for URL %s, fingerprint %s" % (request.url, fprint)) + partition_id = self.partitioner.partitions[0] host_crc32 = 0 else: - partition_id = self.partitioner.partition(hostname, self.partitions) - host_crc32 = get_crc32(hostname) + partition_id = self.partitioner.partition(key) + host_crc32 = get_crc32(key) schedule_at = request.meta[b'crawl_at'] if b'crawl_at' in request.meta else utcnow_timestamp() q = self.queue_model(fingerprint=fprint, score=score, url=request.url, meta=request.meta, headers=request.headers, cookies=request.cookies, method=request.method, @@ -109,7 +108,7 @@ def _create_queue(self, settings): self.interval = settings.get("SQLALCHEMYBACKEND_REVISIT_INTERVAL") assert isinstance(self.interval, timedelta) self.interval = self.interval.total_seconds() - return RevisitingQueue(self.session_cls, RevisitingQueueModel, settings.get('SPIDER_FEED_PARTITIONS')) + return RevisitingQueue(self.session_cls, RevisitingQueueModel, self.partitioner) def _schedule(self, requests): batch = [] diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 490262891..fcb954399 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -11,6 +11,7 @@ from frontera.contrib.messagebus.kafka.async import OffsetsFetcherAsync from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseSpiderFeedStream, \ BaseStreamConsumer, BaseScoringLogStream, BaseStreamProducer +from frontera.utils.misc import load_object from twisted.internet.task import LoopingCall from traceback import format_tb @@ -139,10 +140,10 @@ def __init__(self, messagebus): self._sw_group = messagebus.spiderlog_sw_group self._topic = messagebus.topic_done self._codec = messagebus.codec - self._partitions = messagebus.spider_log_partitions + self._partitioner = messagebus.spider_log_partitioner def producer(self): - return KeyedProducer(self._location, self._topic, FingerprintPartitioner(self._partitions), + return KeyedProducer(self._location, self._topic, self._partitioner, self._codec) def consumer(self, partition_id, type): @@ -154,7 +155,7 @@ def consumer(self, partition_id, type): """ group = self._sw_group if type == b'sw' else self._db_group c = Consumer(self._location, self._topic, group, partition_id) - assert len(c._consumer.partitions_for_topic(self._topic)) == self._partitions + assert len(c._consumer.partitions_for_topic(self._topic)) == len(self._partitioner.partitions) return c @@ -164,15 +165,14 @@ def __init__(self, messagebus): self._general_group = messagebus.spider_feed_group self._topic = messagebus.topic_todo self._max_next_requests = messagebus.max_next_requests - self._hostname_partitioning = messagebus.hostname_partitioning self._offset_fetcher = OffsetsFetcherAsync(bootstrap_servers=self._location, topic=self._topic, group_id=self._general_group) self._codec = messagebus.codec - self._partitions = messagebus.spider_feed_partitions + self._partitioner = messagebus.spider_feed_partitioner def consumer(self, partition_id): c = Consumer(self._location, self._topic, self._general_group, partition_id) - assert len(c._consumer.partitions_for_topic(self._topic)) == self._partitions + assert len(c._consumer.partitions_for_topic(self._topic)) == len(self._partitioner.partitions) return c def available_partitions(self): @@ -184,9 +184,7 @@ def available_partitions(self): return partitions def producer(self): - partitioner = Crc32NamePartitioner(self._partitions) if self._hostname_partitioning \ - else FingerprintPartitioner(self._partitions) - return KeyedProducer(self._location, self._topic, partitioner, self._codec) + return KeyedProducer(self._location, self._topic, self._partitioner, self._codec) class ScoringLogStream(BaseScoringLogStream): @@ -215,11 +213,20 @@ def __init__(self, settings): self.spider_feed_group = settings.get('SPIDER_FEED_GROUP') self.spider_partition_id = settings.get('SPIDER_PARTITION_ID') self.max_next_requests = settings.MAX_NEXT_REQUESTS - self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') self.codec = settings.get('KAFKA_CODEC') self.kafka_location = settings.get('KAFKA_LOCATION') - self.spider_log_partitions = settings.get('SPIDER_LOG_PARTITIONS') - self.spider_feed_partitions = settings.get('SPIDER_FEED_PARTITIONS') + + if settings.get('QUEUE_HOSTNAME_PARTITIONING'): + logger.warning('QUEUE_HOSTNAME_PARTITIONING is deprecated, use SPIDER_FEED_PARTITIONER instead.') + settings.set('SPIDER_FEED_PARTITIONER', 'frontera.contrib.backends.partitioners.Crc32NamePartitioner') + + spider_log_partitions = list(range(settings.get('SPIDER_LOG_PARTITIONS'))) + spider_log_partitioner_cls = load_object(settings.get('SPIDER_LOG_PARTITIONER')) + self.spider_log_partitioner = spider_log_partitioner_cls(spider_log_partitions) + + spider_feed_partitions = list(range(settings.get('SPIDER_FEED_PARTITIONS'))) + spider_feed_partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) + self.spider_feed_partitioner = spider_feed_partitioner_cls(spider_feed_partitions) def spider_log(self): return SpiderLogStream(self) diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 3f99fc973..61945af9e 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -11,6 +11,7 @@ BaseSpiderFeedStream, BaseScoringLogStream from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig +from frontera.utils.misc import load_object from six.moves import range @@ -102,9 +103,9 @@ def get_offset(self, partition_id): class SpiderLogProducer(Producer): - def __init__(self, context, location, partitions): + def __init__(self, context, location, partitioner): super(SpiderLogProducer, self).__init__(context, location, b'sl') - self.partitioner = FingerprintPartitioner(partitions) + self.partitioner = partitioner class SpiderLogStream(BaseSpiderLogStream): @@ -113,10 +114,10 @@ def __init__(self, messagebus): self.sw_in_location = messagebus.socket_config.sw_in() self.db_in_location = messagebus.socket_config.db_in() self.out_location = messagebus.socket_config.spiders_out() - self.partitions = messagebus.spider_log_partitions + self.partitioner = messagebus.spider_log_partitioner def producer(self): - return SpiderLogProducer(self.context, self.out_location, self.partitions) + return SpiderLogProducer(self.context, self.out_location, self.partitioner) def consumer(self, partition_id, type): location = self.sw_in_location if type == b'sw' else self.db_in_location @@ -159,10 +160,9 @@ def producer(self): class SpiderFeedProducer(Producer): - def __init__(self, context, location, partitions, hwm, hostname_partitioning): + def __init__(self, context, location, partitions, hwm, partitioner): super(SpiderFeedProducer, self).__init__(context, location, b'sf') - self.partitioner = Crc32NamePartitioner(partitions) if hostname_partitioning else \ - FingerprintPartitioner(partitions) + self.partitioner = partitioner self.sender.set(zmq.SNDHWM, hwm) @@ -172,17 +172,17 @@ def __init__(self, messagebus): self.in_location = messagebus.socket_config.db_out() self.out_location = messagebus.socket_config.spiders_in() self.partitions = messagebus.spider_feed_partitions + self.partitioner = messagebus.spider_feed_partitioner self.ready_partitions = set(self.partitions) self.consumer_hwm = messagebus.spider_feed_rcvhwm self.producer_hwm = messagebus.spider_feed_sndhwm - self.hostname_partitioning = messagebus.hostname_partitioning def consumer(self, partition_id): return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm) def producer(self): return SpiderFeedProducer(self.context, self.in_location, self.partitions, - self.producer_hwm, self.hostname_partitioning) + self.producer_hwm, self.partitioner) def available_partitions(self): return self.ready_partitions @@ -202,14 +202,25 @@ class Context(object): class MessageBus(BaseMessageBus): def __init__(self, settings): + self.logger = getLogger("messagebus.zeromq") self.context = Context() self.socket_config = SocketConfig(settings.get('ZMQ_ADDRESS'), settings.get('ZMQ_BASE_PORT')) + + if settings.get('QUEUE_HOSTNAME_PARTITIONING'): + self.logger.warning('QUEUE_HOSTNAME_PARTITIONING is deprecated, use SPIDER_FEED_PARTITIONER instead.') + settings.set('SPIDER_FEED_PARTITIONER', 'frontera.contrib.backends.partitioners.Crc32NamePartitioner') + self.spider_log_partitions = [i for i in range(settings.get('SPIDER_LOG_PARTITIONS'))] + spider_log_partitioner_cls = load_object(settings.get('SPIDER_LOG_PARTITIONER')) + self.spider_log_partitioner = spider_log_partitioner_cls(self.spider_log_partitions) + self.spider_feed_partitions = [i for i in range(settings.get('SPIDER_FEED_PARTITIONS'))] + spider_feed_partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) + self.spider_feed_partitioner = spider_feed_partitioner_cls(self.spider_feed_partitions) + self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2) self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0) - self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') if self.socket_config.is_ipv6: self.context.zeromq.setsockopt(zmq.IPV6, True) @@ -221,3 +232,4 @@ def scoring_log(self): def spider_feed(self): return SpiderFeedStream(self) + diff --git a/frontera/core/components.py b/frontera/core/components.py index 33529c7bc..bd6e39c9c 100644 --- a/frontera/core/components.py +++ b/frontera/core/components.py @@ -282,4 +282,13 @@ def partition(self, key, partitions=None): """ raise NotImplementedError('partition function has to be implemented') + @staticmethod + def get_key(request): + """ + Takes a :class:`Request ` and return an + extracted value used by the partitioner. + """ + raise NotImplementedError('partition function has to be implemented') + def __call__(self, key, all_partitions, available): + return self.partition(key, all_partitions) diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index b049e7bdc..aea707aaa 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -35,10 +35,13 @@ REQUEST_MODEL = 'frontera.core.models.Request' RESPONSE_MODEL = 'frontera.core.models.Response' + SCORING_PARTITION_ID = 0 SCORING_LOG_CONSUMER_BATCH_SIZE = 512 SPIDER_LOG_CONSUMER_BATCH_SIZE = 512 +SPIDER_LOG_PARTITIONER = 'frontera.contrib.backends.partitioners.FingerprintPartitioner' SPIDER_LOG_PARTITIONS = 1 +SPIDER_FEED_PARTITIONER = 'frontera.contrib.backends.partitioners.FingerprintPartitioner' SPIDER_FEED_PARTITIONS = 1 SPIDER_PARTITION_ID = 0 SQLALCHEMYBACKEND_CACHE_SIZE = 10000 @@ -77,4 +80,4 @@ SCORING_LOG_DBW_GROUP = "dbw-scoring-log" SPIDER_FEED_GROUP = "fetchers-spider-feed" -KAFKA_CODEC = None \ No newline at end of file +KAFKA_CODEC = None diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 6f9abad85..8319da6ac 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -11,7 +11,6 @@ from twisted.internet import reactor, task from frontera.core.components import DistributedBackend from frontera.core.manager import FrontierManager -from frontera.utils.url import parse_domain_from_url_fast from frontera.logger.handlers import CONSOLE from frontera.settings import Settings @@ -88,7 +87,11 @@ def __init__(self, settings, no_batches, no_incoming, no_scoring): self.strategy_disabled = True self.spider_log_consumer_batch_size = settings.get('SPIDER_LOG_CONSUMER_BATCH_SIZE') self.scoring_log_consumer_batch_size = settings.get('SCORING_LOG_CONSUMER_BATCH_SIZE') - self.spider_feed_partitioning = 'fingerprint' if not settings.get('QUEUE_HOSTNAME_PARTITIONING') else 'hostname' + + if settings.get('QUEUE_HOSTNAME_PARTITIONING'): + self.logger.warning('QUEUE_HOSTNAME_PARTITIONING is deprecated, use SPIDER_FEED_PARTITIONER instead.') + settings.set('SPIDER_FEED_PARTITIONER', 'frontera.contrib.backends.partitioners.Crc32NamePartitioner') + self.partitioner_cls = load_object(settings.get('SPIDER_FEED_PARTITIONER')) self.max_next_requests = settings.MAX_NEXT_REQUESTS self.slot = Slot(self.new_batch, self.consume_incoming, self.consume_scoring, no_batches, self.strategy_disabled, settings.get('NEW_BATCH_DELAY'), no_incoming) @@ -230,32 +233,12 @@ def consume_scoring(self, *args, **kwargs): self.slot.schedule() def new_batch(self, *args, **kwargs): - def get_hostname(request): - try: - netloc, name, scheme, sld, tld, subdomain = parse_domain_from_url_fast(request.url) - except Exception as e: - logger.error("URL parsing error %s, fingerprint %s, url %s" % (e, request.meta[b'fingerprint'], - request.url)) - return None - else: - return name.encode('utf-8', 'ignore') - - def get_fingerprint(request): - return request.meta[b'fingerprint'] - partitions = self.spider_feed.available_partitions() logger.info("Getting new batches for partitions %s" % str(",").join(map(str, partitions))) if not partitions: return 0 count = 0 - if self.spider_feed_partitioning == 'hostname': - get_key = get_hostname - elif self.spider_feed_partitioning == 'fingerprint': - get_key = get_fingerprint - else: - raise Exception("Unexpected value in self.spider_feed_partitioning") - for request in self._backend.get_next_requests(self.max_next_requests, partitions=partitions): try: request.meta[b'jid'] = self.job_id @@ -267,7 +250,7 @@ def get_fingerprint(request): continue finally: count += 1 - self.spider_feed_producer.send(get_key(request), eo) + self.spider_feed_producer.send(self.partitioner_cls.get_key(request), eo) self.stats['pushed_since_start'] += count self.stats['last_batch_size'] = count diff --git a/tests/contrib/backends/hbase/test_hbase.py b/tests/contrib/backends/hbase/test_hbase.py index e0a039fd1..b76d5f31d 100644 --- a/tests/contrib/backends/hbase/test_hbase.py +++ b/tests/contrib/backends/hbase/test_hbase.py @@ -2,6 +2,7 @@ from happybase import Connection from Hbase_thrift import AlreadyExists # module loaded at runtime in happybase from frontera.contrib.backends.hbase import HBaseState, HBaseMetadata, HBaseQueue +from frontera.contrib.backends.partitioners import Crc32NamePartitioner from frontera.core.models import Request, Response from frontera.core.components import States from binascii import unhexlify @@ -42,7 +43,7 @@ def test_metadata(self): def test_queue(self): connection = Connection(host='hbase-docker', port=9090) - queue = HBaseQueue(connection, 2, b'queue', True) + queue = HBaseQueue(connection, Crc32NamePartitioner([0,1]), b'queue', True) batch = [('10', 0.5, r1, True), ('11', 0.6, r2, True), ('12', 0.7, r3, True)] queue.schedule(batch) @@ -53,7 +54,7 @@ def test_queue(self): def test_queue_with_delay(self): connection = Connection(host='hbase-docker', port=9090) - queue = HBaseQueue(connection, 1, b'queue', True) + queue = HBaseQueue(connection, Crc32NamePartitioner([0]), b'queue', True) r5 = r3.copy() crawl_at = int(time()) + 1000 r5.meta[b'crawl_at'] = crawl_at @@ -103,7 +104,7 @@ def test_drop_all_tables_when_table_name_is_str(self): tables = connection.tables() assert set(tables) == set([b'metadata', b'queue']) # Failure of test itself try: - HBaseQueue(connection=connection, partitions=1, table_name=hbase_queue_table, drop=True) + HBaseQueue(connection=connection, partitioner=Crc32NamePartitioner([0]), table_name=hbase_queue_table, drop=True) HBaseMetadata(connection=connection, table_name=hbase_metadata_table, drop_all_tables=True, use_snappy=False, batch_size=300000, store_content=True) except AlreadyExists: diff --git a/tests/test_message_bus.py b/tests/test_message_bus.py index b283a7405..2795f6e01 100644 --- a/tests/test_message_bus.py +++ b/tests/test_message_bus.py @@ -18,7 +18,7 @@ class MessageBusTester(object): def __init__(self, cls, settings=Settings()): settings.set('SPIDER_FEED_PARTITIONS', 1) settings.set('SPIDER_LOG_PARTITIONS', 1) - settings.set('QUEUE_HOSTNAME_PARTITIONING', True) + settings.set('SPIDER_FEED_PARTITIONER', 'frontera.contrib.backends.partitioners.Crc32NamePartitioner') self.messagebus = cls(settings) spiderlog = self.messagebus.spider_log() diff --git a/tests/test_partitioners.py b/tests/test_partitioners.py index 61f52ada8..dbaa437ed 100644 --- a/tests/test_partitioners.py +++ b/tests/test_partitioners.py @@ -1,13 +1,18 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner +from frontera.core.models import Request from six.moves import range +request = Request('http://www.example.com', meta={b'fingerprint': b'1be68ff556fd0bbe5802d1a100850da29f7f15b1'}) def test_fingerprint_partitioner(): partitions = list(range(0, 5)) fp = FingerprintPartitioner(partitions) - key = '1be68ff556fd0bbe5802d1a100850da29f7f15b1' + + key = b'1be68ff556fd0bbe5802d1a100850da29f7f15b1' + assert fp.get_key(request) == key + partition = fp.partition(key, partitions) assert partition == 4 @@ -18,10 +23,16 @@ def test_fingerprint_partitioner(): def test_crc32name_partitioner(): partitions = list(range(0, 5)) cp = Crc32NamePartitioner(partitions) - key = '1be68ff556fd0bbe5802d1a100850da29f7f15b11' + + key = b'www.example.com' + assert cp.get_key(request) == key + partition = cp.partition(key, partitions) assert partition == 3 + partition = cp.partition(42, partitions) + assert partition == 2 + partition = cp.partition(None, partitions) assert partition == 0