Qtはlinuxの下でkafkaクライアントの開発を実現する(三)


一.消費者の作成:
// ConsumerKafka.h
#ifndef CONSUMERKAFKA_H
#define CONSUMERKAFKA_H

#include 
#include "lib/rdkafkacpp.h"
using std::string;

//class THTFKafkaCpp : public

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include

using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;

static bool run = true;
static bool exit_eof = true;

//  kafka      msg_data  


struct protodata
{
    uint64_t uuid;
    uint64_t position;
    uint64_t next_position;
    string gtid;
};

static vector fulltopic;



class MyEventCb : public RdKafka::EventCb {
public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
      {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s
", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; void msg_consume(RdKafka::Message* message, void* opaque) ; class MyConsumeCb : public RdKafka::ConsumeCb { public: void consume_cb (RdKafka::Message &msg, void *opaque) { msg_consume(&msg, opaque); } }; static void sigterm (int sig) { run = false; } class ConsumerKafka { public: ConsumerKafka(); ~ConsumerKafka(){} bool LoadCppLiberary(const QString& dllName); int InitKafka(int _partition, string broker, string _topic); int init_kafka(int partition, string brokers, string topic); int pull_data_from_kafka(); void pull_data_stop(); void destroy(); private: RdKafka::Conf * global_conf; RdKafka::Conf * topic_conf; string brokers; string errstr; RdKafka::Consumer *consumer; string topic_name ; RdKafka::Topic *topic; int32_t partition; int64_t start_offset; RdKafka::Message *msg; bool m_bRun = false; }; #endif // CONSUMERKAFKA_H // ConsumerKafka.cpp #include "ConsumerKafka.h" #include #include #include #include bool g_bLoadedCppLib = false; //typedef Conf *(*rd_conf_create_decl) (Conf::ConfType type); ConsumerKafka::ConsumerKafka() { } bool ConsumerKafka::LoadCppLiberary(const QString& dllName) { QLibrary lib(dllName); if( !lib.load() ) { //qDebug() << " dll " + dllName; qDebug() << QString(" dll ") + dllName + "
error:" + lib.errorString(); return false; } else { qDebug() << QString(" dll !") + dllName; g_bLoadedCppLib = true; //rd_conf_create_decl thtf_conf_create = (rd_conf_create_decl)lib.resolve("Conf::create"); //global_conf = thtf_conf_create(RdKafka::Conf::CONF_GLOBAL); //global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); } return true; } int ConsumerKafka::InitKafka(int _partition, string broker, string _topic) { if( !g_bLoadedCppLib ) return -1; //global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); //topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); return 0; } int ConsumerKafka::init_kafka(int _partition, string broker, string _topic) { global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); brokers = broker; partition = _partition; topic_name = _topic; start_offset = RdKafka::Topic::OFFSET_BEGINNING; global_conf->set("metadata.broker.list", brokers, errstr); MyEventCb ex_event_cb; global_conf->set("event_cb", &ex_event_cb, errstr); /* * Create consumer using accumulated global configuration. */ consumer = RdKafka::Consumer::create(global_conf, errstr); if (!consumer) { std::cerr << "Failed to create consumer: " << errstr << std::endl; exit(1); } /* Create topic */ topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr); if (!topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; exit(1); } } void ConsumerKafka::destroy() { consumer->stop(topic, partition); consumer->poll(1000); delete topic; delete consumer; } int ConsumerKafka::pull_data_from_kafka() { RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset); if (resp != RdKafka::ERR_NO_ERROR) { std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl; exit(1); } /* * Consume messages */ MyConsumeCb ex_consume_cb; int use_ccb = 0; m_bRun = true; QElapsedTimer t; t.start(); //qDebug() << t.elapsed() << endl; while(m_bRun) { QCoreApplication::processEvents(); if (use_ccb) { // consumer->consume_callback(topic, partition, 1000, // &ex_consume_cb, &use_ccb); } else { RdKafka::Message *msg = consumer->consume(topic, partition, 1000); msg_consume(msg, NULL); delete msg; } consumer->poll(0); } } void ConsumerKafka::pull_data_stop() { m_bRun = false; } void msg_consume(RdKafka::Message* message, void* opaque) { switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: /* Real message */ std::cout << "Read msg at offset " << message->offset() << std::endl; if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } cout << static_cast(message->payload()) << endl; break; case RdKafka::ERR__PARTITION_EOF: cout << "reach last message" << endl; /* Last message */ if (exit_eof) { run = false; } break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std::cerr << "Consume failed: " << message->errstr() << std::endl; run = false; break; default: /* Errors */ std::cerr << "Consume failed: " << message->errstr() << std::endl; run = false; } } // int main(int argc, char **argv) { // Process kill signal, quit from the loop signal(SIGINT, sigterm); signal(SIGTERM, sigterm); ConsummerKafka test; test.init_kafka(0, "localhost", "Hello-Kafka"); test.pull_data_from_kafka(); }

参照ドキュメント:https://blog.csdn.net/sinat_25929227/article/details/73614367