[ZeroMQ]libzmqソース読解のReactorメカニズム(mailbox,event)


[ZeroMQ]libzmqソース読解のReactorメカニズム(mailbox,event)
  • ZeroMQ libzmqソース読解のReactorメカニズムmailbox event
  • 信号員signaler
  • プロセス間通信
  • signaler実現
  • マルチプレクサpoller傍受ソケット
  • mailbox
  • IO Thread

  • まとめ
  • zmqは作成時に2種類のスレッドを起動します.1つはアプリケーションスレッド(Application thread)、2つはI/Oスレッドです.では、これらのスレッド間でどのように協力し、どのように通信しているのでしょうか.
    シグナルマンsignaler
    signalerはプロセス間で信号を伝達する責任を負う.MacはBSDシステムに属するため、プロセス間通信はsocketpairで実現される.
    プロセス間通信
    UNIXでは,socketは最初はネットワーク通信に用いられていたが,後にプロセス間通信にも適用された.ローカルループバックであるため,IPアドレスをバインドする必要がなく,傍受接続を必要としない.ソケットペアを直接設定します.
    #include 
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);

    これは全二重のパイプと考えられる.ポートsv[0]で送信し、sv[1]で受信します.逆に、sv[1]発、sv[0]収.
    Signaler実装signaler_tには2人のfdメンバーwrがあり、彼らは2つのsocketを代表している.実際にはフルデュプレクスではなく、1つの読み、1つの書きです.
        class signaler_t
        {
        public:
          ...
            fd_t get_fd () const;
            void send ();
            int wait (int timeout_);
            void recv ();
            int recv_failable ();
    
        private:
            //  Creates a pair of file descriptors that will be used
            //  to pass the signals.
            static int make_fdpair (fd_t *r_, fd_t *w_);
    
            //  Underlying write & read file descriptor
            //  Will be -1 if we exceeded number of available handles
            fd_t w;
            fd_t r;
          ...
        };
    
    //           r,      。
    zmq::fd_t zmq::signaler_t::get_fd () const
    {
        return r;
    }
    
    void zmq::signaler_t::send() {
      ...
        //     w     
        ::send(w, ...);
      ...
    }
    
    void zmq::signaler_t::recv() {
      ...
        //     r     
        ::recv(r, ...);
      ...
    }
    
    
    int zmq::signaler_t::make_fdpair(fd_t *r, fd_t *w){
      ...
      int sv [2];
      int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
      ...
            *w_ = sv [0];
            *r_ = sv [1];
      ...
    }

    マルチプレクサpollerリスニングsocket
    zmqでは,ポートリスニングは実際には**I/Oマルチプレクサ**pollerによって実現される.poller_tI/O多重化の抽象クラスであり、具体的にはzmqはシステムプラットフォームに応じてpoll,select,epoll,kqueueなどを選択する.ここでは,このI/O多重化が何であるかはともかくとする.簡単に言えば、pollerにsocket fdを登録すれば、pollerはそのsocketにデータが入っているかどうかを傍受したり、発行したりすることができる.
    kqueueを例にとると、loopで傍受する:
    void zmq::kqueue_t::loop ()
    {
        while (!stopping) {
            //  Wait for events.
            struct kevent ev_buf [max_io_events];
            int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, ...);
          ...
            for (int i = 0; i < n; i ++) {
                poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
              ...
                if (ev_buf [i].filter == EVFILT_READ)
                    pe->reactor->in_event ();   //     
            }
          ...
        }
    }

    mailbox
    mailbox_tには主に2つのメンバーがあり、1つはパイプであり、もう1つは信号員である.パイプは本当にデータの送受信に使われていますが、信号員はメールボックスからの手紙があるかどうかを監視するだけで、送信するときも信号を送信します.
        class mailbox_t : public i_mailbox
        {
        public:
          ...
          //          ,     poller ,       。
            fd_t get_fd () const;
          //   cmd
            void send (const command_t &cmd_);
          //   cmd
            int recv (command_t *cmd_, int timeout_);
          ...
        private:
          ...
            //   ,    cmd
            cpipe_t cpipe;
            //  Signaler to pass signals from writer thread to reader thread.
            signaler_t signaler;
          ...
        };

    I/O Thread io_thread_tメールボックスmailboxとイベント傍受pollerがある.
        class io_thread_t : public object_t, public i_poll_events
        {
        public:
          ...
            //  Launch the physical thread.
            void start ();
            //  Ask underlying thread to stop.
            void stop ();
    
            //  Returns mailbox associated with this I/O thread.
            mailbox_t *get_mailbox ();
    
            //  i_poll_events implementation.
            void in_event ();
            void out_event ();
    
            //  Used by io_objects to retrieve the associated poller object.
            poller_t *get_poller ();
          ...
    
        private:
            //  I/O thread accesses incoming commands via this mailbox.
            mailbox_t mailbox;
            //  Handle associated with mailbox' file descriptor.
            poller_t::handle_t mailbox_handle;
            //  I/O multiplexing is performed using a poller object.
            poller_t *poller;
          ...
        };
    io_thread作成時には、イベントを傍受するためのpollerを作成する必要がある.
    気づいたio_thread_t引き継いだi_poll_eventsだから彼自身がReactorとして使える.構築時には、poller自分のmailboxと自分自身(this):poller->add_fd (mailbox.get_fd (), this);を登録する必要があります.
    ここはとても微妙だと思いますがpoller->loop()でメールボックスmailboxからメッセージが入ってきたかを傍受し、データを透過することでトリガーin_event().上記add_fdに登録されているthisポインタだからこそ、自分のメンバー関数in_event()を簡単にコールバックできます.
    コンストラクション関数は次のとおりです.
    zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
        object_t (ctx_, tid_)
    {
        poller = new (std::nothrow) poller_t (*ctx_);
        alloc_assert (poller);
    
        mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
        poller->set_pollin (mailbox_handle);
    }

    まとめ
  • io_thread作成時poller自分のmailbox fdを登録し、thisイベントコールバック用ポインタ.
  • ctx_tで、slots [i] = io_thread->get_mailbox ();I/Oスレッドのメールボックスを登録します.そしてslots [tid_]->send (command_);を通じてこのメールボックスにコマンドを送ることができます.
  • 上記send()mailboxを呼び出しているsend()配管cpipeデータを書き込み、信号を送るsignaler->send().
  • signaler_t送信信号は、実は自分のソケットwで送信される.wrはペアなのでrデータを受信します.
  • メールボックスにコマンドが入るとpoller_t傍受します.poller傍受しているのはmailbox->get_fd()、このfdはまさに上記rです.mailbox->get_fd()戻ったのはsignaler->get_fd()で、signaler->get_fd()戻ったのはr.
  • メールボックスにコマンドが入っているのを監視しているreactor->in_event()イベントをトリガーする.io_thread作成時にpoller自分のthisポインタが登録されているためreactorまさにこれthisポインタであり、イベントトリガーを実現する.
  • io_threadin_event()で、呼び出しmailbox->recv().そこでmailboxはパイプcpipeに命令を読み出す.これでio_threadはコマンドを正常に受信しました!