zmqソース読み取りノートのネットワークメッセージとコマンド


mailbox mailboxは、スレッド間でコマンドを送受信するためのクラス定義です.
 class mailbox_t : public i_mailbox
    {
    public:
        mailbox_t ();
        ~mailbox_t ();
        fd_t get_fd () const;
        void send (const command_t &cmd_);
        int recv (command_t *cmd_, int timeout_);
#ifdef HAVE_FORK
        // close the file descriptors in the signaller. This is used in a forked
        // child process to close the file descriptors so that they do not interfere
        // with the context in the parent process.
        void forked () { signaler.forked (); }
#endif
    private:
        //  The pipe to store actual commands.
        typedef ypipe_t  cpipe_t;
        cpipe_t cpipe;
        //  Signaler to pass signals from writer thread to reader thread.
        signaler_t signaler;
        //  There's only one thread receiving from the mailbox, but there
        //  is arbitrary number of threads sending. Given that ypipe requires
        //  synchronised access on both of its endpoints, we have to synchronise
        //  the sending side.
        mutex_t sync;
        //  True if the underlying pipe is active, ie. when we are allowed to
        //  read commands from it.
        bool active;
        //  Disable copying of mailbox_t object.
        mailbox_t (const mailbox_t&);
        const mailbox_t &operator = (const mailbox_t&);
    };
}
#endif

mailboxにsignaler_がありますtタイプsignalerとmutex_tタイプのsync,syncはマルチスレッド同時書き込みの反発に用いられ,signalerは受信法にコマンドが到来したことを通知するために用いられる.mutex_tの実装は、オペレーティングシステムに基づいてスレッドロック、signaler_をカプセル化することである.tの下位実装の本質はsocketpairである.maibox内部にはypipeでコマンドが格納され、各コマンドはcommand_tタイプ定義は、各コマンドが宛先destination、タイプtype、パラメータargsの3つの部分に分かれていることがわかります.
struct command_t
    {
        //  Object to process the command.
        zmq::object_t *destination;

        enum type_t
        {
           ...
        } type;

        union {
           ...
        } args;
    };

オブジェクトがコマンドを送受信するにはobject_を継承する必要があります.tオブジェクトは、主に以下のように定義される

    class object_t
    {
    public:

        object_t (zmq::ctx_t *ctx_, uint32_t tid_);
        object_t (object_t *parent_);
        virtual ~object_t ();

        uint32_t get_tid ();
        void set_tid(uint32_t id);
        ctx_t *get_ctx ();
        void process_command (zmq::command_t &cmd_);
        void send_inproc_connected (zmq::socket_base_t *socket_);
        void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);
        //  Chooses least loaded I/O thread.
        zmq::io_thread_t *choose_io_thread (uint64_t affinity_);

....
 
        virtual void process_seqnum ();

    private:

        //  Context provides access to the global state.
        zmq::ctx_t *ctx;

        //  Thread ID of the thread the object belongs to.
        uint32_t tid;

        void send_command (command_t &cmd_);

        object_t (const object_t&);
        const object_t &operator = (const object_t&);
    };

}

メンバー変数にctx_が1つあります.tとtidオブジェクト、ctx_tはzmqのコンテキスト環境オブジェクトでありctxで実現する.hpp/cppでは、現在のスレッドのmailboxが格納され、tidで対応するmailboxがslotsの位置を見つけてコマンドを書き込むことができます
ctx.hpp      
  //  I/O threads.
        typedef std::vector <:io_thread_t> io_threads_t;
        io_threads_t io_threads;

        //  Array of pointers to mailboxes for both application and I/O threads.
        uint32_t slot_count;
        i_mailbox **slots;

        //  Mailbox for zmq_ctx_term thread.
        mailbox_t term_mailbox;

 
2,poller zmqはpoll,epoll,selectなどのよく使われるIO多重化をカプセル化し,pollerポーリング器に統一し,異なるオペレーティングシステムに基づいて対応するIO多重化を選択する.
#ifndef __ZMQ_POLLER_HPP_INCLUDED__
#define __ZMQ_POLLER_HPP_INCLUDED__

#if   defined ZMQ_USE_KQUEUE  + defined ZMQ_USE_EPOLL   \
    + defined ZMQ_USE_DEVPOLL + defined ZMQ_USE_POLLSET \
    + defined ZMQ_USE_POLL    + defined ZMQ_USE_SELECT > 1
#error More than one of the ZMQ_USE_* macros defined
#endif

#if defined ZMQ_USE_KQUEUE
#   include "kqueue.hpp"
#elif defined ZMQ_USE_EPOLL
#   include "epoll.hpp"
#elif defined ZMQ_USE_DEVPOLL
#   include "devpoll.hpp"
#elif defined ZMQ_USE_POLLSET
#   include "pollset.hpp"
#elif defined ZMQ_USE_POLL
#   include "poll.hpp"
#elif defined ZMQ_USE_SELECT
#   include "select.hpp"
#elif defined ZMQ_HAVE_GNU
#   define ZMQ_USE_POLL
#   include "poll.hpp"
#else
#   error None of the ZMQ_USE_* macros defined
#endif

  select.hpp,poll.hpp,epoll.hppはそれぞれ異なるIO多重化をカプセル化し、すべてのIO多重化はPoller_を継承する.ベースクラス
namespace zmq
{

    struct i_poll_events;

    class poller_base_t
    {
    public:

        poller_base_t ();
        virtual ~poller_base_t ();

        //  Returns load of the poller. Note that this function can be
        //  invoked from a different thread!
        int get_load ();

        //  Add a timeout to expire in timeout_ milliseconds. After the
        //  expiration timer_event on sink_ object will be called with
        //  argument set to id_.
        void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_);

        //  Cancel the timer created by sink_ object with ID equal to id_.
        void cancel_timer (zmq::i_poll_events *sink_, int id_);

    protected:

        //  Called by individual poller implementations to manage the load.
        void adjust_load (int amount_);

        //  Executes any timers that are due. Returns number of milliseconds
        //  to wait to match the next timer or 0 meaning "no timers".
        uint64_t execute_timers ();

    private:

        //  Clock instance private to this I/O thread.
        clock_t clock;

        //  List of active timers.
        struct timer_info_t
        {
            zmq::i_poll_events *sink;
            int id;
        };
        typedef std::multimap  timers_t;
        timers_t timers;

        //  Load of the poller. Currently the number of file descriptors
        //  registered.
        atomic_counter_t load;

        poller_base_t (const poller_base_t&);
        const poller_base_t &operator = (const poller_base_t&);
    };

}

 poller_base_tは主にタイマをカプセル化するために用いられ、中にはmultimapですべてのタイマをロードし、遍歴しやすく、実際に使用するとpollerは異なる実装によって異なるIO多重クラスを呼び出す.epollを例にadd_を通過fdはfdと登録イベントを追加し、loopで登録されたfdをループして傍受し、データpoll inとpoll outがあるかどうか、
 // "poller" concept.
 handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
 void rm_fd (handle_t handle_);
 void set_pollin (handle_t handle_);
 void reset_pollin (handle_t handle_);
 void set_pollout (handle_t handle_);
 void reset_pollout (handle_t handle_);
 void start ();
 void stop ();

epoll_tにthread_がありますtタイプのワークスレッドメンバー、thread_tの実現はthread.hppとthread.cppではlinuxの下にpthreadライブラリをカプセル化し,windowsの下にはwindowsの原生スレッドライブラリを用いる.epoll_t中海はpollを定義したentry_t構造体は、fdとeventを格納するために使用され、その後、他の関数の操作はhandle_で行われる.t操作
 typedef void* handle_t;
 struct poll_entry_t
        {
            fd_t fd;
            epoll_event ev;
            zmq::i_poll_events *events;
        };

        //  List of retired event sources.
        typedef std::vector  retired_t;
        retired_t retired;

各ioスレッド(io_thread_t)にはmailboxとpollerが含まれています.
class io_thread_t : public object_t, public i_poll_events
    {
    public:
        io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
        //  Clean-up. If the thread was started, it's necessary to call 'stop'
        //  before invoking destructor. Otherwise the destructor would hang up.
        ~io_thread_t ();
        //  Launch the physical thread.
        void start ();
        //  Ask underlying thread to stop.
        void stop ();
        //  Returns mailbox associated with this I/O thread.
        mailbox_t *get_mailbox ();

        //  i_poll_events implementation.
        void in_event ();
        void out_event ();
        void timer_event (int id_);
        //  Used by io_objects to retrieve the associated poller object.
        poller_t *get_poller ();
        //  Command handlers.
        void process_stop ();
        //  Returns load experienced by the I/O thread.
        int get_load ();
    private:
        //  I/O thread accesses incoming commands via this mailbox.
        mailbox_t mailbox;
        //  Handle associated with mailbox' file descriptor.
        poller_t::handle_t mailbox_handle;
        //  I/O multiplexing is performed using a poller object.
        poller_t *poller;
        io_thread_t (const io_thread_t&);
        const io_thread_t &operator = (const io_thread_t&);
    };

コンストラクション関数でmailbox_をtハンドルはpollerに組み込まれ、pollerに読み取りイベントを傍受させるため、信号が送られてくるとpollerが起動しio_が呼び出されるthread_tのin_event: