SpringBoot 2.x統合MQTTメッセージ購読(ソースコード付)


MQTTプロトコルは、遅延が低く、効率が高いため、工業用ユビキタスネットワーク分野で使用される頻度が特に高い.前述したように、コードでMQTTメッセージを送信する方法について説明したが、本稿では、前述したように、MQTTメッセージのサブスクリプション受信を実現する.操作手順:
  • 関連依存
  • を導入
    
        org.springframework.boot
        spring-boot-starter-integration
    
    
        org.springframework.integration
        spring-integration-mqtt
    
    
    
        org.projectlombok
        lombok
        true
    
  • アプリケーション.ymlでMQTTサーバ情報
  • を構成する.
    server:
      port: 8090
    
    mqtt:
      host: tcp://127.0.0.1:1883
      clientinid: mqttinId
      clientoutid: mqttoutid
      topic: virus
      qoslevel: 1
      #MQTT   
      username: xxx
      password: xxx
      timeout: 10000
      #20s
      keepalive: 20
  • 構成MQTTメッセージプッシュ構成
  • package com.favccxx.mqtt.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.IntegrationComponentScan;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    
    @Slf4j
    @Configuration
    @IntegrationComponentScan
    public class MQTTReceiveConfig {
    
        @Value("${mqtt.username}")
        private String username;
    
        @Value("${mqtt.password}")
        private String password;
    
        @Value("${mqtt.host}")
        private String hostUrl;
    
        @Value("${mqtt.clientinid}")
        private String clientId;
    
        @Value("${mqtt.topic}")
        private String defaultTopic;
    
        @Value("${mqtt.timeout}")
        private int completionTimeout ;   //    
    
        @Bean
        public MqttConnectOptions getReceiverMqttConnectOptions(){
            MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(10);
            mqttConnectOptions.setKeepAliveInterval(90);
            mqttConnectOptions.setAutomaticReconnect(true);
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            mqttConnectOptions.setKeepAliveInterval(2);
            return mqttConnectOptions;
        }
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getReceiverMqttConnectOptions());
            return factory;
        }
    
        //    
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        //  client,   topic
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                            defaultTopic);
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        //        
        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message> message) throws MessagingException {
                    log.info("  :{},        :{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
                }
            };
        }
    
    }
    
  • はサービスを開始し、前のブログのメッセージインタフェースを使用してメッセージを送信します.
  • リアルタイムコンソールは、出力メッセージ
  • を印刷する.
    テーマ:virus、メッセージ受信データ:武漢肺炎に勝つ