Pythonスレッド池の実現


ここから来ました:http://blog.csdn.net/sding/article/details/5538089

  
  
  
  
  1. import urllib2   
  2. import time   
  3. import socket   
  4. from datetime import datetime   
  5. from thread_pool import *   
  6.     
  7. def main():   
  8.     url_list = {"sina":"http://www.sina.com.cn",   
  9.                "sohu":"http://www.sohu.com",   
  10.                "yahoo":"http://www.yahoo.com",   
  11.                "xiaonei":"http://www.xiaonei.com",   
  12.                "qihoo":"http://www.qihoo.com",   
  13.                "laohan":"http://www.laohan.org",   
  14.                "eyou":"http://www.eyou.com",   
  15.                "chinaren":"http://www.chinaren.com",   
  16.                "douban":"http://www.douban.com",   
  17.                "163":"http://www.163.com",   
  18.                "daqi":"http://www.daqi.com",   
  19.                "qq":"http://www.qq.com",   
  20.                "baidu_1":"http://www.baidu.com/s?wd=asdfasdf",   
  21.                "baidu_2":"http://www.baidu.com/s?wd=dddddddf",   
  22.                "google_1":"http://www.baidu.com/s?wd=sadfas",   
  23.                "google_2":"http://www.baidu.com/s?wd=sadflasd",   
  24.                "hainei":"http://www.hainei.com",   
  25.                "microsoft":"http://www.microsoft.com",   
  26.                "wlzuojia":"http://www.wlzuojia.com"}   
  27.     
  28.     #    
  29.     socket.setdefaulttimeout(10)   
  30.     print 'start testing'   
  31.     wm = WorkerManager(50)   
  32.     for url_name in url_list.keys():   
  33.         wm.add_job(do_get_con, url_name, url_list[url_name])   
  34.     wm.wait_for_complete()   
  35.     print 'end testing'   
  36.     
  37. def do_get_con(url_name,url_link):   
  38.     try:   
  39.         fd = urllib2.urlopen(url_link)   
  40.         data = fd.read()   
  41.         f_hand = open("/tmp/ttt/%s" % url_name,"w")   
  42.         f_hand.write(data)   
  43.         f_hand.close()   
  44.     except Exception,e:   
  45.         pass   
  46.     
  47. if __name__ == "__main__":   
  48.     main()   
  49. thread_pool ( , :http://blog.daviesliu.net/2006/10/09/234822/)   
  50.    
  51. import Queue, threading, sys   
  52. from threading import Thread   
  53. import time   
  54. import urllib   
  55.     
  56. # working thread   
  57. class Worker(Thread):   
  58.     worker_count = 0   
  59.     timeout = 1   
  60.     def __init__( self, workQueue, resultQueue, **kwds):   
  61.         Thread.__init__( self, **kwds )   
  62.         self.id = Worker.worker_count   
  63.         Worker.worker_count += 1   
  64.         self.setDaemon( True )   
  65.         self.workQueue = workQueue   
  66.         self.resultQueue = resultQueue   
  67.         self.start( )   
  68.     
  69.     def run( self ):   
  70.         ''''''' the get-some-work, do-some-work main loop of worker threads '''   
  71.         while True:   
  72.             try:   
  73.                 callable, args, kwds = self.workQueue.get(timeout=Worker.timeout)   
  74.                 res = callable(*args, **kwds)   
  75.                 print "worker[%2d]: %s" % (self.id, str(res) )   
  76.                 self.resultQueue.put( res )   
  77.                 #time.sleep(Worker.sleep)   
  78.             except Queue.Empty:   
  79.                 break   
  80.             except :   
  81.                 print 'worker[%2d]' % self.id, sys.exc_info()[:2]   
  82.                 raise   
  83.                   
  84. class WorkerManager:   
  85.     def __init__( self, num_of_workers=10, timeout = 2):   
  86.         self.workQueue = Queue.Queue()   
  87.         self.resultQueue = Queue.Queue()   
  88.         self.workers = []   
  89.         self.timeout = timeout   
  90.         self._recruitThreads( num_of_workers )   
  91.     
  92.     def _recruitThreads( self, num_of_workers ):   
  93.         for i in range( num_of_workers ):   
  94.             worker = Worker( self.workQueue, self.resultQueue )   
  95.             self.workers.append(worker)   
  96.     
  97.     def wait_for_complete( self):   
  98.         # ...then, wait for each of them to terminate:   
  99.         while len(self.workers):   
  100.             worker = self.workers.pop()   
  101.             worker.join( )   
  102.             if worker.isAlive() and not self.workQueue.empty():   
  103.                 self.workers.append( worker )   
  104.         print "All jobs are are completed."   
  105.     
  106.     def add_job( self, callable, *args, **kwds ):   
  107.         self.workQueue.put( (callable, args, kwds) )   
  108.     
  109.     def get_result( self, *args, **kwds ):   
  110.         return self.resultQueue.get( *args, **kwds )