手書き分布爬虫類

15970 ワード

手書き分布爬虫類
分散プロセスとは,Processプロセスを複数のマシンに分散し,複数のマシンの性能を十分に利用して複雑なタスクを遂行することである.Pythonのmultiprocessingライブラリで完了し、このモジュールはマルチプロセスだけでなく、マルチプロセスを複数のマシンに分散することもサポートします.分散プロセスとは,Queueをネットワークに暴露して他のマシンプロセスがアクセスできるようにするプロセスをカプセル化したものであり,このプロセスはローカルキューのネットワーク化とも呼ばれる.
分散型爬虫サービスは主に6つのステップです.
  
  1. プロセス間の通信を行うためのキューQueueを確立する.
  2. 最初のステップで確立したキューをネットワークに登録し、他のホストのプロセスに暴露し、登録後にネットワークキューを取得し、ローカルキューのイメージに相当する.
  3. オブジェクトインスタンスmanagerを確立し、ポートをバインドし、パスワードを検証します.
  4. 第3ステップで確立された例は、managerを管理し、情報チャネルを監督管理することである.
  5. インスタンスを管理する方法によって、ネットワークを通じてQueueオブジェクトにアクセスし、ネットワークキューを使用可能なキューに実体化する.
  6. タスクを「ローカル」キューに作成し、自動的にネットワークキューにアップロードし、タスクプロセスに割り当てて処理する.
 
linuxバージョン:
#coding=utf-8
import Queue
from multiprocessing.managers import BaseManager
 
#    ,  task_queue result_queue        
task_queue = Queue.Queue()
result_queue = Queue.Queue()
 
#    ,              ,  register  ,callable     Queue  , Queue        
BaseManager.register('get_task_queue', callable=lambda:task_queue)
BaseManager.register('get_result_queue', callable=lambda:result_queue)
 
#    ,    8001,      ,        
manager = BaseManager(address=('', 8001), authkey='ski12')
 
#    ,    ,      
manager.start()
 
#    ,                  Queue  
task = manager.get_task_queue()
result = manager.get_result_queue()
 
#    ,    
for url in ["ImageUrl_" + str(i) for i in range(10)]:
	print "put task %s ..." % url
	task.put(url)
 
#       
print "try get result..."
for i in range(10):
	print "result is %s" % result.get(timeout=10)
 
#     
manager.shutdown()

  
 
Windowsバージョン:
#coding=utf-8
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
 
#     
task_number = 10
 
#    ,  task_queue result_queue        
task_queue = Queue.Queue(task_number)
result_queue = Queue.Queue(task_number)
 
def get_task():
	return task_queue
 
def get_result():
	return result_queue
 
def win_run():
	#    ,              ,  register  ,callable     Queue  , Queue        
	# Windows           lambda,            
	BaseManager.register('get_task_queue', callable=get_task)
	BaseManager.register('get_result_queue', callable=get_result)
 
	#    ,    8001,      ,        ,Windows     IP  
	manager = BaseManager(address=('127.0.0.1', 8001), authkey='ski12')
 
	#    ,    ,      
	manager.start()
 
	try:
		#    ,                  Queue  
		task = manager.get_task_queue()
		result = manager.get_result_queue()
 
		#    ,    
		for url in ["ImageUrl_" + str(i) for i in range(10)]:
			print "put task %s ..." % url
			task.put(url)
 
		#       
		print "try get result..."
		for i in range(10):
			print "result is %s" % result.get(timeout=10)
	except:
		print "Manager error"
	finally:
		#        ,           
		manager.shutdown()
 
if __name__ == '__main__':
	# Windows          ,        
	freeze_support()
	win_run()

  
タスクプロセス(taskWorker.py)は4つのステップに分けられます.
1、QueueManagerを使用してQueueを取得するためのメソッド名を登録し、タスクプロセスは名前でしかネットワーク上でQueueを取得できない.
2、接続サーバー、ポートと認証パスワードはサービスプロセスと完全に一致するように注意する.
3、ネットワークからQueueを取得し、ローカライズを行う.
4、taskキューからタスクを取得し、結果をresultキューに書き込む.
#coding=utf-8
import time
from multiprocessing.managers import BaseManager
 
#    ,  QueueManager      Queue     
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
 
#    ,      
server_addr = '127.0.0.1'
print "Connect to server %s..." % server_addr
 
#                     
m = BaseManager(address=(server_addr, 8001), authkey='ski12')
 
#      
m.connect()
 
#    ,  Queue  
task = m.get_task_queue()
result = m.get_result_queue()
 
#    , task      ,      result  
while(not task.empty()):
	image_url = task.get(True, timeout=5)
	print "run task download %s..." % image_url
	time.sleep(1)
	result.put("%s--->success" % image_url)
 
#     
print "worker exit."

  
主従モードの分散爬虫類:
主従モードを採用し、1台のホストを制御ノードとして爬虫類を実行するすべてのホストを管理し、爬虫類は制御ノードからタスクを受信し、新しく生成したタスクを制御ノードに提出するだけでよい.欠点は,分布型爬虫類システム全体の性能低下を招きやすいことである.
制御ノードは、URLマネージャ、データメモリ、および制御スケジューラに分けられます.
爬虫ノードにはHTMLダウンロード、HTML解析、爬虫スケジューラがあります.
従来のベース爬虫類フレームワークと同様に、分散型の制御管理を実現するために制御スケジューラを追加したにすぎない.
 
ControlNodeコントロールノード:
NodeManager.py:
#coding=utf-8
import time
import sys
from multiprocessing import Queue, Process
from multiprocessing.managers import BaseManager
 
from DataOutput import DataOutput
from URLManager import UrlManager
 
class NodeManager(object):
    #           
    def start_Manager(self, url_q, result_q):
        #               ,  register  ,callable     Queue  , Queue        
        BaseManager.register('get_task_queue', callable=lambda:url_q)
        BaseManager.register('get_result_queue', callable=lambda:result_q)
        #     8001,      ,        
        manager = BaseManager(address=('', 8001), authkey='ski12')
        #   manager  
        return manager
 
    def url_manager_proc(self, url_q, conn_q, root_url):
        url_manager = UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while(url_manager.has_new_url()):
                #  URL       url
                new_url = url_manager.get_new_url()
                #    URL      
                url_q.put(new_url)
                # print "[*]The number of crawled url is: ", url_manager.old_url_size()
                #      
                percentage = u"[*]    URL   :%s" % url_manager.old_url_size()
                sys.stdout.write('\r' + percentage)
                #        ,   2000       ,     
                if(url_manager.old_url_size()>20):
                    #           ,     end
                    url_q.put('end')
                    print u"
[*] ..." # , set url_manager.save_progress('new_urls.txt', url_manager.new_urls) url_manager.save_progress('old_urls.txt', url_manager.old_urls) return # result_solve_proc urls URL try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException, e: # time.sleep(0.1) def result_solve_proc(self, result_q, conn_q, store_q): while True: try: if not result_q.empty(): content = result_q.get(True) if content['new_urls'] == 'end': # print u"[*] " store_q.put('end') return # url set conn_q.put(content['new_urls']) # dict store_q.put(content['data']) else: # time.sleep(0.1) except BaseException, e: # time.sleep(0.1) def store_proc(self, store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data == 'end': print u"[*] " output.output_end(output.filepath) return output.store_data(data) else: time.sleep(0.1) if __name__ == '__main__': if len(sys.argv) == 2: url = 'https://baike.baidu.com/item/' + sys.argv[1] # 4 url_q = Queue() result_q = Queue() conn_q = Queue() store_q = Queue() # node = NodeManager() manager = node.start_Manager(url_q, result_q) # URL 、 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q, conn_q, url)) result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q)) store_proc = Process(target=node.store_proc, args=(store_q, )) # 3 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever() else: print "[*]Usage: python NodeManager.py [Crawl Keyword]"

  
UrlManager.py:
#coding=utf-8
import cPickle
import hashlib
 
class UrlManager(object):
    def __init__(self):
        #     URL  
        self.new_urls = self.load_progress('new_urls.txt')
        #     URL  
        self.old_urls = self.load_progress('old_urls.txt')
 
    def has_new_url(self):
        return self.new_url_size() != 0
 
    def get_new_url(self):
        new_url = self.new_urls.pop()
        #      URL  MD5  ,            128    set(),       
        m = hashlib.md5()
        m.update(new_url)
        self.old_urls.add(m.hexdigest()[8:-8])
        return new_url
 
    def add_new_url(self, url):
        if url is None:
            return
        m = hashlib.md5()
        m.update(url)
        url_md5 = m.hexdigest()[8:-8]
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
 
    def add_new_urls(self, urls):
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)
 
    def new_url_size(self):
        return len(self.new_urls)
 
    def old_url_size(self):
        return len(self.old_urls)
 
    #      URL       URL        ,      ,        
    def save_progress(self, path, data):
        with open(path, 'wb') as f:
            cPickle.dump(data, f)
 
    #     
    def load_progress(self, path):
        print "[+]       :%s" % path
        try:
            with open(path, 'rb') as f:
                tmp = cPickle.load(f)
                return tmp
        except:
            print "[!]     ,  :%s" % path
        return set()

  
DataOutput.py:
#coding=utf-8
import codecs
import time
 
class DataOutput(object):
    def __init__(self):
        #                    
        self.filepath = 'baike_%s.html' % (time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
        self.output_head(self.filepath)
        self.datas = []
 
    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
        #          ,   10       
        if len(self.datas) > 10:
            self.output_html(self.filepath)
 
    def output_head(self, path):
        fout = codecs.open(path, 'w', encoding='utf-8')
        fout.write("")
        fout.write("")
        fout.write("")
        fout.close()
 
    def output_html(self, path):
        fout = codecs.open(path, 'a', encoding='utf-8')
        for data in self.datas:
            fout.write("")
            fout.write("" % data['url'])
            fout.write("" % data['title'])
            fout.write("" % data['summary'])
            fout.write("")
            self.datas.remove(data)
        fout.close()
 
    def output_end(self, path):
        fout = codecs.open(path, 'a', encoding='utf-8')
        fout.write("
%s%s%s
") fout.write("") fout.write("") fout.close()

  
SpiderNode爬虫ノード:
SpiderWorker.py:
#coding=utf-8
from multiprocessing.managers import BaseManager
 
from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser
 
class SpiderWorker(object):
    def __init__(self):
        #                    
        #      :  BaseManager    Queue     
        BaseManager.register('get_task_queue')
        BaseManager.register('get_result_queue')
        #      :      :
        server_addr = '127.0.0.1'
        print "[*]Connect to server %s..." % server_addr
        #                        :
        self.m = BaseManager(address=(server_addr, 8001), authkey='ski12')
        #      :
        self.m.connect()
        #      :  Queue   
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()
        #             
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print "[*]Init finished."
 
    def crawl(self):
        while True:
            try:
                if not self.task.empty():
                    url = self.task.get()
 
                    if url == 'end':
                        print "[*]Control Node informs all the Spider Nodes stop working."
                        #             
                        self.result.put({'new_urls':'end', 'data':'end'})
                        return
                    print "[*]The Spider Node is parsing: %s" % url.encode('utf-8')
                    content = self.downloader.download(url)
                    new_urls, data = self.parser.parser(url, content)
                    self.result.put({'new_urls':new_urls, 'data':data})
            except EOFError, e:
                print "[-]Fail to connect to the Worker Node."
                return
            except Exception, e:
                print e
                print "[-]Crawl failed."
 
if __name__ == '__main__':
    spider = SpiderWorker()
    spider.crawl()

  
 
HtmlDownloader.py:
#coding=utf-8
import requests
 
class HtmlDownloader(object):
    def download(self, url):
        if url is None:
            return None
        user_agent = "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"
        headers = {'User-Agent':user_agent}
        r = requests.get(url, headers=headers)
        if r.status_code == 200:
            r.encoding = 'utf-8'
            return r.text
        return None

  
HtmlParser.py:
#coding=utf-8
import re
import urlparse
from bs4 import BeautifulSoup as BS
 
class HtmlParser(object):
    def parser(self, page_url, html_cont):
        if page_url is None or html_cont is None:
            return
        soup = BS(html_cont, 'html.parser', from_encoding='utf-8')
        new_urls = self._get_new_urls(page_url, soup)
        new_data = self._get_new_data(page_url, soup)
        return new_urls, new_data
 
    def _get_new_urls(self, page_url, soup):
        new_urls = set()
        #        a  
        links = soup.find_all('a', href=re.compile(r'/item/.*'))
        for link in links:
            #   href  
            new_url = link['href']
            #        
            new_full_url = urlparse.urljoin(page_url, new_url)
            new_urls.add(new_full_url)
        return new_urls
 
    def _get_new_data(self, page_url, soup):
        data = {}
        data['url'] = page_url
        title = soup.find('dd', class_='lemmaWgt-lemmaTitle-title').find('h1')
        data['title'] = title.get_text()
        summary = soup.find('div', class_='lemma-summary')
        #    tag              tag    ,      Unicode     
        data['summary'] = summary.get_text()
        return data

  
 
転載先:https://www.cnblogs.com/konghui/p/11193862.html