WCF拡張実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel(2016-03-15 12:35)
13561 ワード
これはこのシリーズの第3編で、その他の文章は以下のディレクトリをクリックしてください
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
RequestChannelに比べて、ReplyChannelは複雑です.
1 zmqのrepノードを起動する
まずOnOpenメソッドを再ロードし、zmqのrepノードを起動し、主にcreateSocketメソッドとバインドアドレスを呼び出す必要があります.
2 ReceiveReqeustを実現し、contextを返す
RequestChannelと同様に、同期バージョンのReceiveRequestを実現します.この方法はIreplyChannelインタフェースの方法です.なぜインタフェースのメソッドはmessageではなくrequestContextを返すのでしょうか.なぜなら、WCFはrequestContextを受信すると、RequestContextに従って返信メッセージを送信できるからである.
3同期バージョンのReceiveRequest実装後、非同期バージョンの
WCFはReplyChannelを使用してメッセージを受信し、デフォルトではBeginReceiveRequestが呼び出されます.
非同期バージョンの実装も使用する.NetのAMP非同期モード.呼び出しの手順は、下図のように上下に実行されます.リモートリクエストは、TryReceiveRequestAsyncResultインスタンスを返すReplyChannelのBeginTryReceiveRequestメソッドに最初にアクセスします.次に、BaseChannelで次のように実行する.SocketReceiveAsyncResultでZMQのsocketを実行する.Receive()メソッド.
ステップは多く、ステップごとに意味があり、BeginTryReceiveRequestはまずリクエストメッセージを受け取り、タイムアウトを処理して異常を投げ出さないようにします.BeginReceiveRequestに転送し、ReceiveRequestAsyncResultオブジェクトを作成し、その構築でベースクラスのBeginReceiveMessageを呼び出します.ベースクラスのBeginReceiveMessageは純粋にコードの多重化のためである.BeginReadDataに転送し、SocketReceiveAsyncResultオブジェクトを作成します.Receive()は,非同期委任方式を用いて非同期を実現している.
4 zmqの同期制限を解決する
必要なインタフェースが実装されると、zmqクライアントの要求を受信するためにwcfサービスを開始することができる.メッセージを受信すると、socket.に再び実行されます.Receive()は、メッセージを再受信しようとしたときに異常が発生しました.例外は、「現在の状態ではノードがこの操作を実行できません」と表示されます.Zmqは初めてなのでZMQのメカニズムはよくわかりません.初めてsocketを実行します.Receive()はメッセージを正常に受信し、2回目にsocketを実行する.Receive()でエラーが発生します.私はまたzmqのdemoを見て、2番目のsocketにも実行しました.Receive()は、このような問題はありません.同じ時間のsocket状態を比較すると、次のことがわかります.
zmqのREP socketはリクエストを受信しなければならず,再送後にリクエストを再受信することができない.私のプログラムではwcfのサービスが呼び出され、ずっとデバッグ状態だったので、タイムリーに戻らず、SendStatusがnoneになったので、再送信できませんでした.
この問題を解決するのも簡単で、ManualResetEventを使いました.メッセージを受信した後、ManualResetEventをreset状態にし、receive()の前にManualResetEventのWaitOne()を呼び出し、送信の返信を待つ.replyChannel送信が戻ってくると、すぐにManualResetEventをset状態にしてreceive()に実行します.
受信時
送信後、すぐにManualResetEventをsetにしてwaitoneを放行させます.
5 zmqキューサポートの追加
これにより、zmqBindingは、zmqクライアントの要求を受信し、正しく返すことができる.しかし、一度に1つのリクエストしか受信できないようで、返事を待ってから、次のリクエストを受信することができます.wcfの処理はすべて非同期であるが,zmqのrepノードがサービス側の処理能力を制限しているのに,どうして複数のリクエストを受信できるのか.zmqは「mq」と呼ぶ以上、キューの機能があります.zmqのマニュアルから,router-dealerはキューの機能を実現できることが分かった.私が使っているzmqバージョンはclrzmqですが、ネット上にはclrzmqがrouter-dealderを実現する例コードはありません.clrzmqのソースコードのテストコードでは,clrzmqのQueueDeviceクラスがzmqのrouter-dealerモードを実現していることを発見した.しかも使いやすいです.完全なzmq例は私の他の文章を参照してください.
ここで注意したいのは、QueueDeviceが最初に起動してからREPノードを起動することです.したがって、ManualResetEventオブジェクトも使用されます.QueueDecviceは新しいスレッドで起動し、起動後、REPノードに起動を通知します.コードは次のとおりです.
これでZMQBindingのTransport部分が完成します.次の記事ではprotocolBufferメッセージの符号化とwcfでの符号化と復号化について説明します.
WCF拡張の実現ZeroMQバインドとprotocolBufferメッセージ符号化(一)概要設計
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(二)IRequestChannelの実装
WCF拡張の実装ZeroMQバインドとprotocolBufferメッセージ符号化(3)実装ReplyChannel
RequestChannelに比べて、ReplyChannelは複雑です.
1 zmqのrepノードを起動する
まずOnOpenメソッドを再ロードし、zmqのrepノードを起動し、主にcreateSocketメソッドとバインドアドレスを呼び出す必要があります.
protected override void OnOpen(TimeSpan timeout)
{
if (this.socket == null)
{
this.socket = this.zmqContext.CreateSocket(SocketType.REP);
int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
string zmqServerAddress = "tcp://*" + trimIP;
socket.Bind(zmqServerAddress);
}
}
2 ReceiveReqeustを実現し、contextを返す
RequestChannelと同様に、同期バージョンのReceiveRequestを実現します.この方法はIreplyChannelインタフェースの方法です.なぜインタフェースのメソッドはmessageではなくrequestContextを返すのでしょうか.なぜなら、WCFはrequestContextを受信すると、RequestContextに従って返信メッセージを送信できるからである.
public RequestContext ReceiveRequest(TimeSpan timeout)
{
ThrowIfDisposedOrNotOpen();
Message request = this.ReceiveMessage(timeout);
return new ZMQRequestContext(this, request, timeout);
}
ReceiveMessage , ZMQChannelBase
public Message ReceiveMessage(TimeSpan timeout)
{
base.ThrowIfDisposedOrNotOpen();
byte[] replyData = bufferManager.TakeBuffer(1000);
int replaySize = socket.Receive(replyData);
Message response = encoder.ReadMessage(
new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
bufferManager.ReturnBuffer(replyData);
return response;
}
3同期バージョンのReceiveRequest実装後、非同期バージョンの
WCFはReplyChannelを使用してメッセージを受信し、デフォルトではBeginReceiveRequestが呼び出されます.
非同期バージョンの実装も使用する.NetのAMP非同期モード.呼び出しの手順は、下図のように上下に実行されます.リモートリクエストは、TryReceiveRequestAsyncResultインスタンスを返すReplyChannelのBeginTryReceiveRequestメソッドに最初にアクセスします.次に、BaseChannelで次のように実行する.SocketReceiveAsyncResultでZMQのsocketを実行する.Receive()メソッド.
|
ReplyChannel BeginTryReceiveRequest
|
TryReceiveRequestAsyncResult new
|
BaseChannel BeginReceiveRequest
|
ReceiveRequestAsyncResult new
|
BaseChannel BeginReceiveMessage
|
BaseChannel BeginReadData
|
BaseChannel.SocketReceiveAsyncResult new
|
socket.Receive
|
BaseChannel EndReadData
|
BaseChannel EndReceiveMessage
ステップは多く、ステップごとに意味があり、BeginTryReceiveRequestはまずリクエストメッセージを受け取り、タイムアウトを処理して異常を投げ出さないようにします.BeginReceiveRequestに転送し、ReceiveRequestAsyncResultオブジェクトを作成し、その構築でベースクラスのBeginReceiveMessageを呼び出します.ベースクラスのBeginReceiveMessageは純粋にコードの多重化のためである.BeginReadDataに転送し、SocketReceiveAsyncResultオブジェクトを作成します.Receive()は,非同期委任方式を用いて非同期を実現している.
4 zmqの同期制限を解決する
必要なインタフェースが実装されると、zmqクライアントの要求を受信するためにwcfサービスを開始することができる.メッセージを受信すると、socket.に再び実行されます.Receive()は、メッセージを再受信しようとしたときに異常が発生しました.例外は、「現在の状態ではノードがこの操作を実行できません」と表示されます.Zmqは初めてなのでZMQのメカニズムはよくわかりません.初めてsocketを実行します.Receive()はメッセージを正常に受信し、2回目にsocketを実行する.Receive()でエラーが発生します.私はまたzmqのdemoを見て、2番目のsocketにも実行しました.Receive()は、このような問題はありません.同じ時間のsocket状態を比較すると、次のことがわかります.
socket ,
ReceiveStatus: Received
, :
SendStatus: None
zmq demo
socket ,
ReceiveStatus: Received
SendStatus: Sent
zmqのREP socketはリクエストを受信しなければならず,再送後にリクエストを再受信することができない.私のプログラムではwcfのサービスが呼び出され、ずっとデバッグ状態だったので、タイムリーに戻らず、SendStatusがnoneになったので、再送信できませんでした.
この問題を解決するのも簡単で、ManualResetEventを使いました.メッセージを受信した後、ManualResetEventをreset状態にし、receive()の前にManualResetEventのWaitOne()を呼び出し、送信の返信を待つ.replyChannel送信が戻ってくると、すぐにManualResetEventをset状態にしてreceive()に実行します.
受信時
serviceHanledDone.WaitOne();
int receiveLength = socket.Receive(data1);
serviceHanledDone.Reset();
return receiveLength;
送信後、すぐにManualResetEventをsetにしてwaitoneを放行させます.
socket.Send(data);
serviceHanledDone.Set();
5 zmqキューサポートの追加
これにより、zmqBindingは、zmqクライアントの要求を受信し、正しく返すことができる.しかし、一度に1つのリクエストしか受信できないようで、返事を待ってから、次のリクエストを受信することができます.wcfの処理はすべて非同期であるが,zmqのrepノードがサービス側の処理能力を制限しているのに,どうして複数のリクエストを受信できるのか.zmqは「mq」と呼ぶ以上、キューの機能があります.zmqのマニュアルから,router-dealerはキューの機能を実現できることが分かった.私が使っているzmqバージョンはclrzmqですが、ネット上にはclrzmqがrouter-dealderを実現する例コードはありません.clrzmqのソースコードのテストコードでは,clrzmqのQueueDeviceクラスがzmqのrouter-dealerモードを実現していることを発見した.しかも使いやすいです.完全なzmq例は私の他の文章を参照してください.
ここで注意したいのは、QueueDeviceが最初に起動してからREPノードを起動することです.したがって、ManualResetEventオブジェクトも使用されます.QueueDecviceは新しいスレッドで起動し、起動後、REPノードに起動を通知します.コードは次のとおりです.
protected override void OnOpen(TimeSpan timeout)
{
if (this.socket == null)
{
startRouterDealer(this.zmqContext);
_deviceReady.WaitOne();
this.socket = this.zmqContext.CreateSocket(SocketType.REP);
int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
string zmqServerAddress = "tcp://*" + trimIP;
//socket.Bind(zmqServerAddress);
socket.Connect("inproc://backend");
}
}
protected override void OnClosing()
{
base.OnClosing();
}
private static void startRouterDealer(ZmqContext context)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), context);
}
private static void startQueueDeviceThread(object state)
{
ZmqContext context = state as ZmqContext;
//Thread.Sleep(2000);
using (QueueDevice queue = new QueueDevice(context,
"tcp://*:5555",
"inproc://backend",
DeviceMode.Threaded))
{
queue.Initialize();
_deviceReady.Set();
queue.Start();
while (true)
{
Thread.Sleep(1000);
}
}
}
これでZMQBindingのTransport部分が完成します.次の記事ではprotocolBufferメッセージの符号化とwcfでの符号化と復号化について説明します.