分析获取影视资源的方案

爬虫技术选型

  • 方便嵌入Flask的定时任务框架
  • RequestsPython Http客户端
  • BeatifulSoupHtml解析标签库

APScheduler和Flask-APScheduler

  • APScheduler:Python任务调度模块
  • Flask-APScheduler:Flask框架调度模块

从豆瓣源安装定时器:

pip install -i https://pypi.doubanio.com/simple/ Flask-APScheduler
pip install -i https://pypi.doubanio.com/simple/ apscheduler
#aps_test.py
from apscheduler.schedulers.blocking import  BlockingScheduler
import datetime

def aps_test():
    print( datetime.datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ) )

#5秒整倍数,就执行这个函数
scheduler = BlockingScheduler()
scheduler.add_job( func= aps_test,trigger="cron",second="*/5")
scheduler.start()
#application.py

from flask_apscheduler import APScheduler

scheduler = APScheduler()
scheduler.init_app( app )
#manager.py
from application import scheduler
from common.libs.aps_test import  aps_test

def main():
    app.apscheduler.add_job(func=aps_test, trigger="cron", second="*/5", id="aps_test")
    scheduler.start()

实现可扩展的定时器框架

  • 便于测试
  • 便于管理
  • 便于部署

举个例子:

#movie.py
from flask_script import Command

class MovieJob(Command):
    def run(self):
        print('MovieJob')
#manager.py
from jobs.movie import MovieJob
#传入类名
manager.add_command("runjob",MovieJob)

命令行窗口执行:

python manager.py runjob

会输出MovieJob

设计一个适合自己的Job框架:launcher.py

# launcher.py
from flask_script import Command
import sys, argparse, traceback, importlib

'''
Job统一入口文件
python manager.py runjob -m Test ( jobs/tasks/Test.py )
python manager.py runjob -m test/index ( jobs/tasks/test/index.py )
'''

class runJob(Command):

    capture_all_args = True
    def run(self,*args,**kwargs):
        # 输入"python manager.py runjob -m Test" 得到:['-m', 'Test']
        args = sys.argv[2:]

        '''
        当你在命令行程序中指定 -h 或 --help 参数时,都会输出帮助信息。
        而 argparse 可通过指定 add_help 入参为 True 或不指定,以达到自动输出帮助信息的目的。
        '''
        parser = argparse.ArgumentParser(add_help=True)
        parser.add_argument("-m", "--name", dest="name", metavar="name", help="指定job名", required=True)
        parser.add_argument("-a", "--act", dest="act", metavar="act", help="Job动作", required=False)
        parser.add_argument("-p", "--param", dest="param", nargs="*", metavar="param", help="业务参数", required=False)
        params = parser.parse_args(args)
        # 输入"python manager.py runjob -m Test" 得到:{'name': 'Test', 'act': None, 'param': None}
        params_dict = params.__dict__
        if "name" not in params_dict or not params_dict['name']:
            return self.tips()

        try:
            '''
            from jobs.tasks.test import JobTask
            '''
            #test.index
            module_name = params_dict['name'].replace("/",".")
            # jobs.tasks.test
            import_string = "jobs.tasks.%s" % (module_name)
            target = importlib.import_module(import_string)
            #执行该类JobTask的run方法,并且把参数{'name': 'test', 'act': None, 'param': None}传递过去
            exit( target.JobTask().run( params_dict ) )
        except Exception as e:
            traceback.print_exc()
        return

    def tips(self):
        tip_msg = '''
        请正确的调度Job
        python manager.py runjob -m Test ( jobs/tasks/Test.py )
        python manager.py runjob -m test/index ( jobs/tasks/test/index.py )
        '''
        print(tip_msg)
        return
#jobs/tasks/test.py
class JobTask():
    def __init__(self):
        pass

    def run(self,params):
        print( "Job测试打印" )
        print( params )
# manager.py
from jobs.launcher import runJob
#传入类名
manager.add_command( "runjob", runJob)

获取影视数据

影视表设计

CREATE TABLE `movie` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(200) NOT NULL DEFAULT '' COMMENT '电影名称',
  `classify` varchar(100) NOT NULL DEFAULT '' COMMENT '类别',
  `actor` varchar(500) NOT NULL DEFAULT '' COMMENT '主演',
  `cover_pic` varchar(300) NOT NULL DEFAULT '' COMMENT '封面图',
  `pics` varchar(1000) NOT NULL DEFAULT '' COMMENT '图片地址json',
  `url` varchar(300) NOT NULL DEFAULT '' COMMENT '电影详情地址',
  `desc` text NOT NULL COMMENT '电影描述',
  `magnet_url` varchar(5000) NOT NULL DEFAULT '' COMMENT '磁力下载地址',
  `hash` varchar(32) NOT NULL DEFAULT '' COMMENT '唯一值',
  `pub_date` datetime NOT NULL COMMENT '来源网址发布日期',
  `source` varchar(20) NOT NULL DEFAULT '' COMMENT '来源',
  `view_counter` int(11) NOT NULL DEFAULT '0' COMMENT '阅读数',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '插入时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_hash` (`hash`),
  KEY `idx_pu_date` (`pub_date`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COMMENT='影视数据表';

自动生成model的语法如下:


flask-sqlacodegen "mysql://root:123456@127.0.0.1/movie_cat" --tables movie --outfile "C:/Users/yjw55/PycharmProjects/new_flask/9.1/common/models/movie.py"  --flask

生成的models/movie.py如下:

# coding: utf-8
from application import db

class Movie(db.Model):
    __tablename__ = 'movie'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(200), nullable=False, server_default=db.FetchedValue())
    classify = db.Column(db.String(100), nullable=False, server_default=db.FetchedValue())
    actor = db.Column(db.String(500), nullable=False, server_default=db.FetchedValue())
    cover_pic = db.Column(db.String(300), nullable=False, server_default=db.FetchedValue())
    pics = db.Column(db.String(1000), nullable=False, server_default=db.FetchedValue())
    url = db.Column(db.String(300), nullable=False, server_default=db.FetchedValue())
    desc = db.Column(db.Text, nullable=False)
    magnet_url = db.Column(db.String(5000), nullable=False, server_default=db.FetchedValue())
    hash = db.Column(db.String(32), nullable=False, unique=True, server_default=db.FetchedValue())
    pub_date = db.Column(db.DateTime, nullable=False, index=True)
    source = db.Column(db.String(20), nullable=False, server_default=db.FetchedValue())
    view_counter = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
    updated_time = db.Column(db.DateTime, nullable=False, server_default=db.FetchedValue())
    created_time = db.Column(db.DateTime, nullable=False, server_default=db.FetchedValue())

    def __init__(self,**items):
        for key in items:
            if hasattr( self,key ):
                setattr( self,key,items[key] )

执行输入:python manager.py runjob -m movie -a list,详细爬取代码如下:


# -*- coding: utf-8 -*-
from application import app,db
import requests,os,time,hashlib,json,re
from bs4 import BeautifulSoup
from common.libs.DataHelper import getCurrentTime
from urllib.parse import urlparse
from common.models.movie import Movie
'''
python manager.py runjob -m movie -a list | parse
'''
class JobTask():
    def __init__(self):
        self.source = "btbtdy"
        self.url = {
            "num": 3,
            "url": "http://btbtdy1.com/btfl/dy1-#d#.html",
            "path": "/tmp/%s/" % (self.source)
        }

    '''
        第一步 首先 获取列表list html 回来,通过解析html 获取详情 的 url等信息,在根据详情url 获取详情html
        第二步 解析 详情的html
    '''
    def run(self,params):
        act = params['act']
        self.date = getCurrentTime( frm = "%Y%m%d")
        if act == "list":
            self.getList()
            self.parseInfo()
        elif act == "parse":
            self.parseInfo()

    '''
    获取列表
    '''
    def getList(self):
        config = self.url
        #/tmp/btbtdy/20200615
        path_root = config['path'] + self.date
        path_list = path_root + "/list"
        path_info = path_root + "/info"
        path_json = path_root + "/json"
        path_vid = path_root + "/vid"

        self.makeSuredirs(path_root)
        self.makeSuredirs(path_list)
        self.makeSuredirs(path_info)
        self.makeSuredirs(path_json)
        self.makeSuredirs(path_vid)

        pages = range(1, config['num'] + 1)
        # 1,2,3
        for idx in pages:
            #/tmp/btbtdy/20200615/list/1
            tmp_path = path_list + "/" + str(idx)
            '''
            http://btbtdy1.com/btfl/dy1-1.html
            http://btbtdy1.com/btfl/dy1-2.html
            http://btbtdy1.com/btfl/dy1-3.html
            '''
            tmp_url = config['url'].replace("#d#", str(idx))
            if os.path.exists(tmp_path):
                continue

            tmp_content = self.getHttpContent(tmp_url)
            self.saveContent(tmp_path, tmp_content)
            time.sleep(0.3)

        for idx in os.listdir(path_list):
            tmp_content = self.getContent(path_list + "/" + str(idx))
            items_data = self.parseList(tmp_content)
            if not items_data:
                continue

            '''
            itme 数据{'name': '爆发!暴走族', 'url': 'http://btbtdy1.com/btdy/dy26767.html', 
            'vid_url': 'http://btbtdy1.com/vidlist/26767.html', 
            'hash': '51a7b571e30434e43911ee09b4e36753'}
            '''
            for item in items_data:
                #/tmp/btbtdy/20200615/json/669f4f53853f2a3d8a1d2c751c3587be
                tmp_json_path = path_json + "/" + item['hash']
                tmp_info_path = path_info + "/" + item['hash']
                tmp_vid_path = path_vid + "/" + item['hash']
                if not os.path.exists(tmp_json_path):
                    #json_dumps(dict)时,如果dict包含有汉字,一定加上ensure_ascii=False
                    self.saveContent(tmp_json_path, json.dumps(item, ensure_ascii=False))

                if not os.path.exists(tmp_info_path):
                    tmp_content = self.getHttpContent(item['url'])
                    self.saveContent(tmp_info_path, tmp_content)

                if not os.path.exists(tmp_vid_path):
                    tmp_content = self.getHttpContent(item['vid_url'])
                    self.saveContent(tmp_vid_path, tmp_content)

                time.sleep(0.3)


    '''
    解析详情信息
    '''
    def parseInfo(self):
        config = self.url
        path_root = config['path'] + self.date
        path_info = path_root + "/info"
        path_json = path_root + "/json"
        path_vid = path_root + "/vid"
        for filename in os.listdir(path_info):
            tmp_json_path = path_json + "/" + filename
            tmp_info_path = path_info + "/" + filename
            tmp_vid_path = path_vid + "/" + filename

            tmp_data = json.loads(self.getContent(tmp_json_path), encoding="utf-8")
            tmp_content = self.getContent(tmp_info_path)
            tmp_soup = BeautifulSoup(tmp_content, "html.parser")
            try:
                tmp_pub_date = tmp_soup.select("div.vod div.vod_intro dl dd")[0].getText()
                tmp_desc = tmp_soup.select("div.vod div.vod_intro div.des")[0].getText()
                tmp_classify = tmp_soup.select("div.vod div.vod_intro dl dd")[2].getText()
                tmp_actor = tmp_soup.select("div.vod div.vod_intro dl dd")[6].getText()
                tmp_pic_list = tmp_soup.select("div.vod div.vod_img img")
                tmp_pics = []
                for tmp_pic in tmp_pic_list:
                    tmp_pics.append(tmp_pic['src'])

                # 获取下载地址
                tmp_download_content = self.getContent(tmp_vid_path)
                tmp_vid_soup = BeautifulSoup(tmp_download_content, "html.parser")
                tmp_download_list = tmp_vid_soup.findAll("a", href=re.compile("magnet:?"))
                tmp_magnet_url = ""
                if tmp_download_list:
                    tmp_magnet_url = tmp_download_list[0]['href']

                tmp_data['pub_date'] = tmp_pub_date
                tmp_data['desc'] = tmp_desc
                tmp_data['classify'] = tmp_classify
                tmp_data['actor'] = tmp_actor
                tmp_data['magnet_url'] = tmp_magnet_url
                tmp_data['source'] = self.source
                tmp_data['created_time'] = tmp_data['updated_time'] = getCurrentTime()
                if tmp_pics:
                    tmp_data['cover_pic'] = tmp_pics[0]
                    tmp_data['pics'] = json.dumps(tmp_pics)

                tmp_movie_info = Movie.query.filter_by( hash  = tmp_data['hash']).first()
                if tmp_movie_info:
                    continue
                print('执行~~~~')
                tmp_model_movie = Movie(**tmp_data)
                db.session.add(tmp_model_movie)
                db.session.commit()
            except:
                continue
        return True


    def parseList(self,content):
        data = []
        config = self.url
        #ParseResult(scheme='http', netloc='btbtdy1.com', path='/btfl/dy1-', params='', query='', fragment='d#.html')
        #可以分析url的成份
        url_info = urlparse(config['url'])
        # http://btbtdy1.com
        url_domain = url_info[0] + "://" + url_info[1]

        tmp_soup = BeautifulSoup(str(content), "html.parser")
        tmp_list = tmp_soup.select("div.list_su ul li")
        for tmp_item in tmp_list:
            tmp_target = tmp_item.select("a.pic_link")
            tmp_name = tmp_target[0]['title']
            tmp_href = tmp_target[0]['href']
            if "http:" not in tmp_href:
                tmp_href = url_domain + tmp_href
            tmp_vid_url = tmp_href.replace("btdy/dy", "vidlist/")
            tmp_data = {
                "name": tmp_name,
                "url": tmp_href,
                "vid_url": tmp_vid_url,
                "hash": hashlib.md5(tmp_href.encode("utf-8")).hexdigest()
            }
            data.append(tmp_data)
        return data



    def saveContent(self,path,content):
        if content:

            with open( path,mode="w+",encoding="utf-8" ) as f:
                if type(content) != str:
                    content = content.decode("utf-8")

                f.write(content)
                f.flush()
                f.close()

    def getHttpContent(self, url):
        try:
            r = requests.get(url,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36'})
            if r.status_code != 200:
                return None

            return r.content

        except Exception:
            return None

    def makeSuredirs(self,path):
        if not os.path.exists( path ):
            os.makedirs( path )


    def getContent(self,path):
        if os.path.exists( path ):
            with open( path ,"r",encoding='utf-8' ) as  f:
                return f.read()

        return ''

    def saveContent(self,path,content):
        if content:
            with open( path,mode="w+",encoding="utf-8" ) as f:
                if type(content) != str:
                    content = content.decode("utf-8")

                f.write(content )
                f.flush()
                f.close()