Qtはlinuxの下でkafkaクライアントの開発を実現する(三)
一.消費者の作成:
参照ドキュメント:https://blog.csdn.net/sinat_25929227/article/details/73614367
// ConsumerKafka.h
#ifndef CONSUMERKAFKA_H
#define CONSUMERKAFKA_H
#include
#include "lib/rdkafkacpp.h"
using std::string;
//class THTFKafkaCpp : public
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
static bool run = true;
static bool exit_eof = true;
// kafka msg_data
struct protodata
{
uint64_t uuid;
uint64_t position;
uint64_t next_position;
string gtid;
};
static vector fulltopic;
class MyEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s
",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
void msg_consume(RdKafka::Message* message, void* opaque) ;
class MyConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque) {
msg_consume(&msg, opaque);
}
};
static void sigterm (int sig) {
run = false;
}
class ConsumerKafka
{
public:
ConsumerKafka();
~ConsumerKafka(){}
bool LoadCppLiberary(const QString& dllName);
int InitKafka(int _partition, string broker, string _topic);
int init_kafka(int partition, string brokers, string topic);
int pull_data_from_kafka();
void pull_data_stop();
void destroy();
private:
RdKafka::Conf * global_conf;
RdKafka::Conf * topic_conf;
string brokers;
string errstr;
RdKafka::Consumer *consumer;
string topic_name ;
RdKafka::Topic *topic;
int32_t partition;
int64_t start_offset;
RdKafka::Message *msg;
bool m_bRun = false;
};
#endif // CONSUMERKAFKA_H
// ConsumerKafka.cpp
#include "ConsumerKafka.h"
#include
#include
#include
#include
bool g_bLoadedCppLib = false;
//typedef Conf *(*rd_conf_create_decl) (Conf::ConfType type);
ConsumerKafka::ConsumerKafka()
{
}
bool ConsumerKafka::LoadCppLiberary(const QString& dllName)
{
QLibrary lib(dllName);
if( !lib.load() )
{
//qDebug() << " dll " + dllName;
qDebug() << QString(" dll ") + dllName + "
error:" + lib.errorString();
return false;
}
else
{
qDebug() << QString(" dll !") + dllName;
g_bLoadedCppLib = true;
//rd_conf_create_decl thtf_conf_create = (rd_conf_create_decl)lib.resolve("Conf::create");
//global_conf = thtf_conf_create(RdKafka::Conf::CONF_GLOBAL);
//global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
}
return true;
}
int ConsumerKafka::InitKafka(int _partition, string broker, string _topic)
{
if( !g_bLoadedCppLib )
return -1;
//global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
//topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
return 0;
}
int ConsumerKafka::init_kafka(int _partition, string broker, string _topic)
{
global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
brokers = broker;
partition = _partition;
topic_name = _topic;
start_offset = RdKafka::Topic::OFFSET_BEGINNING;
global_conf->set("metadata.broker.list", brokers, errstr);
MyEventCb ex_event_cb;
global_conf->set("event_cb", &ex_event_cb, errstr);
/*
* Create consumer using accumulated global configuration.
*/
consumer = RdKafka::Consumer::create(global_conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
/* Create topic */
topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
void ConsumerKafka::destroy()
{
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
}
int ConsumerKafka::pull_data_from_kafka()
{
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
/*
* Consume messages
*/
MyConsumeCb ex_consume_cb;
int use_ccb = 0;
m_bRun = true;
QElapsedTimer t;
t.start();
//qDebug() << t.elapsed() << endl;
while(m_bRun)
{
QCoreApplication::processEvents();
if (use_ccb) {
// consumer->consume_callback(topic, partition, 1000,
// &ex_consume_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
}
void ConsumerKafka::pull_data_stop()
{
m_bRun = false;
}
void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
cout << static_cast(message->payload()) << endl;
break;
case RdKafka::ERR__PARTITION_EOF:
cout << "reach last message" << endl;
/* Last message */
if (exit_eof) {
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
//
int main(int argc, char **argv) {
// Process kill signal, quit from the loop
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
ConsummerKafka test;
test.init_kafka(0, "localhost", "Hello-Kafka");
test.pull_data_from_kafka();
}
参照ドキュメント:https://blog.csdn.net/sinat_25929227/article/details/73614367