WCF拡張実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)実装IRequestChannel(2016-03-15 12:35)

8768 ワード

これはこのシリーズの第2編で、その他の文章は以下のディレクトリをクリックしてください
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
 
今日から、カスタムZeroMQバインドとprotocolBufferメッセージ符号化をどのように実現したかを一歩一歩紹介します.
このシリーズのアイデアは主に蒋金楠Artechのシリーズブログ-WCFの後続の旅と、WCF開発チームのメンバーcarlosfigueiraのWCF Extensionシリーズのブログに由来しています.
まず,通信と符号化の実現が分離されていることを明らかにする.WCFもこのように設計されています.カスタムBindingには、複数のElementBindingを追加できるBindingElementCollectionコレクションが含まれます.しかし、2つのElementBindingは欠かせません.TransportBindingElement、MessageEncodingBindingElementです.ZeroMQは通信技術を提供しているだけで、具体的な符号化には関与せず、その伝送データは長さを指定する文字列であり、文字列がどのように符号化されるかについては、ZeroMQは知らない.従ってZeroMQはWCFのカスタムBindingにおけるTransportBindingElementとして実現可能となる.私たちはまず符号化にかかわらず、伝送を考慮します.
WCFのチャネルスタックは上記の2つのシリーズのブログで詳しく紹介されていますが、私は繰り返したくありません.WCFとZeroMQをどのように接続したのかを直接話します.
IRequestChannelの実装
新しいタイプのZMQRequestChannelは、WCFが提供するチャネルベースクラスであるChannelBaseに継承されます.
まずZeroMQのsocketを作成します.タイプはREQです.
 socket = this.zmqContext.CreateSocket(SocketType.REQ);
 socket.Connect(zmqAddress);

IRequestChannelを実装する方法
public Message Request(Message message, TimeSpan timeout)

Requestは、サーバからの返信が受信されるまで、要求を送信し、現在のスレッドをブロックします.リクエスト送信時にMessageをbyte[]に変換しZeroMQ送信に渡す
            ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);
                
            socket.Send(requestData.Array);

変換はencoderオブジェクトによって行われ、encoderはM e s s s a g e EncodingBindingElementによって作成されます.MessageEncodingBindingElementはWCFチャネルスタックの一部ですが、Messageは各チャネルに沿って通過するわけではありません.MessageEncodingBindingElementは、TransportBindingElementによってのみ使用され、TransportChannelが符号化および復号化を必要とする場合、TransportBindingElementのencoderが立ち上がって自分の役割を果たします.
サービス側からの返信を受信すると、次のようになります.
            byte[] replyData = bufferManager.TakeBuffer(1000);

            int replaySize = socket.Receive(replyData);
                
            Message response = encoder.ReadMessage(
                new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
            bufferManager.ReturnBuffer(replyData);

ここではBufferManagerを使用して受信キャッシュを提供し、キャッシュのサイズを一時的に1000に設定します.キャッシュを使用する目的は、割り当て時の効率が向上することです.受信が完了したら、回収してください.
IRequestChannelはまた非同期の送信方法を実現する必要がある
        IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state);
        IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state);  
        Message EndRequest(IAsyncResult result);

先ほどの同期方法を参考にして、送信は符号化、送信に分けます.ここで、符号化はローカルで完了し、タイムアウトはなく、送信はネットワーク環境とサーバ状態に依存するため、この部分を非同期プロセスに移動する必要があります.
送信非同期メソッドはReplyChannelでも使用されるため、私はZMQChannelBaseベースクラスを作成し、ChannelBaseに派生し、ZMQRequestChannelは新しいZMQChannelBaseベースクラスに継承します.非同期送信方法をZMQChannelBaseに入れる.
        public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
        {
            return base.BeginSendMessage(message, this.DefaultSendTimeout, callback, state);
        }
        public IAsyncResult BeginSendMessage(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            base.ThrowIfDisposedOrNotOpen();
            ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);

            return this.BeginWriteData(requestData, timeout, callback, state);
        }
BeginWriteData AsyncResult , IAsyncResult
        class SocketSendAsyncResult : AsyncResult
        {
            ZMQChannelBase channel;
            ArraySegment<byte> buffer;

            public SocketSendAsyncResult(ArraySegment<byte> buffer, ZMQChannelBase channel, AsyncCallback callback, object state)
                : base(callback, state)
            {
                this.channel = channel;
                this.buffer = buffer;

                this.StartSending();
            }

            void StartSending()
            {
                SocketSendHandle handler = new SocketSendHandle(delegate(ZmqSocket socket, byte[] data)
                {
                    socket.Send(data);
                    serviceHanledDone.Set();
                });

                IAsyncResult sendResult = handler.BeginInvoke(channel.socket, buffer.Array, OnSend, null);
               
            }


            static void OnSend(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                {
                    return;
                }

                SocketSendAsyncResult thisPtr = (SocketSendAsyncResult)result.AsyncState;
                Exception completionException = null;
                bool shouldComplete = false;


                if (shouldComplete)
                {
                    thisPtr.Complete(false, completionException);
                }
            }


            public static void End(IAsyncResult result)
            {
                AsyncResult.End<SocketSendAsyncResult>(result);
            }
        }

 
これでRequestChannelは完了し、ReplyChannelよりもRequestChannelの方が簡単です.次はReplyChannelをご紹介します.