spring bootはkafkaの記録を統合します.
pomファイル
一つのパーティションだけのtopicに対しては、パーティションの消費は不要です.意味がないからです.以下の例は2つのパーティションがある場合(私の中には4つのlistenParttionX方法があります.私のtopicには4つのパーティションが設けられています.)
解決:
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
設定spring:
kafka:
# bootstrap-servers: 192.168.10.45:9092
bootstrap-servers: 192.168.198.128:9092
producer:
# , Integer.MAX_VALUE
retries: 1
# ( 16K)
batch-size: 16384
# (32M)
buffer-memory: 33554432
# key value ( , )
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# ack , 1, leader
acks: 1
# springboot1.5.16 properties , 。2.x
#properties:
#
#{partitioner.class: com.msy.kafka.MyPartition, acks: all}
consumer:
group-id: test
enable-auto-commit: false
# earliest: latest: latest
auto-offset-reset: latest
# key value ( , )
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#
max-poll-records: 100
listener:
# , , 8 Topic。
concurrency: 6
# , ackMode
ack-mode: MANUAL
topic: test5
生産者はデータの生成を呼び出す必要があります.package com.example.sms.middleware.sms_middleware.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @Author kang
* @Date 2020/6/19$ 10:03$
**/
@Component
public class Producer {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${spring.kafka.topic}")
private String topic;
public void sendMessage(){
kafkaTemplate.send(topic,"message");
}
// @Override
// public void run(ApplicationArguments args) throws Exception {
// System.out.println("11111");
//// new Producer().sendMessage();
// }
}
消費者 package com.example.sms.middleware.sms_middleware.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @Author kang
* @Date 2020/6/19$ 10:04$
**/
@Component
@Slf4j
public class Consumer {
@KafkaListener(topics = "test5") // topic
public void consumerMessage(ConsumerRecord consumerRecord, Acknowledgment ack) {
try {
System.out.println("1212121");
String value = consumerRecord.value();
System.out.println("1212121"+value);
log.info(" :{}", value);
// ......
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
}
直接にappication.propertiesにspring.kafka.listener.co ncurrency=3を追加して、@Kafkalistenerを使って同時に消費することができます.一つのパーティションだけのtopicに対しては、パーティションの消費は不要です.意味がないからです.以下の例は2つのパーティションがある場合(私の中には4つのlistenParttionX方法があります.私のtopicには4つのパーティションが設けられています.)
@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
public void listenPartition0(List> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord, ?> record : records) {
String value = record.value();
}
}
}
@KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
public void listenPartition1(List> records) {
log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id1 records size " + records.size());
for (ConsumerRecord, ?> record : records) {
String value = record.value();
}
}
エラーインターフェースは時報にアクセスできません.解決:
@SpringBootApplication Application ,