Scrapy-redis分布式爬虫

分布式爬虫要点

分布式爬虫

分布式的优点

  • 充分利用多机器的宽带加速爬取;
  • 充分利用多机的IP加速爬取速度。

分布式需要解决的问题

  • requests队列集中管理;
  • 去重集中管理。

redis基础知识

linux:sudo apt-get install redis-server

Windows安装

下载地址:https://github.com/tporadowski/redis/releases

Redis 支持 32 位和 64 位。这个需要根据你系统平台的实际情况选择,这里我们下载 Redis-x64-xxx.zip压缩包到 C 盘,解压后,将文件夹重新命名为 redis。
redis下载

打开文件夹,内容如下:
打开文件夹

打开一个 cmd 窗口 使用 cd 命令切换目录到 C:\redis 运行:

1
redis-server.exe redis.windows.conf

如果想方便的话,可以把 redis 的路径加到系统的环境变量里,这样就省得再输路径了,后面的那个 redis.windows.conf 可以省略,如果省略,会启用默认的。输入之后,会显示如下界面:
ccmd

这时候另启一个 cmd 窗口,原来的不要关闭,不然就无法访问服务端了。
切换到 redis 目录下运行:

1
redis-cli.exe -h 127.0.0.1 -p 6379

设置键值对:

1
set myKey abc

取出键值对:

1
get myKey

取值

Redis的数据类型

  • 字符串
  • 散列/哈希
  • 列表
  • 集合
  • 可排序集合

字符串命令

1
2
3
4
5
6
7
8
9
10
11
set mykey ''cnblogs''   创建变量

get mykey 查看变量

getrange mykey start end 获取字符串,如:getrange mykey 1 2 #mykey 1~2的字符串

strlen mykey 获取长度

incr/decr mykey 加一减一,类型是int

append mykey ''com'' 添加字符串,添加到末尾

哈希命令

1
2
3
4
5
6
7
8
9
10
11
12
13
hset myhash name "cnblogs"   创建变量,myhash类似于变量名,name类似于key,"cnblogs"类似于values

hgetall myhash 得到key和values两者

hget myhash name 得到values

hexists myhash name 检查是否存在这个key

hdel myhash name 删除这个key

hkeys myhash 查看key

hvals muhash 查看values

列表命令

1
2
3
4
5
6
7
8
9
10
11
lpush/rpush mylist "cnblogs"  左添加/右添加值

lrange mylist 0 10 查看列表0~10的值

blpop/brpop key1[key2] timeout 左删除/右删除一个,timeout是如果没有key,等待设置的时间后结束。

lpop/rpop key 左删除/右删除,没有等待时间。

llen key 获得长度

lindex key index 取第index元素,index是从0开始的

集合命令(不重复)

1
2
3
4
5
6
7
8
9
10
11
12
13
sadd myset "cnblogs"   添加内容,返回1表示不存在,0表示存在

scard key 查看set中的值

sdiff key1 [key2] 2个set做减法,其实就是减去了交际部分

sinter key1 [key2] 2个set做加法,其实就是留下了两者的交集

spop key 随机删除值

srandmember key member 随机获取member个值

smember key 获取全部的元素

可排序集合命令

1
2
3
4
5
zadd myset 0 ‘project1’ [1 ‘project2’]   添加集合元素;中括号是没有的,在这里是便于理解  

zrangebyscore myset 0 100 选取分数在0~100的元素

zcount key min max 选取分数在min~max的元素的个数

scrapy-redis编写分布式爬虫代码

scrapy-redis简介

scrapy-redisscrapy框架基于redis数据库的组件,用于scrapy项目的分布式开发和部署。

有如下特征:

  • 分布式爬取
    您可以启动多个spider工程,相互之间共享单个redisrequests队列。最适合广泛的多个域名网站的内容爬取。

  • 分布式数据处理
    爬取到的scrapyitem数据可以推入到redis队列中,这意味着你可以根据需求启动尽可能多的处理程序来共享item的队列,进行item数据持久化处理

  • Scrapy即插即用组件
    Scheduler调度器 + Duplication复制过滤器,Item Pipeline,基本spider

scrapy-redis架构

scrapy-redis架构

  • 首先Slaver端从Master端拿任务(Request、url)进行数据抓取,Slaver抓取数据的同时,产生新任务的Request便提交给 Master 处理;

  • Master端只有一个Redis数据库,负责将未处理的Request去重和任务分配,将处理后的Request加入待爬队列,并且存储爬取的数据。

Scrapy-Redis默认使用的就是这种策略,我们实现起来很简单,因为任务调度等工作Scrapy-Redis都已经帮我们做好了,我们只需要继承RedisSpider、指定redis_key就行了。

缺点是,Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),

可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间,所以如果要保证效率,那么就需要一定硬件水平。

scrapy-redis安装

通过pip安装即可:pip install scrapy-redis

一般需要python、redis、scrapy这三个安装包

官方文档:https://scrapy-redis.readthedocs.io/en/stable/

源码位置:https://github.com/rmax/scrapy-redis

scrapy-redis常用配置

一般在配置文件中添加如下几个常用配置选项:

1.(必须). 使用了scrapy_redis的去重组件,在redis数据库里做去重

1
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

2(必须). 使用了scrapy_redis的调度器,在redis里分配请求

1
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

3(可选). 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues

1
SCHEDULER_PERSIST = True

4(必须). 通过配置RedisPipelineitem写入keyspider.name : itemsredislist中,供后面的分布式处理item 这个已经由 scrapy-redis 实现,不需要我们写代码,直接使用即可

1
2
3
ITEM_PIPELINES = {
   'scrapy_redis.pipelines.RedisPipeline': 100 ,
}

5(必须). 指定redis数据库的连接参数

1
2
REDIS_HOST = '127.0.0.1' 
REDIS_PORT = 6379

scrapy-redis键名介绍

scrapy-redis中都是用key-value形式存储数据,其中有几个常见的key-value形式:

  • “项目名:items”–>list 类型,保存爬虫获取到的数据item内容json字符串
  • “项目名:dupefilter”–>set类型,用于爬虫访问的URL去重 内容是40个字符的 urlhash字符串
  • “项目名: start_urls”–>List类型,用于获取spider启动时爬取的第一个url
  • “项目名:requests”–>zset类型,用于scheduler调度处理 requests 内容是 request 对象的序列化 字符串

scrapy-redis简单实例

在原来非分布式爬虫的基础上,使用scrapy-redis简单搭建一个分布式爬虫,过程只需要修改一下spider的继承类和配置文件即可,很简单。

启动新项目scrapy startproject ScrayRedisTest

首先修改配置文件,在settings.py文件中添加代码:

1
2
3
4
5
6
7
8
9
10
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True

ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 100 ,
}

REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379

然后需要修改的文件,是spider文件,原文件代码为:

修改为:

只修改了两个地方,一个是继承类:由scrapy.Spider修改为RedisSpider然后start_url已经不需要了,修改为:redis_key = "xxxxx",其中,这个键的值暂时是自己取的名字,一般用项目名:start_urls来代替初始爬取的url。由于分布式scrapy-redis中每个请求都是从redis中取出来的,因此,在redis数据库中,设置一个redis_key的值,作为初始的urlscrapy就会自动在redis中取出redis_key的值,作为初始url,实现自动爬取。因此:来到redis中,添加代码:

1
lpush tencent:start_urls https://hr.tencent.com/position.php

即:在redis中设置一个键值对,键为tencent:start_urls , 值为:初始化url。即可将传入的url作为初始爬取的url

如此一来,分布式已经搭建完毕。

scrapy-redis源码解析

Scrapy-redis的源码大致分为以下py文件:connection.py + spider.py
spider.py文件是分布式爬虫的入口代码:

  • 通过connection接口,spider初始化时,通过setup_redis()函数初始化好和redis的连接。
  • 通过next_requests函数从redis中取出strat url,spider使用少量的start url + LinkExtractor,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spider的next_requests函数。
  • 再次从redisstart url池中读取一些url。

connection.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

import six
from scrapy.utils.misc import load_object
from . import defaults

# 快速映射settings配置文件中redis的基础配置字典
SETTINGS_PARAMS_MAP = {
'REDIS_URL': 'url',
'REDIS_HOST': 'host',
'REDIS_PORT': 'port',
'REDIS_ENCODING': 'encoding',
}

# 根据scrapy中settings配置文件信息返回一个redis客户端实例对象
def get_redis_from_settings(settings):
params = defaults.REDIS_PARAMS.copy()
params.update(settings.getdict('REDIS_PARAMS'))
for source, dest in SETTINGS_PARAMS_MAP.items():
val = settings.get(source)
if val:
params[dest] = val

if isinstance(params.get('redis_cls'), six.string_types):
params['redis_cls'] = load_object(params['redis_cls'])

return get_redis(**params)

# 返回一个redis的Strictredis实例对象
def get_redis(**kwargs):
redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
url = kwargs.pop('url', None)
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)

spider.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection, defaults
from .utils import bytes_to_str


# 实现从redis的队列中读取url
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None

# Redis client placeholder.
server = None

def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
# 链接redis
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal."""
pass

self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)

self.server = connection.from_settings(crawler.settings)
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

# 这个方法 的作用就是从redis中获取start_url
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)

if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis."""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)

def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)

def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle"""

@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
"""Spider that reads urls from redis queue when idle."""

@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
obj.setup_redis(crawler)
return obj

scheduler.py

这个文件重写了scheduler类,用来代替scrapy.core.scheduler的原有调度器。实现原理是使用指定的一个redis内存作为数据存储的媒介,以达到各个爬虫之间的统一调度。

  • scheduler负责调度各个spiderrequest请求,scheduler初始化时,通过settings文件读取queuedupefilters(url去重)的类型,配置queuedupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)。
  • 每当一个request要被调度时,enqueue_request被调用,scheduler使用dupefilters来判断这个url是否重复,如果不重复,就添加到queue的容器中(三种队列方式:先进先出,先进后出和优先级都可以,可以在settings中配置)。
  • 当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults

class Scheduler(object):
def __init__(self, server,
pass

@classmethoddef from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}

optional = {
pass
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val

if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

server = connection.from_settings(settings)
server.ping()

return cls(server=server, **kwargs)

@classmethoddef from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
instance.stats = crawler.stats
return instance

def open(self, spider):
self.spider = spider
pass

def close(self, reason):
if not self.persist:
self.flush()

def flush(self):
self.df.clear()
self.queue.clear()

def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request

def has_pending_requests(self):
return len(self) > 0

dupefilter.py

分布式爬虫url去重原理:
通过分析可以知道self.serverredis实例,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一个spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,默认使用spider名字 + fingerpointkey就是为了区分在不同主机上的不同spider实例,只要数据是同一个spider,就会访问到redis中的同一个spider-set而这个set就是url的判重池)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings
logger = logging.getLogger(__name__)

# 对请求做去重处理,可以被分布式下不同的schedule调用class RFPDupeFilter(BaseDupeFilter):
logger = logger

def __init__(self, server, key, debug=False):
self.server = server
self.key = key
self.debug = debug
self.logdupes = True


# 通过settings配置文件信息返回一个redis示例对象@classmethoddef from_settings(cls, settings):
server = get_redis_from_settings(settings)
key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)

@classmethoddef from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)

def request_seen(self, request):
fp = self.request_fingerprint(request)
added = self.server.sadd(self.key, fp)
return added == 0

# 这个方法是用来调用request_fingerprint接口的,这个接口通过sha1算法来判断两个url请#求地址是否相同(注意,这里面不完全是我们之前理解的hash了,如果两个url的地址相同,请求方式和参数都相同,#但是请求参数的前后顺序不同的话也别判定为同一个url地址,)从而达到url的去重功能。def request_fingerprint(self, request):
return request_fingerprint(request)

# Scrapy's scheduler调用,删除数据,关闭连接def close(self, reason=''):
self.clear()

# 清空操作记录数据def clear(self):
"""Clears fingerprints data."""
self.server.delete(self.key)

# 请求日志信息def log(self, request, spider):
pass

request.py

request_fingerprint接口:
通过request_fingerprint接口,通过sha1算法来判断两个url请求地址是否相同(注意,这里面不完全是我们之前理解的hash了,如果两个url的地址相同,请求方式和参数都相同,但是请求参数的前后顺序不同的话也别判定为同一个url地址)
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111)从而达到url的去重功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

"""This module provides some useful functions for working with scrapy.http.Request objects"""from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached

_fingerprint_cache = weakref.WeakKeyDictionary()def request_fingerprint(request, include_headers=None):
"""Return the request fingerprint"""
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
if include_headers not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
return cache[include_headers]

queue.py

这是个队列类,它会作为scheduler调度request的容器来维护一个秩序:

  • scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler`的多个实例存在于不同的主机上。
  • 因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名 + queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66

    from scrapy.utils.reqser import request_to_dict, request_from_dict
    from . import picklecompat

    # 队列基类class Base(object):
    def __init__(self, server, spider, key, serializer=None):
    pass
    self.server = server
    self.spider = spider
    self.key = key % {'spider': spider.name}
    self.serializer = serializer

    def _encode_request(self, request):
    """Encode a request object"""
    obj = request_to_dict(request, self.spider)
    return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
    """Decode an request previously encoded"""
    obj = self.serializer.loads(encoded_request)
    return request_from_dict(obj, self.spider)

    def push(self, request):
    """Push a request"""
    raise NotImplementedError

    def pop(self, timeout=0):
    """Pop a request"""
    raise NotImplementedError

    def clear(self):
    """Clear queue/stack"""
    self.server.delete(self.key)

    #队列----先进先出class FifoQueue(Base):
    """Per-spider FIFO queue"""

    def __len__(self):
    """Return the length of the queue"""
    return self.server.llen(self.key)

    def push(self, request):
    # request 进栈,进栈前对request做处理,request请求先被 scrapy的接口request_to_dict
    #变成了一个dict对象(因为request对象实在#是比较复杂,有方法有属性不好串行化),
    #之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中
    #(该key在同一种spider中是相同的)
    self.server.lpush(self.key, self._encode_request(request))

    def pop(self, timeout=0):
    # request出栈,其实就是从redis用那个特定的key去读其值(一个list),
    #从list中读取最早进去的那个,于是就先进先出了.
    if timeout > 0:
    data = self.server.brpop(self.key, timeout)
    if isinstance(data, tuple):
    data = data[1]
    else:
    data = self.server.rpop(self.key)
    if data:
    return self._decode_request(data)

    # 优先级队列class PriorityQueue(Base):pass# 栈----后进先出class LifoQueue(Base):
    pass

    SpiderQueue = FifoQueue
    SpiderStack = LifoQueue
    SpiderPriorityQueue = PriorityQueue

    picklecompat.py

    这里实现了loadsdumps两个函数,其实就是实现了一个serializer
    因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hashkey部分只能是字符串),所以我们存啥都要先串行化成文本才行。这里使用的就是pythonpickle模块,一个兼容py2py3的串行化工具。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    """A pickle wrapper module with protocol=-1 by default."""try:
    import cPickle as pickle # PY2except ImportError:
    import pickle

    def loads(s):
    return pickle.loads(s)

    def dumps(obj):
    return pickle.dumps(obj, protocol=-1)

pipelines.py

pipelines.py中类的作用:
pipeline.py文件用来实现数据分布式处理。它通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们延后处理数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):

def __init__(self, server,key=defaults.PIPELINE_KEY,serialize_func=default_serialize):
self.server = server
self.key = key
self.serialize = serialize_func

@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)

@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)

def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)

def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item

def item_key(self, item, spider):
return self.key % {'spider': spider.name}

这个项目通过重写schedulerspider类,实现了scheduler调度、spider启动和固定redis的交互。实现新的dupefilterqueue类,达到了去重和调度容器和redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和去重都统一进行统一管理,达到了分布式爬虫的目的。

spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redisrequest进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idlespider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作。


本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!