WCF拡張実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel(2016-03-15 12:35)

13561 ワード

これはこのシリーズの第3編で、その他の文章は以下のディレクトリをクリックしてください
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
 
RequestChannelに比べて、ReplyChannelは複雑です.
1 zmqのrepノードを起動する
まずOnOpenメソッドを再ロードし、zmqのrepノードを起動し、主にcreateSocketメソッドとバインドアドレスを呼び出す必要があります.
protected override void OnOpen(TimeSpan timeout)
        {
            if (this.socket == null)
            {
                this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
                string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                string zmqServerAddress = "tcp://*" + trimIP;
                socket.Bind(zmqServerAddress);
            }
        }

2 ReceiveReqeustを実現し、contextを返す
RequestChannelと同様に、同期バージョンのReceiveRequestを実現します.この方法はIreplyChannelインタフェースの方法です.なぜインタフェースのメソッドはmessageではなくrequestContextを返すのでしょうか.なぜなら、WCFはrequestContextを受信すると、RequestContextに従って返信メッセージを送信できるからである.
        public RequestContext ReceiveRequest(TimeSpan timeout)
        {
            ThrowIfDisposedOrNotOpen();

            Message request = this.ReceiveMessage(timeout);
            return new ZMQRequestContext(this, request, timeout);
        }
ReceiveMessage , ZMQChannelBase 
        public Message ReceiveMessage(TimeSpan timeout)
        {
            base.ThrowIfDisposedOrNotOpen();
            byte[] replyData = bufferManager.TakeBuffer(1000);

            int replaySize = socket.Receive(replyData);
                
            Message response = encoder.ReadMessage(
                new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
            bufferManager.ReturnBuffer(replyData);
            return response;
        }

3同期バージョンのReceiveRequest実装後、非同期バージョンの
WCFはReplyChannelを使用してメッセージを受信し、デフォルトではBeginReceiveRequestが呼び出されます.
非同期バージョンの実装も使用する.NetのAMP非同期モード.呼び出しの手順は、下図のように上下に実行されます.リモートリクエストは、TryReceiveRequestAsyncResultインスタンスを返すReplyChannelのBeginTryReceiveRequestメソッドに最初にアクセスします.次に、BaseChannelで次のように実行する.SocketReceiveAsyncResultでZMQのsocketを実行する.Receive()メソッド.
                                                               
                                                                 |
ReplyChannel                                            BeginTryReceiveRequest
                                                                 |
TryReceiveRequestAsyncResult                                  new
                                                                 |
BaseChannel                                                BeginReceiveRequest
                                                                 |
ReceiveRequestAsyncResult                                      new
                                                                 |
BaseChannel                                                BeginReceiveMessage
                                         |
BaseChannel                                                BeginReadData
                                                                 |  
BaseChannel.SocketReceiveAsyncResult                        new                            
                                                                 |
                                                         socket.Receive
                                                                 |
BaseChannel                                                    EndReadData
                                                                 |
BaseChannel                                                EndReceiveMessage             

ステップは多く、ステップごとに意味があり、BeginTryReceiveRequestはまずリクエストメッセージを受け取り、タイムアウトを処理して異常を投げ出さないようにします.BeginReceiveRequestに転送し、ReceiveRequestAsyncResultオブジェクトを作成し、その構築でベースクラスのBeginReceiveMessageを呼び出します.ベースクラスのBeginReceiveMessageは純粋にコードの多重化のためである.BeginReadDataに転送し、SocketReceiveAsyncResultオブジェクトを作成します.Receive()は,非同期委任方式を用いて非同期を実現している.
 
4 zmqの同期制限を解決する
必要なインタフェースが実装されると、zmqクライアントの要求を受信するためにwcfサービスを開始することができる.メッセージを受信すると、socket.に再び実行されます.Receive()は、メッセージを再受信しようとしたときに異常が発生しました.例外は、「現在の状態ではノードがこの操作を実行できません」と表示されます.Zmqは初めてなのでZMQのメカニズムはよくわかりません.初めてsocketを実行します.Receive()はメッセージを正常に受信し、2回目にsocketを実行する.Receive()でエラーが発生します.私はまたzmqのdemoを見て、2番目のsocketにも実行しました.Receive()は、このような問題はありません.同じ時間のsocket状態を比較すると、次のことがわかります.
 
            
socket ,
    ReceiveStatus: Received
 , :
    SendStatus: None

       zmq demo
socket ,
    ReceiveStatus: Received
 
    SendStatus: Sent

zmqのREP socketはリクエストを受信しなければならず,再送後にリクエストを再受信することができない.私のプログラムではwcfのサービスが呼び出され、ずっとデバッグ状態だったので、タイムリーに戻らず、SendStatusがnoneになったので、再送信できませんでした.
この問題を解決するのも簡単で、ManualResetEventを使いました.メッセージを受信した後、ManualResetEventをreset状態にし、receive()の前にManualResetEventのWaitOne()を呼び出し、送信の返信を待つ.replyChannel送信が戻ってくると、すぐにManualResetEventをset状態にしてreceive()に実行します.
受信時
                    serviceHanledDone.WaitOne();
                    int receiveLength = socket.Receive(data1);
                    serviceHanledDone.Reset();
                    return receiveLength;

送信後、すぐにManualResetEventをsetにしてwaitoneを放行させます.
                    socket.Send(data);
                    serviceHanledDone.Set();

5 zmqキューサポートの追加
これにより、zmqBindingは、zmqクライアントの要求を受信し、正しく返すことができる.しかし、一度に1つのリクエストしか受信できないようで、返事を待ってから、次のリクエストを受信することができます.wcfの処理はすべて非同期であるが,zmqのrepノードがサービス側の処理能力を制限しているのに,どうして複数のリクエストを受信できるのか.zmqは「mq」と呼ぶ以上、キューの機能があります.zmqのマニュアルから,router-dealerはキューの機能を実現できることが分かった.私が使っているzmqバージョンはclrzmqですが、ネット上にはclrzmqがrouter-dealderを実現する例コードはありません.clrzmqのソースコードのテストコードでは,clrzmqのQueueDeviceクラスがzmqのrouter-dealerモードを実現していることを発見した.しかも使いやすいです.完全なzmq例は私の他の文章を参照してください.
ここで注意したいのは、QueueDeviceが最初に起動してからREPノードを起動することです.したがって、ManualResetEventオブジェクトも使用されます.QueueDecviceは新しいスレッドで起動し、起動後、REPノードに起動を通知します.コードは次のとおりです.
protected override void OnOpen(TimeSpan timeout)
        {
            if (this.socket == null)
            {
                startRouterDealer(this.zmqContext);


                _deviceReady.WaitOne();

                this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
                string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                string zmqServerAddress = "tcp://*" + trimIP;
                //socket.Bind(zmqServerAddress);
                socket.Connect("inproc://backend");
            }
        }
        protected override void OnClosing()
        {
            base.OnClosing();
        }
        private static void startRouterDealer(ZmqContext context)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), context);
        }
        private static void startQueueDeviceThread(object state)
        {
            ZmqContext context = state as ZmqContext;
            //Thread.Sleep(2000);
            using (QueueDevice queue = new QueueDevice(context,
                "tcp://*:5555",
                "inproc://backend",
                DeviceMode.Threaded))
            {
                queue.Initialize();
                _deviceReady.Set();
                queue.Start();
                while (true)
                {
                    Thread.Sleep(1000);
                }
            }


        }

これでZMQBindingのTransport部分が完成します.次の記事ではprotocolBufferメッセージの符号化とwcfでの符号化と復号化について説明します.