scrapy分布式Spiderソースコード分析と実現過程
22272 ワード
分散フレームワークscrapy_redisは完全なコンポーネントを実現し、その中でもspiderを実現した.RedisSpiderは元のscrapyのSpiderを継承した上で少し変更され、初期URLはstart_urlsリストから読み出すのではなく、redis開始キューから読み出す.
scrapy_redisソースはscrapyにあります.redis.spiderではRedisSpider(分布爬虫類)だけでなくRedisCrawlSpider(分布深さ爬虫類)の論理も実現しているが,両者の多くの方法は一致している.
ソースコードは次のとおりです.
スタートスタートスタート_requestsメソッドは、Redisからすべての接続を取得し、URLのデリバリーリストと比較してデリバリーし、デリバリーされないmake_を介してrequests_from_url(url)はエンジンに渡され、スケジューラに渡されます.ここでは一般的な爬虫類の考え方と一致しています.
エンジンがタスクキューが空のメッセージを受信するとSpider_が呼び出されます.idleメソッド、schedule_を呼び出すnext_requestsは、取得した開始URLを反復してエンジンに渡し、タスクが完了するまでサブコール開始キュー内のURLを開始します.
RedisSpiderとRedisCrawlSpiderは簡単で、SpiderとRedisMiXineを継承し、setup_を実現しました異なるcrawlerに基づいてsetting、redis_を初期化するredisメソッドkey、およびconnectインタフェースを介してspiderにspiderをバインドidle信号バインド.
まとめ:RedisSpider初期Redisキューからstart_を取得urlはRequestを生成し、scrapyエンジンに渡して対応するRedis schedulerスケジューリングに渡し、デマンドとエンキューを完了します.スケジューリング条件がキュー内のRequestがエンジンからダウンロードに渡されることを満たすと、ResponseオブジェクトがSpider解析に渡され、新しいRequestが生成され、引き続きキャプチャされる.新しいRequestが生成されないまで、Spiderは開始キューからURLを取得し、前回のループを継続し、もちろんキューにないと爬虫類は終了します.
scrapy_redisソースはscrapyにあります.redis.spiderではRedisSpider(分布爬虫類)だけでなくRedisCrawlSpider(分布深さ爬虫類)の論理も実現しているが,両者の多くの方法は一致している.
ソースコードは次のとおりです.
from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection
# Default batch size matches default concurrent requests setting.
DEFAULT_START_URLS_BATCH_SIZE = 16
DEFAULT_START_URLS_KEY = '%(name)s:start_urls'
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
# Per spider redis key, default to DEFAULT_START_URLS_KEY.
redis_key = None
# Fetch this amount of start urls when idle. Default to DEFAULT_START_URLS_BATCH_SIZE.
redis_batch_size = None
# Redis client instance.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s)", self.__dict__)
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET')
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
# By default, data is an URL.
if '://' in data:
return self.make_requests_from_url(data)
else:
self.logger.error("Unexpected URL from '%s': %r", self.redis_key, data)
def schedule_next_requests(self):
"""Schedules a request if available"""
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle."""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle."""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
スタートスタートスタート_requestsメソッドは、Redisからすべての接続を取得し、URLのデリバリーリストと比較してデリバリーし、デリバリーされないmake_を介してrequests_from_url(url)はエンジンに渡され、スケジューラに渡されます.ここでは一般的な爬虫類の考え方と一致しています.
エンジンがタスクキューが空のメッセージを受信するとSpider_が呼び出されます.idleメソッド、schedule_を呼び出すnext_requestsは、取得した開始URLを反復してエンジンに渡し、タスクが完了するまでサブコール開始キュー内のURLを開始します.
def schedule_next_requests(self):
"""Schedules a request if available"""
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
RedisSpiderとRedisCrawlSpiderは簡単で、SpiderとRedisMiXineを継承し、setup_を実現しました異なるcrawlerに基づいてsetting、redis_を初期化するredisメソッドkey、およびconnectインタフェースを介してspiderにspiderをバインドidle信号バインド.
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle."""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle."""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', DEFAULT_START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE', DEFAULT_START_URLS_BATCH_SIZE,
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s)", self.__dict__)
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
まとめ:RedisSpider初期Redisキューからstart_を取得urlはRequestを生成し、scrapyエンジンに渡して対応するRedis schedulerスケジューリングに渡し、デマンドとエンキューを完了します.スケジューリング条件がキュー内のRequestがエンジンからダウンロードに渡されることを満たすと、ResponseオブジェクトがSpider解析に渡され、新しいRequestが生成され、引き続きキャプチャされる.新しいRequestが生成されないまで、Spiderは開始キューからURLを取得し、前回のループを継続し、もちろんキューにないと爬虫類は終了します.