Springboot 2(30)統合kafka--詳細説明@KafkaListener
66990 ワード
ソースアドレス
Springboot 2チュートリアルシリーズ
書き込み性能が非常に高いため、Kafkaメッセージキューが混雑している場合にテストされることがよくあります.このtopicがパーティションが1つしかない場合、実際に新しい消費者を再起動しても効果はありません.
C o n c u r r e n t KafkaListenerContainerFactoryとfactoryが設定.setConcurrency(4); (私のtopicには4つのパーティションがあり、消費を加速させるために同時設定を4、つまり4つのKafkaMessageListenerContainerがあります)
AdminClientの一般的な方法は作成Topic:createTopics(Collection newTopics) 削除Topic:deleteTopics(Collection topics) すべてのTopic:listTopics() を羅列パーティションの追加:createPartitions(MapnewPartitions) クエリーTopic:describeTopics(Collection topicName) クエリークラスタ情報:describeCluster() クエリーACL情報:describeAcls(AclBindingFilter) ACL情報の作成:createAcls(Collection acls) ACL情報の削除:deleteAcls(Collection filters) クエリー構成情報:describeConfigs(Collection resources) 構成情報の変更:alterConfigs(Mapconfigs) レプリカの変更ログディレクトリ:alterReplicaLogDirs(MapreplicaAssignment) クエリーノードのログディレクトリ情報:describeLogDirs(Collection brokers) クエリー・レプリカのログ・ディレクトリ情報:describeReplicaLogDirs(Collection replicas)
KafkaTemplate送信メッセージは非同期で送信されます
Spring-Kafkaにおけるメッセージ傍受は大きく2つのタイプに分けられ、1つは単一のデータ消費であり、1つは大量消費である.
他のMessageListener,B a t c h A k o w l e d g g i n C o n s u m e r A wareMessageListenerはGenericMessageListenerの実装クラスである
application.yml
@KafkaListenerプロパティ id:消費者のidであり、GroupIdが構成されていない場合、デフォルトidはGroupId である. containerFactory:上記@KafkaListenerは単一データかマルチデータ消費かを区別する注釈のcontainerFactory属性を構成するだけでよい.この中にはリスニングコンテナ工場、つまりC o n c u r r e n t KafkaListenerContainerFactoryが配置され、BeanName が配置されている. topics:リスニングが必要なTopic、複数の をリスニング可能 topicPartitions:より詳細なリスニング情報を構成できます.Topicの指定されたパーティションをリスニングするか、offsetが200のオフセットから をリスニングする必要があります. errorHandler:例外プロセッサの傍受、BeanName の構成 groupId:消費グループID idIsGroup:idがGroupIdであるかどうか clientIdPrefix:消費者Idプレフィックス beanRef:コンテナのBeanNameを実際にリスニングするには、BeanNameの前に「」を付ける必要があります.
テスト方法:http://127.0.0.1:8080/send?topic=first_top3&msg=message
RabbitMQの消費は使い捨てといえます.つまり、消費を確認するとすぐにハードディスクやメモリから削除されます.また、RabbitMQの粗さは順番消費で、列に並ぶように順番消費され、確認されていないメッセージは再び列に戻り、リスナーが再び消費されるのを待っています.
しかし、Kafkaとは異なり、Kafkaは最新のオフセット量を保存することによってメッセージ消費を行い、消費を確認するメッセージはすぐに削除されないので、削除されていないデータを繰り返し消費することができ、第1のメッセージが確認されていない場合、第2のメッセージが確認された場合、Kafkaは第2のメッセージのオフセット量、すなわち第1のメッセージがリスナーによって取得されない場合、最初のメッセージのオフセット量に基づいて手動で取得しない限り
アプリケーションをymlの
起動クラス
メッセージフィルタは、メッセージがリスニングコンテナに到達する前にブロックされ、フィルタは、システムトラフィックロジックに従って必要なデータをフィルタし、KafkaListenerによって処理される.
生産者住所:http://127.0.0.1:8080/swagger-ui.html
Springboot 2チュートリアルシリーズ
書き込み性能が非常に高いため、Kafkaメッセージキューが混雑している場合にテストされることがよくあります.このtopicがパーティションが1つしかない場合、実際に新しい消費者を再起動しても効果はありません.
C o n c u r r e n t KafkaListenerContainerFactoryとfactoryが設定.setConcurrency(4); (私のtopicには4つのパーティションがあり、消費を加速させるために同時設定を4、つまり4つのKafkaMessageListenerContainerがあります)
操作Topic
コンフィギュレーション
@Component
public class PrividerKafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServer;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
// Kafka
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
KafkaAdmin admin = new KafkaAdmin(props);
return admin;
}
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig());
}
}
Controllerレイヤ
@RestController
@Slf4j
public class TopicController {
@Autowired
private AdminClient adminClient;
@ApiOperation(value = " topic")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query"),
@ApiImplicitParam(name = "partitions", value = " ", defaultValue = "4",
required = true, dataType = "int", paramType = "query"),
@ApiImplicitParam(name = "replicationFactor", value = " ", defaultValue = "1",
required = true, dataType = "int", paramType = "query")
})
@GetMapping("/createTopic")
public String createTopic(String topicName,int partitions,int replicationFactor){
adminClient.createTopics(Arrays.asList(new NewTopic(topicName,partitions,(short)replicationFactor)));
return "create success";
}
@ApiOperation(value = " topic")
@GetMapping("/findAllTopic")
public String findAllTopic() throws ExecutionException, InterruptedException {
ListTopicsResult result = adminClient.listTopics();
Collection<TopicListing> list = result.listings().get();
List<String> resultList = new ArrayList<>();
for(TopicListing topicListing : list){
resultList.add(topicListing.name());
}
return JSON.toJSONString(resultList);
}
@ApiOperation(value = " topic ")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query")
})
@GetMapping("/info")
public String topicInfo(String topicName) throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
Map<String,String> resultMap = new HashMap<>();
result.all().get().forEach((k,v)->{
log.info("k: "+k+" ,v: "+v.toString());
resultMap.put(k,v.toString());
});
return JSON.toJSONString(resultMap);
}
@ApiOperation(value = " topic")
@ApiImplicitParams({
@ApiImplicitParam(name = "topicName", value = "topic ",defaultValue = "first_top",
required = true, dataType = "string", paramType = "query")
})
@GetMapping("/delete")
public String deleteTopic(String topicName){
DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList(topicName));
return JSON.toJSONString(result.values());
}
}
AdminClientの一般的な方法は
メッセージの送信
KafkaTemplate送信メッセージは非同期で送信されます
メッセージを送信する3つの方法
//
template.send(topic, 0, System.currentTimeMillis(), "0", msg);
// ProducerRecord
ProducerRecord record = new ProducerRecord(topic, msg);
template.send(record);
// Message
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, topic);
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, "0");
GenericMessage message = new GenericMessage(msg,new MessageHeaders(map));
template.send(message);
メッセージ結果コールバック
@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
}
同期メッセージの送信
@GetMapping("/syncMsg")
public String syncMsg(@RequestParam String topic, @RequestParam String msg){
try {
template.send(topic, msg).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
消費情報
Spring-Kafkaにおけるメッセージ傍受は大きく2つのタイプに分けられ、1つは単一のデータ消費であり、1つは大量消費である.
GenericMessageListener
@Bean
public KafkaMessageListenerContainer demoListenerContainer(ConsumerFactory consumerFactory) {
ContainerProperties properties = new ContainerProperties("topic3");
properties.setGroupId("group1");
/*
//
properties.setMessageListener(new MessageListener() {
@Override
public void onMessage(ConsumerRecord record) {
log.info("topic3: " + record.toString());
}
});*/
//
properties.setMessageListener(
new BatchAcknowledgingConsumerAwareMessageListener<String,String>(){
@Override
public void onMessage(List<ConsumerRecord<String, String>> list,
Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("size:{}",list.size());
}
});
return new KafkaMessageListenerContainer(consumerFactory, properties);
}
他のMessageListener,B a t c h A k o w l e d g g i n C o n s u m e r A wareMessageListenerはGenericMessageListenerの実装クラスである
@KafkaListener
@Component
@Slf4j
public class KafkaConsumer {
//
@KafkaListener(topics = {"first_top2"})
public void consumer(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record =" + record);
log.info(" message =" + message);
}
}
//
@KafkaListener(topics = {"first_top"},containerFactory="batchFactory")
public void consumerBatch(List<ConsumerRecord<?, ?>> record){
log.info(" :{}",record.size());
}
}
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);// , Kafka
return factory;
}
application.yml
messages:
basename: i18n/Messages,i18n/Pages
kafka:
#bootstrap-servers: 10.10.2.138:9092,10.10.2.138:9093,10.10.2.138:9094
bootstrap-servers: 47.106.106.53:9092
template:
default-topic: self-topic0
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroup998
# offset
auto-offset-reset: earliest
#
max-poll-records: 1000
#
enable-auto-commit: true
consumer-extra:
#
batch-listener: true
@KafkaListenerプロパティ
Topicで指定されたパーティションのリスニング
@KafkaListener(id = "id0", containerFactory="batchFactory",
topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
log.info("Id0 records size " + records.size());
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
log.info("p0 Received message={}", message);
}
}
}
注記方式メッセージヘッダおよびメッセージボディの取得
@KafkaListener(id = "group3", topics = "first_top3")
public void annoListener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) String ts) {
log.info(" receive :
"+
"data : "+data+"
"+
"key : "+key+"
"+
"partitionId : "+partition+"
"+
"topic : "+topic+"
"+
"timestamp : "+ts+"
"
);
}
テスト方法:http://127.0.0.1:8080/send?topic=first_top3&msg=message
Ackメカニズムを用いて消費を確認する
RabbitMQの消費は使い捨てといえます.つまり、消費を確認するとすぐにハードディスクやメモリから削除されます.また、RabbitMQの粗さは順番消費で、列に並ぶように順番消費され、確認されていないメッセージは再び列に戻り、リスナーが再び消費されるのを待っています.
しかし、Kafkaとは異なり、Kafkaは最新のオフセット量を保存することによってメッセージ消費を行い、消費を確認するメッセージはすぐに削除されないので、削除されていないデータを繰り返し消費することができ、第1のメッセージが確認されていない場合、第2のメッセージが確認された場合、Kafkaは第2のメッセージのオフセット量、すなわち第1のメッセージがリスナーによって取得されない場合、最初のメッセージのオフセット量に基づいて手動で取得しない限り
アプリケーションをymlの
enable-auto-commit
はfalse
に設定され、自動的にコミットされないように設定されています.@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory);
return factory;
}
@KafkaListener(id = "ack", topics = "ack",containerFactory = "ackContainerFactory")
public void ackListener(ConsumerRecord record, Acknowledgment ack) {
log.info("receive : " + record.value());
//
// ack.acknowledge();
}
メッセージ転送の実装
@KafkaListener(id = "forward", topics = "first_top4")
@SendTo("first_top2")
public String forward(String data) {
log.info(" :{}",data);
return "send msg : " + data;
}
リスニングの停止を開始
@RestController
public class ConsumerContoller {
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
@GetMapping("/stop")
public String stop(){
registry.getListenerContainer("forward").pause();
return "success";
}
@GetMapping("/start")
public String start(){
// ,
if (!registry.getListenerContainer("forward").isRunning()) {
registry.getListenerContainer("forward").start();
}
registry.getListenerContainer("forward").resume();
return "success";
}
}
起動クラス
@EnableKafka
を追加するにはメッセージフィルタの設定
メッセージフィルタは、メッセージがリスニングコンテナに到達する前にブロックされ、フィルタは、システムトラフィックロジックに従って必要なデータをフィルタし、KafkaListenerによって処理される.
/**
*
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// RecordFilterStrategy ,
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
String msg = (String) consumerRecord.value();
if(msg.contains("abc")){
return false;
}
log.info("filterContainerFactory filter : "+msg);
// true
return true;
}
});
return factory;
}
public class FilterListener {
@KafkaListener(topics = {"filter_topic"},containerFactory="filterContainerFactory")
public void consumerBatch(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("record =" + record);
log.info(" :{}",message);
}
}
}
テスト
生産者住所:http://127.0.0.1:8080/swagger-ui.html