WCF拡張実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)実装IRequestChannel(2016-03-15 12:35)
8768 ワード
これはこのシリーズの第2編で、その他の文章は以下のディレクトリをクリックしてください
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
今日から、カスタムZeroMQバインドとprotocolBufferメッセージ符号化をどのように実現したかを一歩一歩紹介します.
このシリーズのアイデアは主に蒋金楠Artechのシリーズブログ-WCFの後続の旅と、WCF開発チームのメンバーcarlosfigueiraのWCF Extensionシリーズのブログに由来しています.
まず,通信と符号化の実現が分離されていることを明らかにする.WCFもこのように設計されています.カスタムBindingには、複数のElementBindingを追加できるBindingElementCollectionコレクションが含まれます.しかし、2つのElementBindingは欠かせません.TransportBindingElement、MessageEncodingBindingElementです.ZeroMQは通信技術を提供しているだけで、具体的な符号化には関与せず、その伝送データは長さを指定する文字列であり、文字列がどのように符号化されるかについては、ZeroMQは知らない.従ってZeroMQはWCFのカスタムBindingにおけるTransportBindingElementとして実現可能となる.私たちはまず符号化にかかわらず、伝送を考慮します.
WCFのチャネルスタックは上記の2つのシリーズのブログで詳しく紹介されていますが、私は繰り返したくありません.WCFとZeroMQをどのように接続したのかを直接話します.
IRequestChannelの実装
新しいタイプのZMQRequestChannelは、WCFが提供するチャネルベースクラスであるChannelBaseに継承されます.
まずZeroMQのsocketを作成します.タイプはREQです.
IRequestChannelを実装する方法
Requestは、サーバからの返信が受信されるまで、要求を送信し、現在のスレッドをブロックします.リクエスト送信時にMessageをbyte[]に変換しZeroMQ送信に渡す
変換はencoderオブジェクトによって行われ、encoderはM e s s s a g e EncodingBindingElementによって作成されます.MessageEncodingBindingElementはWCFチャネルスタックの一部ですが、Messageは各チャネルに沿って通過するわけではありません.MessageEncodingBindingElementは、TransportBindingElementによってのみ使用され、TransportChannelが符号化および復号化を必要とする場合、TransportBindingElementのencoderが立ち上がって自分の役割を果たします.
サービス側からの返信を受信すると、次のようになります.
ここではBufferManagerを使用して受信キャッシュを提供し、キャッシュのサイズを一時的に1000に設定します.キャッシュを使用する目的は、割り当て時の効率が向上することです.受信が完了したら、回収してください.
IRequestChannelはまた非同期の送信方法を実現する必要がある
先ほどの同期方法を参考にして、送信は符号化、送信に分けます.ここで、符号化はローカルで完了し、タイムアウトはなく、送信はネットワーク環境とサーバ状態に依存するため、この部分を非同期プロセスに移動する必要があります.
送信非同期メソッドはReplyChannelでも使用されるため、私はZMQChannelBaseベースクラスを作成し、ChannelBaseに派生し、ZMQRequestChannelは新しいZMQChannelBaseベースクラスに継承します.非同期送信方法をZMQChannelBaseに入れる.
これでRequestChannelは完了し、ReplyChannelよりもRequestChannelの方が簡単です.次はReplyChannelをご紹介します.
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
今日から、カスタムZeroMQバインドとprotocolBufferメッセージ符号化をどのように実現したかを一歩一歩紹介します.
このシリーズのアイデアは主に蒋金楠Artechのシリーズブログ-WCFの後続の旅と、WCF開発チームのメンバーcarlosfigueiraのWCF Extensionシリーズのブログに由来しています.
まず,通信と符号化の実現が分離されていることを明らかにする.WCFもこのように設計されています.カスタムBindingには、複数のElementBindingを追加できるBindingElementCollectionコレクションが含まれます.しかし、2つのElementBindingは欠かせません.TransportBindingElement、MessageEncodingBindingElementです.ZeroMQは通信技術を提供しているだけで、具体的な符号化には関与せず、その伝送データは長さを指定する文字列であり、文字列がどのように符号化されるかについては、ZeroMQは知らない.従ってZeroMQはWCFのカスタムBindingにおけるTransportBindingElementとして実現可能となる.私たちはまず符号化にかかわらず、伝送を考慮します.
WCFのチャネルスタックは上記の2つのシリーズのブログで詳しく紹介されていますが、私は繰り返したくありません.WCFとZeroMQをどのように接続したのかを直接話します.
IRequestChannelの実装
新しいタイプのZMQRequestChannelは、WCFが提供するチャネルベースクラスであるChannelBaseに継承されます.
まずZeroMQのsocketを作成します.タイプはREQです.
socket = this.zmqContext.CreateSocket(SocketType.REQ);
socket.Connect(zmqAddress);
IRequestChannelを実装する方法
public Message Request(Message message, TimeSpan timeout)
Requestは、サーバからの返信が受信されるまで、要求を送信し、現在のスレッドをブロックします.リクエスト送信時にMessageをbyte[]に変換しZeroMQ送信に渡す
ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);
socket.Send(requestData.Array);
変換はencoderオブジェクトによって行われ、encoderはM e s s s a g e EncodingBindingElementによって作成されます.MessageEncodingBindingElementはWCFチャネルスタックの一部ですが、Messageは各チャネルに沿って通過するわけではありません.MessageEncodingBindingElementは、TransportBindingElementによってのみ使用され、TransportChannelが符号化および復号化を必要とする場合、TransportBindingElementのencoderが立ち上がって自分の役割を果たします.
サービス側からの返信を受信すると、次のようになります.
byte[] replyData = bufferManager.TakeBuffer(1000);
int replaySize = socket.Receive(replyData);
Message response = encoder.ReadMessage(
new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
bufferManager.ReturnBuffer(replyData);
ここではBufferManagerを使用して受信キャッシュを提供し、キャッシュのサイズを一時的に1000に設定します.キャッシュを使用する目的は、割り当て時の効率が向上することです.受信が完了したら、回収してください.
IRequestChannelはまた非同期の送信方法を実現する必要がある
IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state);
IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state);
Message EndRequest(IAsyncResult result);
先ほどの同期方法を参考にして、送信は符号化、送信に分けます.ここで、符号化はローカルで完了し、タイムアウトはなく、送信はネットワーク環境とサーバ状態に依存するため、この部分を非同期プロセスに移動する必要があります.
送信非同期メソッドはReplyChannelでも使用されるため、私はZMQChannelBaseベースクラスを作成し、ChannelBaseに派生し、ZMQRequestChannelは新しいZMQChannelBaseベースクラスに継承します.非同期送信方法をZMQChannelBaseに入れる.
public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
{
return base.BeginSendMessage(message, this.DefaultSendTimeout, callback, state);
}
public IAsyncResult BeginSendMessage(Message message, TimeSpan timeout, AsyncCallback callback, object state)
{
base.ThrowIfDisposedOrNotOpen();
ArraySegment<byte> requestData = encoder.WriteMessage(message, 1000, bufferManager);
return this.BeginWriteData(requestData, timeout, callback, state);
}
BeginWriteData AsyncResult , IAsyncResult
class SocketSendAsyncResult : AsyncResult
{
ZMQChannelBase channel;
ArraySegment<byte> buffer;
public SocketSendAsyncResult(ArraySegment<byte> buffer, ZMQChannelBase channel, AsyncCallback callback, object state)
: base(callback, state)
{
this.channel = channel;
this.buffer = buffer;
this.StartSending();
}
void StartSending()
{
SocketSendHandle handler = new SocketSendHandle(delegate(ZmqSocket socket, byte[] data)
{
socket.Send(data);
serviceHanledDone.Set();
});
IAsyncResult sendResult = handler.BeginInvoke(channel.socket, buffer.Array, OnSend, null);
}
static void OnSend(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
SocketSendAsyncResult thisPtr = (SocketSendAsyncResult)result.AsyncState;
Exception completionException = null;
bool shouldComplete = false;
if (shouldComplete)
{
thisPtr.Complete(false, completionException);
}
}
public static void End(IAsyncResult result)
{
AsyncResult.End<SocketSendAsyncResult>(result);
}
}
これでRequestChannelは完了し、ReplyChannelよりもRequestChannelの方が簡単です.次はReplyChannelをご紹介します.