Kafka詳細五、Kafka Consumerの下層API-SimpleConsumer

8604 ワード

1.Kafkaは2セットのAPIをConsumerに提供した

  • The high-level Consumer API
  • The SimpleConsumer API     

  • 1つ目の高度に抽象的なConsumer APIは、簡単で便利ですが、いくつかの特殊なニーズに対して、2つ目のより下位のAPIを使用する可能性があります.では、まず、2つ目のAPIが私たちに役立つことを紹介します.
  • 複数のメッセージを読み取る
  • は、1つの処理中に、Partitionのメッセージの一部である
  • のみを消費する.
  • トランザクション管理メカニズムを追加して、メッセージが処理され、
  • のみが処理されることを保証する.

    2.SimpleConsumerの使用にはどのような弊害がありますか?

  • プログラムでoffset値
  • を追跡する必要があります.
  • 指定したTopic Partitionのlead broker
  • を見つけなければなりません.
  • brokerの変動
  • を処理しなければならない

    3.SimpleConsumerを使用するには

  • アクティブなすべてのbrokerから、指定したTopic Partitionのリーダーbroker
  • を見つけます.
  • 指定されたTopic Partitionのすべてのバックアップbroker
  • を見つけます.
  • 構築要求
  • 要求照会データ
  • を送信する.
  • 処理leader broker変更
  • 4.コードインスタンス

    package bonree.consumer;
    
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import kafka.api.FetchRequest;
    import kafka.api.FetchRequestBuilder;
    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.common.ErrorMapping;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.FetchResponse;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.consumer.SimpleConsumer;
    import kafka.message.MessageAndOffset;
    
    public class SimpleExample {
    	private List m_replicaBrokers = new ArrayList();
    
    	public SimpleExample() {
    		m_replicaBrokers = new ArrayList();
    	}
    
    	public static void main(String args[]) {
    		SimpleExample example = new SimpleExample();
    		//         
    		long maxReads = Long.parseLong("3");
    		//     topic
    		String topic = "mytopic";
    		//       
    		int partition = Integer.parseInt("0");
    		// broker   ip
    		List seeds = new ArrayList();
    		seeds.add("192.168.4.30");
    		seeds.add("192.168.4.31");
    		seeds.add("192.168.4.32");
    		//   
    		int port = Integer.parseInt("9092");
    		try {
    			example.run(maxReads, topic, partition, seeds, port);
    		} catch (Exception e) {
    			System.out.println("Oops:" + e);
    			e.printStackTrace();
    		}
    	}
    
    	public void run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception {
    		//     Topic partition    
    		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    		if (metadata == null) {
    			System.out.println("Can't find metadata for Topic and Partition. Exiting");
    			return;
    		}
    		if (metadata.leader() == null) {
    			System.out.println("Can't find Leader for Topic and Partition. Exiting");
    			return;
    		}
    		String leadBroker = metadata.leader().host();
    		String clientName = "Client_" + a_topic + "_" + a_partition;
    
    		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    		long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    		int numErrors = 0;
    		while (a_maxReads > 0) {
    			if (consumer == null) {
    				consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    			}
    			FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
    			FetchResponse fetchResponse = consumer.fetch(req);
    
    			if (fetchResponse.hasError()) {
    				numErrors++;
    				// Something went wrong!
    				short code = fetchResponse.errorCode(a_topic, a_partition);
    				System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
    				if (numErrors > 5)
    					break;
    				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
    					// We asked for an invalid offset. For simple case ask for
    					// the last element to reset
    					readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
    					continue;
    				}
    				consumer.close();
    				consumer = null;
    				leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
    				continue;
    			}
    			numErrors = 0;
    
    			long numRead = 0;
    			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    				long currentOffset = messageAndOffset.offset();
    				if (currentOffset < readOffset) {
    					System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
    					continue;
    				}
    
    				readOffset = messageAndOffset.nextOffset();
    				ByteBuffer payload = messageAndOffset.message().payload();
    
    				byte[] bytes = new byte[payload.limit()];
    				payload.get(bytes);
    				System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    				numRead++;
    				a_maxReads--;
    			}
    
    			if (numRead == 0) {
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException ie) {
    				}
    			}
    		}
    		if (consumer != null)
    			consumer.close();
    	}
    
    	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
    		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    		Map requestInfo = new HashMap();
    		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    		OffsetResponse response = consumer.getOffsetsBefore(request);
    
    		if (response.hasError()) {
    			System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
    			return 0;
    		}
    		long[] offsets = response.offsets(topic, partition);
    		return offsets[0];
    	}
    
    	/**
    	 * @param a_oldLeader
    	 * @param a_topic
    	 * @param a_partition
    	 * @param a_port
    	 * @return String
    	 * @throws Exception
    	 *                leader broker
    	 */
    	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
    		for (int i = 0; i < 3; i++) {
    			boolean goToSleep = false;
    			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
    			if (metadata == null) {
    				goToSleep = true;
    			} else if (metadata.leader() == null) {
    				goToSleep = true;
    			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
    				// first time through if the leader hasn't changed give
    				// ZooKeeper a second to recover
    				// second time, assume the broker did recover before failover,
    				// or it was a non-Broker issue
    				//
    				goToSleep = true;
    			} else {
    				return metadata.leader().host();
    			}
    			if (goToSleep) {
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException ie) {
    				}
    			}
    		}
    		System.out.println("Unable to find new leader after Broker failure. Exiting");
    		throw new Exception("Unable to find new leader after Broker failure. Exiting");
    	}
    
    	private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) {
    		PartitionMetadata returnMetaData = null;
    		loop: for (String seed : a_seedBrokers) {
    			SimpleConsumer consumer = null;
    			try {
    				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
    				List topics = Collections.singletonList(a_topic);
    				TopicMetadataRequest req = new TopicMetadataRequest(topics);
    				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
    				List metaData = resp.topicsMetadata();
    				for (TopicMetadata item : metaData) {
    					for (PartitionMetadata part : item.partitionsMetadata()) {
    						if (part.partitionId() == a_partition) {
    							returnMetaData = part;
    							break loop;
    						}
    					}
    				}
    			} catch (Exception e) {
    				System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
    			} finally {
    				if (consumer != null)
    					consumer.close();
    			}
    		}
    		if (returnMetaData != null) {
    			m_replicaBrokers.clear();
    			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
    				m_replicaBrokers.add(replica.host());
    			}
    		}
    		return returnMetaData;
    	}
    }