[ZeroMQ]libzmqソース読解のReactorメカニズム(mailbox,event)
[ZeroMQ]libzmqソース読解のReactorメカニズム(mailbox,event)ZeroMQ libzmqソース読解のReactorメカニズムmailbox event 信号員signaler プロセス間通信 signaler実現 マルチプレクサpoller傍受ソケット mailbox IO Thread
まとめ zmqは作成時に2種類のスレッドを起動します.1つはアプリケーションスレッド(Application thread)、2つはI/Oスレッドです.では、これらのスレッド間でどのように協力し、どのように通信しているのでしょうか.
シグナルマンsignaler
signalerはプロセス間で信号を伝達する責任を負う.MacはBSDシステムに属するため、プロセス間通信は
プロセス間通信
UNIXでは,socketは最初はネットワーク通信に用いられていたが,後にプロセス間通信にも適用された.ローカルループバックであるため,IPアドレスをバインドする必要がなく,傍受接続を必要としない.ソケットペアを直接設定します.
これは全二重のパイプと考えられる.ポート
Signaler実装
マルチプレクサpollerリスニングsocket
zmqでは,ポートリスニングは実際には**I/Oマルチプレクサ**pollerによって実現される.
kqueueを例にとると、loopで傍受する:
mailbox
mailbox_tには主に2つのメンバーがあり、1つはパイプであり、もう1つは信号員である.パイプは本当にデータの送受信に使われていますが、信号員はメールボックスからの手紙があるかどうかを監視するだけで、送信するときも信号を送信します.
I/O Thread
気づいた
ここはとても微妙だと思いますが
コンストラクション関数は次のとおりです.
まとめ 上記 メールボックスにコマンドが入ると メールボックスにコマンドが入っているのを監視している
シグナルマンsignaler
signalerはプロセス間で信号を伝達する責任を負う.MacはBSDシステムに属するため、プロセス間通信は
socketpair
で実現される.プロセス間通信
UNIXでは,socketは最初はネットワーク通信に用いられていたが,後にプロセス間通信にも適用された.ローカルループバックであるため,IPアドレスをバインドする必要がなく,傍受接続を必要としない.ソケットペアを直接設定します.
#include
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
これは全二重のパイプと考えられる.ポート
sv[0]
で送信し、sv[1]
で受信します.逆に、sv[1]
発、sv[0]
収.Signaler実装
signaler_t
には2人のfdメンバーw
とr
があり、彼らは2つのsocketを代表している.実際にはフルデュプレクスではなく、1つの読み、1つの書きです. class signaler_t
{
public:
...
fd_t get_fd () const;
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();
private:
// Creates a pair of file descriptors that will be used
// to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
fd_t w;
fd_t r;
...
};
// r, 。
zmq::fd_t zmq::signaler_t::get_fd () const
{
return r;
}
void zmq::signaler_t::send() {
...
// w
::send(w, ...);
...
}
void zmq::signaler_t::recv() {
...
// r
::recv(r, ...);
...
}
int zmq::signaler_t::make_fdpair(fd_t *r, fd_t *w){
...
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
...
*w_ = sv [0];
*r_ = sv [1];
...
}
マルチプレクサpollerリスニングsocket
zmqでは,ポートリスニングは実際には**I/Oマルチプレクサ**pollerによって実現される.
poller_t
I/O多重化の抽象クラスであり、具体的にはzmqはシステムプラットフォームに応じてpoll,select,epoll,kqueueなどを選択する.ここでは,このI/O多重化が何であるかはともかくとする.簡単に言えば、pollerにsocket fdを登録すれば、pollerはそのsocketにデータが入っているかどうかを傍受したり、発行したりすることができる.kqueueを例にとると、loopで傍受する:
void zmq::kqueue_t::loop ()
{
while (!stopping) {
// Wait for events.
struct kevent ev_buf [max_io_events];
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, ...);
...
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
...
if (ev_buf [i].filter == EVFILT_READ)
pe->reactor->in_event (); //
}
...
}
}
mailbox
mailbox_tには主に2つのメンバーがあり、1つはパイプであり、もう1つは信号員である.パイプは本当にデータの送受信に使われていますが、信号員はメールボックスからの手紙があるかどうかを監視するだけで、送信するときも信号を送信します.
class mailbox_t : public i_mailbox
{
public:
...
// , poller , 。
fd_t get_fd () const;
// cmd
void send (const command_t &cmd_);
// cmd
int recv (command_t *cmd_, int timeout_);
...
private:
...
// , cmd
cpipe_t cpipe;
// Signaler to pass signals from writer thread to reader thread.
signaler_t signaler;
...
};
I/O Thread
io_thread_t
メールボックスmailboxとイベント傍受pollerがある. class io_thread_t : public object_t, public i_poll_events
{
public:
...
// Launch the physical thread.
void start ();
// Ask underlying thread to stop.
void stop ();
// Returns mailbox associated with this I/O thread.
mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
void out_event ();
// Used by io_objects to retrieve the associated poller object.
poller_t *get_poller ();
...
private:
// I/O thread accesses incoming commands via this mailbox.
mailbox_t mailbox;
// Handle associated with mailbox' file descriptor.
poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
...
};
io_thread
作成時には、イベントを傍受するためのpoller
を作成する必要がある.気づいた
io_thread_t
引き継いだi_poll_events
だから彼自身がReactorとして使える.構築時には、poller
自分のmailboxと自分自身(this):poller->add_fd (mailbox.get_fd (), this);
を登録する必要があります.ここはとても微妙だと思いますが
poller->loop()
でメールボックスmailboxからメッセージが入ってきたかを傍受し、データを透過することでトリガーin_event()
.上記add_fd
に登録されているthis
ポインタだからこそ、自分のメンバー関数in_event()
を簡単にコールバックできます.コンストラクション関数は次のとおりです.
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
まとめ
io_thread
作成時poller
自分のmailbox fdを登録し、this
イベントコールバック用ポインタ.ctx_t
で、slots [i] = io_thread->get_mailbox ();
I/Oスレッドのメールボックスを登録します.そしてslots [tid_]->send (command_);
を通じてこのメールボックスにコマンドを送ることができます.send()
mailboxを呼び出しているsend()
配管cpipe
データを書き込み、信号を送るsignaler->send()
.signaler_t
送信信号は、実は自分のソケットw
で送信される.w
とr
はペアなのでr
データを受信します.poller_t
傍受します.poller
傍受しているのはmailbox->get_fd()
、このfdはまさに上記r
です.mailbox->get_fd()
戻ったのはsignaler->get_fd()
で、signaler->get_fd()
戻ったのはr
.reactor->in_event()
イベントをトリガーする.io_thread
作成時にpoller
自分のthis
ポインタが登録されているためreactor
まさにこれthis
ポインタであり、イベントトリガーを実現する.io_thread
のin_event()
で、呼び出しmailbox->recv()
.そこでmailboxはパイプcpipeに命令を読み出す.これでio_threadはコマンドを正常に受信しました!