F#における非同期およびパラレルモード(3-下):エージェントのさらなる使用

13798 ワード

このシリーズの第3部では、F#の軽量レベル、インタラクティブなエージェント、および独立した内部状態を含むエージェントに関するいくつかのモードを探索します.(訳注:原文の内容が多いため、訳文は2つのセグメントに分割して行われる.前半ではエージェントの基本的な使用方法について議論し、後半ではエージェントの使用におけるさらなるパターンについて議論する.)
メッセージと結合タイプ
メッセージのタイプとしてユニオンタイプ(Union Type)を使用することが多い.たとえば、シミュレーションエンジンで次のメッセージを使用するエージェントベースのDirectXの例を示します.
type Message =
    | PleaseTakeOneStep
    | PleaseAddOneBall of Ball 

シミュレーションエンジンのエージェント:
let simulationEngine =
    Agent.Start(fun inbox ->
        async { while true do
                    // Wait for a message
                    let! msg = inbox.Receive() 

                    // Process a message
                    match msg with
                    | PleaseTakeOneStep -> state.Transform moveBalls
                    | PleaseAddOneBall ball -> state.AddObject ball  }) 

多くの場合、強いタイプのメッセージを使用するのは良い方法です.ただし、他のメッセージ・メカニズムと連携する必要がある場合は、objやstringなどの汎用化されたメッセージ・タイプを使用する心配はありません.この場合、エージェントは実行時にタイプ判断や変換を行うだけです.
パラメトリックエージェントおよび抽象エージェント
エージェントはF#符号化の1つの設計モードにすぎない.これは、パラメータ化、抽象化、コードクリップの再利用など、F#でよく使われるテクニックをエージェントと一緒に使用できることを意味します.たとえば、以前のserveQuoteStream関数をパラメータ化して、各株式メッセージ転送の間隔を指定できます.
open System.Net.Sockets 

/// serve up a stream of quotes
let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {
    let stream = client.GetStream()
    while true do
        do! stream.AsyncWrite( "AAPL 439.2"B )
        do! Async.Sleep periodMilliseconds
} 

これは、株式サーバ内の異なるリクエストが異なる長さの間隔を持つことを意味します.
これと同様に、関数パラメータを使用して、プロキシクラス全体の機能を抽象化できます.
let iteratingAgent job =
   Agent.Start(fun inbox ->
     async { while true do
               let! msg = inbox.Receive()
               do! job msg }) 

let foldingAgent job initialState =
   Agent.Start(fun inbox ->
     let rec loop state = async {
         let! msg = inbox.Receive()
         let! state = job state msg
         return! loop state
       }
     loop initialState)

最初の関数は次のように使用できます.
let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'"  msg }) 

2つ目:
let agent2 =
    foldingAgent (fun state msg ->
        async { if state % 1000 = 0 then printfn "count = '%d'" msg;
                return state + 1 }) 0 

エージェントから結果を返す
以降の記事では、実行中のエージェントの結果の一部にアクセスするテクニックについて説明します.たとえば、MailboxProcessorエージェントごとにPostAndAsyncReplyメソッドを使用することができます.このようなテクニックは、ネットワーク通信エージェントを作成する際に特に重要です.
しかし、このようなやり方は多くの場合、GUIのような監視環境に結果を報告する必要があるかもしれません.一部の結果を報告する簡単な方法の一つは、前に2番目の文章で議論した設計モデルである.次の例では、1000個のメッセージをサンプリングし、GUIまたは他の管理スレッドに配布するエージェントを作成します(2番目の記事でSynchronizationContextの2つの拡張方法CaptureCurrentとRaiseEventが使用されていることに注意してください).
// Receive messages and raise an event on each 1000th message 
type SamplingAgent() = 
    // The event that is raised 
    // Capture the synchronization context to allow us to raise events 
    // back on the GUI thread 
    let syncContext = SynchronizationContext.CaptureCurrent() 

    // The internal mailbox processor agent 
    let agent = 
        new MailboxProcessor<_>(fun inbox -> 
            async { let count = ref 0 
                    while true do 
                        let! msg = inbox.Receive() 
                        incr count 
                        if !count % 1000 = 0 then 
                            syncContext.RaiseEvent sample msg }) 

    /// Post a message to the agent 
    member x.Post msg = agent.Post msg 

    /// Start the agent 
    member x.Start () = agent.Start() 

    /// Raised every 1000'th message 
    member x.Sample = sample.Publish

エージェントは次のように使用できます.
let agent = SamplingAgent() 

agent.Sample.Add (fun s -> printfn "sample: %s" s) 
agent.Start() 

for i = 0 to 10000 do 
   agent.Post (sprintf "message %d" i) 

予想通り、agentのメッセージサンプリングが報告されます.
sample: message 999 
sample: message 1999 
sample: message 2999 
sample: message 3999 
sample: message 4999 
sample: message 5999 
sample: message 6999 
sample: message 7999 
sample: message 8999 
sample: message 9999

エージェントおよびエラー
私たちは間違いや異常を避けることができません.良好なエラー検出,報告および記録の措置はエージェントプログラミングの基本要素に基づいている.F#のメモリエージェント(MailboxProcessor)でエラーを検出および転送する方法を見てみましょう.
まず、F#非同期エージェントの不思議な点は、異常がasync{...}複数の非同期待機およびI/O操作を越えても、自動的にキャプチャおよび配布されます.async{...}でtry/with,try/finallyおよびuseキーを使用して、例外を取得または解放します.これは、取得されていないエラーをエージェントで処理するだけでよいことを意味します.
MailboxProcessorエージェントに未キャプチャの例外が発生すると、Errorイベントがトリガーされます.一般的なパターンは、次のような監視プロセスにすべてのエラーを転送することです.
type Agent = MailboxProcessor 

let supervisor = 
   Agent.Start(fun inbox -> 
     async { while true do 
               let! err = inbox.Receive() 
               printfn "an error occurred in an agent: %A" err }) 

let agent = 
   new Agent(fun inbox -> 
     async { while true do 
               let! msg = inbox.Receive() 
               if msg % 1000 = 0 then 
                   failwith "I don't like that cookie!" }) 

agent.Error.Add(fun error -> supervisor.Post error) 
agent.Start() 

これらの構成操作を簡単に並列化することもできます.
let agent = 
   new Agent(fun inbox -> 
     async { while true do 
               let! msg = inbox.Receive() 
               if msg % 1000 = 0 then 
                   failwith "I don't like that cookie!" }) 
   |> Agent.reportErrorsTo supervisor 
   |> Agent.start 

または補助モジュールを使用します.
module Agent = 
   let reportErrorsTo (supervisor: Agent) (agent: Agent<_>) = 
       agent.Error.Add(fun error -> supervisor.Post error); agent

   let start (agent: Agent<_>) = agent.Start(); agent 

次の例では、エラーが報告されるエージェントを10000個作成します.
let supervisor = 
   Agent.Start(fun inbox -> 
     async { while true do 
               let! (agentId, err) = inbox.Receive() 
               printfn "an error '%s' occurred in agent %d" err.Message agentId }) 

let agents = 
   [ for agentId in 0 .. 10000 -> 
        let agent = 
            new Agent(fun inbox -> 
               async { while true do 
                         let! msg = inbox.Receive() 
                         if msg.Contains("agent 99") then 
                             failwith "I don't like that cookie!" }) 
        agent.Error.Add(fun error -> supervisor.Post (agentId,error)) 
        agent.Start() 
        (agentId, agent) ]

メッセージを送信します.
for (agentId, agent) in agents do 
   agent.Post (sprintf "message to agent %d" agentId ) 

次のように表示されます.
an error 'I don't like that cookie!' occurred in agent 99 
an error 'I don't like that cookie!' occurred in agent 991 
an error 'I don't like that cookie!' occurred in agent 992 
an error 'I don't like that cookie!' occurred in agent 993 
... 
an error 'I don't like that cookie!' occurred in agent 9999 

このセクションでは、F#メモリのMailboxProcessorエージェントで発生したエラーについて説明します.他のエージェント(例えば、サーバ側の要求を表すエージェント)も、優雅なエラー転送および再試行を行うために設計およびアーキテクチャを行うことができる.
まとめ
分離エージェントは、デバイス駆動プログラミングからユーザインタフェース、分散プログラミング、高度に伸縮した通信サーバなど、さまざまなプログラミング分野で絶えず使用されている一般的なプログラミングモードです.オブジェクト、スレッド、または非同期ワークプログラムを作成するたびに、サウンドカードへのデータの送信、ネットワークからのデータの読み出し、入力に応答するイベントフローなどの長時間の通信を処理するために、エージェントを作成します.ASPを書くたびにNET Webハンドラの場合、実際には、呼び出しのたびにステータスをリセットする形式のエージェントも使用します.様々な場合において、通信に関連する状態を分離することは一般的なニーズである.
分離エージェントは、例えば、伸縮可能な要求サーバおよび分散プログラミングアルゴリズムを含む伸縮可能なプログラミングアルゴリズムを実現する最終的な実装である.他の様々な非同期および同時プログラミングモードと同様に、それらも乱用されてはいけない.しかし、彼らは優雅で強力で効率的な技術であり、非常に広く使用されています.
F#は、Visual Studio 2010とともに登場する管理言語で、軽量級の非同期コンピューティングとメモリ種を完全にサポートするエージェントです.F#では、非同期エージェントは、コールバック関数や制御反転などを用いずに組み合わせて記述できます(個人的には、実際にはF#のこのようなやり方は優雅な制御反転だと思います).ここにはいくつかのバランスのとれたところがあります.例えば、後の文章では、どのように使うかを観察します.NETクラスライブラリの標準的なAPMモードで、エージェントを解放します.しかしながら、制御が容易で伸縮性が強い、必要に応じてCPUとI/Oの並列動作を組織するながら、CPU密集型コードを保持できるという利点も明らかである.NETでの完全なパフォーマンス.
もちろん、他にもあります.NETまたはJVMベースの言語は軽量レベルのインタラクティブエージェントをサポートしている--以前、これはNETはスレッドのコストが非常に高いため、「不可能」なことです.現在、F#は2007年に「async{...}」を導入しており、これは言語設計の突破と見なされています.これにより、プログラマーは業界で広く認められているプログラミングプラットフォーム上で軽量レベル、組み合わせ式の非同期プログラミング、インタラクティブなエージェントを構築することができます.Axum言語のプロトタイプ(F#の影響も受けている)に加えて、F#は非同期言語の特性が完全に実行可能な方法であることを証明し、現在の業界のランタイムシステム設計分野の議論の話題を解放しました.スレッドを軽くするかどうか.
F#非同期プログラミングは「プリエンプト」の実現と考えられ,これまでにも先駆的な様々な投入があった.例えばOCaml delimited continuations,Haskell embedding of monadic concurrencyおよび並列占有の重要性を強調した様々な論文.
はい、どうぞ.NET 2.0、3.5、4.0、Linux/Mono/Mac、Silverlightで非同期エージェントを使用します.また、WebSharperプラットフォームを使用して、F#の非同期プログラミングモデルをJavaScriptに翻訳して実行することもできます.楽しんでね!
译文:Async and Parallel Design Patterns in F#:Agents