マルチタスク爬虫類
from urllib import parse
from urllib import request
import requests
import os
import threading
import queue
import time
class Producer(threading.Thread):
""" """
headers = {
'User-Agent': '',
'referer': 'https://pvp.qq.com/web201605/wallpaper.shtml'
}
def __init__(self,page_q,img_q,*args,**kwargs):
super(Producer,self).__init__(*args,**kwargs)
#
self.page_q = page_q
self.img_q = img_q
@staticmethod
def get_url(data):
""" url"""
urls=[]
for x in range(1,9):
#
img_no=f'sProdImgNo_{x}'
# data url , 200 0
# "sProdImgNo_1":"http%3A%2F%2Fshp%2Eqpic%2Ecn%2
# Fishow%2F2735032619%2F1585220459%5F84828260%5F9035%5FsProdImgNo%5F1%2Ejpg%2F200"
url=parse.unquote(data.get(img_no)).replace('200','0')
urls.append(url)
return urls
def run(self):
""" url url """
#
while not self.page_q.empty():
# url
url=self.page_q.get()
#
text = requests.get(url, headers=self.headers).json()
# key List
datas = text["List"]
for data in datas:
#
urls =Producer.get_url(data)
# "sProdName":"%E9%98%BF%E8%BD%B2%2D%E8%BF%B7%E8%B8%AA%E4%B8%BD%E5%BD%B1%20"
prod_name = parse.unquote(data['sProdName']).strip()
#
img_path = os.path.join('images', prod_name)
if not os.path.exists(img_path):
os.mkdir(img_path)
# url
for index, img_url in enumerate(urls):
self.img_q.put({
'img_url':img_url,'prod_name':prod_name,'index':index})
#
print('%s ' %threading.current_thread().name)
class Consumer(threading.Thread):
def __init__(self,page_q,img_q,lock,*args,**kwargs):
super(Consumer,self).__init__(*args,**kwargs)
self.page_q=page_q
self.img_q=img_q
self.lock=threading.Lock()
def run(self) -> None:
""" """
while 1:
try:
#
img_obj=self.img_q.get(timeout=60)
img_url=img_obj.get('img_url')
name=img_obj.get('prod_name')
index=img_obj.get('index')
#
img_path = os.path.join('images',name)
try:
request.urlretrieve(img_url, os.path.join(img_path, f"{index+1}.jpg"))
print((img_url,os.path.join(img_path,f"{index+1}.jpg"))+' ')
except Exception as e:
print(e)
#
except queue.Empty as e:
print(e)
time.sleep(0.1)
continue
def main():
# url
page_q=queue.Queue(20)
#
img_q=queue.Queue(1000)
#
lock=threading.Lock()
for x in range(18):
#
url='https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD' \
'&sDataType=JSON&iListNum=4&totalpage=0&page={page}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true' \
'&iTypeId=1&iFlowId=267733&iActId=2735&iModuleId=2735&_=1587033640385'.format(page=x)
page_q.put(url)
for x in range(10):
# ,
t1=Producer(page_q,img_q,name=f" {x}")
t1.start()
for x in range(5):
# ,
t2=Consumer(page_q,img_q,lock,name=f" {x}")
t2.start()
if __name__ == '__main__':
main()
#
async def test():
print("Hello World")
タスクオブジェクト:コパスオブジェクトをさらにカプセル化し、コールバック関数をバインドし、タスク終了後に実行し、コールバック関数はパラメータを呼び出し者(タスクオブジェクト)、タスクオブジェクトはresult()を呼び出して特殊関数のreturn結果を返す.
#
c=test()
#
task=asyncio.ensure_future(c)
#
task.add_done_callback(fun_callback)
≪ループ・イベント・オブジェクト|Loop Event Objects|emdw≫:複数のタスク・オブジェクトをイベント・オブジェクトに登録した後、イベントを開始して複数のタスクを非同期で実行できます.
#
loop=asyncio.get_event_loop()
#
loop.run_until_complete(asyncio.wait(tasks))
waitメソッド:タスクオブジェクトを一時停止する権限を付与し、cpuを解放するawaitキーワードの使用:特殊な関数でのブロック操作このキーワード修飾
2.2 aiohttp非同期要求モジュール
import aiohttp
async def get_request(url):
#
async with aiohttp.ClientSession() as sess:
# ,
# get/post(url,headers,params/data,proxy="http://ip:port")
async with await sess.get(url,headers=headers) as response:
# text()
# read() byte
res_text=await response.text()
return res_text
非同期マルチタスク網易雲歌曲ダウンロード例:
import requests
from fake_useragent import UserAgent
from lxml import etree
import asyncio
import aiohttp
from urllib import request
ua=UserAgent().random
headers={
'user-agent':ua}
def get_urls():
url=" https://music.163.com/discover/playlist"
text=requests.get(url,headers=headers).text
html=etree.HTML(text)
url_list=html.xpath("//ul[@id='m-pl-container']/li/div/a/@href")
# print(url_list)
return url_list
# print(url_list)
async def get_request(url):
#
async with aiohttp.ClientSession() as sess:
# ,
async with await sess.get(url,headers=headers) as response:
#
res_text=await response.text()
# print(res_text)
return res_text
def parse(t):
"""
:param t: t parse ( )
:return:
"""
#
text=t.result()
html=etree.HTML(text)
# , response
names=html.xpath("//ul[@class='f-hide']/li//text()")
song_urls=html.xpath("//ul[@class='f-hide']/li/a/@href")
# print(names,song_urls)
for name,song_url in zip(names,song_urls):
#
url=f"http://music.163.com/song/media/outer/url?id={song_url.strip('/song?id=')}"+'.mp3'
request.urlretrieve(url, f'./musics/{name}.mp3')
print(f"{name} ")
if __name__=='__main__':
tasks=[]
for herf in get_urls():
url="https://music.163.com/"+herf
# print(url)
c=get_request(url)
#
task=asyncio.ensure_future(c)
#
task.add_done_callback(parse)
tasks.append(task)
#
loop=asyncio.get_event_loop()
#
loop.run_until_complete(asyncio.wait(tasks))