Kafka-PHP使用Rdkafka生産/消費データ

5011 ワード

Kafkaクラスタ配備
rdkafkaのインストールrdkafka依存libkafka
yum install rdkafka rdkafka-devel
pecl install rdkafka
php --ri rdkafka

http://pecl.php.net/package/r...サポートされているkafkaクライアントバージョンを参照
生産者
クラスタ接続、作成topic、生産データ.
setLogLevel(LOG_DEBUG);

//   kafka  
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

//   topic
$topic = $rk->newTopic("topic_1");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (\Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}

消費者-HightLevel
自動配分partition・、rebalancecomsumer group.
setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(null);
            break;
        default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'group_1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'topic_1'
$consumer->subscribe(['topic_1']);

echo "Waiting for partition assignment... (make take some time when
"; echo "quickly re-joining the group after leaving it.)
"; while (true) { $message = $consumer->consume(3e3); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: sleep(2); case RD_KAFKA_RESP_ERR__TIMED_OUT: echo $message->errstr() . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } }

消費者-LowLevel
指定partition消費.php consumer_lowlevel.php [partitonNuo]LowLevel消費グループの概念がなく、消費者ごとに独立した消費グループに属していると考えてもよい.
set('group.id', 'group_2');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers('192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 2000);

// Set the offset store method to 'file'
// $topicConf->set('offset.store.method', 'file');
// $topicConf->set('offset.store.path', sys_get_temp_dir());

// Alternatively, set the offset store method to 'broker'
$topicConf->set('offset.store.method', 'broker');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume($partition, 3 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}