PythonのTwistedフレームワークを使って、渋滞していないプログラムを作成するコードの例
まずコードを見てください。
前に述べた処理の操作は他のどこかにあります。ここで説明しますと、演算はCPU内で行われていますが、現在のCPUの処理速度はディスクやネットワークよりも速いです。CPUにデータを提供したり、CPUからメモリや他のCPUにデータを送信するのに時間がかかります。非ブロッキング動作を使用して、この態様の時間を節約しました。例えば、task.deferLaterは、データの転送が完了したときにアクティブになります。
もう一つの重要な点は出力中のGoodmoning from Twisted developerとBye from Twisted developerです。メッセージ。コードの実行開始時にすでにこの2つの情報が印刷されました。コードがこんなに早くこの場所に実行されたら、私たちのアプリケーションが本当に起動されるのはいつですか?答えは、Twistedアプリケーション(Scarapyを含む)にとってはreactor.runで実行されます。この方法を呼び出す前に、アプリケーションで使用可能な各Deferredチェーンを準備しなければならない。その後、reactor.run()方法は、フィードバック関数を監視してアクティブにする。
注意してください。reactorの主なルールは、十分に速く、ブロックされていない限り、どのような操作も実行できます。
今はいいです。コードの中でマルチスレッドを管理する部分はあまりないですが、これらのコールバック関数はまだ乱雑に見えます。このように修正できます。
今唯一の問題は、15人の消費者だけではなく、10000人の消費者がいる時又はどうですか?このコードは同時に10000個の同時実行のシーケンス(例えばHTTP要求、データベースの書き込み操作など)を開始します。このようにすれば大丈夫かもしれませんが、さまざまな失敗が生じるかもしれません。大きな同時要求があるアプリケーションでは、例えばSrapyは、合併の数を許容できる程度に制限する必要があります。次の例では、task.Coooperator()を使ってこのような機能を完成します。Scarapyは、そのItem Pipelineにおいても同じ機構を使用して、併発の数(すなわち、CONCURRENT_)を制限している。ITEMS設定):
PS:deferToThreadは同期関数をブロックしないようにします。
wistedのdefer.Deferredはdeferredオブジェクトに戻ります。
注:deferToThreadはスレッドを使って実現されていますので、使いすぎはおすすめできません。
***同期関数を非同期にする(Deferredを返す)***
twistedのdeferToThread(from twisted.internet.threads import deferToThread)もdeferredオブジェクトに戻りますが、コールバック関数は別のスレッドで処理されています。主にデータベース/ファイル読み込み操作に使用されます。
# ~*~ Twisted - A Python tale ~*~
from time import sleep
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
# Our hosting company Threads Ltd. is bad. I start installation and...
print "Start installation for", customer
# ...then wait till the installation finishes successfully. It is
# boring and I'm spending most of my time waiting while consuming
# resources (memory and some CPU cycles). It's because the process
# is *blocking*.
sleep(3)
print "All done for", customer
# I do this all day long for our customers
def developer_day(customers):
for customer in customers:
install_wordpress(customer)
developer_day(["Bill", "Elon", "Steve", "Mark"])
運転してみたら、次のような結果が出ました。
$ ./deferreds.py 1
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds
これは一定の順序で実行されるコードです。4人の消費者が一人でインストールするには3秒の時間が必要です。4人は12秒です。このような処理はあまり満足できませんので、スレッドを使った二番目の例を見てください。
import threading
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
# But we now have to synchronize... a.k.a. bureaucracy
lock = threading.Lock()
#
def dev_day(id):
print "Goodmorning from developer", id
# Yuck - I hate locks...
lock.acquire()
while customers:
customer = customers.pop(0)
lock.release()
# My Python is less readable
install_wordpress(customer)
lock.acquire()
lock.release()
print "Bye from developer", id
# We go to work in the morning
devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
[dev.start() for dev in devs]
# We leave for the evening
[dev.join() for dev in devs]
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])
実行してください:
$ ./deferreds.py 2
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds
今回は並行して実行するコードで、作業スレッドを5つ使いました。15人の消費者が3 sを費やしているということは、全部で45 sの時間を意味していますが、5つのスレッドを使って並行して実行すると、合計9 sしかかかりません。このコードはちょっと複雑で、大部分のコードは合併を管理するために使われます。アルゴリズムや業務ロジックに集中するのではありません。また、プログラムの出力結果も混雑しているように見えます。可読性も天津市です。簡単なマルチスレッドのコードでもよく書けないので、Twistedを使うようになりました。
# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
# Twisted has a slightly different approach
def schedule_install(customer):
# They are calling us back when a Wordpress installation completes.
# They connected the caller recognition system with our CRM and
# we know exactly what a call is about and what has to be done next.
#
# We now design processes of what has to happen on certain events.
def schedule_install_wordpress():
def on_done():
print "Callback: Finished installation for", customer
print "Scheduling: Installation for", customer
return task.deferLater(reactor, 3, on_done)
#
def all_done(_):
print "All done for", customer
#
# For each customer, we schedule these processes on the CRM
# and that
# is all our chief-Twisted developer has to do
d = schedule_install_wordpress()
d.addCallback(all_done)
#
return d
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
print "Goodmorning from Twisted developer"
#
# Here's what has to be done today
work = [schedule_install(customer) for customer in customers]
# Turn off the lights when done
join = defer.DeferredList(work)
join.addCallback(lambda _: reactor.stop())
#
print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()
実行結果:
------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds
今回は完全な実行コードと読み取り可能性の高い出力結果を得たが,スレッドは使用されなかった。私たちは15の消費者を並行して処理しました。つまり、45 sを必要とする実行時間は3 s以内に完了しました。このコツは、ブロックのすべての対sleep()の呼び出しをTwistedの対等なtask.deferLater()とコールバック関数に変えます。現在処理されている操作は他のところで行われていますので、15人の消費者にサービスを同時にしてもいいです。前に述べた処理の操作は他のどこかにあります。ここで説明しますと、演算はCPU内で行われていますが、現在のCPUの処理速度はディスクやネットワークよりも速いです。CPUにデータを提供したり、CPUからメモリや他のCPUにデータを送信するのに時間がかかります。非ブロッキング動作を使用して、この態様の時間を節約しました。例えば、task.deferLaterは、データの転送が完了したときにアクティブになります。
もう一つの重要な点は出力中のGoodmoning from Twisted developerとBye from Twisted developerです。メッセージ。コードの実行開始時にすでにこの2つの情報が印刷されました。コードがこんなに早くこの場所に実行されたら、私たちのアプリケーションが本当に起動されるのはいつですか?答えは、Twistedアプリケーション(Scarapyを含む)にとってはreactor.runで実行されます。この方法を呼び出す前に、アプリケーションで使用可能な各Deferredチェーンを準備しなければならない。その後、reactor.run()方法は、フィードバック関数を監視してアクティブにする。
注意してください。reactorの主なルールは、十分に速く、ブロックされていない限り、どのような操作も実行できます。
今はいいです。コードの中でマルチスレッドを管理する部分はあまりないですが、これらのコールバック関数はまだ乱雑に見えます。このように修正できます。
# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
print "Scheduling: Installation for", customer
yield task.deferLater(reactor, 3, lambda: None)
print "Callback: Finished installation for", customer
print "All done for", customer
def twisted_developer_day(customers):
... same as previously but using inline_install() instead of schedule_install()
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
運転の結果は前の例と同じです。このコードの役割は前の例と同じですが、もっと簡潔で明瞭に見えます。inline CallbacksジェネレータはいくつかのPythonのメカニズムを使用してinline gauを作ることができます。install()関数が一時停止または再開されます。inlineinstall()関数はDeferredオブジェクトとなり、並行して消費者ごとに動作します。毎回yieldの時、運行は現在のinline_で中止されます。install()は例えば、yieldのDeferredオブジェクトが完了するまで運転を再開します。今唯一の問題は、15人の消費者だけではなく、10000人の消費者がいる時又はどうですか?このコードは同時に10000個の同時実行のシーケンス(例えばHTTP要求、データベースの書き込み操作など)を開始します。このようにすれば大丈夫かもしれませんが、さまざまな失敗が生じるかもしれません。大きな同時要求があるアプリケーションでは、例えばSrapyは、合併の数を許容できる程度に制限する必要があります。次の例では、task.Coooperator()を使ってこのような機能を完成します。Scarapyは、そのItem Pipelineにおいても同じ機構を使用して、併発の数(すなわち、CONCURRENT_)を制限している。ITEMS設定):
@defer.inlineCallbacks
def inline_install(customer):
... same as above
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
print "Goodmorning from Twisted developer"
work = (inline_install(customer) for customer in customers)
#
# We use the Cooperator mechanism to make the secretary not
# service more than 5 customers simultaneously.
coop = task.Cooperator()
join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
#
join.addCallback(lambda _: reactor.stop())
print "Bye from Twisted developer!"
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~
実行結果:
$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds
上の出力から見ると、プログラムが動いている時には5つの処理用の消費者の溝があるようです。一つの溝が空いていない限り、次の消費者の要求を処理し始めることはありません。本例では処理時間は3秒であるため、5ロットずつ処理しているように見える。最後に得られた性能はスレッドを使っているのと同じですが、今回はスレッドが一つしかなく、コードもより簡潔で正確なコードが書きやすいです。PS:deferToThreadは同期関数をブロックしないようにします。
wistedのdefer.Deferredはdeferredオブジェクトに戻ります。
注:deferToThreadはスレッドを使って実現されていますので、使いすぎはおすすめできません。
***同期関数を非同期にする(Deferredを返す)***
twistedのdeferToThread(from twisted.internet.threads import deferToThread)もdeferredオブジェクトに戻りますが、コールバック関数は別のスレッドで処理されています。主にデータベース/ファイル読み込み操作に使用されます。
..
#
def dataReceived(self, data):
now = int(time.time())
for ftype, data in self.fpcodec.feed(data):
if ftype == 'oob':
self.msg('OOB:', repr(data))
elif ftype == 0x81: # ( , gps , )
self.msg('FP.PONG:', repr(data))
else:
self.msg('TODO:', (ftype, data))
d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
d.addCallback(self._doResult, extra)
ここの完全な例を参考にしてください。
# -*- coding: utf-8 -*-
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
import functools
import time
#
def mySleep(timeout):
time.sleep(timeout)
# callback
return 3
def say(result):
print " , ", result
# functools.partial ,
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
print " , "
reactor.run()