Programming Ruby(読書ノート)-12章(Fibers,スレッド,プロセス)

9631 ワード

スレッドとプロセスは、プログラムが並列状態で動作することができます.Fiberは実行中の一部を保留し,さらに別の部分を実行する.
12.1 Fibers
Ruby 1.9後に導入した.Fiberは、プログラム間のコラボレーションメカニズム(coroutine mechanism)を理解することができる.マルチスレッドプログラムのようにプログラミングしますが、マルチスレッドの問題を参照しません.次の例では、テキストを読み取り、各単語の出現回数を計算します.
#       
counts = Hash.new(0)
File.foreach("testfile") do |line|
  line.scan(/\w+/) do |word|
    word = word.downcase
    counts[word] += 1
  end
end
counts.keys.sort.each {|k| print "#{k}:#{counts[k]}"}

produces:
and:1 is:3 line:3 on:1 one:1 so:1 this:3 three:1 two:1

上のコードは少し混乱していて、スキャン行から単語を取り出す論理と単語の出現回数を計算する論理を混同しています.テキストを読み取り、単語を取得し、yieldの各単語を統計ブロックに渡す方法を書く必要があります.Fiberを使う書き方は以下の通りです.
words = Fiber.new do
  File.foreach("testfile") do |line|
    line.scan(/\w+/) do |word|
      Fiber.yield word.downcase
    end
  end
  nil
end

counts = Hash.new(0)
while word = words.resume
  counts[word] += 1
end
counts.keys.sort.each {|k| print "#{k}:#{counts[k]}"}

Fiberの構築方法はコードブロックを受信し,Fiberオブジェクトを返す.コードブロック内のコードはすぐに実行されません.Fiber#resumeを呼び出すと、コードブロック内のコードが実行-ファイルを読み取り、単語を解析し、完了すると、Fiberを呼び出す.yieldの場合、Rubyは現在の実行を保留し、Fiber#resumeを呼び出すコードに制御権を渡し、resumeメソッドはFiber#yieldが入力した値を返します.ループ(foreach)が完了したらnilに戻ります.こちらのwhileも止まります.
Fiber.resumeのメソッドはメソッドを呼び出すのと同じです.nilに戻ると、Fiberを呼び出そうとします.resumeの場合、RubyはFiberErrorを放出します.
 
Fiberはシーケンス番号値の製造によく用いられ、以下の例では2で割り切れるが3で割り切れない数を生成するために用いられる.
twos = Fiber.new do
  num = 2
  loop do
    Fiber.yield(num) unless num %3 ==0
    num += 2
  end
end
10.times { print twos.resume, " " }
produces:
2 4 8 10 14 16 20 22 26 28

Fiberはオブジェクトで、パラメータを伝達し、変数に割り当てることができますが、Fiberは作成したスレッドでのみ実行できます.
 
Fibers,Coroutines,and Continuations
オリジナルのfiberには制限があります.fiberはresumeに呼び出されたコードのみを返すことを許可します.Rubyにはこの挙動を拡張する2つの基本パッケージがある.ロードされると、fiberにはtransferメソッドがあり、transferを他のfiberに制御できます.
------------------
A related but more general mechanism is the continuation. A continuation is a way of
recording the state of your running program (where it is, the current binding, and so on)
and then resuming from that state at some point in the future. You can use continuations to
implementcoroutines(andothernewcontrolstructures).Continuationshavealsobeenused
to store the state of a running web application between requests—a continuation is created
when the application sends a response to the browser; then, when the next request arrives
from that browser, the continuation is invoked, and the application continues from where
it left off. You enable continuations in Ruby by requiring the continuation library, described
in the library section on page 739.
--------------------
12.2マルチスレッド
 Ruby1.8以降、スレッドはオペレーティングシステムレベルではなく、Ruby解釈器(interpreter)だけが複数のスレッド切り替えを行う.1.9から、スレッドはオペレーティングシステムレベルです.多核にとっては利益だ.しかし、古いコードと互換性が必要なため、Rubyは実際にオペレーティングシステム上のスレッドが現在も1つしかなく、本当の意味でのマルチスレッドではありません.
スレッドの作成
require 'net/http'

pages = %w(www.rubycentral.org slashdot.org)

threads = pages.map do |page_to_fetch|
  Thread.new(page_to_fetch) do |url|
    http = Net::HTTP.new(url, 80)
    print "Fetching: #{url}
" resp = http.get('/') print "Got #{url}: #{resp.message}
" end end threads.each {|thr| thr.join} produces: Fetching: www.rubycentral.org Fetching: slashdot.org Got slashdot.org:OK Got www.rubycentral.org:OK #Thread.new(...) , 。 page_to_fetch, , Thread 。 , print puts, puts , , 。 print 。

 
Manipulating Threads
Thread#joinは実行完了を待っており、時間パラメータを受信することができる.タイムアウト後にnilに戻ると
Thread#valueは、スレッドが実行する最後の式の値を返します.
Thread#currentは現在のスレッドを返します
Thread#listは、停止したスレッドを含む現在のすべてのスレッドを返します.
Thread#alive? 生き残るかどうか
Thread#statusステータス
Thread#priority=優先線の設定
 
Thread variablesスレッド変数
スレッドオブジェクトはHashとして扱い,[]=書き込み,[]を用いて取る.
count = 0
threads = 10.times.map |i|
  Thread.new do
    sleep(rand(0.))
    Thread.current[:mycount] = count
    count += 1
  end
end
threads.each {|thr| thr.join; print thr[:mycount], "  "}
puts "count = #{count}"
           (count race condition)

 
Threads and Exceptions
スレッドが未処理の例外を投げ出すと、Rubyはどのように処理し、2つの構成項目に依存します:abort_on_Exception flagとinterpreter’s$DEBUG flag.
abort_on_Exception=false、$DEBUGはnot enabledです.例外が放出されるとスレッドが終了します.デフォルトはこうです.
joinが異常なスレッドを投げ出すと、呼び出し元で再びこの異常がraiseされます.
threads = 4.times.map do |i|
  Thread.new(i) do |i|
    raise "Boom" if i == 1
    print "#{i}
" end end puts "waiting" threads.each do |t| begin t.join rescue RuntimeError => e print "Failed: #{e.message}" end end puts "Done" produces: waiting 0 2 3 Failed: BoomDone

でもabort_on_Exceptionをtrueに設定したり、-dを使用してdebugモードを開設したりすると、スレッドが異常を投げ出すとメインスレッドが終了します.
Thread.abort_on_exception = true
threads = 4.times.map do |number|
......
produces:
0
2
prog.rb:4:in `block (2 levels) in <main>': Boom! (RuntimeError)

 
12.3制御スレッドのスケジュール
正しい設計は、スレッドが自分のことだけをして、他のスレッドとは関係ありません.例えば、スレッド間で互いに待つ必要があるのはよくありません.しかし、Rubyはこのような方法をサポートしています.
Thread#stop
Thread#run
Thread#passはjavaのThreadに似ています.yield
 
12.4 Mutual Exclusion反発
synchronizedと同様にRubyはMutexを提供した.
sum = 0
mutex = Mutex.new
threads = 10.times.map do
  Thread.new do
    100_000.times do
      mutex.lock
      new_value = sum + 1
      print "#{new_value}   " if new_value % 250_000 == 0
      sum = new_value
      mutex_unlock
    end
  end
end
threads.each(&:join)
puts "
sum = #{sum}" produces: 250000 500000 750000 1000000 sum = 1000000

Rubyは、同期実行が必要なブロックコードを受信するMutex#synchronizeメソッドを提供します.
...
mutex = Mutex.new
....
  mutex.synchronize do
  ....
  end
...

 Mutex#try_lockは、ロックを取得しようとしますが、取得できない場合(他のスレッドによって取得され、解放されていない場合はfalseに戻ります).
次の例:Mutex#try_の使用lock
rate_mutex = Mutex.new
exchange_rates = ExchangeRates.new
exchange_rates.update_from_online_feed

Thread.new do
  loop do
    sleep 3600
    rate_mutex.synchronize do
      exchange_rates.update_fro_online_feed
    end
  end
end

loop do
  print "Enter currency code and amount: "
  line = gets
  if rate_mutex.try_lock
    puts(exchange_rates.convert(line)) ensure rate_mutex.unlock
  else
    puts "Sorry, rates being updated. Try again in a minute"
  end
end

ロックの取得中に一時的にロックを解除し、他のスレッドが実行できるようにする場合は、Mutex#sleepを使用します.この後ろには時間があります.
mutex.sleep 3600
...

 
Queues and Condition variables
RubyパケットはQueueとスレッド間のCondition変数を提供する.
 
12.5 Running Multiple Processesマルチプロセスの実行
マルチCPUを利用したい場合や、Rubyで作成されていないプログラムをRubyで起動したい場合は、マルチプロセスメカニズムを使用します.
 
Spawning New Processesが新しいプロセスを生成
system("tar xzf test.tgz")
`date`

簡単な方法:Object#systemを使用するか、逆引用符を使用します.
Object#system:サブプロセスでコマンドを実行します.コマンドが見つかり、正常に実行された場合はtrueを返します.コマンドが見つからない場合はraiseに異常があり、コマンドが見つかったが実行エラーが発生した場合はfalseに戻り、エラーが発生した後、$?をクリックしてexitコードを取得します.
しかしシステム()はプログラムの出力内容を返さない.
`command`はプログラムの出力内容を返すことができますが、String#chompを使用して行末記号を削除する必要があることに注意してください.
 
Rubyプロセスがサブプロセスと通信する必要がある場合、IOを使用することができる.popen.この方法を使用すると、Rubyはこのコマンドをサブプロセスで実行し、メインプロセスとサブプロセス間のパイプを構築します.サブプロセスの標準入力と出力をRubyのIO objectに接続します.
pig = IO.popen("local/util/pig", "w+")
pig.puts "ice cream after they go to bed"
pig.close_write
puts pig.gets
produces:
iceway eamcray afterway eythay ogay otay edbay

 
Independent children
サブプロセスの実行は、メインプロセスをブロックしません.
exec("sort testfile > output.txt") if fork.nil?
...     
Process.wait

Object#forkはプロセスIDを返しますが、サブプロセスではnilを返します.
..................
 
Blocks and Subprocessesブロックとサブプロセス
IO.popenがブロックコードを入力場合、Fileと同様である.Openのロジック.
IO.popen("date") {|f| puts "Date is #{f.gets}" }
produces:
Date is Mon May 27 12:31:17 CDT 2013

IOオブジェクトは、ブロック実行が完了すると自動的に閉じます.
 
 
fork do
  puts "In child, pid = #$$"
  exit 99
end

pid = Process.wait
puts "Child terminated, pid = #{pid}, status = #{$?.exitstatus}"
produces:
In child, pid = 22033
Child terminated, pid = 22033, status = 99
#$$     ID
windows   。