kafka:java生産者がkafka topicにデータ送信に失敗しました

48877 ワード

Javaで書かれたkafkaの生産者がkafkaのtopicにデータを送信と、3回送信データ送信に成功しないErrorに遭遇する.
2016-09-08 10:32:49,695- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 0 for 1 topic(s) Set(test09026)
2016-09-08 10:32:49,936- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:49,962- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:51,056- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:51,057- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:51,060- [WARN] Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)
    at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:51,084- [INFO] Back off for 100 ms before retrying send. Remaining retries = 3
2016-09-08 10:32:51,186- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 3 for 1 topic(s) Set(test09026)
2016-09-08 10:32:51,188- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:51,194- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:51,195- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:52,207- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:52,207- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:52,207- [WARN] Failed to send producer request with correlation id 5 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)
    at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:52,210- [INFO] Back off for 100 ms before retrying send. Remaining retries = 2
2016-09-08 10:32:52,311- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 6 for 1 topic(s) Set(test09026)
2016-09-08 10:32:52,313- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:52,321- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:52,339- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:53,346- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:53,346- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:53,347- [WARN] Failed to send producer request with correlation id 8 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)
    at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:53,348- [INFO] Back off for 100 ms before retrying send. Remaining retries = 1
2016-09-08 10:32:53,450- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 9 for 1 topic(s) Set(test09026)
2016-09-08 10:32:53,453- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:53,456- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:53,457- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,462- [INFO] Connected to localhost:9092 for producing
2016-09-08 10:32:54,462- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,466- [WARN] Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test09026,0]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:110)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:109)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:108)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:275)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:113)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:105)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:105)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)
    at ProducerDemo.run(ProducerDemo.java:32)
2016-09-08 10:32:54,468- [INFO] Back off for 100 ms before retrying send. Remaining retries = 0
2016-09-08 10:32:54,582- [INFO] Fetching metadata from broker BrokerEndPoint(0,172.17.17.98,9092) with correlation id 12 for 1 topic(s) Set(test09026)
2016-09-08 10:32:54,585- [INFO] Connected to 172.17.17.98:9092 for producing
2016-09-08 10:32:54,588- [INFO] Disconnecting from 172.17.17.98:9092
2016-09-08 10:32:54,588- [INFO] Disconnecting from localhost:9092
2016-09-08 10:32:54,591- [ERROR] Failed to send requests for topics test09026 with correlation ids in [0,12]
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:96)
    at kafka.producer.Producer.send(Producer.scala:78)
    at kafka.javaapi.producer.Producer.send(Producer.scala:35)
    at ProducerDemo.run(ProducerDemo.java:32)

Windowsサーバからlinuxサーバにコンテンツを送信しました.
ソリューション:1.windowsのhostファイルにlinuxサーバipとhostnameのマッピングを追加します.2.server.properteiesで次の構成を変更します.
#advertised.listeners=PLAINTEXT://your.host.name:9092

変更
advertised.listeners=PLAINTEXT://172.16.128.208:9092