RocketMQ開発指導の3——RocketMQプログラミング例


ここでは、RocketMQを使用するC++クライアント(rocketmq-client-cpp)について説明し、最も簡単なサンプルコードでRocketMQプログラミングについて説明します.
1概要
GitHubの定義は次のとおりです.
RocketMQ-Client-CPP is the C/C++ client of Apache RocketMQ, a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
現在、RocketMQ-Client-CPPでは、次の機能がサポートされています.
  • produce messages, including normal and delayed messages, synchronously or asynchronously
  • consume messages, in cluster or broadcast model, concurrently or orderly
  • c and c++ style API
  • cross-platform, all features are supported on Windows, Linux and Mac OS
  • automatically rebalanced, both in producing and consuming process
  • reliability, any downtime broker or name server has no impact on the client

  • 2インストール
    2.1準備環境
    rocketmq-client-cppのインストールはスクリプトbuildによって行われます.shは実装されていますが、インストールする前に、開発環境に次の表に必要なコンパイルソフトウェアとライブラリがインストールされていることを確認する必要があります.
    ソフトウェア/ライブラリ名
    バージョン番号
    オペレーティングシステム
    CentOS Linux release 7.5.1804
    gcc-c++(c++ compiler while need support C++11)
    4.8.5
    cmake(build jsoncpp require it)
    2.8.12.2
    automake(build libevent require it)
    1.13.4
    autoconf(build libevent require it)
    2.69
    libtool(build libevent require it)
    2.4.2
    bzip2-devel(boost depend it)
    1.0.6
    zlib-devel(boost depend it)
    1.2.7
    2.2インストール
    次の操作を行い、rocketmq-client-cppのインストールを完了します.
    1.GitHubからrocketmq-client-cppソースパッケージをダウンロードして解凍し、https://github.com/apache/rocketmq-client-cpp/tree/1.2.1
    2.解凍後のディレクトリに入り、buildを実行する.shは以下のようにインストール操作を行う.
    [root@node3 /opt/rocketmq-client-cpp-1.2.1]# sh build.sh

    説明:
  • build.shスクリプトはrocketmq-client-cppのインストールに必要な依存ライブラリを自動的にダウンロードして構築します(libevent、json、boostを含む)これらのライブラリはrocketmq-client-cppフォルダの下に保存されます.その後、スクリプトを変更するとrocketmq-clientの静的ライブラリと動的ライブラリが構築されます.build.shスクリプトによる依存ライブラリの構築に失敗した場合は、libevent、json、boostのソースパッケージによってこれらの依存ライブラリを手動で構築する必要があります.
  • サーバがネットワークに接続されていない場合、libevent-release-2.0を手動でダウンロードすることができる.22-stable.zip、jsoncpp-0.10.6.zipおよびboost_1_58_0.tar.gzの3つのソフトウェアのソースパッケージをrocketmq-client-cpp解凍後のルートディレクトリにアップロードし、buildを実行します.shでいいです.

  • 3.前のステップのbuild.sh実行が完了すると、rocketmq-client-cppの静的ライブラリと動的ライブラリはrocketmq-client-cpp/binディレクトリに保存されます.また、これらのライブラリを使用してプログラムまたはライブラリを構築する場合は、他のいくつかのライブラリ(-lpthread-lz-ldl-lrt)をリンクする必要があります.例のコンパイルコマンドは次のとおりです.
    g++ -o consumer_example consumer_example.cpp -lrocketmq -lpthread -lz -ldl -lrt

    3サンプルコード
    ここでは、2つの簡単な生産者と消費者のサンプルコードを示します.
    説明:生産環境コードで使用するrocketmqのヘッダファイルおよびライブラリファイルの場所は、実際の状況に応じて保存する必要があります.本稿ではrocketmq-client-cppソースパッケージのヘッダファイルおよびライブラリファイルを直接使用します.
    3.1生産者サンプルコード
    生産者サンプルコード(SimpleProducer.cpp)は以下の通りである.
    /*
    * Description: Simple Producer demo
    */
    
    #include 
    #include 
    #include 
    #include 
    #include "CProducer.h"
    #include "CMessage.h"
    #include "CSendResult.h"
    
    using namespace std;
    
    // send message
    void StartSendMessage(CProducer *producer)
    {
        CSendResult result;
    
        // create message and set some values for it
        CMessage *msg = CreateMessage("Test_Topic");
        SetMessageTags(msg, "Test_Tag");
        SetMessageKeys(msg, "Test_Keys");
        
        for (int i = 0; i < 10; i++)
        {
            // construct different body
            string strMessageBody = "this is body number-" + to_string(i);
            
            SetMessageBody(msg, strMessageBody.c_str());
            // send message
            SendMessageSync(producer, msg, &result);
    
            cout << "send message[" << i << "], result status:" << (int)result.sendStatus << ", msgBody:" << strMessageBody << endl;
            usleep(1000000);
        }
    
        // destroy message
        DestroyMessage(msg);
    }
    
    int main(int argc, char *argv[])
    {
        cout << "Producer Initializing....." << endl;
    
        // create producer and set some values for it
        CProducer *producer = CreateProducer("Group_producer");
        SetProducerNameServerAddress(producer, "192.168.213.128:9876;192.168.213.129:9876");
        // start producer
        StartProducer(producer);
        cout << "Producer start....." << endl;
        // send message
        StartSendMessage(producer);
        // shutdown producer
        ShutdownProducer(producer);
        // destroy producer
        DestroyProducer(producer);
        cout << "Producer Shutdown!" << endl;
        
        return 0;
    }
    
    

    上記のコードをコンパイルして実行した結果は、次のとおりです.
    [root@node3 /opt/liitdar/rocketmq]# g++ -o SimpleProducer SimpleProducer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt -std=c++11
    [root@node3 /opt/liitdar/rocketmq]# ./SimpleProducer 
    Producer Initializing.....
    Producer start.....
    send message[0], result status:0, msgBody:this is body number-0
    send message[1], result status:0, msgBody:this is body number-1
    send message[2], result status:0, msgBody:this is body number-2
    send message[3], result status:0, msgBody:this is body number-3
    send message[4], result status:0, msgBody:this is body number-4
    send message[5], result status:0, msgBody:this is body number-5
    send message[6], result status:0, msgBody:this is body number-6
    send message[7], result status:0, msgBody:this is body number-7
    send message[8], result status:0, msgBody:this is body number-8
    send message[9], result status:0, msgBody:this is body number-9
    Producer Shutdown!
    [root@node3 /opt/liitdar/rocketmq]# 
    

    以上の結果から,生産者プログラムは10個のメッセージを生産(メッセージキューに送信)することに成功した.
    3.2消費者サンプルコード
    Pushモードの消費者サンプルコード(SimplePushConsumer.cpp)は以下の通りである.
    /*
    * Description: Simple push consumer demo
    */
    
    #include 
    #include 
    #include 
    #include 
    #include "CPushConsumer.h"
    #include "CMessageExt.h"
    
    using namespace std;
    
    // consume message
    int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
    {
        cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt) << ", MsgTags:" << GetMessageTags(msgExt)
            << ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" << GetMessageBody(msgExt) << endl;
    
        return E_CONSUME_SUCCESS;
    }
    
    int main(int argc, char *argv[])
    {
        cout << "Push consumer Initializing...." << endl;
        // create push consumer and set some values for it
        CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
        SetPushConsumerNameServerAddress(consumer, "192.168.213.128:9876;192.168.213.129:9876");
        Subscribe(consumer, "Test_Topic", "*");
        // register message callback
        RegisterMessageCallback(consumer, doConsumeMessage);
        // start push consumer
        StartPushConsumer(consumer);
        cout << "Push consumer start, and listening message within 1min..." << endl;
        for (int i = 0; i < 6; i++)
        {
            cout << "Already Running: " << (i * 10) << "S" << endl;
            usleep(10000000);
        }
        // shutdown push consumer
        ShutdownPushConsumer(consumer);
        // destroy push consumer
        DestroyPushConsumer(consumer);
        cout << "PushConsumer Shutdown!" << endl;
        
        return 0;
    }
    
    

    上記のコードをコンパイルし、以下のように実行可能なプログラムを得る.
    [root@node3 /opt/liitdar/rocketmq]# g++ -o SimplePushConsumer SimplePushConsumer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt

    上記生成された消費者プログラムを実行し、消費者がメッセージリスニング状態に入ると、前にコンパイルされた生産者プログラム(メッセージを生成する)を実行し、結果は以下の通りである.
    [root@node3 /opt/liitdar/rocketmq]# ./SimplePushConsumer 
    Push consumer Initializing....
    Push consumer start, and listening message within 1min...
    Already Running: 0S
    Already Running: 10S
    Already Running: 20S
    Already Running: 30S
    Already Running: 40S
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-0
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-1
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-2
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-3
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-4
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-5
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-6
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-7
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-8
    Already Running: 50S
    [Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-9
    PushConsumer Shutdown!
    [root@node3 /opt/liitdar/rocketmq]# 
    

    以上の結果から,消費者が生産者の生産に成功したというニュースが分かる.