Scrapy-redis分布式爬虫

分布式爬虫要点

分布式爬虫

分布式的优点

  • 充分利用多机器的宽带加速爬取;
  • 充分利用多机的IP加速爬取速度。

分布式需要解决的问题

  • requests队列集中管理;
  • 去重集中管理。

redis基础知识

linux:sudo apt-get install redis-server

Windows安装

下载地址:https://github.com/tporadowski/redis/releases

Redis 支持 32 位和 64 位。这个需要根据你系统平台的实际情况选择,这里我们下载 Redis-x64-xxx.zip压缩包到 C 盘,解压后,将文件夹重新命名为 redis。
redis下载

打开文件夹,内容如下:
打开文件夹

打开一个 cmd 窗口 使用 cd 命令切换目录到 C:\redis 运行:

redis-server.exe redis.windows.conf

如果想方便的话,可以把 redis 的路径加到系统的环境变量里,这样就省得再输路径了,后面的那个 redis.windows.conf 可以省略,如果省略,会启用默认的。输入之后,会显示如下界面:
ccmd

这时候另启一个 cmd 窗口,原来的不要关闭,不然就无法访问服务端了。
切换到 redis 目录下运行:

redis-cli.exe -h 127.0.0.1 -p 6379

设置键值对:

set myKey abc

取出键值对:

get myKey

取值

Redis的数据类型

  • 字符串
  • 散列/哈希
  • 列表
  • 集合
  • 可排序集合

字符串命令

set mykey ''cnblogs''   创建变量

get mykey   查看变量

getrange mykey start end   获取字符串,如:getrange mykey  1 2 #mykey 1~2的字符串

strlen mykey   获取长度

incr/decr mykey    加一减一,类型是int

append mykey ''com''   添加字符串,添加到末尾

哈希命令

hset myhash name "cnblogs"   创建变量,myhash类似于变量名,name类似于key,"cnblogs"类似于values

hgetall myhash   得到key和values两者

hget myhash  name  得到values

hexists myhash name  检查是否存在这个key

hdel myhash name   删除这个key

hkeys myhash   查看key

hvals muhash   查看values

列表命令

lpush/rpush mylist "cnblogs"  左添加/右添加值

lrange mylist 0 10   查看列表0~10的值

blpop/brpop key1[key2] timeout   左删除/右删除一个,timeout是如果没有key,等待设置的时间后结束。

lpop/rpop key   左删除/右删除,没有等待时间。

llen key  获得长度

lindex key index    取第index元素,index是从0开始的

集合命令(不重复)

sadd myset "cnblogs"   添加内容,返回1表示不存在,0表示存在

scard key  查看set中的值

sdiff key1 [key2]   2个set做减法,其实就是减去了交际部分

sinter key1 [key2]    2个set做加法,其实就是留下了两者的交集

spop key   随机删除值

srandmember key member  随机获取member个值

smember key   获取全部的元素

可排序集合命令

zadd myset 0 ‘project1’ [1 ‘project2’]   添加集合元素;中括号是没有的,在这里是便于理解  

zrangebyscore myset 0 100   选取分数在0~100的元素

zcount key min max   选取分数在min~max的元素的个数

scrapy-redis编写分布式爬虫代码

scrapy-redis简介

scrapy-redisscrapy框架基于redis数据库的组件,用于scrapy项目的分布式开发和部署。

有如下特征:

  • 分布式爬取
    您可以启动多个spider工程,相互之间共享单个redisrequests队列。最适合广泛的多个域名网站的内容爬取。

  • 分布式数据处理
    爬取到的scrapyitem数据可以推入到redis队列中,这意味着你可以根据需求启动尽可能多的处理程序来共享item的队列,进行item数据持久化处理

  • Scrapy即插即用组件
    Scheduler调度器 + Duplication复制过滤器,Item Pipeline,基本spider

scrapy-redis架构

scrapy-redis架构

  • 首先Slaver端从Master端拿任务(Request、url)进行数据抓取,Slaver抓取数据的同时,产生新任务的Request便提交给 Master 处理;

  • Master端只有一个Redis数据库,负责将未处理的Request去重和任务分配,将处理后的Request加入待爬队列,并且存储爬取的数据。

Scrapy-Redis默认使用的就是这种策略,我们实现起来很简单,因为任务调度等工作Scrapy-Redis都已经帮我们做好了,我们只需要继承RedisSpider、指定redis_key就行了。

缺点是,Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),

可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间,所以如果要保证效率,那么就需要一定硬件水平。

scrapy-redis安装

通过pip安装即可:pip install scrapy-redis

一般需要python、redis、scrapy这三个安装包

官方文档:https://scrapy-redis.readthedocs.io/en/stable/

源码位置:https://github.com/rmax/scrapy-redis

scrapy-redis常用配置

一般在配置文件中添加如下几个常用配置选项:

1.(必须). 使用了scrapy_redis的去重组件,在redis数据库里做去重

DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

2(必须). 使用了scrapy_redis的调度器,在redis里分配请求

SCHEDULER = "scrapy_redis.scheduler.Scheduler"

3(可选). 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues

SCHEDULER_PERSIST = True

4(必须). 通过配置RedisPipelineitem写入keyspider.name : itemsredislist中,供后面的分布式处理item 这个已经由 scrapy-redis 实现,不需要我们写代码,直接使用即可

ITEM_PIPELINES = {
   'scrapy_redis.pipelines.RedisPipeline': 100 ,
}

5(必须). 指定redis数据库的连接参数

REDIS_HOST = '127.0.0.1' 
REDIS_PORT = 6379

scrapy-redis键名介绍

scrapy-redis中都是用key-value形式存储数据,其中有几个常见的key-value形式:

  • “项目名:items”–>list 类型,保存爬虫获取到的数据item内容json字符串
  • “项目名:dupefilter”–>set类型,用于爬虫访问的URL去重 内容是40个字符的 urlhash字符串
  • “项目名: start_urls”–>List类型,用于获取spider启动时爬取的第一个url
  • “项目名:requests”–>zset类型,用于scheduler调度处理 requests 内容是 request 对象的序列化 字符串

scrapy-redis简单实例

在原来非分布式爬虫的基础上,使用scrapy-redis简单搭建一个分布式爬虫,过程只需要修改一下spider的继承类和配置文件即可,很简单。

启动新项目scrapy startproject ScrayRedisTest

首先修改配置文件,在settings.py文件中添加代码:

DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True

ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline': 100 ,
}

REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379

然后需要修改的文件,是spider文件,原文件代码为:

修改为:

只修改了两个地方,一个是继承类:由scrapy.Spider修改为RedisSpider然后start_url已经不需要了,修改为:redis_key = "xxxxx",其中,这个键的值暂时是自己取的名字,一般用项目名:start_urls来代替初始爬取的url。由于分布式scrapy-redis中每个请求都是从redis中取出来的,因此,在redis数据库中,设置一个redis_key的值,作为初始的urlscrapy就会自动在redis中取出redis_key的值,作为初始url,实现自动爬取。因此:来到redis中,添加代码:

lpush tencent:start_urls https://hr.tencent.com/position.php

即:在redis中设置一个键值对,键为tencent:start_urls , 值为:初始化url。即可将传入的url作为初始爬取的url

如此一来,分布式已经搭建完毕。

scrapy-redis源码解析

Scrapy-redis的源码大致分为以下py文件:connection.py + spider.py
spider.py文件是分布式爬虫的入口代码:

  • 通过connection接口,spider初始化时,通过setup_redis()函数初始化好和redis的连接。
  • 通过next_requests函数从redis中取出strat url,spider使用少量的start url + LinkExtractor,可以发展出很多新的url,这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候,会触发spider_idle信号,从而触发spider的next_requests函数。
  • 再次从redisstart url池中读取一些url。

connection.py


import six
from scrapy.utils.misc import load_object
from . import defaults

# 快速映射settings配置文件中redis的基础配置字典
SETTINGS_PARAMS_MAP = {
    'REDIS_URL': 'url',
    'REDIS_HOST': 'host',
    'REDIS_PORT': 'port',
    'REDIS_ENCODING': 'encoding',
}

# 根据scrapy中settings配置文件信息返回一个redis客户端实例对象
def get_redis_from_settings(settings):
    params = defaults.REDIS_PARAMS.copy()
    params.update(settings.getdict('REDIS_PARAMS'))
    for source, dest in SETTINGS_PARAMS_MAP.items():
        val = settings.get(source)
        if val:
            params[dest] = val

    if isinstance(params.get('redis_cls'), six.string_types):
        params['redis_cls'] = load_object(params['redis_cls'])

    return get_redis(**params)

# 返回一个redis的Strictredis实例对象
def get_redis(**kwargs):
    redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
    url = kwargs.pop('url', None)
    if url:
        return redis_cls.from_url(url, **kwargs)
    else:
        return redis_cls(**kwargs)

spider.py


from scrapy import signals
from scrapy.exceptions import DontCloseSpider
from scrapy.spiders import Spider, CrawlSpider
from . import connection, defaults
from .utils import bytes_to_str


# 实现从redis的队列中读取url
class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    redis_key = None
    redis_batch_size = None
    redis_encoding = None

    # Redis client placeholder.
    server = None

    def start_requests(self):
    """Returns a batch of start requests from redis."""
        return self.next_requests()
    # 链接redis
    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal."""
        pass

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                   "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                   self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    # 这个方法 的作用就是从redis中获取start_url
    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
  fetch_one = self.server.spop if use_set else self.server.lpop
      # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        """Returns a Request instance from data coming from Redis."""
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        # TODO: While there is capacity, schedule a batch of redis requests.
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider


class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle"""

    @classmethod
   def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj


class RedisCrawlSpider(RedisMixin, CrawlSpider):
    """Spider that reads urls from redis queue when idle."""

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj

scheduler.py

这个文件重写了scheduler类,用来代替scrapy.core.scheduler的原有调度器。实现原理是使用指定的一个redis内存作为数据存储的媒介,以达到各个爬虫之间的统一调度。

  • scheduler负责调度各个spiderrequest请求,scheduler初始化时,通过settings文件读取queuedupefilters(url去重)的类型,配置queuedupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)。
  • 每当一个request要被调度时,enqueue_request被调用,scheduler使用dupefilters来判断这个url是否重复,如果不重复,就添加到queue的容器中(三种队列方式:先进先出,先进后出和优先级都可以,可以在settings中配置)。
  • 当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。

import importlib
import six
from scrapy.utils.misc import load_object
from . import connection, defaults

class Scheduler(object):
      def __init__(self, server,
           pass

@classmethoddef from_settings(cls, settings):
    kwargs = {
        'persist': settings.getbool('SCHEDULER_PERSIST'),
        'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
        'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
    }

    optional = {
        pass
    }
    for name, setting_name in optional.items():
        val = settings.get(setting_name)
        if val:
            kwargs[name] = val

    if isinstance(kwargs.get('serializer'), six.string_types):
        kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

    server = connection.from_settings(settings)
    server.ping()

    return cls(server=server, **kwargs)

@classmethoddef from_crawler(cls, crawler):
    instance = cls.from_settings(crawler.settings)
    instance.stats = crawler.stats
    return instance

def open(self, spider):
    self.spider = spider
    pass

def close(self, reason):
    if not self.persist:
        self.flush()

def flush(self):
    self.df.clear()
    self.queue.clear()

def next_request(self):
    block_pop_timeout = self.idle_before_close
    request = self.queue.pop(block_pop_timeout)
    if request and self.stats:
        self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
    return request

def has_pending_requests(self):
    return len(self) > 0

dupefilter.py

分布式爬虫url去重原理:
通过分析可以知道self.serverredis实例,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一个spider是相同的,redis是一个key-value的数据库,如果key是相同的,访问到的值就是相同的,默认使用spider名字 + fingerpointkey就是为了区分在不同主机上的不同spider实例,只要数据是同一个spider,就会访问到redis中的同一个spider-set而这个set就是url的判重池)。


import logging
import time
from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint
from . import defaults
from .connection import get_redis_from_settings
logger = logging.getLogger(__name__)

# 对请求做去重处理,可以被分布式下不同的schedule调用class RFPDupeFilter(BaseDupeFilter):
    logger = logger

    def __init__(self, server, key, debug=False):
    self.server = server
    self.key = key
    self.debug = debug
    self.logdupes = True


# 通过settings配置文件信息返回一个redis示例对象@classmethoddef from_settings(cls, settings):
    server = get_redis_from_settings(settings)
    key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
    debug = settings.getbool('DUPEFILTER_DEBUG')
    return cls(server, key=key, debug=debug)

@classmethoddef from_crawler(cls, crawler):
    return cls.from_settings(crawler.settings)

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    added = self.server.sadd(self.key, fp)
    return added == 0

# 这个方法是用来调用request_fingerprint接口的,这个接口通过sha1算法来判断两个url请#求地址是否相同(注意,这里面不完全是我们之前理解的hash了,如果两个url的地址相同,请求方式和参数都相同,#但是请求参数的前后顺序不同的话也别判定为同一个url地址,)从而达到url的去重功能。def request_fingerprint(self, request):
    return request_fingerprint(request)

# Scrapy's scheduler调用,删除数据,关闭连接def close(self, reason=''):
    self.clear()

# 清空操作记录数据def clear(self):
    """Clears fingerprints data."""
    self.server.delete(self.key)

# 请求日志信息def log(self, request, spider):
    pass

request.py

request_fingerprint接口:
通过request_fingerprint接口,通过sha1算法来判断两个url请求地址是否相同(注意,这里面不完全是我们之前理解的hash了,如果两个url的地址相同,请求方式和参数都相同,但是请求参数的前后顺序不同的话也别判定为同一个url地址)
http://www.example.com/query?id=111&cat=222
http://www.example.com/query?cat=222&id=111)从而达到url的去重功能。


"""This module provides some useful functions for working with scrapy.http.Request objects"""from __future__ import print_function
import hashlib
import weakref
from six.moves.urllib.parse import urlunparse
from w3lib.http import basic_auth_header
from scrapy.utils.python import to_bytes, to_native_str
from w3lib.url import canonicalize_url
from scrapy.utils.httpobj import urlparse_cached

_fingerprint_cache = weakref.WeakKeyDictionary()def request_fingerprint(request, include_headers=None):
    """Return the request fingerprint"""
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
                                 for h in sorted(include_headers))
    cache = _fingerprint_cache.setdefault(request, {})
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                    for v in request.headers.getlist(hdr):
                       fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]

queue.py

这是个队列类,它会作为scheduler调度request的容器来维护一个秩序:

  • scheduler在每个主机上都会实例化一个,并且和spider一一对应,所以分布式运行时会有一个spider的多个实例和一个scheduler`的多个实例存在于不同的主机上。
  • 因为scheduler都是用相同的容器,而这些容器都连接同一个redis服务器,又都使用spider名 + queue来作为key读写数据,所以不同主机上的不同爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。
    
    from scrapy.utils.reqser import request_to_dict, request_from_dict
    from . import picklecompat
    
    # 队列基类class Base(object):
        def __init__(self, server, spider, key, serializer=None):
            pass
            self.server = server
            self.spider = spider
            self.key = key % {'spider': spider.name}
            self.serializer = serializer
    
        def _encode_request(self, request):
            """Encode a request object"""
            obj = request_to_dict(request, self.spider)
            return self.serializer.dumps(obj)
    
        def _decode_request(self, encoded_request):
            """Decode an request previously encoded"""
           obj = self.serializer.loads(encoded_request)
            return request_from_dict(obj, self.spider)
    
        def push(self, request):
            """Push a request"""
            raise NotImplementedError
    
        def pop(self, timeout=0):
           """Pop a request"""
            raise NotImplementedError
    
        def clear(self):
            """Clear queue/stack"""
            self.server.delete(self.key)
    
    #队列----先进先出class FifoQueue(Base):
        """Per-spider FIFO queue"""
    
        def __len__(self):
            """Return the length of the queue"""
           return self.server.llen(self.key)
    
       def push(self, request):
            # request 进栈,进栈前对request做处理,request请求先被 scrapy的接口request_to_dict
        #变成了一个dict对象(因为request对象实在#是比较复杂,有方法有属性不好串行化),
            #之后使用picklecompat中的serializer串行化为字符串,然后使用一个特定的key存入redis中
            #(该key在同一种spider中是相同的)
            self.server.lpush(self.key, self._encode_request(request))
    
        def pop(self, timeout=0):
            # request出栈,其实就是从redis用那个特定的key去读其值(一个list),
            #从list中读取最早进去的那个,于是就先进先出了.
            if timeout > 0:
                data = self.server.brpop(self.key, timeout)
                if isinstance(data, tuple):
                    data = data[1]
           else:
                data = self.server.rpop(self.key)
                if data:
                    return self._decode_request(data)
    
    # 优先级队列class PriorityQueue(Base):pass# 栈----后进先出class LifoQueue(Base):
        pass
    
    SpiderQueue = FifoQueue
    SpiderStack = LifoQueue
    SpiderPriorityQueue = PriorityQueue

    picklecompat.py

    这里实现了loadsdumps两个函数,其实就是实现了一个serializer
    因为redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hashkey部分只能是字符串),所以我们存啥都要先串行化成文本才行。这里使用的就是pythonpickle模块,一个兼容py2py3的串行化工具。
    
    """A pickle wrapper module with protocol=-1 by default."""try:
        import cPickle as pickle  # PY2except ImportError:
        import pickle
    
    def loads(s):
        return pickle.loads(s)
    
    def dumps(obj):
        return pickle.dumps(obj, protocol=-1)

pipelines.py

pipelines.py中类的作用:
pipeline.py文件用来实现数据分布式处理。它通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key,把item串行化之后存入redis数据库对应的value中(这个value可以看出是个list,我们的每个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便我们延后处理数据。


from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):

    def __init__(self, server,key=defaults.PIPELINE_KEY,serialize_func=default_serialize):
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
            settings['REDIS_ITEMS_SERIALIZER']
        )
        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        return self.key % {'spider': spider.name}

这个项目通过重写schedulerspider类,实现了scheduler调度、spider启动和固定redis的交互。实现新的dupefilterqueue类,达到了去重和调度容器和redis的交互,因为每个主机上的爬虫进程都访问同一个redis数据库,所以调度和去重都统一进行统一管理,达到了分布式爬虫的目的。

spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。每当一个spider产出一个request的时候,scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度,scheduler对象通过访问redisrequest进行判重,如果不重复就把他添加进redis中的调度池。当调度条件满足时,scheduler对象就从redis的调度池中取出一个request发送给spider,让他爬取。当spider爬取的所有暂时可用url之后,scheduler发现这个spider对应的redis的调度池空了,于是触发信号spider_idlespider收到这个信号之后,直接连接redis读取strart url池,拿去新的一批url入口,然后再次重复上边的工作。


本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!