マルチタスク爬虫類


  • スレッドベースの生産者と消費者モデル爬虫類ケース1.1プロセスとスレッドプロセス:プロセスはリソース(cpu、メモリ)割り当ての基本単位であり、プログラム実行時のインスタンスである.プログラム実行時システムはプロセスを作成し、リソースを割り当て、プロセス準備キューに入れます.プロセススケジューラが選択するとCPU時間が割り当てられ、プログラムが本格的に実行されます.スレッド:スレッドは実行パスであり、プログラム実行時の最小単位であり、プロセスの実行フローであり、CPUスケジューリングと割り当ての基本単位であり、1つのプロセスは多くのスレッドから構成され、スレッド間でプロセスのすべてのリソースを共有することができ、各スレッドには独自のスタックとローカル変数がある.スレッドはCPUが独立してスケジューリングして実行し、マルチCPU環境では複数のスレッドが同時に実行できるようにする.同様にマルチスレッドは、要求ごとに1つのスレッドを割り当てて処理する同時操作を実現することもできる.1つの実行中のアプリケーション(例えば迅雷)は1つのプロセスであり、1つのプロセスは同時に複数のタスクを実行することができ(迅雷ソフトウェアは同時に複数のファイルをダウンロードすることができ、各ダウンロードタスクは1つのスレッドである)、簡単にプロセスがスレッドの集合であると考えることができる.コラボレーション:ユーザー状態の軽量スレッドであり、コラボレーションのスケジューリングは完全にユーザーによって制御されます.協程は独自のレジスタコンテキストとスタックを持っている.コンシステントスケジューリング切替では、レジスタコンテキストとスタックを他の場所に保存し、切り出したときに以前に保存したレジスタコンテキストとスタックを復元し、直接操作スタックではカーネル切替のオーバーヘッドがほとんどなく、ロックをかけずにグローバル変数にアクセスできるため、コンテキストの切替が非常に速い.1.2マルチスレッドプラス対生産者と消費者モードにおける爬虫類の応用例(王者栄光壁紙爬取)
  • from urllib import parse
    from urllib import request
    import requests
    import os
    import threading
    import queue
    import time
    
    class Producer(threading.Thread):
    	"""   """
    	headers = {
         
    		'User-Agent': '',
    		'referer': 'https://pvp.qq.com/web201605/wallpaper.shtml'
    	}
    	def __init__(self,page_q,img_q,*args,**kwargs):
    		super(Producer,self).__init__(*args,**kwargs)
    		#        
    		self.page_q = page_q
    		self.img_q = img_q
    
    	@staticmethod
    	def get_url(data):
    		"""    url"""
    		urls=[]
    		for x in range(1,9):
    			#          
    			img_no=f'sProdImgNo_{x}'
    			#  data          url      ,   200   0
    			# "sProdImgNo_1":"http%3A%2F%2Fshp%2Eqpic%2Ecn%2
    			# Fishow%2F2735032619%2F1585220459%5F84828260%5F9035%5FsProdImgNo%5F1%2Ejpg%2F200"
    			url=parse.unquote(data.get(img_no)).replace('200','0')
    			urls.append(url)
    		return urls
    
    	def run(self):
    		"""       url    url            """
    		#         
    		while not self.page_q.empty():
    			#      url
    			url=self.page_q.get()
    			#          
    			text = requests.get(url, headers=self.headers).json()
    			#   key List     
    			datas = text["List"]
    			for data in datas:
    				#       
    				urls =Producer.get_url(data)
    				#      "sProdName":"%E9%98%BF%E8%BD%B2%2D%E8%BF%B7%E8%B8%AA%E4%B8%BD%E5%BD%B1%20"
    				prod_name = parse.unquote(data['sProdName']).strip()
    				#       
    				img_path = os.path.join('images', prod_name)
    				if not os.path.exists(img_path):
    					os.mkdir(img_path)
    				#      url                     
    				for index, img_url in enumerate(urls):
    					self.img_q.put({
         'img_url':img_url,'prod_name':prod_name,'index':index})
    		#          
    		print('%s      ' %threading.current_thread().name)
    
    class Consumer(threading.Thread):
    	def __init__(self,page_q,img_q,lock,*args,**kwargs):
    		super(Consumer,self).__init__(*args,**kwargs)
    		self.page_q=page_q
    		self.img_q=img_q
    		self.lock=threading.Lock()
    
    	def run(self) -> None:
    		"""               """
    		while 1:
    			try:
    				#        
    				img_obj=self.img_q.get(timeout=60)
    				img_url=img_obj.get('img_url')
    				name=img_obj.get('prod_name')
    				index=img_obj.get('index')
    				#       
    				img_path = os.path.join('images',name)
    				try:
    					request.urlretrieve(img_url, os.path.join(img_path, f"{index+1}.jpg"))
    					print((img_url,os.path.join(img_path,f"{index+1}.jpg"))+'    ')
    				except Exception as e:
    					print(e)
    			#            
    			except  queue.Empty as e:
    				print(e)
    				time.sleep(0.1)
    				continue
    
    def main():
    	#     url   
    	page_q=queue.Queue(20)
    	#        
    	img_q=queue.Queue(1000)
    	#         
    	lock=threading.Lock()
    	for x in range(18):
    		#        
    		url='https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD' \
    	        '&sDataType=JSON&iListNum=4&totalpage=0&page={page}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true' \
    	        '&iTypeId=1&iFlowId=267733&iActId=2735&iModuleId=2735&_=1587033640385'.format(page=x)
    		page_q.put(url)
    
    	for x in range(10):
    		#      ,    
    		t1=Producer(page_q,img_q,name=f"     {x}")
    		t1.start()
    	for x in range(5):
    		#      ,    
    		t2=Consumer(page_q,img_q,lock,name=f"     {x}")
    		t2.start()
    
    if __name__ == '__main__':
    	main()
    
    
    
  • 非同期協程に基づくマルチタスク爬虫例2.1 asyncioモジュール特殊関数:関数がasyncioモジュール中のasyncキーワードで修飾された後の内部操作はすぐに実行されず、呼び出し後に協程オブジェクトに戻り、特殊関数内部に非同期をサポートしないモジュールの代コードが存在しない.
  • #     
    async def test():
    	print("Hello World")
    

    タスクオブジェクト:コパスオブジェクトをさらにカプセル化し、コールバック関数をバインドし、タスク終了後に実行し、コールバック関数はパラメータを呼び出し者(タスクオブジェクト)、タスクオブジェクトはresult()を呼び出して特殊関数のreturn結果を返す.
    #     
    c=test()
    #     
    task=asyncio.ensure_future(c)
    #       
    task.add_done_callback(fun_callback)
    

    ≪ループ・イベント・オブジェクト|Loop Event Objects|emdw≫:複数のタスク・オブジェクトをイベント・オブジェクトに登録した後、イベントを開始して複数のタスクを非同期で実行できます.
    #         
    loop=asyncio.get_event_loop()
    #        
    loop.run_until_complete(asyncio.wait(tasks))
    

    waitメソッド:タスクオブジェクトを一時停止する権限を付与し、cpuを解放するawaitキーワードの使用:特殊な関数でのブロック操作このキーワード修飾
    2.2 aiohttp非同期要求モジュール
    import aiohttp
    async def get_request(url):
    	#          
    	async with aiohttp.ClientSession() as sess:
    		#     ,      
    	 	# get/post(url,headers,params/data,proxy="http://ip:port")
    		async with await sess.get(url,headers=headers) as response:
    			# text()            
    			# read()  byte       
    			res_text=await response.text()
    			return res_text
    

    非同期マルチタスク網易雲歌曲ダウンロード例:
    import requests
    from fake_useragent import UserAgent
    from lxml import etree
    import asyncio
    import aiohttp
    from urllib import request
    
    ua=UserAgent().random
    headers={
         'user-agent':ua}
    def get_urls():
    	url=" https://music.163.com/discover/playlist"
    	text=requests.get(url,headers=headers).text
    	html=etree.HTML(text)
    	url_list=html.xpath("//ul[@id='m-pl-container']/li/div/a/@href")
    	# print(url_list)
    	return url_list
    # print(url_list)
    
    async def get_request(url):
    	#          
    	async with aiohttp.ClientSession() as sess:
    		#     ,      
    		async with await sess.get(url,headers=headers) as response:
    			#         
    			res_text=await response.text()
    			# print(res_text)
    			return res_text
    
    def parse(t):
    	"""
    	    
    	:param t: t parse    (    )
    	:return:
    	"""
    	#           
    	text=t.result()
    	html=etree.HTML(text)
    	#             ,  response   
    	names=html.xpath("//ul[@class='f-hide']/li//text()")
    	song_urls=html.xpath("//ul[@class='f-hide']/li/a/@href")
    	# print(names,song_urls)
    	for name,song_url in zip(names,song_urls):
    		#     
    		url=f"http://music.163.com/song/media/outer/url?id={song_url.strip('/song?id=')}"+'.mp3'
    		request.urlretrieve(url, f'./musics/{name}.mp3')
    		print(f"{name}    ")
    
    
    if __name__=='__main__':
    	tasks=[]
    	for herf in get_urls():
    		url="https://music.163.com/"+herf
    		# print(url)
    		c=get_request(url)
    		#         
    		task=asyncio.ensure_future(c)
    		#            
    		task.add_done_callback(parse)
    		tasks.append(task)
    	#           
    	loop=asyncio.get_event_loop()
    	#          
    	loop.run_until_complete(asyncio.wait(tasks))