Pythonはスレッドの代わりにジェネレータを使う方法です。


問題
システムスレッドの代わりにジェネレータ(協働)を使って合併を実現したいです。これはユーザ級スレッドや緑色スレッドと呼ばれることもある。
ソリューション
ジェネレータを使って自分の合併を実現するには、まずジェネレータ関数とyield文について深く理解してください。yield文は一つのジェネレータに実行を保留させます。このようにスケジューラを作成して、ジェネレータをある種の「タスク」として認識し、タスク連携スイッチを使って実行を交替します。このような考えを示すには、次の2つの簡単なyield文を使用するジェネレータ関数を考えます。

# Two simple generator functions
def countdown(n):
  while n > 0:
    print('T-minus', n)
    yield
    n -= 1
  print('Blastoff!')

def countup(n):
  x = 0
  while x < n:
    print('Counting up', x)
    yield
    x += 1
これらの関数は内部でyield文を使います。以下は簡単なタスクスケジューラを実現するコードです。

from collections import deque

class TaskScheduler:
  def __init__(self):
    self._task_queue = deque()

  def new_task(self, task):
    '''
    Admit a newly started task to the scheduler
    '''
    self._task_queue.append(task)

  def run(self):
    '''
    Run until there are no more tasks
    '''
    while self._task_queue:
      task = self._task_queue.popleft()
      try:
        # Run until the next yield statement
        next(task)
        self._task_queue.append(task)
      except StopIteration:
        # Generator is no longer executing
        pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()
Task Scheduler類は、あるサイクルでジェネレータセットを実行します。それぞれがyield文に触れるまで動作します。この例を実行します。出力は以下の通りです。
T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2

これまで、私たちは実際に「オペレーティングシステム」の最小核心部分を実現しました。ジェネレータ関数はタスクであり、yield文はタスクが保留されている信号です。スケジューラは、タスクが実行されるまでループチェックします。
実際には、生成器を使って簡単な合併を実現したいかもしれません。actorやネットワークサーバを実現する際には、スレッドの代わりにジェネレータを使ってもいいです。
次のコードは、生成器を用いてスレッドに依存しないactorを実現することを実証する。

from collections import deque

class ActorScheduler:
  def __init__(self):
    self._actors = {}     # Mapping of names to actors
    self._msg_queue = deque()  # Message queue

  def new_actor(self, name, actor):
    '''
    Admit a newly started actor to the scheduler and give it a name
    '''
    self._msg_queue.append((actor,None))
    self._actors[name] = actor

  def send(self, name, msg):
    '''
    Send a message to a named actor
    '''
    actor = self._actors.get(name)
    if actor:
      self._msg_queue.append((actor,msg))

  def run(self):
    '''
    Run as long as there are pending messages.
    '''
    while self._msg_queue:
      actor, msg = self._msg_queue.popleft()
      try:
         actor.send(msg)
      except StopIteration:
         pass

# Example use
if __name__ == '__main__':
  def printer():
    while True:
      msg = yield
      print('Got:', msg)

  def counter(sched):
    while True:
      # Receive the current count
      n = yield
      if n == 0:
        break
      # Send to the printer task
      sched.send('printer', n)
      # Send the next count to the counter task (recursive)
      sched.send('counter', n-1)

  sched = ActorScheduler()
  # Create the initial actors
  sched.new_actor('printer', printer())
  sched.new_actor('counter', counter(sched))

  # Send an initial message to the counter to initiate
  sched.send('counter', 10000)
  sched.run()
このコードを完全に理解するにはもっと深く勉強する必要がありますが、肝心な点は情報収集の行列です。本質的には、スケジューラは、送信すべきメッセージがあるときには常に実行しています。カウント生成器は、自分にメッセージを送り、再帰的ループで終了する。
以下は、より高度な例であり、生成器を使用した同時ネットワークアプリケーションの実現を実証する。

from collections import deque
from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
  def handle_yield(self, sched, task):
    pass

  def handle_resume(self, sched, task):
    pass

# Task Scheduler
class Scheduler:
  def __init__(self):
    self._numtasks = 0    # Total num of tasks
    self._ready = deque()  # Tasks ready to run
    self._read_waiting = {} # Tasks waiting to read
    self._write_waiting = {} # Tasks waiting to write

  # Poll for I/O events and restart waiting tasks
  def _iopoll(self):
    rset,wset,eset = select(self._read_waiting,
                self._write_waiting,[])
    for r in rset:
      evt, task = self._read_waiting.pop(r)
      evt.handle_resume(self, task)
    for w in wset:
      evt, task = self._write_waiting.pop(w)
      evt.handle_resume(self, task)

  def new(self,task):
    '''
    Add a newly started task to the scheduler
    '''
    self._ready.append((task, None))
    self._numtasks += 1

  def add_ready(self, task, msg=None):
    '''
    Append an already started task to the ready queue.
    msg is what to send into the task when it resumes.
    '''
    self._ready.append((task, msg))

  # Add a task to the reading set
  def _read_wait(self, fileno, evt, task):
    self._read_waiting[fileno] = (evt, task)

  # Add a task to the write set
  def _write_wait(self, fileno, evt, task):
    self._write_waiting[fileno] = (evt, task)

  def run(self):
    '''
    Run the task scheduler until there are no tasks
    '''
    while self._numtasks:
       if not self._ready:
         self._iopoll()
       task, msg = self._ready.popleft()
       try:
         # Run the coroutine to the next yield
         r = task.send(msg)
         if isinstance(r, YieldEvent):
           r.handle_yield(self, task)
         else:
           raise RuntimeError('unrecognized yield event')
       except StopIteration:
         self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
  def __init__(self, sock, nbytes):
    self.sock = sock
    self.nbytes = nbytes
  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)
  def handle_resume(self, sched, task):
    data = self.sock.recv(self.nbytes)
    sched.add_ready(task, data)

class WriteSocket(YieldEvent):
  def __init__(self, sock, data):
    self.sock = sock
    self.data = data

  def handle_yield(self, sched, task):
    sched._write_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    nsent = self.sock.send(self.data)
    sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
  def __init__(self, sock):
    self.sock = sock

  def handle_yield(self, sched, task):
    sched._read_wait(self.sock.fileno(), self, task)

  def handle_resume(self, sched, task):
    r = self.sock.accept()
    sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
  def __init__(self, sock):
    self._sock = sock

  def recv(self, maxbytes):
    return ReadSocket(self._sock, maxbytes)

  def send(self, data):
    return WriteSocket(self._sock, data)

  def accept(self):
    return AcceptSocket(self._sock)

  def __getattr__(self, name):
    return getattr(self._sock, name)

if __name__ == '__main__':
  from socket import socket, AF_INET, SOCK_STREAM
  import time

  # Example of a function involving generators. This should
  # be called using line = yield from readline(sock)
  def readline(sock):
    chars = []
    while True:
      c = yield sock.recv(1)
      if not c:
        break
      chars.append(c)
      if c == b'
': break return b''.join(chars) # Echo server using generators class EchoServer: def __init__(self,addr,sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self,addr): s = Socket(socket(AF_INET,SOCK_STREAM)) s.bind(addr) s.listen(5) while True: c,a = yield s.accept() print('Got connection from ', a) self.sched.new(self.client_handler(Socket(c))) def client_handler(self,client): while True: line = yield from readline(client) if not line: break line = b'GOT:' + line while line: nsent = yield client.send(line) line = line[nsent:] client.close() print('Client closed') sched = Scheduler() EchoServer(('',16000),sched) sched.run()
このコードはちょっと複雑です。しかし、小型のオペレーティングシステムを実現しました。準備されたタスクのキューがあります。また、I/Oの休止によるタスク待ちエリアがあります。まだたくさんのスケジューラが待機列とI/O待ちエリアの間の移動を担当しています。
討論する
生成器に基づく連結フレームを構築する際には、より一般的なyield形式が使用されることが多い。

def some_generator():
  ...
  result = yield data
  ...
このような形式のyield文を使用する関数は、一般に「協働」と呼ばれています。スケジューラによって、yield文は一つのサイクルで処理されます。次のようにします。

f = some_generator()

# Initial result. Is None to start since nothing has been computed
result = None
while True:
  try:
    data = f.send(result)
    result = ... do some calculation ...
  except StopIteration:
    break
ここのロジックはちょっと複雑です。ただし、send() に伝えられた値は、yield文の目覚め時の戻り値を定義している。したがって、前のyieldデータに対する応答の中で結果を返したい場合、次のsend() 操作で戻ります。ジェネレータ関数が動作を開始した直後に、None値を送信すると、最初のyield文の前に並べられます。
送信値に加えて、1つの生成器の上でclose() 方法を実行してもよい。これはyield文を実行する時にGeneratorExit を投げ出して異常をもたらすので、実行を終了します。さらに設計すれば、1つのジェネレータはこの異常を捕捉してクリーンアップ動作を実行することができる。また、生成器のthrow() 方法を使用して、yield文実行時に任意の実行命令を生成することもできる。一つのタスクスケジューラは、動作するジェネレータでエラーを処理するために利用できます。
最後の例で使用される yield from 文は協働を実現するために使用され、他のジェネレータによってサブルーチンまたはプロセスとして呼び出されてもよい。本質的には制御権の透明性を新しい関数に伝達することである。普通のジェネレータではなく、yield from を使用して呼び出された関数はyield from 文の結果としての値を返します。yield fromに関するもっと多い情報はPEP 380で見つけることができます。
最後に、ジェネレータを使ってプログラムを作成すると、注意すべき点はやはり多くの欠点があります。特に、スレッドを得ることができないというメリットがあります。例えば、CPU依存またはI/Oブロッキングプログラムを実行すると、動作が完了するまでタスク全体が保留されます。この問題を解決するためには、別の実行可能なスレッドまたはプロセスに操作を任せるしかありません。もう一つの限界は、ほとんどのPythonライブラリが生成器ベーススレッドとうまく互換できないことである。この案を選ぶと、多くの標準ライブラリ関数を自分で書き換える必要があることが分かります。本節で言及した協働および関連技術の基礎的背景としてPEP 342協働と併発の面白い授業を見ることができます。
PEP 3156は、協働プロセスを使用する非同期I/Oモデルも同様である。特に、下のスケジューラを自分で実現することはできません。しかし、协程についての考えは多くの流行库の基础であり、gevent、greenlet、Stocless Python及びその他の类似工事を含みます。
以上はPythonがスレッドの代わりにジェネレータを使う方法の詳細です。Pythonジェネレータがスレッドの代わりになる資料については他の関連記事に注目してください。