Kafka JAVAクライアントコードの例
紹介する http://kafka.apache.org
kafkaはスループットが高い分散型の購読メッセージシステムです.
kafkaはlinkedinがログ処理に使用する分散型メッセージ・キューであり、linkedinのログ・データ容量は大きいが、信頼性に対する要求は高くない.そのログ・データは主にユーザー行為(ログイン、ブラウズ、クリック、共有、好き)とシステム運営ログ(CPU、メモリ、磁気ディスク、ネットワーク、システムおよびプロセス状態)を含む.
現在多くのメッセージ・キュー・サービスは、信頼できる配信保証を提供し、デフォルトではリアルタイム消費(オフラインには適していません)です.
Likedinに対する高い信頼性の付与ログは必須ではないので、信頼性を低減することによって性能を向上させ、分散型のクラスタを構築することによって、システムにメッセージを蓄積させ、kafkaがオフラインとオンラインログ処理を同時にサポートするようにする.
テスト環境
kafka_2.10-0.8.1.1 3つのノードが行うクラスタ
zookeeper-3.4.5の実例ノード
コードの例
メッセージ生産者コードの例
https://cwiki.apache.org/confluence/display/KAFKA/Index
https://kafka.apache.org/
kafkaはスループットが高い分散型の購読メッセージシステムです.
kafkaはlinkedinがログ処理に使用する分散型メッセージ・キューであり、linkedinのログ・データ容量は大きいが、信頼性に対する要求は高くない.そのログ・データは主にユーザー行為(ログイン、ブラウズ、クリック、共有、好き)とシステム運営ログ(CPU、メモリ、磁気ディスク、ネットワーク、システムおよびプロセス状態)を含む.
現在多くのメッセージ・キュー・サービスは、信頼できる配信保証を提供し、デフォルトではリアルタイム消費(オフラインには適していません)です.
Likedinに対する高い信頼性の付与ログは必須ではないので、信頼性を低減することによって性能を向上させ、分散型のクラスタを構築することによって、システムにメッセージを蓄積させ、kafkaがオフラインとオンラインログ処理を同時にサポートするようにする.
テスト環境
kafka_2.10-0.8.1.1 3つのノードが行うクラスタ
zookeeper-3.4.5の実例ノード
コードの例
メッセージ生産者コードの例
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* :https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
* @author Fung
*
*/
public class ProducerDemo {
public static void main(String[] args) {
Random rnd = new Random();
int events=100;
//
Properties props = new Properties();
props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// , , partitioner
props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
// acknowledgement , fire and forget,
// 0,1,-1,
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// producer
Producer<String, String> producer = new Producer<String, String>(config);
//
long start=System.currentTimeMillis();
for (long i = 0; i < events; i++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + i;//rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
// topic , , replication-factor 1,partitions 0
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
"page_visits", ip, msg);
producer.send(data);
}
System.out.println(" :" + (System.currentTimeMillis() - start));
// producer
producer.close();
}
}
メッセージ消費者コードの例import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* :https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
*
* @author Fung
*
*/
public class ConsumerDemo {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
public void run(int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] arg) {
String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
demo.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
demo.shutdown();
}
}
メッセージ処理クラスimport kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": "
+ new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
Partioner類の例import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class PartitionerDemo implements Partitioner {
public PartitionerDemo(VerifiableProperties props) {
}
@Override
public int partition(Object obj, int numPartitions) {
int partition = 0;
if (obj instanceof String) {
String key=(String)obj;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
}
}else{
partition = obj.toString().length() % numPartitions;
}
return partition;
}
}
pom.xmlファイル<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xxx</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>mail</artifactId>
<groupId>javax.mail</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
参照https://cwiki.apache.org/confluence/display/KAFKA/Index
https://kafka.apache.org/