手書き分布爬虫類
15970 ワード
手書き分布爬虫類
分散プロセスとは,Processプロセスを複数のマシンに分散し,複数のマシンの性能を十分に利用して複雑なタスクを遂行することである.Pythonのmultiprocessingライブラリで完了し、このモジュールはマルチプロセスだけでなく、マルチプロセスを複数のマシンに分散することもサポートします.分散プロセスとは,Queueをネットワークに暴露して他のマシンプロセスがアクセスできるようにするプロセスをカプセル化したものであり,このプロセスはローカルキューのネットワーク化とも呼ばれる.
分散型爬虫サービスは主に6つのステップです.
1. プロセス間の通信を行うためのキューQueueを確立する.
2. 最初のステップで確立したキューをネットワークに登録し、他のホストのプロセスに暴露し、登録後にネットワークキューを取得し、ローカルキューのイメージに相当する.
3. オブジェクトインスタンスmanagerを確立し、ポートをバインドし、パスワードを検証します.
4. 第3ステップで確立された例は、managerを管理し、情報チャネルを監督管理することである.
5. インスタンスを管理する方法によって、ネットワークを通じてQueueオブジェクトにアクセスし、ネットワークキューを使用可能なキューに実体化する.
6. タスクを「ローカル」キューに作成し、自動的にネットワークキューにアップロードし、タスクプロセスに割り当てて処理する.
linuxバージョン:
Windowsバージョン:
タスクプロセス(taskWorker.py)は4つのステップに分けられます.
1、QueueManagerを使用してQueueを取得するためのメソッド名を登録し、タスクプロセスは名前でしかネットワーク上でQueueを取得できない.
2、接続サーバー、ポートと認証パスワードはサービスプロセスと完全に一致するように注意する.
3、ネットワークからQueueを取得し、ローカライズを行う.
4、taskキューからタスクを取得し、結果をresultキューに書き込む.
主従モードの分散爬虫類:
主従モードを採用し、1台のホストを制御ノードとして爬虫類を実行するすべてのホストを管理し、爬虫類は制御ノードからタスクを受信し、新しく生成したタスクを制御ノードに提出するだけでよい.欠点は,分布型爬虫類システム全体の性能低下を招きやすいことである.
制御ノードは、URLマネージャ、データメモリ、および制御スケジューラに分けられます.
爬虫ノードにはHTMLダウンロード、HTML解析、爬虫スケジューラがあります.
従来のベース爬虫類フレームワークと同様に、分散型の制御管理を実現するために制御スケジューラを追加したにすぎない.
ControlNodeコントロールノード:
NodeManager.py:
UrlManager.py:
DataOutput.py:
SpiderNode爬虫ノード:
SpiderWorker.py:
HtmlDownloader.py:
HtmlParser.py:
転載先:https://www.cnblogs.com/konghui/p/11193862.html
分散プロセスとは,Processプロセスを複数のマシンに分散し,複数のマシンの性能を十分に利用して複雑なタスクを遂行することである.Pythonのmultiprocessingライブラリで完了し、このモジュールはマルチプロセスだけでなく、マルチプロセスを複数のマシンに分散することもサポートします.分散プロセスとは,Queueをネットワークに暴露して他のマシンプロセスがアクセスできるようにするプロセスをカプセル化したものであり,このプロセスはローカルキューのネットワーク化とも呼ばれる.
分散型爬虫サービスは主に6つのステップです.
1. プロセス間の通信を行うためのキューQueueを確立する.
2. 最初のステップで確立したキューをネットワークに登録し、他のホストのプロセスに暴露し、登録後にネットワークキューを取得し、ローカルキューのイメージに相当する.
3. オブジェクトインスタンスmanagerを確立し、ポートをバインドし、パスワードを検証します.
4. 第3ステップで確立された例は、managerを管理し、情報チャネルを監督管理することである.
5. インスタンスを管理する方法によって、ネットワークを通じてQueueオブジェクトにアクセスし、ネットワークキューを使用可能なキューに実体化する.
6. タスクを「ローカル」キューに作成し、自動的にネットワークキューにアップロードし、タスクプロセスに割り当てて処理する.
linuxバージョン:
#coding=utf-8
import Queue
from multiprocessing.managers import BaseManager
# , task_queue result_queue
task_queue = Queue.Queue()
result_queue = Queue.Queue()
# , , register ,callable Queue , Queue
BaseManager.register('get_task_queue', callable=lambda:task_queue)
BaseManager.register('get_result_queue', callable=lambda:result_queue)
# , 8001, ,
manager = BaseManager(address=('', 8001), authkey='ski12')
# , ,
manager.start()
# , Queue
task = manager.get_task_queue()
result = manager.get_result_queue()
# ,
for url in ["ImageUrl_" + str(i) for i in range(10)]:
print "put task %s ..." % url
task.put(url)
#
print "try get result..."
for i in range(10):
print "result is %s" % result.get(timeout=10)
#
manager.shutdown()
Windowsバージョン:
#coding=utf-8
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
#
task_number = 10
# , task_queue result_queue
task_queue = Queue.Queue(task_number)
result_queue = Queue.Queue(task_number)
def get_task():
return task_queue
def get_result():
return result_queue
def win_run():
# , , register ,callable Queue , Queue
# Windows lambda,
BaseManager.register('get_task_queue', callable=get_task)
BaseManager.register('get_result_queue', callable=get_result)
# , 8001, , ,Windows IP
manager = BaseManager(address=('127.0.0.1', 8001), authkey='ski12')
# , ,
manager.start()
try:
# , Queue
task = manager.get_task_queue()
result = manager.get_result_queue()
# ,
for url in ["ImageUrl_" + str(i) for i in range(10)]:
print "put task %s ..." % url
task.put(url)
#
print "try get result..."
for i in range(10):
print "result is %s" % result.get(timeout=10)
except:
print "Manager error"
finally:
# ,
manager.shutdown()
if __name__ == '__main__':
# Windows ,
freeze_support()
win_run()
タスクプロセス(taskWorker.py)は4つのステップに分けられます.
1、QueueManagerを使用してQueueを取得するためのメソッド名を登録し、タスクプロセスは名前でしかネットワーク上でQueueを取得できない.
2、接続サーバー、ポートと認証パスワードはサービスプロセスと完全に一致するように注意する.
3、ネットワークからQueueを取得し、ローカライズを行う.
4、taskキューからタスクを取得し、結果をresultキューに書き込む.
#coding=utf-8
import time
from multiprocessing.managers import BaseManager
# , QueueManager Queue
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# ,
server_addr = '127.0.0.1'
print "Connect to server %s..." % server_addr
#
m = BaseManager(address=(server_addr, 8001), authkey='ski12')
#
m.connect()
# , Queue
task = m.get_task_queue()
result = m.get_result_queue()
# , task , result
while(not task.empty()):
image_url = task.get(True, timeout=5)
print "run task download %s..." % image_url
time.sleep(1)
result.put("%s--->success" % image_url)
#
print "worker exit."
主従モードの分散爬虫類:
主従モードを採用し、1台のホストを制御ノードとして爬虫類を実行するすべてのホストを管理し、爬虫類は制御ノードからタスクを受信し、新しく生成したタスクを制御ノードに提出するだけでよい.欠点は,分布型爬虫類システム全体の性能低下を招きやすいことである.
制御ノードは、URLマネージャ、データメモリ、および制御スケジューラに分けられます.
爬虫ノードにはHTMLダウンロード、HTML解析、爬虫スケジューラがあります.
従来のベース爬虫類フレームワークと同様に、分散型の制御管理を実現するために制御スケジューラを追加したにすぎない.
ControlNodeコントロールノード:
NodeManager.py:
#coding=utf-8
import time
import sys
from multiprocessing import Queue, Process
from multiprocessing.managers import BaseManager
from DataOutput import DataOutput
from URLManager import UrlManager
class NodeManager(object):
#
def start_Manager(self, url_q, result_q):
# , register ,callable Queue , Queue
BaseManager.register('get_task_queue', callable=lambda:url_q)
BaseManager.register('get_result_queue', callable=lambda:result_q)
# 8001, ,
manager = BaseManager(address=('', 8001), authkey='ski12')
# manager
return manager
def url_manager_proc(self, url_q, conn_q, root_url):
url_manager = UrlManager()
url_manager.add_new_url(root_url)
while True:
while(url_manager.has_new_url()):
# URL url
new_url = url_manager.get_new_url()
# URL
url_q.put(new_url)
# print "[*]The number of crawled url is: ", url_manager.old_url_size()
#
percentage = u"[*] URL :%s" % url_manager.old_url_size()
sys.stdout.write('\r' + percentage)
# , 2000 ,
if(url_manager.old_url_size()>20):
# , end
url_q.put('end')
print u"
[*] ..."
# , set
url_manager.save_progress('new_urls.txt', url_manager.new_urls)
url_manager.save_progress('old_urls.txt', url_manager.old_urls)
return
# result_solve_proc urls URL
try:
if not conn_q.empty():
urls = conn_q.get()
url_manager.add_new_urls(urls)
except BaseException, e:
#
time.sleep(0.1)
def result_solve_proc(self, result_q, conn_q, store_q):
while True:
try:
if not result_q.empty():
content = result_q.get(True)
if content['new_urls'] == 'end':
#
print u"[*] "
store_q.put('end')
return
# url set
conn_q.put(content['new_urls'])
# dict
store_q.put(content['data'])
else:
#
time.sleep(0.1)
except BaseException, e:
#
time.sleep(0.1)
def store_proc(self, store_q):
output = DataOutput()
while True:
if not store_q.empty():
data = store_q.get()
if data == 'end':
print u"[*] "
output.output_end(output.filepath)
return
output.store_data(data)
else:
time.sleep(0.1)
if __name__ == '__main__':
if len(sys.argv) == 2:
url = 'https://baike.baidu.com/item/' + sys.argv[1]
# 4
url_q = Queue()
result_q = Queue()
conn_q = Queue()
store_q = Queue()
#
node = NodeManager()
manager = node.start_Manager(url_q, result_q)
# URL 、
url_manager_proc = Process(target=node.url_manager_proc, args=(url_q, conn_q, url))
result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q))
store_proc = Process(target=node.store_proc, args=(store_q, ))
# 3
url_manager_proc.start()
result_solve_proc.start()
store_proc.start()
manager.get_server().serve_forever()
else:
print "[*]Usage: python NodeManager.py [Crawl Keyword]"
UrlManager.py:
#coding=utf-8
import cPickle
import hashlib
class UrlManager(object):
def __init__(self):
# URL
self.new_urls = self.load_progress('new_urls.txt')
# URL
self.old_urls = self.load_progress('old_urls.txt')
def has_new_url(self):
return self.new_url_size() != 0
def get_new_url(self):
new_url = self.new_urls.pop()
# URL MD5 , 128 set(),
m = hashlib.md5()
m.update(new_url)
self.old_urls.add(m.hexdigest()[8:-8])
return new_url
def add_new_url(self, url):
if url is None:
return
m = hashlib.md5()
m.update(url)
url_md5 = m.hexdigest()[8:-8]
if url not in self.new_urls and url_md5 not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)
def new_url_size(self):
return len(self.new_urls)
def old_url_size(self):
return len(self.old_urls)
# URL URL , ,
def save_progress(self, path, data):
with open(path, 'wb') as f:
cPickle.dump(data, f)
#
def load_progress(self, path):
print "[+] :%s" % path
try:
with open(path, 'rb') as f:
tmp = cPickle.load(f)
return tmp
except:
print "[!] , :%s" % path
return set()
DataOutput.py:
#coding=utf-8
import codecs
import time
class DataOutput(object):
def __init__(self):
#
self.filepath = 'baike_%s.html' % (time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
self.output_head(self.filepath)
self.datas = []
def store_data(self, data):
if data is None:
return
self.datas.append(data)
# , 10
if len(self.datas) > 10:
self.output_html(self.filepath)
def output_head(self, path):
fout = codecs.open(path, 'w', encoding='utf-8')
fout.write("")
fout.write("")
fout.write("")
fout.close()
def output_html(self, path):
fout = codecs.open(path, 'a', encoding='utf-8')
for data in self.datas:
fout.write("")
fout.write("" % data['url'])
fout.write("" % data['title'])
fout.write("" % data['summary'])
fout.write("")
self.datas.remove(data)
fout.close()
def output_end(self, path):
fout = codecs.open(path, 'a', encoding='utf-8')
fout.write("%s %s %s
")
fout.write("")
fout.write("")
fout.close()
SpiderNode爬虫ノード:
SpiderWorker.py:
#coding=utf-8
from multiprocessing.managers import BaseManager
from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser
class SpiderWorker(object):
def __init__(self):
#
# : BaseManager Queue
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# : :
server_addr = '127.0.0.1'
print "[*]Connect to server %s..." % server_addr
# :
self.m = BaseManager(address=(server_addr, 8001), authkey='ski12')
# :
self.m.connect()
# : Queue
self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
#
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print "[*]Init finished."
def crawl(self):
while True:
try:
if not self.task.empty():
url = self.task.get()
if url == 'end':
print "[*]Control Node informs all the Spider Nodes stop working."
#
self.result.put({'new_urls':'end', 'data':'end'})
return
print "[*]The Spider Node is parsing: %s" % url.encode('utf-8')
content = self.downloader.download(url)
new_urls, data = self.parser.parser(url, content)
self.result.put({'new_urls':new_urls, 'data':data})
except EOFError, e:
print "[-]Fail to connect to the Worker Node."
return
except Exception, e:
print e
print "[-]Crawl failed."
if __name__ == '__main__':
spider = SpiderWorker()
spider.crawl()
HtmlDownloader.py:
#coding=utf-8
import requests
class HtmlDownloader(object):
def download(self, url):
if url is None:
return None
user_agent = "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"
headers = {'User-Agent':user_agent}
r = requests.get(url, headers=headers)
if r.status_code == 200:
r.encoding = 'utf-8'
return r.text
return None
HtmlParser.py:
#coding=utf-8
import re
import urlparse
from bs4 import BeautifulSoup as BS
class HtmlParser(object):
def parser(self, page_url, html_cont):
if page_url is None or html_cont is None:
return
soup = BS(html_cont, 'html.parser', from_encoding='utf-8')
new_urls = self._get_new_urls(page_url, soup)
new_data = self._get_new_data(page_url, soup)
return new_urls, new_data
def _get_new_urls(self, page_url, soup):
new_urls = set()
# a
links = soup.find_all('a', href=re.compile(r'/item/.*'))
for link in links:
# href
new_url = link['href']
#
new_full_url = urlparse.urljoin(page_url, new_url)
new_urls.add(new_full_url)
return new_urls
def _get_new_data(self, page_url, soup):
data = {}
data['url'] = page_url
title = soup.find('dd', class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title'] = title.get_text()
summary = soup.find('div', class_='lemma-summary')
# tag tag , Unicode
data['summary'] = summary.get_text()
return data
転載先:https://www.cnblogs.com/konghui/p/11193862.html