python爬虫プロセスandスレッド

43406 ワード

マルチタスク
定義:オペレーティングシステムは同時に複数のタスクを実行することができる真の並列実行マルチタスクはマルチコアCPUでしか実現できないが、タスク数はCPUコア数よりはるかに多いため、オペレーティングシステムは自動的に多くのタスクを各コアに順番にスケジューリングして実行する.
同時:タスク数の余分なcpuコア数を指し、オペレーティングシステムの各種タスクスケジューリングアルゴリズムを通じて、タスクで「一緒に」実行を実現する(実際には、タスクの切り替え速度がかなり速く、一緒に実行されているように見えるため、いくつかのタスクが実行されていないことが多い)並列:タスク数がcpuコア数以下であること、すなわちタスクが本当に一緒に実行されていることを指す.
スレッドとプロセスの関係:プロセス:プログラムが実行されると、コード+で使用されるリソースをプロセスと呼びます.これはオペレーティングシステムがリソースを割り当てる基本ユニットスレッドです.プログラム実行フローの最小ユニットです.スレッドはプロセスのエンティティであり、システムによって独立して調整され、割り当てられた基本単位です.スレッドはプロセスに依存して存在します.
スレッド(Thread)
pythonのthreadモジュールは比較的下位のモジュールであり、pythonのthreadingモジュールはthreadでいくつか包装されており、より便利に使用することができる.
  • Threadのパラメータ:
  • target:スレッド実行関数
  • name:スレッドの名前
  • args:スレッドが実行する関数のパラメータ(メタグループタイプ)
  • daemon:Falseメインスレッドの終了時にサブスレッドが終了したかどうかを検出し、サブスレッドがまだ実行されている場合は、メインスレッドが完了してから終了するのを待つTrueの場合は、メインスレッドがjoin()メソッドを追加しない限り、サブスレッドが待機しないことを検出しません.
  • import threading
    import time
    
    def work1(parameter):
        for i in range(30):
            print('ssss',str(i),threading.currentThread().name,parameter)
            # threading.currentThread().name              
    def work2():
        for i in range(30):
            print('xxxxxxx',i,threading.currentThread().name)
    
    def main():
        t1 = threading.Thread(target=work1,name='   ',args=('    ',))
        t2 = threading.Thread(target=work2,name='    ')
    
        t1.start()
        #          start()     
        t2.start()
        # join()           
        t2.join()
    if __name__ == '__main__':
        main()
        print('     ,    ',threading.currentThread().name)
    

    スレッド-グローバル変数の共有とロック
    スレッドのグローバル変数は共有されています.これは、スレッドがグローバル変数を勝手に変更することで、マルチスレッド間のグローバル変数の混乱(すなわち、スレッドが安全ではない)を引き起こす可能性があることを意味します.そのため、栗をロックする必要があります.
    import threading
    
    lock = threading.Lock()
    lockq = threading.Lock()
    #   
    # lock.acquire()
    #
    # #   
    # lock.release()
    
    a = 0
    
    def aaaaaa():
        for i in range(1000000):
            global a
            lock.acquire()
            a+=1
            lockq.release()
    
    def bbbbb():
        for i in range(1000000):
            global a
            lockq.acquire()
            a+=1
            lock.release()
    
    
    def main():
        qq = threading.Thread(target=aaaaaa)
        ww = threading.Thread(target=bbbbb)
    
        qq.start()
        ww.start()
        qq.join()
        ww.join()
        print(a)
    if __name__ == '__main__':
        main()
    

    注意:ロックをかけるときは慎重にしなければなりません.そうしないと、デッドロックの問題が発生します.デッドロック:オンラインスレッド間で複数のリソースを共有する場合、2つのスレッドがそれぞれ一部のリソースを占有し、相手のリソースを待っている場合、デッドロックが発生します.
    スレッドキューQueue
    キューはスレッド間で最もよく使われる交換データの形式Queueであり、スレッドが安全であるため、使用条件を満たす場合はキューの使用を推奨する
  • Queueのパラメータ
  • myqueue.put(10)ネットワークキュー列にパラメータ
  • を追加
  • myqueue.get()キューからパラメータ
  • を取り出す
  • Queue.qsize()は、キューのサイズ
  • を返します.
  • Queue.Empty()キューが空の場合はTrueを返し、逆にFalse
  • を返します.
  • Queue.フル()キューがいっぱいになったらTrueに戻り、逆にFalse
  • Queue.fullはmaxsizeサイズに対応する
  • Queue.get([block[,timeout]])取得キュー、timeout待ち時間
  • import Queue
    
    
    def work1(queue):
        while not queue.empty():
            url = queue.get()
            request.get(url)
    
    def main():
        myqueue = Queue.Queue(maxsize = 10)
        for i in range(4):
            url = 'https://xxx.com/page/'+str(i)
            myqueue.put(url)
            work1(myqueue)
    

    スレッドプール
    干物:栗
    # ------------   ---------
    
    import requests
    from lxml import etree
    from concurrent.futures import ThreadPoolExecutor
    
    def download_article_list(req_url):
        headers = {
            'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
        }
        response= requests.get(url=req_url,headers=headers)
        if response.status_code == 200:
            html = response.text
            return html
    def parse_data_by_callback(future):
        html = future.result()
        html = etree.HTML(html)
        data_list = html.xpath('//div[@class="post floated-thumb"]')
        for data in data_list[:1]:
            title = '|'.join(data.xpath('.//div[@class="post-meta"]/p[1]//text()')).replace(' ', '').replace('\r
    '
    , '') print(title) def main(): pool = ThreadPoolExecutor(10) # for i in range(1, 11): ful_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(str(i)) pl = pool.submit(download_article_list,ful_url) pl.add_done_callback(parse_data_by_callback) if __name__ == '__main__': main()

    プロセス
    プロセス:プログラムが実行されると、コード+で使用されるリソースをプロセスと呼びます.オペレーティングシステムがリソースを割り当てる基本ユニットですが、スレッドと似ています.
  • Processパラメータ:
  • target:関数の参照が伝達されると、このサブプロセスをタスクして、ここのコード
  • を実行することができる.
  • args:targetに指定する関数に渡すパラメータで、
  • をメタグループで渡す.
  • kwargs:targetに指定された関数に名前付きパラメータ
  • を渡す
  • name:プロセスに名前を設定し、
  • を設定しないことができます.
  • Processで作成されたインスタンスオブジェクトの一般的な方法
  • start():サブプロセスインスタンスの開始(サブプロセスの作成)
  • is_alive():プロセスサブプロセスがまだ生きているかどうかを判断する
  • join([timeout]):サブプロセスの実行が終了するのを待つか、または何秒待つか
  • terminate():タスクが完了するかどうかにかかわらず、サブプロセス
  • を直ちに終了します.

    栗:
    from multiprocessing import Process
    
    
    def work1(a):
        print(a)
    
    
    def work2():
        print('aaaaa')
    
    
    def main():
        print(11111111111111111111111111111)
        pr1 = Process(target=work1, args=('  ',))
        pr2 = Process(target=work2)
        pr1.start()
        pr2.start()
    
        pr1.join()
        pr2.join()
        print(22222222222222222222222222222)
    
    
    if __name__ == '__main__':
        main()
    
    

    プロセス通信-Queue
    プロセス間ではグローバル変数を共有しないプロセス間で通信が必要になる場合があり、オペレーティングシステムはプロセス間の通信を実現するための多くのメカニズムを提供しています.このQueueは以前のスレッドのQueueとは異なる注意で導入されたパケットですが、このQueueとスレッドのQueueの使い方はあまり違いません
    #          
        #   multiprocessing    queue          
        from multiprocessing import Queue, Process
        import os
    
    
        def write(dataqueue):
            for i in range(10):
                dataqueue.put(i)
            print(os.getpid(), '    ')
    
    
        def read(dataqueue):
            while not dataqueue.empty():
                print(dataqueue.get())
    
    
        def main():
            data_queue = Queue()
            process = Process(target=write, args=(data_queue,))
            process.start()
            process.join()
            p2 = Process(target=read, args=(data_queue,))
            p2.start()
            p2.join()
    
    
        if __name__ == '__main__':
            main()
    
    

    プロセスプール
  • 方式一:使用するmultiprocessingのプロセスプール
    from multiprocessing import Pool
    import os,time
    def runtest(num):
        print('    '+str(os.getpid()))
        time.sleep(2)
        # print(num)
        print('    '+str(os.getpid()))
        return num,num
    
    def done(future):
        print(future)
    
    #       
    p = Pool(4)
    for i in range(0,50):
        #func:    (  )   ,args:  (  )      tuple(  ),
        #callback    (     ,   )
        p.apply_async(func=runtest,args=(i,),callback=done)
    
    #close()       ,          
    p.close()
    p.join()
    
    プロセスプール間の通信:Mangerの下のQueue()を使用する他の方法は
        from multiprocessing import Manager, Pool
        import time
    
        def write(queue, num):
            for i in range(num):
                queue.put(i)
            print('      ')
    
    
        def read(queue):
            print(queue.get())
        def main():
            #   Manage
            q = Manager().Queue()
    
            pool = Pool()
            print(111)
            pool.apply_async(func=write,args=(q,2,))
            print(2222)
            time.sleep(3)
            pool.apply_async(func=read,args=(q,))
            pool.close()
            pool.join()
        if __name__ == '__main__':
            main()
    
  • 未満である.
  • 方式2:concurrentを用いる.futuresのプロセスプール
    from concurrent.futures import ProcessPoolExecutor
    import time,os
    #       
    def runtest(num):
        print('    '+str(os.getpid()))
        time.sleep(2)
        # print(num)
        print('    '+str(os.getpid()))
        return num
    
    def done(future):
        print(future.result())
    
    pool = ProcessPoolExecutor(4)
    for i in range(0,20):
        handler =  pool.submit(runtest,(i,))
        handler.add_done_callback(done)
    
    pool.shutdown(wait=True)
    
  • きょうてい
    一般的な理解:1つのスレッドの関数では、現在の関数の一時変数などの情報をどこでも保存し、別の関数に切り替えて実行することができます.関数を呼び出す方法ではなく、切り替えの回数やいつ元の関数に切り替えるかは、開発者が自分で決定します.geventという第3ライブラリを使用すると、待つ間に自動的に切り替えることができます.
    インストール:pip 3 install gevent
    from gevent import monkey,pool
    import gevent,requests
    import lxml.etree as etree
    
    #         
    monkey.patch_all()  #               ,  gevent        
    
    
    def download(url):
        print(url+'    1')
        header = {'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
        response = requests.get(url,headers=header)
        print(len(response.text),url+'   1')
    
    def download2(url):
        print(url+'    2')
        header = {'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0'}
        response = requests.get(url,headers=header)
        print(len(response.text),url+'   2')
    
    pool = pool.Pool(2)
    
    gevent.joinall(
        [
            pool.spawn(download,'https://www.yahoo.com/'),
            pool.spawn(download,'https://www.taobao.com/'),
            pool.spawn(download,'https://github.com/'), 
            pool.spawn(download2,'https://www.yahoo.com/'),
            pool.spawn(download2,'https://www.taobao.com/'),
            pool.spawn(download2,'https://github.com/'), 
        ]
    )
    

    まとめ
    プロセスは、リソース割り当ての単位スレッドです.オペレーティングシステムのスケジューリングの単位です.
    プロセス切替に必要なリソースは最大であり、効率が低いスレッド切替に必要なリソースは一般的であり、効率は一般的である(もちろんGILを考慮しない場合は、コヒーレント切替タスクリソースは小さく、効率が高い
    マルチプロセス、マルチスレッドは、cpuのコア数によっては並列である可能性があるが、コヒーレンスは1つのスレッドにあるので並列である.