zmqソース読み取りノートのネットワークメッセージとコマンド
8856 ワード
mailbox mailboxは、スレッド間でコマンドを送受信するためのクラス定義です.
mailboxにsignaler_がありますtタイプsignalerとmutex_tタイプのsync,syncはマルチスレッド同時書き込みの反発に用いられ,signalerは受信法にコマンドが到来したことを通知するために用いられる.mutex_tの実装は、オペレーティングシステムに基づいてスレッドロック、signaler_をカプセル化することである.tの下位実装の本質はsocketpairである.maibox内部にはypipeでコマンドが格納され、各コマンドはcommand_tタイプ定義は、各コマンドが宛先destination、タイプtype、パラメータargsの3つの部分に分かれていることがわかります.
オブジェクトがコマンドを送受信するにはobject_を継承する必要があります.tオブジェクトは、主に以下のように定義される
メンバー変数にctx_が1つあります.tとtidオブジェクト、ctx_tはzmqのコンテキスト環境オブジェクトでありctxで実現する.hpp/cppでは、現在のスレッドのmailboxが格納され、tidで対応するmailboxがslotsの位置を見つけてコマンドを書き込むことができます
2,poller zmqはpoll,epoll,selectなどのよく使われるIO多重化をカプセル化し,pollerポーリング器に統一し,異なるオペレーティングシステムに基づいて対応するIO多重化を選択する.
select.hpp,poll.hpp,epoll.hppはそれぞれ異なるIO多重化をカプセル化し、すべてのIO多重化はPoller_を継承する.ベースクラス
poller_base_tは主にタイマをカプセル化するために用いられ、中にはmultimapですべてのタイマをロードし、遍歴しやすく、実際に使用するとpollerは異なる実装によって異なるIO多重クラスを呼び出す.epollを例にadd_を通過fdはfdと登録イベントを追加し、loopで登録されたfdをループして傍受し、データpoll inとpoll outがあるかどうか、
epoll_tにthread_がありますtタイプのワークスレッドメンバー、thread_tの実現はthread.hppとthread.cppではlinuxの下にpthreadライブラリをカプセル化し,windowsの下にはwindowsの原生スレッドライブラリを用いる.epoll_t中海はpollを定義したentry_t構造体は、fdとeventを格納するために使用され、その後、他の関数の操作はhandle_で行われる.t操作
各ioスレッド(io_thread_t)にはmailboxとpollerが含まれています.
コンストラクション関数でmailbox_をtハンドルはpollerに組み込まれ、pollerに読み取りイベントを傍受させるため、信号が送られてくるとpollerが起動しio_が呼び出されるthread_tのin_event:
class mailbox_t : public i_mailbox
{
public:
mailbox_t ();
~mailbox_t ();
fd_t get_fd () const;
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void forked () { signaler.forked (); }
#endif
private:
// The pipe to store actual commands.
typedef ypipe_t cpipe_t;
cpipe_t cpipe;
// Signaler to pass signals from writer thread to reader thread.
signaler_t signaler;
// There's only one thread receiving from the mailbox, but there
// is arbitrary number of threads sending. Given that ypipe requires
// synchronised access on both of its endpoints, we have to synchronise
// the sending side.
mutex_t sync;
// True if the underlying pipe is active, ie. when we are allowed to
// read commands from it.
bool active;
// Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&);
const mailbox_t &operator = (const mailbox_t&);
};
}
#endif
mailboxにsignaler_がありますtタイプsignalerとmutex_tタイプのsync,syncはマルチスレッド同時書き込みの反発に用いられ,signalerは受信法にコマンドが到来したことを通知するために用いられる.mutex_tの実装は、オペレーティングシステムに基づいてスレッドロック、signaler_をカプセル化することである.tの下位実装の本質はsocketpairである.maibox内部にはypipeでコマンドが格納され、各コマンドはcommand_tタイプ定義は、各コマンドが宛先destination、タイプtype、パラメータargsの3つの部分に分かれていることがわかります.
struct command_t
{
// Object to process the command.
zmq::object_t *destination;
enum type_t
{
...
} type;
union {
...
} args;
};
オブジェクトがコマンドを送受信するにはobject_を継承する必要があります.tオブジェクトは、主に以下のように定義される
class object_t
{
public:
object_t (zmq::ctx_t *ctx_, uint32_t tid_);
object_t (object_t *parent_);
virtual ~object_t ();
uint32_t get_tid ();
void set_tid(uint32_t id);
ctx_t *get_ctx ();
void process_command (zmq::command_t &cmd_);
void send_inproc_connected (zmq::socket_base_t *socket_);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);
// Chooses least loaded I/O thread.
zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
....
virtual void process_seqnum ();
private:
// Context provides access to the global state.
zmq::ctx_t *ctx;
// Thread ID of the thread the object belongs to.
uint32_t tid;
void send_command (command_t &cmd_);
object_t (const object_t&);
const object_t &operator = (const object_t&);
};
}
メンバー変数にctx_が1つあります.tとtidオブジェクト、ctx_tはzmqのコンテキスト環境オブジェクトでありctxで実現する.hpp/cppでは、現在のスレッドのmailboxが格納され、tidで対応するmailboxがslotsの位置を見つけてコマンドを書き込むことができます
ctx.hpp
// I/O threads.
typedef std::vector <:io_thread_t> io_threads_t;
io_threads_t io_threads;
// Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
i_mailbox **slots;
// Mailbox for zmq_ctx_term thread.
mailbox_t term_mailbox;
2,poller zmqはpoll,epoll,selectなどのよく使われるIO多重化をカプセル化し,pollerポーリング器に統一し,異なるオペレーティングシステムに基づいて対応するIO多重化を選択する.
#ifndef __ZMQ_POLLER_HPP_INCLUDED__
#define __ZMQ_POLLER_HPP_INCLUDED__
#if defined ZMQ_USE_KQUEUE + defined ZMQ_USE_EPOLL \
+ defined ZMQ_USE_DEVPOLL + defined ZMQ_USE_POLLSET \
+ defined ZMQ_USE_POLL + defined ZMQ_USE_SELECT > 1
#error More than one of the ZMQ_USE_* macros defined
#endif
#if defined ZMQ_USE_KQUEUE
# include "kqueue.hpp"
#elif defined ZMQ_USE_EPOLL
# include "epoll.hpp"
#elif defined ZMQ_USE_DEVPOLL
# include "devpoll.hpp"
#elif defined ZMQ_USE_POLLSET
# include "pollset.hpp"
#elif defined ZMQ_USE_POLL
# include "poll.hpp"
#elif defined ZMQ_USE_SELECT
# include "select.hpp"
#elif defined ZMQ_HAVE_GNU
# define ZMQ_USE_POLL
# include "poll.hpp"
#else
# error None of the ZMQ_USE_* macros defined
#endif
select.hpp,poll.hpp,epoll.hppはそれぞれ異なるIO多重化をカプセル化し、すべてのIO多重化はPoller_を継承する.ベースクラス
namespace zmq
{
struct i_poll_events;
class poller_base_t
{
public:
poller_base_t ();
virtual ~poller_base_t ();
// Returns load of the poller. Note that this function can be
// invoked from a different thread!
int get_load ();
// Add a timeout to expire in timeout_ milliseconds. After the
// expiration timer_event on sink_ object will be called with
// argument set to id_.
void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_);
// Cancel the timer created by sink_ object with ID equal to id_.
void cancel_timer (zmq::i_poll_events *sink_, int id_);
protected:
// Called by individual poller implementations to manage the load.
void adjust_load (int amount_);
// Executes any timers that are due. Returns number of milliseconds
// to wait to match the next timer or 0 meaning "no timers".
uint64_t execute_timers ();
private:
// Clock instance private to this I/O thread.
clock_t clock;
// List of active timers.
struct timer_info_t
{
zmq::i_poll_events *sink;
int id;
};
typedef std::multimap timers_t;
timers_t timers;
// Load of the poller. Currently the number of file descriptors
// registered.
atomic_counter_t load;
poller_base_t (const poller_base_t&);
const poller_base_t &operator = (const poller_base_t&);
};
}
poller_base_tは主にタイマをカプセル化するために用いられ、中にはmultimapですべてのタイマをロードし、遍歴しやすく、実際に使用するとpollerは異なる実装によって異なるIO多重クラスを呼び出す.epollを例にadd_を通過fdはfdと登録イベントを追加し、loopで登録されたfdをループして傍受し、データpoll inとpoll outがあるかどうか、
// "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
epoll_tにthread_がありますtタイプのワークスレッドメンバー、thread_tの実現はthread.hppとthread.cppではlinuxの下にpthreadライブラリをカプセル化し,windowsの下にはwindowsの原生スレッドライブラリを用いる.epoll_t中海はpollを定義したentry_t構造体は、fdとeventを格納するために使用され、その後、他の関数の操作はhandle_で行われる.t操作
typedef void* handle_t;
struct poll_entry_t
{
fd_t fd;
epoll_event ev;
zmq::i_poll_events *events;
};
// List of retired event sources.
typedef std::vector retired_t;
retired_t retired;
各ioスレッド(io_thread_t)にはmailboxとpollerが含まれています.
class io_thread_t : public object_t, public i_poll_events
{
public:
io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
// Clean-up. If the thread was started, it's necessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
~io_thread_t ();
// Launch the physical thread.
void start ();
// Ask underlying thread to stop.
void stop ();
// Returns mailbox associated with this I/O thread.
mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event (int id_);
// Used by io_objects to retrieve the associated poller object.
poller_t *get_poller ();
// Command handlers.
void process_stop ();
// Returns load experienced by the I/O thread.
int get_load ();
private:
// I/O thread accesses incoming commands via this mailbox.
mailbox_t mailbox;
// Handle associated with mailbox' file descriptor.
poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
io_thread_t (const io_thread_t&);
const io_thread_t &operator = (const io_thread_t&);
};
コンストラクション関数でmailbox_をtハンドルはpollerに組み込まれ、pollerに読み取りイベントを傍受させるため、信号が送られてくるとpollerが起動しio_が呼び出されるthread_tのin_event: