php接続kafka
1671 ワード
1、まずkafka拡張をインストールする
2、生産者コード例
$rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); //topicname $cf = new RdKafka\TopicConf(); $cf->set('offset.store.method', 'broker'); $cf->set('auto.offset.reset', 'smallest'); $rk = new RdKafka\Producer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1");//brokeraddr $topic = $rk->newTopic("test", $cf); //topicname for($i = 0; $i < 10; $i++) { $topic->produce(0,0,'test' . $i);}3、消費者コード例
$rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); $rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion $cf = new RdKafka\TopicConf(); $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1");//brokeraddr $topic = $rk->newTopic("test", $cf); //topicname,topicobject $topic->consumeStart(0,10); //partition,offset $msg = $topic->consume(0, 1000); //partition,timeout var_dump($msg);
# librdkafka: : https://github.com/edenhill/librdkafka/releases/tag/v0.9.2
$ git clone https://github.com/edenhill/librdkafka.git
$ ./configure
$ make
$ sudo make install
# rdkafka.so :https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install
2、生産者コード例
$rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); //topicname $cf = new RdKafka\TopicConf(); $cf->set('offset.store.method', 'broker'); $cf->set('auto.offset.reset', 'smallest'); $rk = new RdKafka\Producer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1");//brokeraddr $topic = $rk->newTopic("test", $cf); //topicname for($i = 0; $i < 10; $i++) { $topic->produce(0,0,'test' . $i);}3、消費者コード例
$rcf = new RdKafka\Conf(); $rcf->set('group.id', 'test'); $rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion $cf = new RdKafka\TopicConf(); $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1");//brokeraddr $topic = $rk->newTopic("test", $cf); //topicname,topicobject $topic->consumeStart(0,10); //partition,offset $msg = $topic->consume(0, 1000); //partition,timeout var_dump($msg);