SpringBoot 2.x統合MQTTメッセージ購読(ソースコード付)
MQTTプロトコルは、遅延が低く、効率が高いため、工業用ユビキタスネットワーク分野で使用される頻度が特に高い.前述したように、コードでMQTTメッセージを送信する方法について説明したが、本稿では、前述したように、MQTTメッセージのサブスクリプション受信を実現する.操作手順:関連依存 を導入アプリケーション.ymlでMQTTサーバ情報 を構成する.構成MQTTメッセージプッシュ構成 はサービスを開始し、前のブログのメッセージインタフェースを使用してメッセージを送信します. リアルタイムコンソールは、出力メッセージ を印刷する.
テーマ:virus、メッセージ受信データ:武漢肺炎に勝つ
org.springframework.boot
spring-boot-starter-integration
org.springframework.integration
spring-integration-mqtt
org.projectlombok
lombok
true
server:
port: 8090
mqtt:
host: tcp://127.0.0.1:1883
clientinid: mqttinId
clientoutid: mqttoutid
topic: virus
qoslevel: 1
#MQTT
username: xxx
password: xxx
timeout: 10000
#20s
keepalive: 20
package com.favccxx.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Slf4j
@Configuration
@IntegrationComponentScan
public class MQTTReceiveConfig {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.host}")
private String hostUrl;
@Value("${mqtt.clientinid}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@Value("${mqtt.timeout}")
private int completionTimeout ; //
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
//
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// client, topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
defaultTopic);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
log.info(" :{}, :{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
}
};
}
}
テーマ:virus、メッセージ受信データ:武漢肺炎に勝つ