kafka javaアクセス
15735 ワード
kafkaが時間的にアクセスする場合のプログラムログで得られるinitoffsetは-1であり、情報が得られないのでkafkaを調整することができる.zookeeper.session.timeout.ms=1000000で解決
使用時に遭遇:Failed to send messages after 3 tries.
serverを変更します....name=localhost localhostをあなたのipに変更
mavenパッケージ
org.apache.kafka
kafka_2.8.0
0.8.1.1
1:生産者コード
2:消費者コード
注:consumerMap.get(topic)はtopicのpatitonストリームのセットを得て、それらは自分のoffsetを持っています.ストリームごとに1つのconsumerに対応
3:生産者(複数partition)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
カスタム分割クラス
4:消費者(複数のパーティションを扱う)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
スレッド数がpartitionより多い場合、メッセージが得られないスレッドもあります.スレッドがpartitionより小さい場合、スレッドは複数のpartitionを処理する.最適スレッド数はpartition数と一致する
と書く
package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread "+ m_threadNumber + ": "+ new String(it.next().message()));
System.out.println("Shutting down Thread: "+ m_threadNumber);
}
}
props.put("auto.commit.enable", "false"); 消費者はoffsetを自動的にコミットしません
転載する
PRODUCERのAPI
1.Producerの作成は、ProducerConfigpublic Producer(ProducerConfig config)に依存する.
2.単一または一括のメッセージ送信public void send(KeyedMessagemessage);public void send(List> messages);
3.Producerからすべてのbrokerへの接続public void close()をオフにします.
二CONSUMERのハイレベルAPI
主にConsumerとConsumer Connectorで、ここのConsumerはConsumer Connectorの静的ファクトリクラスclass Consumer{public static kafka.javaapi.consumer.C onsumerConnector createJavaConsumer Connector(config:Consumer Config);}
具体的なメッセージの消費は、Consumer Connectorでメッセージ処理のストリームを作成し、すべてのtopicを含み、指定されたDecoderpublicMap>createMessageStreams(MaptopicCountMap,DecoderkeyDecoder,DecodervalueDecoder)に従う.
すべてのtopicを含むメッセージ処理ストリームを作成し、デフォルトのDecoderpublic Map>createMessageStreams(MaptopicCountMap)を使用します.
指定されたメッセージのtopicを取得し、指定されたDecoderpublicList>createMessageStreamsByFilter(TopicFilter topicFilter,int numStreams,DecoderkeyDecoder,DecodervalueDecoder)に従う.
指定したメッセージのtopicを取得し、デフォルトのDecoderpublic List>createMessageStreamsByFilter(TopicFilter topicFilter);
この消費者接続のtopicpublic void commitOffsets()にオフセットをコミットします.
消費者public void shutdown()を閉じる.
上層部のAPIでよく使われるのはpublic List>createMessageStreamsByFilter(TopicFilter topicFilter);およびpublic void commitOffsets()
三CONSUMERの簡単なAPI–SIMPLECONSUMER
一括取得メッセージpublic FetchResponse fetch(request:kafka.javaapi.FetchResponse fetch);
topicのメタ情報public kafkaを取得する.javaapi.TopicMetadataResponse send(request: kafka.javaapi.TopicMetadataRequest);
現在利用可能なオフセット量public kafkaを取得する.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
接続public void close()を閉じます.
ほとんどのアプリケーションでは、上位APIは十分に使用することができるが、より一層の制御を行うには、簡単なAPI、例えば消費者が再起動する場合、最新のoffsetを得るにはSimpleConsumerを使用することができる.
四KAFKA HADOOP CONSUMER API
hadoopの使用に関連して、水平に伸縮可能なソリューションが提供されています.
https://github.com/linkedin/camus/tree/camus-kafka-0.8/
五実戦
maven依存:
org.apache.kafka kafka_2.10 0.8.0
生産者コード:
消費者側コード
使用時に遭遇:Failed to send messages after 3 tries.
serverを変更します....name=localhost localhostをあなたのipに変更
mavenパッケージ
1:生産者コード
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerSample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "183.57.57.76:2181/YRFS/test-datacenter/test-server"); // kafkaserver.properties zookeeper.connect
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "183.57.57.76:9092");// kafka
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("gbz", "test-message2"); //topic
producer.send(data);
producer.close(); // , kafka , logs/server.log
}
}
2:消費者コード
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;
private final String topic;
public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("gbz");
consumerThread.start();
}
public ConsumerTest(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "183.57.57.76:2181/YRFS/test-datacenter/test-server");
props.put("group.id", "0"); //
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println(new String(it.next().message()));
}
}
注:consumerMap.get(topic)はtopicのpatitonストリームのセットを得て、それらは自分のoffsetを持っています.ストリームごとに1つのconsumerに対応
3:生産者(複数partition)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");// , partition
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");// partition
props.put("request.required.acks", "1"); //
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);// :topic,key( , partition ),message
producer.send(data);
}
producer.close();
}
}
カスタム分割クラス
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {// ,
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
4:消費者(複数のパーティションを扱う)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
package com.test.groups;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
スレッド数がpartitionより多い場合、メッセージが得られないスレッドもあります.スレッドがpartitionより小さい場合、スレッドは複数のpartitionを処理する.最適スレッド数はpartition数と一致する
と書く
package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator
while (it.hasNext())
System.out.println("Thread "+ m_threadNumber + ": "+ new String(it.next().message()));
System.out.println("Shutting down Thread: "+ m_threadNumber);
}
}
props.put("auto.commit.enable", "false"); 消費者はoffsetを自動的にコミットしません
転載する
PRODUCERのAPI
1.Producerの作成は、ProducerConfigpublic Producer(ProducerConfig config)に依存する.
2.単一または一括のメッセージ送信public void send(KeyedMessage
3.Producerからすべてのbrokerへの接続public void close()をオフにします.
二CONSUMERのハイレベルAPI
主にConsumerとConsumer Connectorで、ここのConsumerはConsumer Connectorの静的ファクトリクラスclass Consumer{public static kafka.javaapi.consumer.C onsumerConnector createJavaConsumer Connector(config:Consumer Config);}
具体的なメッセージの消費は、Consumer Connectorでメッセージ処理のストリームを作成し、すべてのtopicを含み、指定されたDecoderpublic
すべてのtopicを含むメッセージ処理ストリームを作成し、デフォルトのDecoderpublic Map
指定されたメッセージのtopicを取得し、指定されたDecoderpublic
指定したメッセージのtopicを取得し、デフォルトのDecoderpublic List
この消費者接続のtopicpublic void commitOffsets()にオフセットをコミットします.
消費者public void shutdown()を閉じる.
上層部のAPIでよく使われるのはpublic List
三CONSUMERの簡単なAPI–SIMPLECONSUMER
一括取得メッセージpublic FetchResponse fetch(request:kafka.javaapi.FetchResponse fetch);
topicのメタ情報public kafkaを取得する.javaapi.TopicMetadataResponse send(request: kafka.javaapi.TopicMetadataRequest);
現在利用可能なオフセット量public kafkaを取得する.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
接続public void close()を閉じます.
ほとんどのアプリケーションでは、上位APIは十分に使用することができるが、より一層の制御を行うには、簡単なAPI、例えば消費者が再起動する場合、最新のoffsetを得るにはSimpleConsumerを使用することができる.
四KAFKA HADOOP CONSUMER API
hadoopの使用に関連して、水平に伸縮可能なソリューションが提供されています.
https://github.com/linkedin/camus/tree/camus-kafka-0.8/
五実戦
maven依存:
生産者コード:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/**
* <pre>
* Created by zhaoming on 14-5-4 3:23
* </pre>
*/
public class KafkaProductor {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put("zk.connect", "127.0.0.1:2181");
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String, String> producer = new Producer<String, String>(producerConfig);
//
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>("test-topic", "test-message");
producer.send(keyedMessage);
Thread.sleep(1000);
producer.close();
}
}
消費者側コード
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.commons.collections.CollectionUtils;
/**
* <pre>
* Created by zhaoming on 14-5-4 3:32
* </pre>
*/
public class kafkaConsumer {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Properties properties = new Properties();
properties.put("zookeeper.connect", "127.0.0.1:2181");
properties.put("auto.commit.enable", "true");
properties.put("auto.commit.interval.ms", "60000");
properties.put("group.id", "test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
//topic
Whitelist whitelist = new Whitelist("test-topic");
List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
if (CollectionUtils.isEmpty(partitions)) {
System.out.println("empty!");
TimeUnit.SECONDS.sleep(1);
}
//
for (KafkaStream<byte[], byte[]> partition : partitions) {
ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
while (iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> next = iterator.next();
System.out.println("partiton:" + next.partition());
System.out.println("offset:" + next.offset());
System.out.println("message:" + new String(next.message(), "utf-8"));
}
}
}
}