scrapy安装和配置

virtualenvwrapper管理虚拟环境

virtualenvwrapper这个软件包可以让我们管理虚拟环境变得更加简单。不用再跑到某个目录下通过virtualenv来创建虚拟环境,并且激活的时候也要跑到具体的目录下去激活。

安装virtualenvwrapper

  • *nix:pip install virtualenvwrapper。
  • windows:pip install virtualenvwrapper-win。

virtualenvwrapper基本使用

  • 创建虚拟环境:mkvirtualenv my_env
    那么会在你当前用户下创建一个Env的文件夹,然后将这个虚拟环境安装到这个目录下。如果你电脑中安装了python2python3,并且两个版本中都安装了virtualenvwrapper,那么将会使用环境变量中第一个出现的Python版本来作为这个虚拟环境的Python解释器。
  • 切换到某个虚拟环境:workon my_env
  • 退出当前虚拟环境:deactivate
  • 删除某个虚拟环境:rmvirtualenv my_env
  • 列出所有虚拟环境:lsvirtualenv
  • 进入到虚拟环境所在的目录:cdvirtualenv

修改mkvirtualenv的默认路径
我的电脑->右键->属性->高级系统设置->环境变量->系统变量中添加一个参数WORKON_HOME,将这个参数的值设置为你需要的路径。

创建虚拟环境的时候指定Python版本
在使用mkvirtualenv的时候,可以指定--python的参数来指定具体的python路径:mkvirtualenv --python==C:\Python36\python.exe hy_env

scrapy的安装

使用豆瓣源安装scrapy:pip install -i https://pypi.douban.com/simple/ scrapy

补充

进入虚拟环境: workon py3scrapy
创建项目: scrapy startproject ArticleSpider
需要进入到项目路径下cd ArticleSpider,才能建立spider: scrapy genspider jobbole news.cnblogs.com

设置项目的虚拟环境:
设置项目的虚拟环境

pycharm中调试scrapy源码

运行爬虫文件

建立一个main.py文件,在ArticleSpider文件目录下

from scrapy.cmdline import execute
import sys,os

os.path.dirname(os.path.abspath(__file__))
execute(['scrapy','crawl','jobbole']) #使用crawl命令执行爬虫程序jobbole
  • os.path.abspath(__file__) #获取main.py目录地址
  • dirname() #获取main.py父目录

调试程序

调试程序
pass位置设置断点,可以调试出爬虫数据。

xpath基础语法

xpath语法

表达式 说明
article 选取所有article元素的所有子节点
/article 选取跟元素article
article/a 选取所有article元素的所有子元素a元素
//div 选取所有div元素(不论出现在文档任何地方)
article//div 选取所有属于article元素的后代的div元素,不管它出现在article之下的任何位置
//@class 选取所有名为class的属性
/article/div[1] 选取属于article子元素的第一个div元素
/article/div[last()] 选取属于article子元素的最后一个div元素
/article/div[last()-1] 选取属于article子元素的倒数第二个div元素
//div[@lang] 选取所有拥有lang属性的div元素
//div[@lang=’eng’] 选取所有拥有lang属性为eng的div元素
/div/* 选取所有属于div元素的所有子节点
//* 选取所有元素

补充内容

xpath写对了,但是获取到的内容与网页呈现出的不一致。F12产生的源码,不同于网页源代码,前者可能是js加载完的源代码。response.xpath()是根据网页源代码来提取信息的。

contains()用法

response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]#
表示在span标签中class属性中含有 bookmark-btn 即为符合

scrapy shell 调试url,如:scrapy shell https://www.cnblogs.com/
scrapy shell 调试url

response.xpath('//title')

xpath提取元素

def parse(self, response):
    # url = response.xpath('//*[@id="entry_661638"]/div[2]/h2/a/@href').extract()[0]#extract()方法返回的是列表
    url = response.xpath('//*[@id="entry_661638"]/div[2]/h2/a/@href').extract_first()#提取第一个,如果没有就返回空值
    pass

css选择器

表达式 说明
* 选择所有节点
#container 选择id为container的节点
.container 选取所有class包含container的节点
li a 选取所有li下的所有a节点
ul + p 选取ul后面的第一个p元素
div #container > ul 选取id为container的div的ul子元素
p~ul 选择前面有

元素的每个

    元素。
a[title] 选取所有有title属性的a元素
a[href = “http://javami.com"] 选取所有href属性为javami.com值的a元素
a[href* = “jobole”] 选取所有href属性包含jobole的a元素
a[href^ = “http”] 选取所有href属性以http开头的a元素
a[href$ = “.jpg”] 选取所有href属性以.jpg结尾的a元素
input[type=radio]:checked 选择选中的radio的元素
div:not[#container] 选取所有id非container的div属性
li:nth-child(3)] 选取第三个li元素
tr:nth-child(2n)] 第偶数个tr
url = response.css('div#news_list h2 a::attr(href)').extract_first()

编写spider完成抓取过程

http带则不加,不带则加上写法

from urllib import parse

url = 'http://www.baidu.com/posts'
post_url = 'n/123'
print(parse.urljoin(url,post_url)) #http://www.baidu.com/n/123
post_url = 'https://www.cnblogs.com/n/1233'
print(parse.urljoin(url,post_url)) #https://www.cnblogs.com/n/1233

获取文章列表页中的文章url并交给scrapy下载后并进行解析

class JobboleSpider(scrapy.Spider):
    name = 'jobbole'
    allowed_domains = ['news.cnblogs.com']
    start_urls = ['http://news.cnblogs.com/']

    def parse(self, response):
        """
        1. 获取文章列表页中的文章url并交给scrapy下载后并进行解析
        2. 获取下一页的url并交给scrapy进行下载, 下载完成后交给parse
        """
        post_nodes = response.css('#news_list .news_block') #返回的是xpath对象集合
        for post_node in post_nodes:
            image_url = post_node.css('.entry_summary a img::attr(src)').extract_first("")
            post_url = post_node.css('h2 a::attr(href)').extract_first("")
            #异步函数,拼接url,生成器yield扔给self.parse_detail,注意,这里不是函数调用,所以不能写成callback=self.parse_detail()
            yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":parse.urljoin(response.url,image_url)},callback=self.parse_detail)


        #提取下一页并交给scrapy进行下载
        next_url = response.xpath("//a[contains(text(),'Next >')]/@href").extract_first("")
        yield Request(url=parse.urljoin(response.url,next_url),callback=self.parse)
        pass
    def parse_detail(self,response):
        pass

scrapy中为什么要使用yield

#生成器
def myGen():
    yield 1
    yield 2
    yield 3
    yield 4


for data in myGen():
    print(data)

#可以停止的函数
print(next(myGen()))
print(next(myGen()))

提取详情页信息

def parse_detail(self,response):
    match_re = re.match(".*?(\d+)",response.url)
    if match_re:
        title = response.css("#news_title a::text").extract_first("")
        create_date = response.css("#news_info .time::text").extract_first("")
        #文章内容标记
        content = response.css("#news_content").extract()[0]
        tag_list = response.css(".news_tags a::text").extract()
        tags = ",".join(tag_list)

        post_id = match_re.group(1)
        yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)),callback=self.pares_nums)
    pass

def pares_nums(self,response):
    j_data = json.loads(response.text)#获取到的数据转换成json
    parise_nums = j_data["DiggCount"]
    fav_nums = j_data["TotalView"]
    comment_nums = j_data["CommentCount"]
    pass

items的定义和使用

在items.py模块下创建JobBoleArticleItem类

class JobBoleArticleItem(scrapy.Item):
    title = scrapy.Fieldld()
    create_date = scrapy.Fieldld()
    url = scrapy.Fieldld()
    url_object_id = scrapy.Fieldld()
    front_image_url = scrapy.Fieldld()
    front_image_path = scrapy.Fieldld()
    parise_nums = scrapy.Fieldld()
    comment_nums = scrapy.Fieldld()
    fav_nums = scrapy.Fieldld()
    tags = scrapy.Fieldld()
    content = scrapy.Fieldld()

在jobble.py模块下引入JobBoleArticleItem类

from  ArticleSpider.items import JobBoleArticleItem 
from ArticleSpider.utils import common

    def parse_detail(self,response):
        match_re = re.match(".*?(\d+)",response.url)
        if match_re:
            #获得JobBoleArticleItem()对象
            article_item = JobBoleArticleItem()
            post_id = match_re.group(1)
            title = response.css("#news_title a::text").extract_first("")
            create_data = response.css("#news_info .time::text").extract_first("")
            match_re = re.match(".*?(\d+.*)", create_data)
            create_date = match_re.group(1)
            #文章内容标记
            content = response.css("#news_content").extract()[0]
            tag_list = response.css(".news_tags a::text").extract()
            tags = ",".join(tag_list)

            article_item["title"] = title
            article_item["create_date"] = create_date
            article_item["content"] = content
            article_item["tags"] = tags
            article_item["url"] = response.url
            #获取parse yield的meta={"front_image_url":image_url},没有则为空,把front_image_url转换成列表。对于图片下载字段,必须使用list类型
            article_item["front_image_url"] = [response.meta.get("front_image_url","")]

            yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)),meta={"article_item":article_item},callback=self.pares_nums)
        pass

    def pares_nums(self,response):
        j_data = json.loads(response.text)#获取到的数据转换成json
        #前一个引号是自己定义的名称,后一个空着,这样如果就不会抛异常
        article_item = response.meta.get("article_item","")
        parise_nums = j_data["DiggCount"]
        fav_nums = j_data["TotalView"]
        comment_nums = j_data["CommentCount"]


        article_item["parise_nums"] = parise_nums
        article_item["fav_nums"] = fav_nums
        article_item["comment_nums"] = comment_nums
        article_item["url_object_id"] = common.get_md5(article_item["url"])

        yield article_item #会走pipelines模式(数据处理)只能yield这两种模式(Request或者是scrapy.Item的子类)
        pass

在项目文件夹下创建子文件夹utils,写一个MD5生成类(common.py)

import hashlib
#哈希表摘要算法,输出固定长度
def get_md5(url):
    if isinstance(url,str): #判断url是否是str类型(判断是不是Unicode)
        url = url.encode("utf-8")
    m = hashlib.md5()
    m.update(url)

    return m.hexdigest()

if __name__ == '__main__':
    print(get_md5("http//www.javami.com"))

settings.py的设置问题

不遵守robots.txt协议

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

有关于pipelines的设置,需要打开项目管道

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
}

pipelines.py项目管道

class ArticlespiderPipeline:
    def process_item(self, item, spider):
        return item

scrapy配置图片下载

settings.py设置配置文件,就会自动下载图片

ITEM_PIPELINES = {
     'scrapy.pipelines.images.ImagesPipeline': 1,
}

配置图片默认下载路径

import os
project_dir=os.path.dirname(os.path.abspath(__file__))  #相对的路径IMAGES_STORE=os.path.join(project_dir,'images') #放在同级images目录下
IMAGES_URLS_FIELD = "front_image_url" #设置自动下载的字段

安装图片下载处理库pillow:pip install -i https://pypi.douban.com/simple pillow

在pipelines.py模块下重载图片方法

#目的:图片下载过程中的拦截,重载方法,下载过程中获取图片路径保存在item里面
#重点关注front_image_path路径
from scrapy.pipelines.images import ImagesPipeline

class ArticleImagesPipeline(ImagesPipeline):
        def item_completed(self, results, item, info):
            image_file_path = ""
            if "front_image_url" in item:
                for ok,value in results:
                    image_file_path = value["path"]
                item["front_image_url"] = image_file_path
            return item

因为重写了ArticleImagesPipeline,所以需要在设置中更改过来:

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.ArticleImagesPipeline': 1,
}

items数据写入到json文件中

数据保存都会放在piplines.py模块下处理

自定义json文件的导出

import codecs
import json

#自定义json文件的导出
class JsonWithEncodingPipeline(object):
    def __init__(self):
        self.file = codecs.open("article.json","w",encoding="utf-8")

    def process_item(self,item,spider):
        lines = json.dumps(dict(item),ensure_ascii=False) + "\n"
        self.file.write(lines)
        return item

    def spider_closed(self,spider):
        self.file.close()

在settings.py中配置ITEM_PIPELINES

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.JsonWithEncodingPipeline': 2,
}

调用scrapy提供的json export导出json文件

数据保存都会放在piplines.py模块下处理

from scrapy.exporters import JsonItemExporter

class JsonExporterPipleline(object):
    #调用scrapy提供的json export导出json文件
    def __init__(self):
        self.file = open('articleexport.json', 'wb')
        self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
        self.exporter.start_exporting()

    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

在settings.py中配置ITEM_PIPELINES

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.JsonExporterPipleline': 2,
}

mysql表结构设计

安装mysql的一个驱动(mysqlclient)

pip install -i https://pypi.douban.com/simple mysqlclient

mysql表结构设计,并且设置字符集

mysql表结构设计

pipeline数据库保存

采用同步的机制写入mysql

数据保存都会放在piplines.py模块下处理

import MySQLdb

class MysqlPipeline(object):
    #采用同步的机制写入mysql
    def __init__(self):
        self.conn = MySQLdb.connect('localhost', 'root', 'root', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql = """
            insert into jobbole_article(title, url,url_object_id,front_image_path,front_image_url,parise_nums,comment_nums,fav_nums,tags,content,create_date)
            VALUES (%s, %s, %s, %s, %s,%s, %s, %s, %s, %s,%s)
        """

        params = list()
        params.append(item.get("title",""))
        params.append(item.get("url", ""))
        params.append(item.get("url_object_id", ""))
        params.append(item.get("front_image_path", ""))
        params.append(item.get("front_image_url", ""))
        params.append(item.get("parise_nums", 0))
        params.append(item.get("comment_nums", 0))
        params.append(item.get("fav_nums", 0))
        params.append(item.get("tags", ""))
        params.append(item.get("content", ""))
        params.append(item.get("create_date", "1970-07-01"))
        self.cursor.execute(insert_sql,tuple(params))
        self.conn.commit()
        return item

在settings.py中配置ITEM_PIPELINES

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.MysqlPipeline': 4,
}

异步方式入库mysql

使用twisted将mysql插入变成异步执行

from twisted.enterprise import adbapi

class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        from MySQLdb.cursors import DictCursor
        dbparms = dict(
            host=settings["MYSQL_HOST"],
            db=settings["MYSQL_DBNAME"],
            user=settings["MYSQL_USER"],
            passwd=settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
        #使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider) #处理异常
        return item

    def handle_error(self, failure, item, spider):
        #处理异步插入的异常
        print (failure)

    def do_insert(self, cursor, item):
        #执行具体的插入
        #根据不同的item 构建不同的sql语句并插入到mysql中
        insert_sql = """
                    insert into jobbole_article(title, url,url_object_id,front_image_path,front_image_url,parise_nums,comment_nums,fav_nums,tags,content,create_date)
                    VALUES (%s, %s, %s, %s, %s,%s, %s, %s, %s, %s,%s)
                """

        params = list()
        params.append(item.get("title", ""))
        params.append(item.get("url", ""))
        params.append(item.get("url_object_id", ""))
        params.append(item.get("front_image_path", ""))
        params.append(item.get("front_image_url", ""))
        params.append(item.get("parise_nums", 0))
        params.append(item.get("comment_nums", 0))
        params.append(item.get("fav_nums", 0))
        params.append(item.get("tags", ""))
        params.append(item.get("content", ""))
        params.append(item.get("create_date", "1970-07-01"))

        cursor.execute(insert_sql, tuple(params))

在settings.py中配置ITEM_PIPELINES和数据库信息

ITEM_PIPELINES = {
    'ArticleSpider.pipelines.MysqlTwistedPipline': 4,
}

MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "root"

数据插入主键冲突的解决方法

ON DUPLICATE KEY UPDATE(当insert已经存在的记录时,执行Update)

def do_insert(self, cursor, item):
    #执行具体的插入
    #根据不同的item 构建不同的sql语句并插入到mysql中
    insert_sql = """
                insert into jobbole_article(title, url,url_object_id,front_image_path,front_image_url,parise_nums,comment_nums,fav_nums,tags,content,create_date)
                VALUES (%s, %s, %s, %s, %s,%s, %s, %s, %s, %s,%s) ON DUPLICATE KEY UPDATE parise_nums = VALUES(parise_nums)
            """

一旦parise_nums值存在,就更新该值的数值。

itemloader提取信息

处理逻辑为item_loader保存网页提取到的css字段或者value字段,通过items.py模块对字段进行定制化筛选

引入ArticleItemLoader类

from  ArticleSpider.items import JobBoleArticleItem
from ArticleSpider.items import ArticleItemLoader

    def parse_detail(self,response):
        match_re = re.match(".*?(\d+)",response.url)
        if match_re:
            # #获得JobBoleArticleItem()对象
            # article_item = JobBoleArticleItem()
            post_id = match_re.group(1)
            # title = response.css("#news_title a::text").extract_first("")
            # create_data = response.css("#news_info .time::text").extract_first("")
            # match_re = re.match(".*?(\d+.*)", create_data)
            # create_date = match_re.group(1)
            # #文章内容标记
            # content = response.css("#news_content").extract()[0]
            # tag_list = response.css(".news_tags a::text").extract()
            # tags = ",".join(tag_list)
            #
            # article_item["title"] = title
            # article_item["create_date"] = create_date
            # article_item["content"] = content
            # article_item["tags"] = tags
            # article_item["url"] = response.url
            #
            # if response.meta.get("front_image_url",""):
            #     # 获取parse yield的meta={"front_image_url":image_url},没有则为空,把front_image_url转换成列表。可能存在多图
            #     article_item["front_image_url"] = [response.meta.get("front_image_url", "")]
            # else:
            #     article_item["front_image_url"] = []
            #ArticleItemLoader类重写ItemLoader
            item_loader = ArticleItemLoader(item = JobBoleArticleItem(),response=response)
            item_loader.add_css("title","#news_title a::text")
            item_loader.add_css("content","#news_content")
            item_loader.add_css("tags", ".news_tags a::text")
            item_loader.add_css("create_date", "#news_info .time::text")
            item_loader.add_value("url",response.url)
            item_loader.add_value("front_image_url",response.meta.get("front_image_url",""))
            # article_item = item_loader.load_item()

            yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)),meta={"article_item":item_loader,"url":response.url},callback=self.pares_nums)
        pass

    def pares_nums(self,response):
        j_data = json.loads(response.text)#获取到的数据转换成json
        #前一个引号是自己定义的名称,后一个空着,这样如果就不会抛异常
        item_loader = response.meta.get("article_item","")
        parise_nums = j_data["DiggCount"]
        fav_nums = j_data["TotalView"]
        comment_nums = j_data["CommentCount"]

        item_loader.add_value("parise_nums",j_data["DiggCount"])
        item_loader.add_value("fav_nums", j_data["TotalView"])
        item_loader.add_value("comment_nums", j_data["CommentCount"])
        item_loader.add_value("url_object_id", common.get_md5(response.meta.get("url","")))
        # article_item["parise_nums"] = parise_nums
        # article_item["fav_nums"] = fav_nums
        # article_item["comment_nums"] = comment_nums
        # article_item["url_object_id"] = common.get_md5(article_item["url"])
        yield item_loader #会走pipelines模式(数据处理)只能yield这两种模式(Request或者是scrapy.Item的子类)
        pass

items.py模块下继承ItemLoader的基类item中定义一些规则

Item Loader 为每个 Item Field 单独提供了一个 Input processor和一个 Output processor

Input processor 一旦它通过 add_xpath()add_css()add_value() 方法收到提取到的数据便会执行,执行以后所得到的数据将仍然保存在 ItemLoader 实例中;当数据收集完成以后,ItemLoader 通过 load_item() 方法来进行填充并返回已填充的 Item 实例。

input_processor是在收集数据的过程中所做的处理,output_processor是数据yield之后进行的处理

from scrapy.loader.processors import MapCompose,TakeFirst,Identity,Join
from scrapy.loader import ItemLoader
import re

def date_convert(value):
    match_re = re.match(".*?(\d+.*)", value)
    if match_re:
        return match_re.group(1)
    else:
        return "1970-07-01"

class ArticleItemLoader(ItemLoader):
    default_output_processor = TakeFirst()

class JobBoleArticleItem(scrapy.Item):
    title = scrapy.Field()
    create_date = scrapy.Field(
        #输入处理器input_processor,Mapcompose()函数是引入处理函数的
        input_processor = MapCompose(date_convert)
    )
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    front_image_url = scrapy.Field(
        #成list
        output_processor=Identity()
    )
    front_image_path = scrapy.Field()
    parise_nums = scrapy.Field()
    comment_nums = scrapy.Field()
    fav_nums = scrapy.Field()
    tags = scrapy.Field(
        output_processor=Join(separator=",")
    )
    content = scrapy.Field()

大规模抓取图片下载出错的问题

避免获取到空字符串

if response.meta.get("front_image_url",[]):
    item_loader.add_value("front_image_url", response.meta.get("front_image_url", ""))

避免图片地址不一致

#startsWith()方法用于检测字符串是否以指定的前缀开始
if image_url.startswith("//"):
    image_url = "https:"+image_url

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!

Scrapy爬取知名问答网站 上一篇
MySQL初体验 下一篇