通过定时器获取电影资料

分析获取影视资源的方案

爬虫技术选型

  • 方便嵌入Flask的定时任务框架
  • RequestsPython Http客户端
  • BeatifulSoupHtml解析标签库

APScheduler和Flask-APScheduler

  • APScheduler:Python任务调度模块
  • Flask-APScheduler:Flask框架调度模块

从豆瓣源安装定时器:

1
2
pip install -i https://pypi.doubanio.com/simple/ Flask-APScheduler
pip install -i https://pypi.doubanio.com/simple/ apscheduler
1
2
3
4
5
6
7
8
9
10
11
#aps_test.py
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def aps_test():
print( datetime.datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ) )

#5秒整倍数,就执行这个函数
scheduler = BlockingScheduler()
scheduler.add_job( func= aps_test,trigger="cron",second="*/5")
scheduler.start()
1
2
3
4
5
6
#application.py

from flask_apscheduler import APScheduler

scheduler = APScheduler()
scheduler.init_app( app )
1
2
3
4
5
6
7
#manager.py
from application import scheduler
from common.libs.aps_test import aps_test

def main():
app.apscheduler.add_job(func=aps_test, trigger="cron", second="*/5", id="aps_test")
scheduler.start()

实现可扩展的定时器框架

  • 便于测试
  • 便于管理
  • 便于部署

举个例子:

1
2
3
4
5
6
#movie.py
from flask_script import Command

class MovieJob(Command):
def run(self):
print('MovieJob')
1
2
3
4
#manager.py
from jobs.movie import MovieJob
#传入类名
manager.add_command("runjob",MovieJob)

命令行窗口执行:

1
python manager.py runjob

会输出MovieJob

设计一个适合自己的Job框架:launcher.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# launcher.py
from flask_script import Command
import sys, argparse, traceback, importlib

'''
Job统一入口文件
python manager.py runjob -m Test ( jobs/tasks/Test.py )
python manager.py runjob -m test/index ( jobs/tasks/test/index.py )
'''

class runJob(Command):

capture_all_args = True
def run(self,*args,**kwargs):
# 输入"python manager.py runjob -m Test" 得到:['-m', 'Test']
args = sys.argv[2:]

'''
当你在命令行程序中指定 -h 或 --help 参数时,都会输出帮助信息。
而 argparse 可通过指定 add_help 入参为 True 或不指定,以达到自动输出帮助信息的目的。
'''
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument("-m", "--name", dest="name", metavar="name", help="指定job名", required=True)
parser.add_argument("-a", "--act", dest="act", metavar="act", help="Job动作", required=False)
parser.add_argument("-p", "--param", dest="param", nargs="*", metavar="param", help="业务参数", required=False)
params = parser.parse_args(args)
# 输入"python manager.py runjob -m Test" 得到:{'name': 'Test', 'act': None, 'param': None}
params_dict = params.__dict__
if "name" not in params_dict or not params_dict['name']:
return self.tips()

try:
'''
from jobs.tasks.test import JobTask
'''
#test.index
module_name = params_dict['name'].replace("/",".")
# jobs.tasks.test
import_string = "jobs.tasks.%s" % (module_name)
target = importlib.import_module(import_string)
#执行该类JobTask的run方法,并且把参数{'name': 'test', 'act': None, 'param': None}传递过去
exit( target.JobTask().run( params_dict ) )
except Exception as e:
traceback.print_exc()
return

def tips(self):
tip_msg = '''
请正确的调度Job
python manager.py runjob -m Test ( jobs/tasks/Test.py )
python manager.py runjob -m test/index ( jobs/tasks/test/index.py )
'''
print(tip_msg)
return
1
2
3
4
5
6
7
8
#jobs/tasks/test.py
class JobTask():
def __init__(self):
pass

def run(self,params):
print( "Job测试打印" )
print( params )
1
2
3
4
# manager.py
from jobs.launcher import runJob
#传入类名
manager.add_command( "runjob", runJob)

获取影视数据

影视表设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE `movie` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(200) NOT NULL DEFAULT '' COMMENT '电影名称',
`classify` varchar(100) NOT NULL DEFAULT '' COMMENT '类别',
`actor` varchar(500) NOT NULL DEFAULT '' COMMENT '主演',
`cover_pic` varchar(300) NOT NULL DEFAULT '' COMMENT '封面图',
`pics` varchar(1000) NOT NULL DEFAULT '' COMMENT '图片地址json',
`url` varchar(300) NOT NULL DEFAULT '' COMMENT '电影详情地址',
`desc` text NOT NULL COMMENT '电影描述',
`magnet_url` varchar(5000) NOT NULL DEFAULT '' COMMENT '磁力下载地址',
`hash` varchar(32) NOT NULL DEFAULT '' COMMENT '唯一值',
`pub_date` datetime NOT NULL COMMENT '来源网址发布日期',
`source` varchar(20) NOT NULL DEFAULT '' COMMENT '来源',
`view_counter` int(11) NOT NULL DEFAULT '0' COMMENT '阅读数',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '插入时间',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_hash` (`hash`),
KEY `idx_pu_date` (`pub_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='影视数据表';

自动生成model的语法如下:

1
2

flask-sqlacodegen "mysql://root:123456@127.0.0.1/movie_cat" --tables movie --outfile "C:/Users/yjw55/PycharmProjects/new_flask/9.1/common/models/movie.py" --flask

生成的models/movie.py如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# coding: utf-8
from application import db

class Movie(db.Model):
__tablename__ = 'movie'

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False, server_default=db.FetchedValue())
classify = db.Column(db.String(100), nullable=False, server_default=db.FetchedValue())
actor = db.Column(db.String(500), nullable=False, server_default=db.FetchedValue())
cover_pic = db.Column(db.String(300), nullable=False, server_default=db.FetchedValue())
pics = db.Column(db.String(1000), nullable=False, server_default=db.FetchedValue())
url = db.Column(db.String(300), nullable=False, server_default=db.FetchedValue())
desc = db.Column(db.Text, nullable=False)
magnet_url = db.Column(db.String(5000), nullable=False, server_default=db.FetchedValue())
hash = db.Column(db.String(32), nullable=False, unique=True, server_default=db.FetchedValue())
pub_date = db.Column(db.DateTime, nullable=False, index=True)
source = db.Column(db.String(20), nullable=False, server_default=db.FetchedValue())
view_counter = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue())
updated_time = db.Column(db.DateTime, nullable=False, server_default=db.FetchedValue())
created_time = db.Column(db.DateTime, nullable=False, server_default=db.FetchedValue())

def __init__(self,**items):
for key in items:
if hasattr( self,key ):
setattr( self,key,items[key] )

执行输入:python manager.py runjob -m movie -a list,详细爬取代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230

# -*- coding: utf-8 -*-
from application import app,db
import requests,os,time,hashlib,json,re
from bs4 import BeautifulSoup
from common.libs.DataHelper import getCurrentTime
from urllib.parse import urlparse
from common.models.movie import Movie
'''
python manager.py runjob -m movie -a list | parse
'''
class JobTask():
def __init__(self):
self.source = "btbtdy"
self.url = {
"num": 3,
"url": "http://btbtdy1.com/btfl/dy1-#d#.html",
"path": "/tmp/%s/" % (self.source)
}

'''
第一步 首先 获取列表list html 回来,通过解析html 获取详情 的 url等信息,在根据详情url 获取详情html
第二步 解析 详情的html
'''
def run(self,params):
act = params['act']
self.date = getCurrentTime( frm = "%Y%m%d")
if act == "list":
self.getList()
self.parseInfo()
elif act == "parse":
self.parseInfo()

'''
获取列表
'''
def getList(self):
config = self.url
#/tmp/btbtdy/20200615
path_root = config['path'] + self.date
path_list = path_root + "/list"
path_info = path_root + "/info"
path_json = path_root + "/json"
path_vid = path_root + "/vid"

self.makeSuredirs(path_root)
self.makeSuredirs(path_list)
self.makeSuredirs(path_info)
self.makeSuredirs(path_json)
self.makeSuredirs(path_vid)

pages = range(1, config['num'] + 1)
# 1,2,3
for idx in pages:
#/tmp/btbtdy/20200615/list/1
tmp_path = path_list + "/" + str(idx)
'''
http://btbtdy1.com/btfl/dy1-1.html
http://btbtdy1.com/btfl/dy1-2.html
http://btbtdy1.com/btfl/dy1-3.html
'''
tmp_url = config['url'].replace("#d#", str(idx))
if os.path.exists(tmp_path):
continue

tmp_content = self.getHttpContent(tmp_url)
self.saveContent(tmp_path, tmp_content)
time.sleep(0.3)

for idx in os.listdir(path_list):
tmp_content = self.getContent(path_list + "/" + str(idx))
items_data = self.parseList(tmp_content)
if not items_data:
continue

'''
itme 数据{'name': '爆发!暴走族', 'url': 'http://btbtdy1.com/btdy/dy26767.html',
'vid_url': 'http://btbtdy1.com/vidlist/26767.html',
'hash': '51a7b571e30434e43911ee09b4e36753'}
'''
for item in items_data:
#/tmp/btbtdy/20200615/json/669f4f53853f2a3d8a1d2c751c3587be
tmp_json_path = path_json + "/" + item['hash']
tmp_info_path = path_info + "/" + item['hash']
tmp_vid_path = path_vid + "/" + item['hash']
if not os.path.exists(tmp_json_path):
#json_dumps(dict)时,如果dict包含有汉字,一定加上ensure_ascii=False
self.saveContent(tmp_json_path, json.dumps(item, ensure_ascii=False))

if not os.path.exists(tmp_info_path):
tmp_content = self.getHttpContent(item['url'])
self.saveContent(tmp_info_path, tmp_content)

if not os.path.exists(tmp_vid_path):
tmp_content = self.getHttpContent(item['vid_url'])
self.saveContent(tmp_vid_path, tmp_content)

time.sleep(0.3)


'''
解析详情信息
'''
def parseInfo(self):
config = self.url
path_root = config['path'] + self.date
path_info = path_root + "/info"
path_json = path_root + "/json"
path_vid = path_root + "/vid"
for filename in os.listdir(path_info):
tmp_json_path = path_json + "/" + filename
tmp_info_path = path_info + "/" + filename
tmp_vid_path = path_vid + "/" + filename

tmp_data = json.loads(self.getContent(tmp_json_path), encoding="utf-8")
tmp_content = self.getContent(tmp_info_path)
tmp_soup = BeautifulSoup(tmp_content, "html.parser")
try:
tmp_pub_date = tmp_soup.select("div.vod div.vod_intro dl dd")[0].getText()
tmp_desc = tmp_soup.select("div.vod div.vod_intro div.des")[0].getText()
tmp_classify = tmp_soup.select("div.vod div.vod_intro dl dd")[2].getText()
tmp_actor = tmp_soup.select("div.vod div.vod_intro dl dd")[6].getText()
tmp_pic_list = tmp_soup.select("div.vod div.vod_img img")
tmp_pics = []
for tmp_pic in tmp_pic_list:
tmp_pics.append(tmp_pic['src'])

# 获取下载地址
tmp_download_content = self.getContent(tmp_vid_path)
tmp_vid_soup = BeautifulSoup(tmp_download_content, "html.parser")
tmp_download_list = tmp_vid_soup.findAll("a", href=re.compile("magnet:?"))
tmp_magnet_url = ""
if tmp_download_list:
tmp_magnet_url = tmp_download_list[0]['href']

tmp_data['pub_date'] = tmp_pub_date
tmp_data['desc'] = tmp_desc
tmp_data['classify'] = tmp_classify
tmp_data['actor'] = tmp_actor
tmp_data['magnet_url'] = tmp_magnet_url
tmp_data['source'] = self.source
tmp_data['created_time'] = tmp_data['updated_time'] = getCurrentTime()
if tmp_pics:
tmp_data['cover_pic'] = tmp_pics[0]
tmp_data['pics'] = json.dumps(tmp_pics)

tmp_movie_info = Movie.query.filter_by( hash = tmp_data['hash']).first()
if tmp_movie_info:
continue
print('执行~~~~')
tmp_model_movie = Movie(**tmp_data)
db.session.add(tmp_model_movie)
db.session.commit()
except:
continue
return True


def parseList(self,content):
data = []
config = self.url
#ParseResult(scheme='http', netloc='btbtdy1.com', path='/btfl/dy1-', params='', query='', fragment='d#.html')
#可以分析url的成份
url_info = urlparse(config['url'])
# http://btbtdy1.com
url_domain = url_info[0] + "://" + url_info[1]

tmp_soup = BeautifulSoup(str(content), "html.parser")
tmp_list = tmp_soup.select("div.list_su ul li")
for tmp_item in tmp_list:
tmp_target = tmp_item.select("a.pic_link")
tmp_name = tmp_target[0]['title']
tmp_href = tmp_target[0]['href']
if "http:" not in tmp_href:
tmp_href = url_domain + tmp_href
tmp_vid_url = tmp_href.replace("btdy/dy", "vidlist/")
tmp_data = {
"name": tmp_name,
"url": tmp_href,
"vid_url": tmp_vid_url,
"hash": hashlib.md5(tmp_href.encode("utf-8")).hexdigest()
}
data.append(tmp_data)
return data



def saveContent(self,path,content):
if content:

with open( path,mode="w+",encoding="utf-8" ) as f:
if type(content) != str:
content = content.decode("utf-8")

f.write(content)
f.flush()
f.close()

def getHttpContent(self, url):
try:
r = requests.get(url,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36'})
if r.status_code != 200:
return None

return r.content

except Exception:
return None

def makeSuredirs(self,path):
if not os.path.exists( path ):
os.makedirs( path )


def getContent(self,path):
if os.path.exists( path ):
with open( path ,"r",encoding='utf-8' ) as f:
return f.read()

return ''

def saveContent(self,path,content):
if content:
with open( path,mode="w+",encoding="utf-8" ) as f:
if type(content) != str:
content = content.decode("utf-8")

f.write(content )
f.flush()
f.close()