PythonのTwistedフレームワークを使って、渋滞していないプログラムを作成するコードの例

11592 ワード

まずコードを見てください。

# ~*~ Twisted - A Python tale ~*~

from time import sleep

# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer

# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)

developer_day(["Bill", "Elon", "Steve", "Mark"])

運転してみたら、次のような結果が出ました。

$ ./deferreds.py 1

------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds

これは一定の順序で実行されるコードです。4人の消費者が一人でインストールするには3秒の時間が必要です。4人は12秒です。このような処理はあまり満足できませんので、スレッドを使った二番目の例を見てください。

import threading

# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]

# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])

実行してください:

$ ./deferreds.py 2

------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds

今回は並行して実行するコードで、作業スレッドを5つ使いました。15人の消費者が3 sを費やしているということは、全部で45 sの時間を意味していますが、5つのスレッドを使って並行して実行すると、合計9 sしかかかりません。このコードはちょっと複雑で、大部分のコードは合併を管理するために使われます。アルゴリズムや業務ロジックに集中するのではありません。また、プログラムの出力結果も混雑しているように見えます。可読性も天津市です。簡単なマルチスレッドのコードでもよく書けないので、Twistedを使うようになりました。

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!

from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task

# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d

# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])

# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()

実行結果:

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds
今回は完全な実行コードと読み取り可能性の高い出力結果を得たが,スレッドは使用されなかった。私たちは15の消費者を並行して処理しました。つまり、45 sを必要とする実行時間は3 s以内に完了しました。このコツは、ブロックのすべての対sleep()の呼び出しをTwistedの対等なtask.deferLater()とコールバック関数に変えます。現在処理されている操作は他のところで行われていますので、15人の消費者にサービスを同時にしてもいいです。
前に述べた処理の操作は他のどこかにあります。ここで説明しますと、演算はCPU内で行われていますが、現在のCPUの処理速度はディスクやネットワークよりも速いです。CPUにデータを提供したり、CPUからメモリや他のCPUにデータを送信するのに時間がかかります。非ブロッキング動作を使用して、この態様の時間を節約しました。例えば、task.deferLaterは、データの転送が完了したときにアクティブになります。
もう一つの重要な点は出力中のGoodmoning from Twisted developerとBye from Twisted developerです。メッセージ。コードの実行開始時にすでにこの2つの情報が印刷されました。コードがこんなに早くこの場所に実行されたら、私たちのアプリケーションが本当に起動されるのはいつですか?答えは、Twistedアプリケーション(Scarapyを含む)にとってはreactor.runで実行されます。この方法を呼び出す前に、アプリケーションで使用可能な各Deferredチェーンを準備しなければならない。その後、reactor.run()方法は、フィードバック関数を監視してアクティブにする。
注意してください。reactorの主なルールは、十分に速く、ブロックされていない限り、どのような操作も実行できます。
今はいいです。コードの中でマルチスレッドを管理する部分はあまりないですが、これらのコールバック関数はまだ乱雑に見えます。このように修正できます。

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer

def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()

twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()

運転の結果は前の例と同じです。このコードの役割は前の例と同じですが、もっと簡潔で明瞭に見えます。inline CallbacksジェネレータはいくつかのPythonのメカニズムを使用してinline gauを作ることができます。install()関数が一時停止または再開されます。inlineinstall()関数はDeferredオブジェクトとなり、並行して消費者ごとに動作します。毎回yieldの時、運行は現在のinline_で中止されます。install()は例えば、yieldのDeferredオブジェクトが完了するまで運転を再開します。
今唯一の問題は、15人の消費者だけではなく、10000人の消費者がいる時又はどうですか?このコードは同時に10000個の同時実行のシーケンス(例えばHTTP要求、データベースの書き込み操作など)を開始します。このようにすれば大丈夫かもしれませんが、さまざまな失敗が生じるかもしれません。大きな同時要求があるアプリケーションでは、例えばSrapyは、合併の数を許容できる程度に制限する必要があります。次の例では、task.Coooperator()を使ってこのような機能を完成します。Scarapyは、そのItem Pipelineにおいても同じ機構を使用して、併発の数(すなわち、CONCURRENT_)を制限している。ITEMS設定):

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above

# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"

twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()

# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~

実行結果:

$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds
上の出力から見ると、プログラムが動いている時には5つの処理用の消費者の溝があるようです。一つの溝が空いていない限り、次の消費者の要求を処理し始めることはありません。本例では処理時間は3秒であるため、5ロットずつ処理しているように見える。最後に得られた性能はスレッドを使っているのと同じですが、今回はスレッドが一つしかなく、コードもより簡潔で正確なコードが書きやすいです。
PS:deferToThreadは同期関数をブロックしないようにします。
wistedのdefer.Deferredはdeferredオブジェクトに戻ります。
注:deferToThreadはスレッドを使って実現されていますので、使いすぎはおすすめできません。
***同期関数を非同期にする(Deferredを返す)***
twistedのdeferToThread(from twisted.internet.threads import deferToThread)もdeferredオブジェクトに戻りますが、コールバック関数は別のスレッドで処理されています。主にデータベース/ファイル読み込み操作に使用されます。

..

#     

  def dataReceived(self, data):
    now = int(time.time())

    for ftype, data in self.fpcodec.feed(data):
      if ftype == 'oob':
        self.msg('OOB:', repr(data))
      elif ftype == 0x81: #            (            ,  gps    ,           )
        self.msg('FP.PONG:', repr(data))
      else:
        self.msg('TODO:', (ftype, data))
      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
      d.addCallback(self._doResult, extra)

ここの完全な例を参考にしてください。

# -*- coding: utf-8 -*-

from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread

import functools
import time

#                
def mySleep(timeout):
  time.sleep(timeout)

  #          callback 
  return 3 

def say(result):
  print "       ,            ", result

#  functools.partial    ,       
cb = functools.partial(mySleep, 3)
d = deferToThread(cb) 
d.addCallback(say)

print "           ,   "

reactor.run()