diff --git a/crawlfrontier/contrib/backends/memory/__init__.py b/crawlfrontier/contrib/backends/memory/__init__.py index 17539262f..a5ae8d241 100644 --- a/crawlfrontier/contrib/backends/memory/__init__.py +++ b/crawlfrontier/contrib/backends/memory/__init__.py @@ -48,9 +48,7 @@ def request_error(self, request, error): def _get_or_create_request(self, request): fingerprint = request.meta['fingerprint'] if fingerprint not in self.requests: - new_request = request.copy() - new_request.meta['created_at'] = datetime.datetime.utcnow() - new_request.meta['depth'] = 0 + new_request = self._create_request(request) self.requests[fingerprint] = new_request self.manager.logger.backend.debug('Creating request %s' % new_request) return new_request, True @@ -59,6 +57,12 @@ def _get_or_create_request(self, request): self.manager.logger.backend.debug('Request exists %s' % request) return page, False + def _create_request(self, request): + new_request = request.copy() + new_request.meta['created_at'] = datetime.datetime.utcnow() + new_request.meta['depth'] = 0 + return new_request + def _compare_pages(self, first, second): raise NotImplementedError diff --git a/crawlfrontier/contrib/backends/mongodb.py b/crawlfrontier/contrib/backends/mongodb.py new file mode 100644 index 000000000..0cef7b2da --- /dev/null +++ b/crawlfrontier/contrib/backends/mongodb.py @@ -0,0 +1,184 @@ +from datetime import datetime +from pymongo import MongoClient, DESCENDING + +from crawlfrontier import Backend, Request, Response +from crawlfrontier.exceptions import NotConfigured + + +class MongodbBackend(Backend): + name = 'Mongodb Backend' + + class State: + NOT_CRAWLED = 'NOT CRAWLED' + QUEUED = 'QUEUED' + CRAWLED = 'CRAWLED' + ERROR = 'ERROR' + + def __init__(self, manager): + settings = manager.settings + mongo_hostname = settings.get('BACKEND_MONGO_HOSTNAME') + mongo_port = settings.get('BACKEND_MONGO_PORT') + mongo_db = settings.get('BACKEND_MONGO_DB_NAME') + mongo_collection = settings.get('BACKEND_MONGO_COLLECTION_NAME') + if mongo_hostname is None or mongo_port is None or mongo_db is None or mongo_collection is None: + raise NotConfigured + + self.client = MongoClient(mongo_hostname, mongo_port) + self.db = self.client[mongo_db] + self.collection = self.db[mongo_collection] + self.collection.ensure_index("meta.fingerprint", unique=True, drop_dups=True) + self.collection.ensure_index("score") + self.collection.ensure_index("meta.created_at") + self.collection.ensure_index("meta.depth") + self.manager = manager + + @classmethod + def from_manager(cls, manager): + return cls(manager) + + def add_seeds(self, seeds): + # Log + self.manager.logger.backend.debug('ADD_SEEDS n_links=%s' % len(seeds)) + + for seed in seeds: + # Get or create page from link + request, _ = self._get_or_create_request(seed) + + def page_crawled(self, response, links): + # Log + self.manager.logger.backend.debug('PAGE_CRAWLED page=%s status=%s links=%s' % + (response, response.status_code, len(links))) + + # process page crawled + backend_page = self._page_crawled(response) + + # Update crawled fields + backend_page.state = self.State.CRAWLED + self.collection.update(self._get_mongo_spec(backend_page), { + "$set": self._to_mongo_dict(backend_page)}, upsert=False) + + # Create links + for link in links: + self.manager.logger.backend.debug('ADD_LINK link=%s' % link) + link_page, link_created = self._get_or_create_request(link) + if link_created: + link_page._meta['depth'] = response.meta['depth'] + 1 + self.collection.update(self._get_mongo_spec(link_page), { + "$set": self._to_mongo_dict(link_page)}, upsert=False) + + def _page_crawled(self, response): + # Get timestamp + now = datetime.utcnow() + + # Get or create page from incoming page + backend_page, created = self._get_or_create_request(response) + + # Update creation fields + if created: + backend_page.created_at = now + + # Update fields + backend_page.last_update = now + backend_page.status = response.status_code + return backend_page + + def request_error(self, request, error): + self.manager.logger.backend.debug('PAGE_CRAWLED_ERROR page=%s error=%s' % (request, error)) + now = datetime.utcnow() + + backend_page, created = self._get_or_create_request(request) + + if created: + backend_page.created_at = now + backend_page.last_update = now + + backend_page.state = self.State.ERROR + self.collection.update(self._get_mongo_spec(backend_page), + {"$set": self._to_mongo_dict(backend_page)}, upsert=False) + return backend_page + + def get_next_requests(self, max_next_pages, downloader_info=None): + # Log + self.manager.logger.backend.debug('GET_NEXT_PAGES max_next_pages=%s' % max_next_pages) + now = datetime.utcnow() + mongo_pages = self._get_sorted_pages(max_next_pages) + requests = [] + for p in mongo_pages: + req = self._request_from_mongo_dict(p) + requests.append(req) + + if max_next_pages: + requests = requests[0:max_next_pages] + for req in requests: + req.state = self.State.QUEUED + req.last_update = now + self.collection.update(self._get_mongo_spec(req), { + "$set": self._to_mongo_dict(req)}, upsert=False) + return requests + + def _get_mongo_spec(self, obj): + return {'meta.fingerprint': obj.meta['fingerprint']} + + def _request_from_mongo_dict(self, o): + request = Request(o['url'], o['method'], o['headers'], o['cookies'], o['meta']) + request.state = o['state'] + return request + + def _to_mongo_dict(self, obj): + def _request_to_dict(req): + return { + 'url': req.url, + 'method': req.method, + 'headers': req.headers, + 'cookies': req.cookies, + 'meta': req.meta, + 'state': req.state + } + + if isinstance(obj, Request): + return _request_to_dict(obj) + + if isinstance(obj, Response): + return { + 'url': obj.url, + 'status_code': obj.status_code, + 'headers': obj.headers, + 'body': obj.body, + 'meta': obj.request.meta, + 'method': obj.request.method, + 'cookies': obj.request.cookies, + 'state': obj.state + } + + raise TypeError("Type of object %s isn't known." % obj) + + def _get_or_create_request(self, obj): + existing_request = self.collection.find_one(self._get_mongo_spec(obj)) + if existing_request is None: + new_request = obj.copy() + new_request.meta['created_at'] = datetime.utcnow() + new_request.meta['depth'] = 0 + new_request.state = self.State.NOT_CRAWLED + self.collection.insert(self._to_mongo_dict(new_request)) + self.manager.logger.backend.debug('Creating request %s' % new_request) + return new_request, True + else: + obj = self._request_from_mongo_dict(existing_request) + self.manager.logger.backend.debug('Request exists %s' % obj) + return obj, False + + def _get_sorted_pages(self, max_pages): + raise NotImplementedError + + def frontier_start(self): + pass + + def frontier_stop(self): + self.client.close() + + +class MongodbScoreBackend(MongodbBackend): + name = 'Score Mongodb Backend' + + def _get_sorted_pages(self, max_pages): + return self.collection.find({'state': self.State.NOT_CRAWLED}).sort('meta.score', DESCENDING).limit(max_pages) \ No newline at end of file diff --git a/crawlfrontier/contrib/backends/sqlalchemy/__init__.py b/crawlfrontier/contrib/backends/sqlalchemy/__init__.py index f7ef9713f..410de64e2 100644 --- a/crawlfrontier/contrib/backends/sqlalchemy/__init__.py +++ b/crawlfrontier/contrib/backends/sqlalchemy/__init__.py @@ -122,7 +122,8 @@ def frontier_stop(self): def add_seeds(self, seeds): for seed in seeds: - db_page, _ = self._get_or_create_db_page(url=seed.url, fingerprint=seed.meta['fingerprint']) + db_page, _ = self._get_or_create_db_page(url=seed.url, fingerprint=seed.meta['fingerprint'], + request_or_response=seed) self.session.commit() def get_next_requests(self, max_next_requests): @@ -134,35 +135,34 @@ def get_next_requests(self, max_next_requests): next_pages = [] for db_page in query: db_page.state = Page.State.QUEUED - request = self.manager.request_model(url=db_page.url) + request = self._create_request(db_page) # FIXME: we loose all the Request metadata here: methods, meta... next_pages.append(request) self.session.commit() return next_pages def page_crawled(self, response, links): - db_page, _ = self._get_or_create_db_page(url=response.url, fingerprint=response.meta['fingerprint']) + db_page, _ = self._get_or_create_db_page(url=response.url, fingerprint=response.meta['fingerprint'], + request_or_response=response) db_page.state = Page.State.CRAWLED db_page.status_code = response.status_code + # TODO: a performance bottle-neck on big volumes, operations should be batched here for link in links: - db_page_from_link, created = self._get_or_create_db_page(url=link.url, fingerprint=link.meta['fingerprint']) + db_page_from_link, created = self._get_or_create_db_page(url=link.url, fingerprint=link.meta['fingerprint'], + request_or_response=link) if created: db_page_from_link.depth = db_page.depth+1 self.session.commit() def request_error(self, request, error): - db_page, _ = self._get_or_create_db_page(url=request.url, fingerprint=request.meta['fingerprint']) + db_page, _ = self._get_or_create_db_page(url=request.url, fingerprint=request.meta['fingerprint'], + request_or_response=request) db_page.state = Page.State.ERROR db_page.error = error self.session.commit() - def _get_or_create_db_page(self, url, fingerprint): + def _get_or_create_db_page(self, url, fingerprint, request_or_response): if not self._request_exists(fingerprint): - db_request = self.page_model() - db_request.fingerprint = fingerprint - db_request.state = Page.State.NOT_CRAWLED - db_request.url = url - db_request.depth = 0 - db_request.created_at = datetime.datetime.utcnow() + db_request = self._create_page(url, fingerprint, request_or_response) self.session.add(db_request) self.manager.logger.backend.debug('Creating request %s' % db_request) return db_request, True @@ -171,6 +171,18 @@ def _get_or_create_db_page(self, url, fingerprint): self.manager.logger.backend.debug('Request exists %s' % db_request) return db_request, False + def _create_page(self, url, fingerprint, request_or_response): + page = self.page_model() + page.fingerprint = fingerprint + page.state = Page.State.NOT_CRAWLED + page.url = url + page.depth = 0 + page.created_at = datetime.datetime.utcnow() + return page + + def _create_request(self, db_page): + return self.manager.request_model(url=db_page.url) + def _request_exists(self, fingerprint): q = self.page_model.query(self.session).filter_by(fingerprint=fingerprint) return self.session.query(q.exists()).scalar()