php接続kafka


1、まずkafka拡張をインストールする
#  librdkafka:     : https://github.com/edenhill/librdkafka/releases/tag/v0.9.2
$   git clone https://github.com/edenhill/librdkafka.git
$  ./configure
$  make
$  sudo make install

#   rdkafka.so    :https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install

2、生産者コード例
  $rcf = new RdKafka\Conf();    $rcf->set('group.id', 'test'); //topicname    $cf = new RdKafka\TopicConf();    $cf->set('offset.store.method', 'broker');    $cf->set('auto.offset.reset', 'smallest');    $rk = new RdKafka\Producer($rcf);    $rk->setLogLevel(LOG_DEBUG);    $rk->addBrokers("127.0.0.1");//brokeraddr    $topic = $rk->newTopic("test", $cf); //topicname    for($i = 0; $i < 10; $i++) {       $topic->produce(0,0,'test' . $i);}3、消費者コード例
    $rcf = new RdKafka\Conf();    $rcf->set('group.id', 'test');    $rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion    $cf = new RdKafka\TopicConf();    $cf->set('auto.offset.reset', 'smallest');    $cf->set('auto.commit.enable', true);    $rk = new RdKafka\Consumer($rcf);    $rk->setLogLevel(LOG_DEBUG);    $rk->addBrokers("127.0.0.1");//brokeraddr    $topic = $rk->newTopic("test", $cf); //topicname,topicobject    $topic->consumeStart(0,10); //partition,offset    $msg = $topic->consume(0, 1000);  //partition,timeout    var_dump($msg);