zeroMQ初体験-32.パブリッシュ/サブスクリプションモードのステップアップ-クローンモード-上


パブリケーション/サブスクリプションモード、特に現実的なアプリケーションでは、そのような問題によってサブスクライバが必要なデータを失うことが多く、再取得のニーズがあります.通常、これは購読者が完成するが、「千百人のハムレット」は工事の観点から見ると、「多重化」の概念に完全に背いている.すると、「クローンモード」が呼び出されます.パブリケーション側にこれらのメッセージを格納し、キューの積み重ねを避けるためにこのようなカップも、より良い購読体験のためにkev-valueは良い選択のようです.
注意:ここのkev-valueは現在の赤いnosqlではありません(似ていますが)、パブリッシャーのデータウェアハウスとして理解できます(そう理解できるでしょう).
簡単に明らかにするために、ここではメカニズム全体を分解します.
データの保存の更新
モデル図:
サーバ:

//
//  Clone server Model One
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
    //  Prepare our context and publisher socket
    zctx_t *ctx = zctx_new ();
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5556");
    zclock_sleep (200);

    zhash_t *kvmap = zhash_new ();
    int64_t sequence = 0;
    srandom ((unsigned) time (NULL));

    while (!zctx_interrupted) {
        //  Distribute as key-value message
        kvmsg_t *kvmsg = kvmsg_new (++sequence);
        kvmsg_fmt_key  (kvmsg, "%d", randof (10000));
        kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
        kvmsg_send     (kvmsg, publisher);
        kvmsg_store   (&kvmsg, kvmap);
    }
    printf (" Interrupted
%d messages out
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; }

クライアント:

//
//  Clone client Model One
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
    //  Prepare our context and updates socket
    zctx_t *ctx = zctx_new ();
    void *updates = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (updates, "tcp://localhost:5556");

    zhash_t *kvmap = zhash_new ();
    int64_t sequence = 0;

    while (TRUE) {
        kvmsg_t *kvmsg = kvmsg_recv (updates);
        if (!kvmsg)
            break;          //  Interrupted
        kvmsg_store (&kvmsg, kvmap);
        sequence++;
    }
    printf (" Interrupted
%d messages in
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; }

key-valueライブラリ:

/*  =====================================================================
    kvsimple - simple key-value message class for example applications

    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.

    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.

    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/

#include "kvsimple.h"
#include "zlist.h"

//  Keys are short strings
#define KVMSG_KEY_MAX   255

//  Message is formatted on wire as 4 frames:
//  frame 0: key (0MQ string)
//  frame 1: sequence (8 bytes, network order)
//  frame 2: body (blob)
#define FRAME_KEY       0
#define FRAME_SEQ       1
#define FRAME_BODY      2
#define KVMSG_FRAMES    3

//  Structure of our class
struct _kvmsg {
    //  Presence indicators for each frame
    int present [KVMSG_FRAMES];
    //  Corresponding 0MQ message frames, if any
    zmq_msg_t frame [KVMSG_FRAMES];
    //  Key, copied into safe C string
    char key [KVMSG_KEY_MAX + 1];
};

//  ---------------------------------------------------------------------
//  Constructor, sets sequence as provided

kvmsg_t *
kvmsg_new (int64_t sequence)
{
    kvmsg_t
        *self;

    self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
    kvmsg_set_sequence (self, sequence);
    return self;
}

//  ---------------------------------------------------------------------
//  Destructor

//  Free shim, compatible with zhash_free_fn
void
kvmsg_free (void *ptr)
{
    if (ptr) {
        kvmsg_t *self = (kvmsg_t *) ptr;
        //  Destroy message frames if any
        int frame_nbr;
        for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
            if (self->present [frame_nbr])
                zmq_msg_close (&self->frame [frame_nbr]);

        //  Free object itself
        free (self);
    }
}

void
kvmsg_destroy (kvmsg_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        kvmsg_free (*self_p);
        *self_p = NULL;
    }
}

//  ---------------------------------------------------------------------
//  Reads key-value message from socket, returns new kvmsg instance.

kvmsg_t *
kvmsg_recv (void *socket)
{
    assert (socket);
    kvmsg_t *self = kvmsg_new (0);

    //  Read all frames off the wire, reject if bogus
    int frame_nbr;
    for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
        if (self->present [frame_nbr])
            zmq_msg_close (&self->frame [frame_nbr]);
        zmq_msg_init (&self->frame [frame_nbr]);
        self->present [frame_nbr] = 1;
        if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
            kvmsg_destroy (&self);
            break;
        }
        //  Verify multipart framing
        int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
        if (zsockopt_rcvmore (socket) != rcvmore) {
            kvmsg_destroy (&self);
            break;
        }
    }
    return self;
}

//  ---------------------------------------------------------------------
//  Send key-value message to socket; any empty frames are sent as such.

void
kvmsg_send (kvmsg_t *self, void *socket)
{
    assert (self);
    assert (socket);

    int frame_nbr;
    for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
        zmq_msg_t copy;
        zmq_msg_init (&copy);
        if (self->present [frame_nbr])
            zmq_msg_copy (&copy, &self->frame [frame_nbr]);
        zmq_sendmsg (socket, &copy,
            (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
        zmq_msg_close (&copy);
    }
}

//  ---------------------------------------------------------------------
//  Return key from last read message, if any, else NULL

char *
kvmsg_key (kvmsg_t *self)
{
    assert (self);
    if (self->present [FRAME_KEY]) {
        if (!*self->key) {
            size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
            if (size > KVMSG_KEY_MAX)
                size = KVMSG_KEY_MAX;
            memcpy (self->key,
                zmq_msg_data (&self->frame [FRAME_KEY]), size);
            self->key [size] = 0;
        }
        return self->key;
    }
    else
        return NULL;
}

//  ---------------------------------------------------------------------
//  Return sequence nbr from last read message, if any

int64_t
kvmsg_sequence (kvmsg_t *self)
{
    assert (self);
    if (self->present [FRAME_SEQ]) {
        assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
        byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
        int64_t sequence = ((int64_t) (source [0]) << 56)
                         + ((int64_t) (source [1]) << 48)
                         + ((int64_t) (source [2]) << 40)
                         + ((int64_t) (source [3]) << 32)
                         + ((int64_t) (source [4]) << 24)
                         + ((int64_t) (source [5]) << 16)
                         + ((int64_t) (source [6]) << 8)
                         +  (int64_t) (source [7]);
        return sequence;
    }
    else
        return 0;
}

//  ---------------------------------------------------------------------
//  Return body from last read message, if any, else NULL

byte *
kvmsg_body (kvmsg_t *self)
{
    assert (self);
    if (self->present [FRAME_BODY])
        return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
    else
        return NULL;
}

//  ---------------------------------------------------------------------
//  Return body size from last read message, if any, else zero

size_t
kvmsg_size (kvmsg_t *self)
{
    assert (self);
    if (self->present [FRAME_BODY])
        return zmq_msg_size (&self->frame [FRAME_BODY]);
    else
        return 0;
}

//  ---------------------------------------------------------------------
//  Set message key as provided

void
kvmsg_set_key (kvmsg_t *self, char *key)
{
    assert (self);
    zmq_msg_t *msg = &self->frame [FRAME_KEY];
    if (self->present [FRAME_KEY])
        zmq_msg_close (msg);
    zmq_msg_init_size (msg, strlen (key));
    memcpy (zmq_msg_data (msg), key, strlen (key));
    self->present [FRAME_KEY] = 1;
}

//  ---------------------------------------------------------------------
//  Set message sequence number

void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
    assert (self);
    zmq_msg_t *msg = &self->frame [FRAME_SEQ];
    if (self->present [FRAME_SEQ])
        zmq_msg_close (msg);
    zmq_msg_init_size (msg, 8);

    byte *source = zmq_msg_data (msg);
    source [0] = (byte) ((sequence >> 56) & 255);
    source [1] = (byte) ((sequence >> 48) & 255);
    source [2] = (byte) ((sequence >> 40) & 255);
    source [3] = (byte) ((sequence >> 32) & 255);
    source [4] = (byte) ((sequence >> 24) & 255);
    source [5] = (byte) ((sequence >> 16) & 255);
    source [6] = (byte) ((sequence >> 8)  & 255);
    source [7] = (byte) ((sequence)       & 255);

    self->present [FRAME_SEQ] = 1;
}

//  ---------------------------------------------------------------------
//  Set message body

void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
    assert (self);
    zmq_msg_t *msg = &self->frame [FRAME_BODY];
    if (self->present [FRAME_BODY])
        zmq_msg_close (msg);
    self->present [FRAME_BODY] = 1;
    zmq_msg_init_size (msg, size);
    memcpy (zmq_msg_data (msg), body, size);
}

//  ---------------------------------------------------------------------
//  Set message key using printf format

void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
    char value [KVMSG_KEY_MAX + 1];
    va_list args;

    assert (self);
    va_start (args, format);
    vsnprintf (value, KVMSG_KEY_MAX, format, args);
    va_end (args);
    kvmsg_set_key (self, value);
}

//  ---------------------------------------------------------------------
//  Set message body using printf format

void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
    char value [255 + 1];
    va_list args;

    assert (self);
    va_start (args, format);
    vsnprintf (value, 255, format, args);
    va_end (args);
    kvmsg_set_body (self, (byte *) value, strlen (value));
}

//  ---------------------------------------------------------------------
//  Store entire kvmsg into hash map, if key/value are set
//  Nullifies kvmsg reference, and destroys automatically when no longer
//  needed.

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
    assert (self_p);
    if (*self_p) {
        kvmsg_t *self = *self_p;
        assert (self);
        if (self->present [FRAME_KEY]
        &&  self->present [FRAME_BODY]) {
            zhash_update (hash, kvmsg_key (self), self);
            zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
        }
        *self_p = NULL;
    }
}

//  ---------------------------------------------------------------------
//  Dump message to stderr, for debugging and tracing

void
kvmsg_dump (kvmsg_t *self)
{
    if (self) {
        if (!self) {
            fprintf (stderr, "NULL");
            return;
        }
        size_t size = kvmsg_size (self);
        byte  *body = kvmsg_body (self);
        fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
        fprintf (stderr, "[key:%s]", kvmsg_key (self));
        fprintf (stderr, "[size:%zd] ", size);
        int char_nbr;
        for (char_nbr = 0; char_nbr < size; char_nbr++)
            fprintf (stderr, "%02X", body [char_nbr]);
        fprintf (stderr, "
");     }     else         fprintf (stderr, "NULL message
"); } //  --------------------------------------------------------------------- //  Runs self test of class int kvmsg_test (int verbose) {     kvmsg_t         *kvmsg;     printf (" * kvmsg: ");     //  Prepare our context and sockets     zctx_t *ctx = zctx_new ();     void *output = zsocket_new (ctx, ZMQ_DEALER);     int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");     assert (rc == 0);     void *input = zsocket_new (ctx, ZMQ_DEALER);     rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");     assert (rc == 0);     zhash_t *kvmap = zhash_new ();     //  Test send and receive of simple message     kvmsg = kvmsg_new (1);     kvmsg_set_key  (kvmsg, "key");     kvmsg_set_body (kvmsg, (byte *) "body", 4);     if (verbose)         kvmsg_dump (kvmsg);     kvmsg_send (kvmsg, output);     kvmsg_store (&kvmsg, kvmap);     kvmsg = kvmsg_recv (input);     if (verbose)         kvmsg_dump (kvmsg);     assert (streq (kvmsg_key (kvmsg), "key"));     kvmsg_store (&kvmsg, kvmap);     //  Shutdown and destroy all objects     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     printf ("OK
");     return 0; }

keyによるデータ取得
実際、サブスクライバがkeyを発行してデータを取得できるようになったとき、それはもう純粋なサブスクライバではなく、クライアントの呼称がもっと適切になるかもしれません.
モデル図:
サーバ:

//
//  Clone server Model Two
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

static int s_send_single (char *key, void *data, void *args);
static void state_manager (void *args, zctx_t *ctx, void *pipe);

int main (void)
{
    //  Prepare our context and sockets
    zctx_t *ctx = zctx_new ();
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5557");

    int64_t sequence = 0;
    srandom ((unsigned) time (NULL));

    //  Start state manager and wait for synchronization signal
    void *updates = zthread_fork (ctx, state_manager, NULL);
    free (zstr_recv (updates));

    while (!zctx_interrupted) {
        //  Distribute as key-value message
        kvmsg_t *kvmsg = kvmsg_new (++sequence);
        kvmsg_fmt_key  (kvmsg, "%d", randof (10000));
        kvmsg_fmt_body (kvmsg, "%d", randof (1000000));
        kvmsg_send     (kvmsg, publisher);
        kvmsg_send     (kvmsg, updates);
        kvmsg_destroy (&kvmsg);
    }
    printf (" Interrupted
%d messages out
", (int) sequence);     zctx_destroy (&ctx);     return 0; } //  Routing information for a key-value snapshot typedef struct {     void *socket;           //  ROUTER socket to send to     zframe_t *identity;     //  Identity of peer who requested state } kvroute_t; //  Send one state snapshot key-value pair to a socket //  Hash item data is our kvmsg object, ready to send static int s_send_single (char *key, void *data, void *args) {     kvroute_t *kvroute = (kvroute_t *) args;     //  Send identity of recipient first     zframe_send (&kvroute->identity,         kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);     kvmsg_t *kvmsg = (kvmsg_t *) data;     kvmsg_send (kvmsg, kvroute->socket);     return 0; } //  This thread maintains the state and handles requests from //  clients for snapshots. // static void state_manager (void *args, zctx_t *ctx, void *pipe) {     zhash_t *kvmap = zhash_new ();     zstr_send (pipe, "READY");     void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);     zsocket_bind (snapshot, "tcp://*:5556");     zmq_pollitem_t items [] = {         { pipe, 0, ZMQ_POLLIN, 0 },         { snapshot, 0, ZMQ_POLLIN, 0 }     };     int64_t sequence = 0;       //  Current snapshot version number     while (!zctx_interrupted) {         int rc = zmq_poll (items, 2, -1);         if (rc == -1 && errno == ETERM)             break;              //  Context has been shut down         //  Apply state update from main thread         if (items [0].revents & ZMQ_POLLIN) {             kvmsg_t *kvmsg = kvmsg_recv (pipe);             if (!kvmsg)                 break;          //  Interrupted             sequence = kvmsg_sequence (kvmsg);             kvmsg_store (&kvmsg, kvmap);         }         //  Execute state snapshot request         if (items [1].revents & ZMQ_POLLIN) {             zframe_t *identity = zframe_recv (snapshot);             if (!identity)                 break;          //  Interrupted             //  Request is in second frame of message             char *request = zstr_recv (snapshot);             if (streq (request, "ICANHAZ?"))                 free (request);             else {                 printf ("E: bad request, aborting
");                 break;             }             //  Send state snapshot to client             kvroute_t routing = { snapshot, identity };             //  For each entry in kvmap, send kvmsg to client             zhash_foreach (kvmap, s_send_single, &routing);             //  Now send END message with sequence number             printf ("Sending state shapshot=%d
", (int) sequence);             zframe_send (&identity, snapshot, ZFRAME_MORE);             kvmsg_t *kvmsg = kvmsg_new (sequence);             kvmsg_set_key  (kvmsg, "KTHXBAI");             kvmsg_set_body (kvmsg, (byte *) "", 0);             kvmsg_send     (kvmsg, snapshot);             kvmsg_destroy (&kvmsg);         }     }     zhash_destroy (&kvmap); }

クライアント:

//
//  Clone client Model Two
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
    //  Prepare our context and subscriber
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (snapshot, "tcp://localhost:5556");
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (subscriber, "tcp://localhost:5557");

    zhash_t *kvmap = zhash_new ();

    //  Get state snapshot
    int64_t sequence = 0;
    zstr_send (snapshot, "ICANHAZ?");
    while (TRUE) {
        kvmsg_t *kvmsg = kvmsg_recv (snapshot);
        if (!kvmsg)
            break;          //  Interrupted
        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
            sequence = kvmsg_sequence (kvmsg);
            printf ("Received snapshot=%d
", (int) sequence);             kvmsg_destroy (&kvmsg);             break;          //  Done         }         kvmsg_store (&kvmsg, kvmap);     }     //  Now apply pending updates, discard out-of-sequence messages     while (!zctx_interrupted) {         kvmsg_t *kvmsg = kvmsg_recv (subscriber);         if (!kvmsg)             break;          //  Interrupted         if (kvmsg_sequence (kvmsg) > sequence) {             sequence = kvmsg_sequence (kvmsg);             kvmsg_store (&kvmsg, kvmap);         }         else             kvmsg_destroy (&kvmsg);     }     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; }

更新の再発行
上のモデルでは、データが一点に集中しており、サーバがクラッシュしてデータが失われる恐れがあるかもしれませんが、クライアントにデータを置くのでしょうか.
モデル図:
サーバ:

//
//  Clone server Model Three
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

static int s_send_single (char *key, void *data, void *args);

//  Routing information for a key-value snapshot
typedef struct {
    void *socket;           //  ROUTER socket to send to
    zframe_t *identity;     //  Identity of peer who requested state
} kvroute_t;

int main (void)
{
    //  Prepare our context and sockets
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (snapshot, "tcp://*:5556");
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5557");
    void *collector = zsocket_new (ctx, ZMQ_PULL);
    zsocket_bind (collector, "tcp://*:5558");

    int64_t sequence = 0;
    zhash_t *kvmap = zhash_new ();

    zmq_pollitem_t items [] = {
        { collector, 0, ZMQ_POLLIN, 0 },
        { snapshot, 0, ZMQ_POLLIN, 0 }
    };
    while (!zctx_interrupted) {
        int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

        //  Apply state update sent from client
        if (items [0].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (collector);
            if (!kvmsg)
                break;          //  Interrupted
            kvmsg_set_sequence (kvmsg, ++sequence);
            kvmsg_send (kvmsg, publisher);
            kvmsg_store (&kvmsg, kvmap);
            printf ("I: publishing update %5d
", (int) sequence);         }         //  Execute state snapshot request         if (items [1].revents & ZMQ_POLLIN) {             zframe_t *identity = zframe_recv (snapshot);             if (!identity)                 break;          //  Interrupted             //  Request is in second frame of message             char *request = zstr_recv (snapshot);             if (streq (request, "ICANHAZ?"))                 free (request);             else {                 printf ("E: bad request, aborting
");                 break;             }             //  Send state snapshot to client             kvroute_t routing = { snapshot, identity };             //  For each entry in kvmap, send kvmsg to client             zhash_foreach (kvmap, s_send_single, &routing);             //  Now send END message with sequence number             printf ("I: sending shapshot=%d
", (int) sequence);             zframe_send (&identity, snapshot, ZFRAME_MORE);             kvmsg_t *kvmsg = kvmsg_new (sequence);             kvmsg_set_key  (kvmsg, "KTHXBAI");             kvmsg_set_body (kvmsg, (byte *) "", 0);             kvmsg_send     (kvmsg, snapshot);             kvmsg_destroy (&kvmsg);         }     }     printf (" Interrupted
%d messages handled
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; } //  Send one state snapshot key-value pair to a socket //  Hash item data is our kvmsg object, ready to send static int s_send_single (char *key, void *data, void *args) {     kvroute_t *kvroute = (kvroute_t *) args;     //  Send identity of recipient first     zframe_send (&kvroute->identity,         kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);     kvmsg_t *kvmsg = (kvmsg_t *) data;     kvmsg_send (kvmsg, kvroute->socket);     return 0; }

クライアント:

//
//  Clone client Model Three
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

int main (void)
{
    //  Prepare our context and subscriber
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (snapshot, "tcp://localhost:5556");
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (subscriber, "tcp://localhost:5557");
    void *publisher = zsocket_new (ctx, ZMQ_PUSH);
    zsocket_connect (publisher, "tcp://localhost:5558");

    zhash_t *kvmap = zhash_new ();
    srandom ((unsigned) time (NULL));

    //  Get state snapshot
    int64_t sequence = 0;
    zstr_send (snapshot, "ICANHAZ?");
    while (TRUE) {
        kvmsg_t *kvmsg = kvmsg_recv (snapshot);
        if (!kvmsg)
            break;          //  Interrupted
        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
            sequence = kvmsg_sequence (kvmsg);
            printf ("I: received snapshot=%d
", (int) sequence);             kvmsg_destroy (&kvmsg);             break;          //  Done         }         kvmsg_store (&kvmsg, kvmap);     }     int64_t alarm = zclock_time () + 1000;     while (!zctx_interrupted) {         zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };         int tickless = (int) ((alarm - zclock_time ()));         if (tickless < 0)             tickless = 0;         int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);         if (rc == -1)             break;              //  Context has been shut down         if (items [0].revents & ZMQ_POLLIN) {             kvmsg_t *kvmsg = kvmsg_recv (subscriber);             if (!kvmsg)                 break;          //  Interrupted             //  Discard out-of-sequence kvmsgs, incl. heartbeats             if (kvmsg_sequence (kvmsg) > sequence) {                 sequence = kvmsg_sequence (kvmsg);                 kvmsg_store (&kvmsg, kvmap);                 printf ("I: received update=%d
", (int) sequence);             }             else                 kvmsg_destroy (&kvmsg);         }         //  If we timed-out, generate a random kvmsg         if (zclock_time () >= alarm) {             kvmsg_t *kvmsg = kvmsg_new (0);             kvmsg_fmt_key  (kvmsg, "%d", randof (10000));             kvmsg_fmt_body (kvmsg, "%d", randof (1000000));             kvmsg_send     (kvmsg, publisher);             kvmsg_destroy (&kvmsg);             alarm = zclock_time () + 1000;         }     }     printf (" Interrupted
%d messages in
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; }

クローンサブツリー
実際、すべての消費者がパブリッシャーが提供するすべての情報を消費することを望んでいるわけではありません.では、特別なグループに対して、サブセットを1つ提供すればいいだけです.
サーバ:

//
//  Clone server Model Four
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

static int s_send_single (char *key, void *data, void *args);

//  Routing information for a key-value snapshot
typedef struct {
    void *socket;           //  ROUTER socket to send to
    zframe_t *identity;     //  Identity of peer who requested state
    char *subtree;          //  Client subtree specification
} kvroute_t;

int main (void)
{
    //  Prepare our context and sockets
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (snapshot, "tcp://*:5556");
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5557");
    void *collector = zsocket_new (ctx, ZMQ_PULL);
    zsocket_bind (collector, "tcp://*:5558");

    int64_t sequence = 0;
    zhash_t *kvmap = zhash_new ();

    zmq_pollitem_t items [] = {
        { collector, 0, ZMQ_POLLIN, 0 },
        { snapshot, 0, ZMQ_POLLIN, 0 }
    };
    while (!zctx_interrupted) {
        int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);

        //  Apply state update sent from client
        if (items [0].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (collector);
            if (!kvmsg)
                break;          //  Interrupted
            kvmsg_set_sequence (kvmsg, ++sequence);
            kvmsg_send (kvmsg, publisher);
            kvmsg_store (&kvmsg, kvmap);
            printf ("I: publishing update %5d
", (int) sequence);         }         //  Execute state snapshot request         if (items [1].revents & ZMQ_POLLIN) {             zframe_t *identity = zframe_recv (snapshot);             if (!identity)                 break;          //  Interrupted             //  Request is in second frame of message             char *request = zstr_recv (snapshot);             char *subtree = NULL;             if (streq (request, "ICANHAZ?")) {                 free (request);                 subtree = zstr_recv (snapshot);             }             else {                 printf ("E: bad request, aborting
");                 break;             }             //  Send state snapshot to client             kvroute_t routing = { snapshot, identity, subtree };             //  For each entry in kvmap, send kvmsg to client             zhash_foreach (kvmap, s_send_single, &routing);             //  Now send END message with sequence number             printf ("I: sending shapshot=%d
", (int) sequence);             zframe_send (&identity, snapshot, ZFRAME_MORE);             kvmsg_t *kvmsg = kvmsg_new (sequence);             kvmsg_set_key  (kvmsg, "KTHXBAI");             kvmsg_set_body (kvmsg, (byte *) subtree, 0);             kvmsg_send     (kvmsg, snapshot);             kvmsg_destroy (&kvmsg);             free (subtree);         }     }     printf (" Interrupted
%d messages handled
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; } //  Send one state snapshot key-value pair to a socket //  Hash item data is our kvmsg object, ready to send static int s_send_single (char *key, void *data, void *args) {     kvroute_t *kvroute = (kvroute_t *) args;     kvmsg_t *kvmsg = (kvmsg_t *) data;     if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))     &&  memcmp (kvroute->subtree,                 kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {         //  Send identity of recipient first         zframe_send (&kvroute->identity,             kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);         kvmsg_send (kvmsg, kvroute->socket);     }     return 0; }

クライアント:

//
//  Clone client Model Four
//

//  Lets us build this source without creating a library
#include "kvsimple.c"

#define SUBTREE "/client/"

int main (void)
{
    //  Prepare our context and subscriber
    zctx_t *ctx = zctx_new ();
    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (snapshot, "tcp://localhost:5556");
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (subscriber, "tcp://localhost:5557");
    zsockopt_set_subscribe (subscriber, SUBTREE);
    void *publisher = zsocket_new (ctx, ZMQ_PUSH);
    zsocket_connect (publisher, "tcp://localhost:5558");

    zhash_t *kvmap = zhash_new ();
    srandom ((unsigned) time (NULL));

    //  Get state snapshot
    int64_t sequence = 0;
    zstr_sendm (snapshot, "ICANHAZ?");
    zstr_send  (snapshot, SUBTREE);
    while (TRUE) {
        kvmsg_t *kvmsg = kvmsg_recv (snapshot);
        if (!kvmsg)
            break;          //  Interrupted
        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
            sequence = kvmsg_sequence (kvmsg);
            printf ("I: received snapshot=%d
", (int) sequence);             kvmsg_destroy (&kvmsg);             break;          //  Done         }         kvmsg_store (&kvmsg, kvmap);     }     int64_t alarm = zclock_time () + 1000;     while (!zctx_interrupted) {         zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };         int tickless = (int) ((alarm - zclock_time ()));         if (tickless < 0)             tickless = 0;         int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);         if (rc == -1)             break;              //  Context has been shut down         if (items [0].revents & ZMQ_POLLIN) {             kvmsg_t *kvmsg = kvmsg_recv (subscriber);             if (!kvmsg)                 break;          //  Interrupted             //  Discard out-of-sequence kvmsgs, incl. heartbeats             if (kvmsg_sequence (kvmsg) > sequence) {                 sequence = kvmsg_sequence (kvmsg);                 kvmsg_store (&kvmsg, kvmap);                 printf ("I: received update=%d
", (int) sequence);             }             else                 kvmsg_destroy (&kvmsg);         }         //  If we timed-out, generate a random kvmsg         if (zclock_time () >= alarm) {             kvmsg_t *kvmsg = kvmsg_new (0);             kvmsg_fmt_key  (kvmsg, "%s%d", SUBTREE, randof (10000));             kvmsg_fmt_body (kvmsg, "%d", randof (1000000));             kvmsg_send     (kvmsg, publisher);             kvmsg_destroy (&kvmsg);             alarm = zclock_time () + 1000;         }     }     printf (" Interrupted
%d messages in
", (int) sequence);     zhash_destroy (&kvmap);     zctx_destroy (&ctx);     return 0; }

(未完待機)