RocketMQ開発指導の3——RocketMQプログラミング例
9421 ワード
ここでは、RocketMQを使用するC++クライアント(rocketmq-client-cpp)について説明し、最も簡単なサンプルコードでRocketMQプログラミングについて説明します.
1概要
GitHubの定義は次のとおりです.
RocketMQ-Client-CPP is the C/C++ client of Apache RocketMQ, a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
現在、RocketMQ-Client-CPPでは、次の機能がサポートされています. produce messages, including normal and delayed messages, synchronously or asynchronously consume messages, in cluster or broadcast model, concurrently or orderly c and c++ style API cross-platform, all features are supported on Windows, Linux and Mac OS automatically rebalanced, both in producing and consuming process reliability, any downtime broker or name server has no impact on the client
2インストール
2.1準備環境
rocketmq-client-cppのインストールはスクリプトbuildによって行われます.shは実装されていますが、インストールする前に、開発環境に次の表に必要なコンパイルソフトウェアとライブラリがインストールされていることを確認する必要があります.
ソフトウェア/ライブラリ名
バージョン番号
オペレーティングシステム
CentOS Linux release 7.5.1804
gcc-c++(c++ compiler while need support C++11)
4.8.5
cmake(build jsoncpp require it)
2.8.12.2
automake(build libevent require it)
1.13.4
autoconf(build libevent require it)
2.69
libtool(build libevent require it)
2.4.2
bzip2-devel(boost depend it)
1.0.6
zlib-devel(boost depend it)
1.2.7
2.2インストール
次の操作を行い、rocketmq-client-cppのインストールを完了します.
1.GitHubからrocketmq-client-cppソースパッケージをダウンロードして解凍し、https://github.com/apache/rocketmq-client-cpp/tree/1.2.1
2.解凍後のディレクトリに入り、buildを実行する.shは以下のようにインストール操作を行う.
説明: build.shスクリプトはrocketmq-client-cppのインストールに必要な依存ライブラリを自動的にダウンロードして構築します(libevent、json、boostを含む)これらのライブラリはrocketmq-client-cppフォルダの下に保存されます.その後、スクリプトを変更するとrocketmq-clientの静的ライブラリと動的ライブラリが構築されます.build.shスクリプトによる依存ライブラリの構築に失敗した場合は、libevent、json、boostのソースパッケージによってこれらの依存ライブラリを手動で構築する必要があります. サーバがネットワークに接続されていない場合、libevent-release-2.0を手動でダウンロードすることができる.22-stable.zip、jsoncpp-0.10.6.zipおよびboost_1_58_0.tar.gzの3つのソフトウェアのソースパッケージをrocketmq-client-cpp解凍後のルートディレクトリにアップロードし、buildを実行します.shでいいです.
3.前のステップのbuild.sh実行が完了すると、rocketmq-client-cppの静的ライブラリと動的ライブラリはrocketmq-client-cpp/binディレクトリに保存されます.また、これらのライブラリを使用してプログラムまたはライブラリを構築する場合は、他のいくつかのライブラリ(-lpthread-lz-ldl-lrt)をリンクする必要があります.例のコンパイルコマンドは次のとおりです.
3サンプルコード
ここでは、2つの簡単な生産者と消費者のサンプルコードを示します.
説明:生産環境コードで使用するrocketmqのヘッダファイルおよびライブラリファイルの場所は、実際の状況に応じて保存する必要があります.本稿ではrocketmq-client-cppソースパッケージのヘッダファイルおよびライブラリファイルを直接使用します.
3.1生産者サンプルコード
生産者サンプルコード(SimpleProducer.cpp)は以下の通りである.
上記のコードをコンパイルして実行した結果は、次のとおりです.
以上の結果から,生産者プログラムは10個のメッセージを生産(メッセージキューに送信)することに成功した.
3.2消費者サンプルコード
Pushモードの消費者サンプルコード(SimplePushConsumer.cpp)は以下の通りである.
上記のコードをコンパイルし、以下のように実行可能なプログラムを得る.
上記生成された消費者プログラムを実行し、消費者がメッセージリスニング状態に入ると、前にコンパイルされた生産者プログラム(メッセージを生成する)を実行し、結果は以下の通りである.
以上の結果から,消費者が生産者の生産に成功したというニュースが分かる.
1概要
GitHubの定義は次のとおりです.
RocketMQ-Client-CPP is the C/C++ client of Apache RocketMQ, a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
現在、RocketMQ-Client-CPPでは、次の機能がサポートされています.
2インストール
2.1準備環境
rocketmq-client-cppのインストールはスクリプトbuildによって行われます.shは実装されていますが、インストールする前に、開発環境に次の表に必要なコンパイルソフトウェアとライブラリがインストールされていることを確認する必要があります.
ソフトウェア/ライブラリ名
バージョン番号
オペレーティングシステム
CentOS Linux release 7.5.1804
gcc-c++(c++ compiler while need support C++11)
4.8.5
cmake(build jsoncpp require it)
2.8.12.2
automake(build libevent require it)
1.13.4
autoconf(build libevent require it)
2.69
libtool(build libevent require it)
2.4.2
bzip2-devel(boost depend it)
1.0.6
zlib-devel(boost depend it)
1.2.7
2.2インストール
次の操作を行い、rocketmq-client-cppのインストールを完了します.
1.GitHubからrocketmq-client-cppソースパッケージをダウンロードして解凍し、https://github.com/apache/rocketmq-client-cpp/tree/1.2.1
2.解凍後のディレクトリに入り、buildを実行する.shは以下のようにインストール操作を行う.
[root@node3 /opt/rocketmq-client-cpp-1.2.1]# sh build.sh
説明:
3.前のステップのbuild.sh実行が完了すると、rocketmq-client-cppの静的ライブラリと動的ライブラリはrocketmq-client-cpp/binディレクトリに保存されます.また、これらのライブラリを使用してプログラムまたはライブラリを構築する場合は、他のいくつかのライブラリ(-lpthread-lz-ldl-lrt)をリンクする必要があります.例のコンパイルコマンドは次のとおりです.
g++ -o consumer_example consumer_example.cpp -lrocketmq -lpthread -lz -ldl -lrt
3サンプルコード
ここでは、2つの簡単な生産者と消費者のサンプルコードを示します.
説明:生産環境コードで使用するrocketmqのヘッダファイルおよびライブラリファイルの場所は、実際の状況に応じて保存する必要があります.本稿ではrocketmq-client-cppソースパッケージのヘッダファイルおよびライブラリファイルを直接使用します.
3.1生産者サンプルコード
生産者サンプルコード(SimpleProducer.cpp)は以下の通りである.
/*
* Description: Simple Producer demo
*/
#include
#include
#include
#include
#include "CProducer.h"
#include "CMessage.h"
#include "CSendResult.h"
using namespace std;
// send message
void StartSendMessage(CProducer *producer)
{
CSendResult result;
// create message and set some values for it
CMessage *msg = CreateMessage("Test_Topic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
for (int i = 0; i < 10; i++)
{
// construct different body
string strMessageBody = "this is body number-" + to_string(i);
SetMessageBody(msg, strMessageBody.c_str());
// send message
SendMessageSync(producer, msg, &result);
cout << "send message[" << i << "], result status:" << (int)result.sendStatus << ", msgBody:" << strMessageBody << endl;
usleep(1000000);
}
// destroy message
DestroyMessage(msg);
}
int main(int argc, char *argv[])
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "192.168.213.128:9876;192.168.213.129:9876");
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
return 0;
}
上記のコードをコンパイルして実行した結果は、次のとおりです.
[root@node3 /opt/liitdar/rocketmq]# g++ -o SimpleProducer SimpleProducer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt -std=c++11
[root@node3 /opt/liitdar/rocketmq]# ./SimpleProducer
Producer Initializing.....
Producer start.....
send message[0], result status:0, msgBody:this is body number-0
send message[1], result status:0, msgBody:this is body number-1
send message[2], result status:0, msgBody:this is body number-2
send message[3], result status:0, msgBody:this is body number-3
send message[4], result status:0, msgBody:this is body number-4
send message[5], result status:0, msgBody:this is body number-5
send message[6], result status:0, msgBody:this is body number-6
send message[7], result status:0, msgBody:this is body number-7
send message[8], result status:0, msgBody:this is body number-8
send message[9], result status:0, msgBody:this is body number-9
Producer Shutdown!
[root@node3 /opt/liitdar/rocketmq]#
以上の結果から,生産者プログラムは10個のメッセージを生産(メッセージキューに送信)することに成功した.
3.2消費者サンプルコード
Pushモードの消費者サンプルコード(SimplePushConsumer.cpp)は以下の通りである.
/*
* Description: Simple push consumer demo
*/
#include
#include
#include
#include
#include "CPushConsumer.h"
#include "CMessageExt.h"
using namespace std;
// consume message
int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
{
cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt) << ", MsgTags:" << GetMessageTags(msgExt)
<< ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" << GetMessageBody(msgExt) << endl;
return E_CONSUME_SUCCESS;
}
int main(int argc, char *argv[])
{
cout << "Push consumer Initializing...." << endl;
// create push consumer and set some values for it
CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
SetPushConsumerNameServerAddress(consumer, "192.168.213.128:9876;192.168.213.129:9876");
Subscribe(consumer, "Test_Topic", "*");
// register message callback
RegisterMessageCallback(consumer, doConsumeMessage);
// start push consumer
StartPushConsumer(consumer);
cout << "Push consumer start, and listening message within 1min..." << endl;
for (int i = 0; i < 6; i++)
{
cout << "Already Running: " << (i * 10) << "S" << endl;
usleep(10000000);
}
// shutdown push consumer
ShutdownPushConsumer(consumer);
// destroy push consumer
DestroyPushConsumer(consumer);
cout << "PushConsumer Shutdown!" << endl;
return 0;
}
上記のコードをコンパイルし、以下のように実行可能なプログラムを得る.
[root@node3 /opt/liitdar/rocketmq]# g++ -o SimplePushConsumer SimplePushConsumer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt
上記生成された消費者プログラムを実行し、消費者がメッセージリスニング状態に入ると、前にコンパイルされた生産者プログラム(メッセージを生成する)を実行し、結果は以下の通りである.
[root@node3 /opt/liitdar/rocketmq]# ./SimplePushConsumer
Push consumer Initializing....
Push consumer start, and listening message within 1min...
Already Running: 0S
Already Running: 10S
Already Running: 20S
Already Running: 30S
Already Running: 40S
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-0
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-1
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-2
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-3
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-4
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-5
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-6
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-7
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-8
Already Running: 50S
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-9
PushConsumer Shutdown!
[root@node3 /opt/liitdar/rocketmq]#
以上の結果から,消費者が生産者の生産に成功したというニュースが分かる.