kafka javaアクセス

15735 ワード

kafkaが時間的にアクセスする場合のプログラムログで得られるinitoffsetは-1であり、情報が得られないのでkafkaを調整することができる.zookeeper.session.timeout.ms=1000000で解決
 
使用時に遭遇:Failed to send messages after 3 tries.
serverを変更します....name=localhost localhostをあなたのipに変更
mavenパッケージ


org.apache.kafka
kafka_2.8.0
0.8.1.1

 
1:生産者コード
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
  
public class ProducerSample {  
  
  
    public static void main(String[] args) {  
        Properties props = new Properties();  
        props.put("zk.connect", "183.57.57.76:2181/YRFS/test-datacenter/test-server");   // kafkaserver.properties zookeeper.connect  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        props.put("metadata.broker.list", "183.57.57.76:9092");//     kafka     
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("gbz", "test-message2"); //topic        
        producer.send(data);  
        producer.close();  //    ,  kafka     ,  logs/server.log  
    }  
} 

2:消費者コード
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerTest extends Thread {
	private final ConsumerConnector consumer;
	private final String topic;

	public static void main(String[] args) {
		ConsumerTest consumerThread = new ConsumerTest("gbz");
		consumerThread.start();
	}

	public ConsumerTest(String topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", "183.57.57.76:2181/YRFS/test-datacenter/test-server");
		props.put("group.id", "0"); //       
		props.put("zookeeper.session.timeout.ms", "10000");
		return new ConsumerConfig(props);
	}

	public void run() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while (it.hasNext())
			System.out.println(new String(it.next().message()));

	}
}

注:consumerMap.get(topic)はtopicのpatitonストリームのセットを得て、それらは自分のoffsetを持っています.ストリームごとに1つのconsumerに対応
3:生産者(複数partition)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 
import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");//     ,          partition
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");//           partition
        props.put("request.required.acks", "1"); //                  
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);//   :topic,key(   ,           partition ),message
               producer.send(data);
        }
        producer.close();
    }
}

カスタム分割クラス
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner<String> {//     ,    
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

4:消費者(複数のパーティションを扱う)
コピー可能な公式サイト:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 
package com.test.groups;
 
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

スレッド数がpartitionより多い場合、メッセージが得られないスレッドもあります.スレッドがpartitionより小さい場合、スレッドは複数のpartitionを処理する.最適スレッド数はpartition数と一致する
 
と書く
 
package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread "+ m_threadNumber + ": "+ new String(it.next().message()));
System.out.println("Shutting down Thread: "+ m_threadNumber);
}
}
  props.put("auto.commit.enable", "false"); 消費者はoffsetを自動的にコミットしません
転載する
PRODUCERのAPI
1.Producerの作成は、ProducerConfigpublic Producer(ProducerConfig config)に依存する.
2.単一または一括のメッセージ送信public void send(KeyedMessagemessage);public void send(List> messages);
3.Producerからすべてのbrokerへの接続public void close()をオフにします.
二CONSUMERのハイレベルAPI
主にConsumerとConsumer Connectorで、ここのConsumerはConsumer Connectorの静的ファクトリクラスclass Consumer{public static kafka.javaapi.consumer.C onsumerConnector createJavaConsumer Connector(config:Consumer Config);}
具体的なメッセージの消費は、Consumer Connectorでメッセージ処理のストリームを作成し、すべてのtopicを含み、指定されたDecoderpublicMap>createMessageStreams(MaptopicCountMap,DecoderkeyDecoder,DecodervalueDecoder)に従う.
すべてのtopicを含むメッセージ処理ストリームを作成し、デフォルトのDecoderpublic Map>createMessageStreams(MaptopicCountMap)を使用します.
指定されたメッセージのtopicを取得し、指定されたDecoderpublicList>createMessageStreamsByFilter(TopicFilter topicFilter,int numStreams,DecoderkeyDecoder,DecodervalueDecoder)に従う.
指定したメッセージのtopicを取得し、デフォルトのDecoderpublic List>createMessageStreamsByFilter(TopicFilter topicFilter);
この消費者接続のtopicpublic void commitOffsets()にオフセットをコミットします.
消費者public void shutdown()を閉じる.
上層部のAPIでよく使われるのはpublic List>createMessageStreamsByFilter(TopicFilter topicFilter);およびpublic void commitOffsets()
三CONSUMERの簡単なAPI–SIMPLECONSUMER
一括取得メッセージpublic FetchResponse fetch(request:kafka.javaapi.FetchResponse fetch);
topicのメタ情報public kafkaを取得する.javaapi.TopicMetadataResponse send(request: kafka.javaapi.TopicMetadataRequest);
現在利用可能なオフセット量public kafkaを取得する.javaapi.OffsetResponse getOffsetsBefore(request: OffsetRequest);
接続public void close()を閉じます.
ほとんどのアプリケーションでは、上位APIは十分に使用することができるが、より一層の制御を行うには、簡単なAPI、例えば消費者が再起動する場合、最新のoffsetを得るにはSimpleConsumerを使用することができる.
四KAFKA HADOOP CONSUMER API
hadoopの使用に関連して、水平に伸縮可能なソリューションが提供されています.
https://github.com/linkedin/camus/tree/camus-kafka-0.8/
五実戦
maven依存:
org.apache.kafkakafka_2.100.8.0
生産者コード:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
import java.util.Properties;
 
/**
 * <pre>
 * Created by zhaoming on 14-5-4   3:23
 * </pre>
 */
public class KafkaProductor {
 
public static void main(String[] args) throws InterruptedException {
 
Properties properties = new Properties();
 properties.put("zk.connect", "127.0.0.1:2181");
 properties.put("metadata.broker.list", "localhost:9092");
 
properties.put("serializer.class", "kafka.serializer.StringEncoder");
 
ProducerConfig producerConfig = new ProducerConfig(properties);
 Producer<String, String> producer = new Producer<String, String>(producerConfig);
 
//      
 KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>("test-topic", "test-message");
 producer.send(keyedMessage);
 
Thread.sleep(1000);
 
producer.close();
 }
 
}

 
消費者側コード
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
 
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
 
import org.apache.commons.collections.CollectionUtils;
 
/**
 * <pre>
 * Created by zhaoming on 14-5-4   3:32
 * </pre>
 */
public class kafkaConsumer {
 
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
 
Properties properties = new Properties();
 properties.put("zookeeper.connect", "127.0.0.1:2181");
 properties.put("auto.commit.enable", "true");
 properties.put("auto.commit.interval.ms", "60000");
 properties.put("group.id", "test-group");
 
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
 
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
 //topic    
 Whitelist whitelist = new Whitelist("test-topic");
 List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
 
if (CollectionUtils.isEmpty(partitions)) {
 System.out.println("empty!");
 TimeUnit.SECONDS.sleep(1);
 }
 
//    
 for (KafkaStream<byte[], byte[]> partition : partitions) {
 
ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
 while (iterator.hasNext()) {
 MessageAndMetadata<byte[], byte[]> next = iterator.next();
 System.out.println("partiton:" + next.partition());
 System.out.println("offset:" + next.offset());
 System.out.println("message:" + new String(next.message(), "utf-8"));
 }
 
}
 
}
}