スレッドプールpython

39158 ワード

オリジナルブログ、転載は出典を明記してください
今日pythonプロセスとスレッドを学習するとき、公式ドキュメントを参照して、スレッドプールthreadpoolモジュールを何気なく発見しました.
モジュールの使用は非常に簡単で、スレッドプールの動作原理を熟知する必要があることを前提としています.
システムがタスクを処理する場合、各リクエストに対してオブジェクトを作成および破棄する必要があることを知っています.大量の同時タスクを処理する必要がある場合、従来のマルチスレッドを再使用すると、大量のリソース作成破棄が発生し、サーバの効率が低下します.このとき、スレッドプールが役に立ちます.スレッドプールテクノロジーは、スレッドの作成、破棄にかかるオーバーヘッドの問題とシステムリソースの不足の問題に良い解決策を提供します.
メリット:
(1)生成スレッドの数を制御できる.あらかじめ一定数のワークスレッドを作成し、その数を制限することで、スレッドオブジェクトのメモリ消費を制御します.(2)システムオーバーヘッドとリソース消費を低減する.複数のリクエストに対してスレッドを再利用することで、スレッドの作成、破棄のオーバーヘッドが複数のリクエストに割り当てられます.また,スレッド数を制限することで,仮想マシンのゴミ回収におけるオーバーヘッドを低減する.(3)システム応答速度の向上.スレッドは事前に作成されており、リクエストが到着したときに直接処理することができ、スレッドの作成による遅延を解消し、他の複数のスレッドを同時処理することができる.
 スレッドプールの基本的な実装方法:(1)スレッドプールマネージャ.スレッドプールを作成して維持し、必要に応じてプールのサイズを調整し、スレッド漏れ現象を監視します.(2)作業スレッド.タスクをループして実行できるスレッドで、タスクがない場合はWait状態で、新しいタスクが到着したときに起動できます.(3)タスクキュー.同時スレッドのmonitorオブジェクトとして同時に処理対象のタスクを一時的に格納するバッファメカニズムを提供します.(4)タスクインタフェース.これは、各タスクが実装しなければならないインタフェースであり、ワークスレッドは、タスクの実行をスケジューリングします.スレッドプールマネージャを構築する場合は、まずタスクキュー(Queue)を初期化し、実行時にタスクの追加方法を呼び出してタスクキューにタスクを追加します.その後、一定数の作業スレッドを作成して起動し、これらのスレッドをオンラインキューに保存します.スレッドプールマネージャは、実行時に必要に応じて作業スレッドの数を増やしたり減らしたりすることができます.ワークスレッドが実行されると、まずタスクキューをロックして、キュー内で処理されるタスクのようなマルチスレッドのタスクキューへの正しい同時アクセスを保証します.ワークスレッドはタスクを取り出し、タスクキューへのロックを解放して、他のスレッドがタスクキューへのアクセスと処理を実現します.タスクを取得した後、ワークスレッドはタスクインタフェースを呼び出してタスクの処理を完了します.タスクキューが空の場合、作業スレッドはタスクキューの待機スレッドリストに追加され、作業スレッドはWait状態であり、CPUリソースをほとんど占めない.新しいタスクが到着すると、タスクリストオブジェクトのnotifyメソッドを呼び出すことで、タスクを処理するために待機スレッドリストからワークスレッドを呼び出します.このコラボレーションモードにより、スレッドの作成、破棄のオーバーヘッドを節約し、タスクの同時処理を保証し、システムの応答速度を向上させることができます.
簡単に言えば、同時実行されたタスクをスレッドプールに渡し、同時実行されたタスクごとに新しいスレッドを起動する代わりにします.プールに空きスレッドがある限り、タスクはスレッド実行に割り当てられます.
1 pool = ThreadPool(poolsize)

2 requests = makeRequests(some_callable,list_of_args,callback)

3 [pool.putRequest(req) for req in requests]

4 pool.wait()

最初の行はpoolsize個の数のスレッドを格納できるスレッドプールを作成することを意味します.
2行目はmakeRequests作成リクエストを呼び出すことを意味します.some_callableはマルチスレッド処理を開く必要がある関数、list_of_argsは関数パラメータ、callbackはオプションパラメータコールバック、デフォルトはなしです.
3行目は、マルチスレッドを実行する関数をスレッドプールに入れることを意味します.
最後の行は、すべてのスレッドが完了するのを待って終了することを意味します.
ソースコードを分析することで、実は中身が簡単であることがわかりました.
  1 import sys

  2 import threading

  3 import Queue

  4 import traceback

  5 

  6 

  7 # exceptions

  8 class NoResultsPending(Exception):

  9     """All work requests have been processed."""

 10     pass

 11 

 12 class NoWorkersAvailable(Exception):

 13     """No worker threads available to process remaining requests."""

 14     pass

 15 

 16 

 17 # internal module helper functions

 18 def _handle_thread_exception(request, exc_info):

 19     """Default exception handler callback function.

 20 

 21     This just prints the exception info via ``traceback.print_exception``.

 22 

 23     """

 24     traceback.print_exception(*exc_info)

 25 

 26 

 27 # utility functions

 28 def makeRequests(callable_, args_list, callback=None,  #           callback         ,exc_callback           

 29         exc_callback=_handle_thread_exception):

 30     """Create several work requests for same callable with different arguments.

 31 

 32     Convenience function for creating several work requests for the same

 33     callable where each invocation of the callable receives different values

 34     for its arguments.

 35 

 36     ``args_list`` contains the parameters for each invocation of callable.

 37     Each item in ``args_list`` should be either a 2-item tuple of the list of

 38     positional arguments and a dictionary of keyword arguments or a single,

 39     non-tuple argument.

 40 

 41     See docstring for ``WorkRequest`` for info on ``callback`` and

 42     ``exc_callback``.

 43 

 44     """

 45     requests = []

 46     for item in args_list:

 47         if isinstance(item, tuple):

 48             requests.append(

 49                 WorkRequest(callable_, item[0], item[1], callback=callback,

 50                     exc_callback=exc_callback)

 51             )

 52         else:

 53             requests.append(

 54                 WorkRequest(callable_, [item], None, callback=callback,

 55                     exc_callback=exc_callback)

 56             )

 57     return requests

 58 

 59 

 60 # classes

 61 class WorkerThread(threading.Thread):     #    

 62     """Background thread connected to the requests/results queues.

 63 

 64     A worker thread sits in the background and picks up work requests from

 65     one queue and puts the results in another until it is dismissed.

 66 

 67     """

 68 

 69     def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):

 70         """Set up thread in daemonic mode and start it immediatedly.

 71 

 72         ``requests_queue`` and ``results_queue`` are instances of

 73         ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new

 74         worker thread.

 75 

 76         """

 77         threading.Thread.__init__(self, **kwds)

 78         self.setDaemon(1)

 79         self._requests_queue = requests_queue     #    

 80         self._results_queue = results_queue       #        

 81         self._poll_timeout = poll_timeout

 82         self._dismissed = threading.Event()       

 83         self.start()

 84 

 85     def run(self):

 86         """Repeatedly process the job queue until told to exit."""

 87         while True:

 88             if self._dismissed.isSet():  #       True,        

 89                 # we are dismissed, break out of loop

 90                 break

 91             # get next work request. If we don't get a new request from the

 92             # queue after self._poll_timout seconds, we jump to the start of

 93             # the while loop again, to give the thread a chance to exit.

 94             try:

 95                 request = self._requests_queue.get(True, self._poll_timeout)#       ,block  True,       ,        

 96             except Queue.Empty:

 97                 continue

 98             else:

 99                 if self._dismissed.isSet():    ,        ,        

100                     # we are dismissed, put back request in queue and exit loop

101                     self._requests_queue.put(request) #         

102                     break

103                 try:

104                     result = request.callable(*request.args, **request.kwds)

105                     self._results_queue.put((request, result))

106                 except:

107                     request.exception = True

108                     self._results_queue.put((request, sys.exc_info()))

109 

110     def dismiss(self):

111         """Sets a flag to tell the thread to exit when done with current job."""

112         self._dismissed.set()

113 

114 

115 class WorkRequest:      #        

116     """A request to execute a callable for putting in the request queue later.

117 

118     See the module function ``makeRequests`` for the common case

119     where you want to build several ``WorkRequest`` objects for the same

120     callable but with different arguments for each call.

121 

122     """

123 

124     def __init__(self, callable_, args=None, kwds=None, requestID=None,

125             callback=None, exc_callback=_handle_thread_exception):

126         """Create a work request for a callable and attach callbacks.

127 

128         A work request consists of the a callable to be executed by a

129         worker thread, a list of positional arguments, a dictionary

130         of keyword arguments.

131 

132         A ``callback`` function can be specified, that is called when the

133         results of the request are picked up from the result queue. It must

134         accept two anonymous arguments, the ``WorkRequest`` object and the

135         results of the callable, in that order. If you want to pass additional

136         information to the callback, just stick it on the request object.

137 

138         You can also give custom callback for when an exception occurs with

139         the ``exc_callback`` keyword parameter. It should also accept two

140         anonymous arguments, the ``WorkRequest`` and a tuple with the exception

141         details as returned by ``sys.exc_info()``. The default implementation

142         of this callback just prints the exception info via

143         ``traceback.print_exception``. If you want no exception handler

144         callback, just pass in ``None``.

145 

146         ``requestID``, if given, must be hashable since it is used by

147         ``ThreadPool`` object to store the results of that work request in a

148         dictionary. It defaults to the return value of ``id(self)``.

149 

150         """

151         if requestID is None:

152             self.requestID = id(self) #id         

153         else:

154             try:

155                 self.requestID = hash(requestID) #     

156             except TypeError:

157                 raise TypeError("requestID must be hashable.")

158         self.exception = False

159         self.callback = callback

160         self.exc_callback = exc_callback

161         self.callable = callable_

162         self.args = args or []

163         self.kwds = kwds or {}

164 

165     def __str__(self):

166         return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \

167             (self.requestID, self.args, self.kwds, self.exception)

168 

169 class ThreadPool:  #      

170     """A thread pool, distributing work requests and collecting results.

171 

172     See the module docstring for more information.

173 

174     """

175 

176     def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):

177         """Set up the thread pool and start num_workers worker threads.

178 

179         ``num_workers`` is the number of worker threads to start initially.

180 

181         If ``q_size > 0`` the size of the work *request queue* is limited and

182         the thread pool blocks when the queue is full and it tries to put

183         more work requests in it (see ``putRequest`` method), unless you also

184         use a positive ``timeout`` value for ``putRequest``.

185 

186         If ``resq_size > 0`` the size of the *results queue* is limited and the

187         worker threads will block when the queue is full and they try to put

188         new results in it.

189 

190         .. warning:

191             If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is

192             the possibilty of a deadlock, when the results queue is not pulled

193             regularly and too many jobs are put in the work requests queue.

194             To prevent this, always set ``timeout > 0`` when calling

195             ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.

196 

197         """

198         self._requests_queue = Queue.Queue(q_size)  #    

199         self._results_queue = Queue.Queue(resq_size) #    

200         self.workers = []  #    

201         self.dismissedWorkers = [] #    

202         self.workRequests = {}  #       id   request

203         self.createWorkers(num_workers, poll_timeout)

204 

205     def createWorkers(self, num_workers, poll_timeout=5):

206         """Add num_workers worker threads to the pool.

207 

208         ``poll_timout`` sets the interval in seconds (int or float) for how

209         ofte threads should check whether they are dismissed, while waiting for

210         requests.

211 

212         """

213         for i in range(num_workers):

214             self.workers.append(WorkerThread(self._requests_queue,

215                 self._results_queue, poll_timeout=poll_timeout))

216 

217     def dismissWorkers(self, num_workers, do_join=False):

218         """Tell num_workers worker threads to quit after their current task."""

219         dismiss_list = []

220         for i in range(min(num_workers, len(self.workers))):

221             worker = self.workers.pop()

222             worker.dismiss()

223             dismiss_list.append(worker)

224 

225         if do_join:

226             for worker in dismiss_list:

227                 worker.join()

228         else:

229             self.dismissedWorkers.extend(dismiss_list)

230 

231     def joinAllDismissedWorkers(self):

232         """Perform Thread.join() on all worker threads that have been dismissed.

233         """

234         for worker in self.dismissedWorkers:

235             worker.join()

236         self.dismissedWorkers = []

237 

238     def putRequest(self, request, block=True, timeout=None):

239         """Put work request into work queue and save its id for later."""

240         assert isinstance(request, WorkRequest)

241         # don't reuse old work requests

242         assert not getattr(request, 'exception', None)

243         self._requests_queue.put(request, block, timeout)

244         self.workRequests[request.requestID] = request  #            id    request 

245 

246     def poll(self, block=False):#    ,

247         """Process any new results in the queue."""

248         while True:

249             # still results pending?

250             if not self.workRequests: #    

251                 raise NoResultsPending

252             # are there still workers to process remaining requests?

253             elif block and not self.workers:#     

254                 raise NoWorkersAvailable

255             try:

256                 # get back next results

257                 request, result = self._results_queue.get(block=block)

258                 # has an exception occured?

259                 if request.exception and request.exc_callback:

260                     request.exc_callback(request, result)

261                 # hand results to callback, if any

262                 if request.callback and not \

263                        (request.exception and request.exc_callback):

264                     request.callback(request, result)

265                 del self.workRequests[request.requestID]

266             except Queue.Empty:

267                 break

268 

269     def wait(self):

270         """Wait for results, blocking until all have arrived."""

271         while 1:

272             try:

273                 self.poll(True)

274             except NoResultsPending:

275                 break

3つのクラスがあります
まず、スレッドプールスケジューリングThreadPoolインスタンス(パラメータに基づいて複数のスレッドworksを生成)を作成し、makeRequestsで複数の異なるパラメータを持つタスクリクエストworkRequestを作成し、タスクリクエストをputRequestでスレッドプールのタスクキューに配置する必要があります.このとき、スレッドworkThreadはタスクcallableを取得します.そして処理を行って結果を得,結果キューに格納する.callbackが存在する場合、結果に対して関数が呼び出されます.
注意:結果キューの要素はメタグループ(request,result)であり、これで一つ一つ対応します. 
私の次の文章では、爬虫類について、スレッドプールを使用して爬虫類の爬虫類の効率を強化することを試みます.