Kafka 0.8 Producer(0.9以前のバージョンで適用)
1640 ワード
Kafkaの旧バージョンproducerはscalaによって作成され、0.9以降は廃止された.
サンプルコードは次のとおりです.
カスタムpartitionのサンプルコードは次のとおりです.
より多くのリアルタイムコンピューティング、Kafkaなどの関連技術の博文、リアルタイムストリームコンピューティングに注目することを歓迎します
サンプルコードは次のとおりです.
import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.requird.acks", "1");
ProducerConfig config = new ProducerConfig(properties);
Producer producer = new Producer(config);
KeyedMessage msg = new KeyedMessage("topic","key","hello");
producer.send(msg);
}
}
カスタムpartitionのサンプルコードは次のとおりです.
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
より多くのリアルタイムコンピューティング、Kafkaなどの関連技術の博文、リアルタイムストリームコンピューティングに注目することを歓迎します