スレッドプールpython
39158 ワード
オリジナルブログ、転載は出典を明記してください
今日pythonプロセスとスレッドを学習するとき、公式ドキュメントを参照して、スレッドプールthreadpoolモジュールを何気なく発見しました.
モジュールの使用は非常に簡単で、スレッドプールの動作原理を熟知する必要があることを前提としています.
システムがタスクを処理する場合、各リクエストに対してオブジェクトを作成および破棄する必要があることを知っています.大量の同時タスクを処理する必要がある場合、従来のマルチスレッドを再使用すると、大量のリソース作成破棄が発生し、サーバの効率が低下します.このとき、スレッドプールが役に立ちます.スレッドプールテクノロジーは、スレッドの作成、破棄にかかるオーバーヘッドの問題とシステムリソースの不足の問題に良い解決策を提供します.
メリット:
(1)生成スレッドの数を制御できる.あらかじめ一定数のワークスレッドを作成し、その数を制限することで、スレッドオブジェクトのメモリ消費を制御します.(2)システムオーバーヘッドとリソース消費を低減する.複数のリクエストに対してスレッドを再利用することで、スレッドの作成、破棄のオーバーヘッドが複数のリクエストに割り当てられます.また,スレッド数を制限することで,仮想マシンのゴミ回収におけるオーバーヘッドを低減する.(3)システム応答速度の向上.スレッドは事前に作成されており、リクエストが到着したときに直接処理することができ、スレッドの作成による遅延を解消し、他の複数のスレッドを同時処理することができる.
スレッドプールの基本的な実装方法:(1)スレッドプールマネージャ.スレッドプールを作成して維持し、必要に応じてプールのサイズを調整し、スレッド漏れ現象を監視します.(2)作業スレッド.タスクをループして実行できるスレッドで、タスクがない場合はWait状態で、新しいタスクが到着したときに起動できます.(3)タスクキュー.同時スレッドのmonitorオブジェクトとして同時に処理対象のタスクを一時的に格納するバッファメカニズムを提供します.(4)タスクインタフェース.これは、各タスクが実装しなければならないインタフェースであり、ワークスレッドは、タスクの実行をスケジューリングします.スレッドプールマネージャを構築する場合は、まずタスクキュー(Queue)を初期化し、実行時にタスクの追加方法を呼び出してタスクキューにタスクを追加します.その後、一定数の作業スレッドを作成して起動し、これらのスレッドをオンラインキューに保存します.スレッドプールマネージャは、実行時に必要に応じて作業スレッドの数を増やしたり減らしたりすることができます.ワークスレッドが実行されると、まずタスクキューをロックして、キュー内で処理されるタスクのようなマルチスレッドのタスクキューへの正しい同時アクセスを保証します.ワークスレッドはタスクを取り出し、タスクキューへのロックを解放して、他のスレッドがタスクキューへのアクセスと処理を実現します.タスクを取得した後、ワークスレッドはタスクインタフェースを呼び出してタスクの処理を完了します.タスクキューが空の場合、作業スレッドはタスクキューの待機スレッドリストに追加され、作業スレッドはWait状態であり、CPUリソースをほとんど占めない.新しいタスクが到着すると、タスクリストオブジェクトのnotifyメソッドを呼び出すことで、タスクを処理するために待機スレッドリストからワークスレッドを呼び出します.このコラボレーションモードにより、スレッドの作成、破棄のオーバーヘッドを節約し、タスクの同時処理を保証し、システムの応答速度を向上させることができます.
簡単に言えば、同時実行されたタスクをスレッドプールに渡し、同時実行されたタスクごとに新しいスレッドを起動する代わりにします.プールに空きスレッドがある限り、タスクはスレッド実行に割り当てられます.
最初の行はpoolsize個の数のスレッドを格納できるスレッドプールを作成することを意味します.
2行目はmakeRequests作成リクエストを呼び出すことを意味します.some_callableはマルチスレッド処理を開く必要がある関数、list_of_argsは関数パラメータ、callbackはオプションパラメータコールバック、デフォルトはなしです.
3行目は、マルチスレッドを実行する関数をスレッドプールに入れることを意味します.
最後の行は、すべてのスレッドが完了するのを待って終了することを意味します.
ソースコードを分析することで、実は中身が簡単であることがわかりました.
3つのクラスがあります
まず、スレッドプールスケジューリングThreadPoolインスタンス(パラメータに基づいて複数のスレッドworksを生成)を作成し、makeRequestsで複数の異なるパラメータを持つタスクリクエストworkRequestを作成し、タスクリクエストをputRequestでスレッドプールのタスクキューに配置する必要があります.このとき、スレッドworkThreadはタスクcallableを取得します.そして処理を行って結果を得,結果キューに格納する.callbackが存在する場合、結果に対して関数が呼び出されます.
注意:結果キューの要素はメタグループ(request,result)であり、これで一つ一つ対応します.
私の次の文章では、爬虫類について、スレッドプールを使用して爬虫類の爬虫類の効率を強化することを試みます.
今日pythonプロセスとスレッドを学習するとき、公式ドキュメントを参照して、スレッドプールthreadpoolモジュールを何気なく発見しました.
モジュールの使用は非常に簡単で、スレッドプールの動作原理を熟知する必要があることを前提としています.
システムがタスクを処理する場合、各リクエストに対してオブジェクトを作成および破棄する必要があることを知っています.大量の同時タスクを処理する必要がある場合、従来のマルチスレッドを再使用すると、大量のリソース作成破棄が発生し、サーバの効率が低下します.このとき、スレッドプールが役に立ちます.スレッドプールテクノロジーは、スレッドの作成、破棄にかかるオーバーヘッドの問題とシステムリソースの不足の問題に良い解決策を提供します.
メリット:
(1)生成スレッドの数を制御できる.あらかじめ一定数のワークスレッドを作成し、その数を制限することで、スレッドオブジェクトのメモリ消費を制御します.(2)システムオーバーヘッドとリソース消費を低減する.複数のリクエストに対してスレッドを再利用することで、スレッドの作成、破棄のオーバーヘッドが複数のリクエストに割り当てられます.また,スレッド数を制限することで,仮想マシンのゴミ回収におけるオーバーヘッドを低減する.(3)システム応答速度の向上.スレッドは事前に作成されており、リクエストが到着したときに直接処理することができ、スレッドの作成による遅延を解消し、他の複数のスレッドを同時処理することができる.
スレッドプールの基本的な実装方法:(1)スレッドプールマネージャ.スレッドプールを作成して維持し、必要に応じてプールのサイズを調整し、スレッド漏れ現象を監視します.(2)作業スレッド.タスクをループして実行できるスレッドで、タスクがない場合はWait状態で、新しいタスクが到着したときに起動できます.(3)タスクキュー.同時スレッドのmonitorオブジェクトとして同時に処理対象のタスクを一時的に格納するバッファメカニズムを提供します.(4)タスクインタフェース.これは、各タスクが実装しなければならないインタフェースであり、ワークスレッドは、タスクの実行をスケジューリングします.スレッドプールマネージャを構築する場合は、まずタスクキュー(Queue)を初期化し、実行時にタスクの追加方法を呼び出してタスクキューにタスクを追加します.その後、一定数の作業スレッドを作成して起動し、これらのスレッドをオンラインキューに保存します.スレッドプールマネージャは、実行時に必要に応じて作業スレッドの数を増やしたり減らしたりすることができます.ワークスレッドが実行されると、まずタスクキューをロックして、キュー内で処理されるタスクのようなマルチスレッドのタスクキューへの正しい同時アクセスを保証します.ワークスレッドはタスクを取り出し、タスクキューへのロックを解放して、他のスレッドがタスクキューへのアクセスと処理を実現します.タスクを取得した後、ワークスレッドはタスクインタフェースを呼び出してタスクの処理を完了します.タスクキューが空の場合、作業スレッドはタスクキューの待機スレッドリストに追加され、作業スレッドはWait状態であり、CPUリソースをほとんど占めない.新しいタスクが到着すると、タスクリストオブジェクトのnotifyメソッドを呼び出すことで、タスクを処理するために待機スレッドリストからワークスレッドを呼び出します.このコラボレーションモードにより、スレッドの作成、破棄のオーバーヘッドを節約し、タスクの同時処理を保証し、システムの応答速度を向上させることができます.
簡単に言えば、同時実行されたタスクをスレッドプールに渡し、同時実行されたタスクごとに新しいスレッドを起動する代わりにします.プールに空きスレッドがある限り、タスクはスレッド実行に割り当てられます.
1 pool = ThreadPool(poolsize)
2 requests = makeRequests(some_callable,list_of_args,callback)
3 [pool.putRequest(req) for req in requests]
4 pool.wait()
最初の行はpoolsize個の数のスレッドを格納できるスレッドプールを作成することを意味します.
2行目はmakeRequests作成リクエストを呼び出すことを意味します.some_callableはマルチスレッド処理を開く必要がある関数、list_of_argsは関数パラメータ、callbackはオプションパラメータコールバック、デフォルトはなしです.
3行目は、マルチスレッドを実行する関数をスレッドプールに入れることを意味します.
最後の行は、すべてのスレッドが完了するのを待って終了することを意味します.
ソースコードを分析することで、実は中身が簡単であることがわかりました.
1 import sys
2 import threading
3 import Queue
4 import traceback
5
6
7 # exceptions
8 class NoResultsPending(Exception):
9 """All work requests have been processed."""
10 pass
11
12 class NoWorkersAvailable(Exception):
13 """No worker threads available to process remaining requests."""
14 pass
15
16
17 # internal module helper functions
18 def _handle_thread_exception(request, exc_info):
19 """Default exception handler callback function.
20
21 This just prints the exception info via ``traceback.print_exception``.
22
23 """
24 traceback.print_exception(*exc_info)
25
26
27 # utility functions
28 def makeRequests(callable_, args_list, callback=None, # callback ,exc_callback
29 exc_callback=_handle_thread_exception):
30 """Create several work requests for same callable with different arguments.
31
32 Convenience function for creating several work requests for the same
33 callable where each invocation of the callable receives different values
34 for its arguments.
35
36 ``args_list`` contains the parameters for each invocation of callable.
37 Each item in ``args_list`` should be either a 2-item tuple of the list of
38 positional arguments and a dictionary of keyword arguments or a single,
39 non-tuple argument.
40
41 See docstring for ``WorkRequest`` for info on ``callback`` and
42 ``exc_callback``.
43
44 """
45 requests = []
46 for item in args_list:
47 if isinstance(item, tuple):
48 requests.append(
49 WorkRequest(callable_, item[0], item[1], callback=callback,
50 exc_callback=exc_callback)
51 )
52 else:
53 requests.append(
54 WorkRequest(callable_, [item], None, callback=callback,
55 exc_callback=exc_callback)
56 )
57 return requests
58
59
60 # classes
61 class WorkerThread(threading.Thread): #
62 """Background thread connected to the requests/results queues.
63
64 A worker thread sits in the background and picks up work requests from
65 one queue and puts the results in another until it is dismissed.
66
67 """
68
69 def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
70 """Set up thread in daemonic mode and start it immediatedly.
71
72 ``requests_queue`` and ``results_queue`` are instances of
73 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
74 worker thread.
75
76 """
77 threading.Thread.__init__(self, **kwds)
78 self.setDaemon(1)
79 self._requests_queue = requests_queue #
80 self._results_queue = results_queue #
81 self._poll_timeout = poll_timeout
82 self._dismissed = threading.Event()
83 self.start()
84
85 def run(self):
86 """Repeatedly process the job queue until told to exit."""
87 while True:
88 if self._dismissed.isSet(): # True,
89 # we are dismissed, break out of loop
90 break
91 # get next work request. If we don't get a new request from the
92 # queue after self._poll_timout seconds, we jump to the start of
93 # the while loop again, to give the thread a chance to exit.
94 try:
95 request = self._requests_queue.get(True, self._poll_timeout)# ,block True, ,
96 except Queue.Empty:
97 continue
98 else:
99 if self._dismissed.isSet(): , ,
100 # we are dismissed, put back request in queue and exit loop
101 self._requests_queue.put(request) #
102 break
103 try:
104 result = request.callable(*request.args, **request.kwds)
105 self._results_queue.put((request, result))
106 except:
107 request.exception = True
108 self._results_queue.put((request, sys.exc_info()))
109
110 def dismiss(self):
111 """Sets a flag to tell the thread to exit when done with current job."""
112 self._dismissed.set()
113
114
115 class WorkRequest: #
116 """A request to execute a callable for putting in the request queue later.
117
118 See the module function ``makeRequests`` for the common case
119 where you want to build several ``WorkRequest`` objects for the same
120 callable but with different arguments for each call.
121
122 """
123
124 def __init__(self, callable_, args=None, kwds=None, requestID=None,
125 callback=None, exc_callback=_handle_thread_exception):
126 """Create a work request for a callable and attach callbacks.
127
128 A work request consists of the a callable to be executed by a
129 worker thread, a list of positional arguments, a dictionary
130 of keyword arguments.
131
132 A ``callback`` function can be specified, that is called when the
133 results of the request are picked up from the result queue. It must
134 accept two anonymous arguments, the ``WorkRequest`` object and the
135 results of the callable, in that order. If you want to pass additional
136 information to the callback, just stick it on the request object.
137
138 You can also give custom callback for when an exception occurs with
139 the ``exc_callback`` keyword parameter. It should also accept two
140 anonymous arguments, the ``WorkRequest`` and a tuple with the exception
141 details as returned by ``sys.exc_info()``. The default implementation
142 of this callback just prints the exception info via
143 ``traceback.print_exception``. If you want no exception handler
144 callback, just pass in ``None``.
145
146 ``requestID``, if given, must be hashable since it is used by
147 ``ThreadPool`` object to store the results of that work request in a
148 dictionary. It defaults to the return value of ``id(self)``.
149
150 """
151 if requestID is None:
152 self.requestID = id(self) #id
153 else:
154 try:
155 self.requestID = hash(requestID) #
156 except TypeError:
157 raise TypeError("requestID must be hashable.")
158 self.exception = False
159 self.callback = callback
160 self.exc_callback = exc_callback
161 self.callable = callable_
162 self.args = args or []
163 self.kwds = kwds or {}
164
165 def __str__(self):
166 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
167 (self.requestID, self.args, self.kwds, self.exception)
168
169 class ThreadPool: #
170 """A thread pool, distributing work requests and collecting results.
171
172 See the module docstring for more information.
173
174 """
175
176 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
177 """Set up the thread pool and start num_workers worker threads.
178
179 ``num_workers`` is the number of worker threads to start initially.
180
181 If ``q_size > 0`` the size of the work *request queue* is limited and
182 the thread pool blocks when the queue is full and it tries to put
183 more work requests in it (see ``putRequest`` method), unless you also
184 use a positive ``timeout`` value for ``putRequest``.
185
186 If ``resq_size > 0`` the size of the *results queue* is limited and the
187 worker threads will block when the queue is full and they try to put
188 new results in it.
189
190 .. warning:
191 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
192 the possibilty of a deadlock, when the results queue is not pulled
193 regularly and too many jobs are put in the work requests queue.
194 To prevent this, always set ``timeout > 0`` when calling
195 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
196
197 """
198 self._requests_queue = Queue.Queue(q_size) #
199 self._results_queue = Queue.Queue(resq_size) #
200 self.workers = [] #
201 self.dismissedWorkers = [] #
202 self.workRequests = {} # id request
203 self.createWorkers(num_workers, poll_timeout)
204
205 def createWorkers(self, num_workers, poll_timeout=5):
206 """Add num_workers worker threads to the pool.
207
208 ``poll_timout`` sets the interval in seconds (int or float) for how
209 ofte threads should check whether they are dismissed, while waiting for
210 requests.
211
212 """
213 for i in range(num_workers):
214 self.workers.append(WorkerThread(self._requests_queue,
215 self._results_queue, poll_timeout=poll_timeout))
216
217 def dismissWorkers(self, num_workers, do_join=False):
218 """Tell num_workers worker threads to quit after their current task."""
219 dismiss_list = []
220 for i in range(min(num_workers, len(self.workers))):
221 worker = self.workers.pop()
222 worker.dismiss()
223 dismiss_list.append(worker)
224
225 if do_join:
226 for worker in dismiss_list:
227 worker.join()
228 else:
229 self.dismissedWorkers.extend(dismiss_list)
230
231 def joinAllDismissedWorkers(self):
232 """Perform Thread.join() on all worker threads that have been dismissed.
233 """
234 for worker in self.dismissedWorkers:
235 worker.join()
236 self.dismissedWorkers = []
237
238 def putRequest(self, request, block=True, timeout=None):
239 """Put work request into work queue and save its id for later."""
240 assert isinstance(request, WorkRequest)
241 # don't reuse old work requests
242 assert not getattr(request, 'exception', None)
243 self._requests_queue.put(request, block, timeout)
244 self.workRequests[request.requestID] = request # id request
245
246 def poll(self, block=False):# ,
247 """Process any new results in the queue."""
248 while True:
249 # still results pending?
250 if not self.workRequests: #
251 raise NoResultsPending
252 # are there still workers to process remaining requests?
253 elif block and not self.workers:#
254 raise NoWorkersAvailable
255 try:
256 # get back next results
257 request, result = self._results_queue.get(block=block)
258 # has an exception occured?
259 if request.exception and request.exc_callback:
260 request.exc_callback(request, result)
261 # hand results to callback, if any
262 if request.callback and not \
263 (request.exception and request.exc_callback):
264 request.callback(request, result)
265 del self.workRequests[request.requestID]
266 except Queue.Empty:
267 break
268
269 def wait(self):
270 """Wait for results, blocking until all have arrived."""
271 while 1:
272 try:
273 self.poll(True)
274 except NoResultsPending:
275 break
3つのクラスがあります
まず、スレッドプールスケジューリングThreadPoolインスタンス(パラメータに基づいて複数のスレッドworksを生成)を作成し、makeRequestsで複数の異なるパラメータを持つタスクリクエストworkRequestを作成し、タスクリクエストをputRequestでスレッドプールのタスクキューに配置する必要があります.このとき、スレッドworkThreadはタスクcallableを取得します.そして処理を行って結果を得,結果キューに格納する.callbackが存在する場合、結果に対して関数が呼び出されます.
注意:結果キューの要素はメタグループ(request,result)であり、これで一つ一つ対応します.
私の次の文章では、爬虫類について、スレッドプールを使用して爬虫類の爬虫類の効率を強化することを試みます.