Spring-boot統合mqtt

4738 ワード

サービス側とクライアントとの非同期通信のために、mqttを採用し、サービス側とクライアントは同じtopicを購読して通信する.
mqttのusernameはカスタマイズでき、passwordはmd 5(username)です.
1、導入依存
        
         
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.0
        
        
            org.fusesource.mqtt-client
            mqtt-client
            1.14
        

2、構成クラスの追加
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;

/**
 * mqtt  
 * @Author zyh
 * @Date 2020/7/9 15:59
 */
@Configuration
@RefreshScope
@Data
public class MqttConfig {

    /**
     *     
     */
    @Value("${mqtt.topic.prefix}")
    private String topicPrefix;

    /**
     * level 0:       
     * level 1:       ,(  )
     * level 2:        
     */
    @Value("${mqtt.qos}")
    private int qos= 2;

    /**
     *        ,     ,     , :tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
     */
    @Value("${mqtt.broker}")
    private String broker;

    /**
     *     clientId
     */
    @Value("${mqtt.clientId.prefix}")
    private String clientIdPrefix;

3、ツール類の作成
import lombok.extern.slf4j.Slf4j;
import net.go2global.common.core.util.UUIDUtils;
import net.go2global.service.translate.config.MqttConfig;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Author zyh
 * @Date 2020/7/9 16:13
 */
@Slf4j
@Component
public class MqttClientUtils {

    @Autowired
    private MqttConfig mqttConfig;

    private MqttClient mqttClient=null;

    private MemoryPersistence persistence = new MemoryPersistence();

    private MqttConnectOptions connOpts = new MqttConnectOptions();

    /**
     *     
     * @param clientId
     * @param content
     * @throws MqttException
     */
    public void sendMsgToUser(String clientId,String content) throws MqttException {

        String topic=mqttConfig.getTopicPrefix()+clientId;

        MqttClient mqttClient = getMqttClient();

        log.info("sending msg to : "+clientId+",topic:"+topic+",content:"+content);

        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(mqttConfig.getQos());
        mqttClient.publish(topic, message);

        log.info("send success msg to : "+clientId+",topic:"+topic+",content:"+content);

        //disConnected();
    }

    /**
     *   client
     * @return
     * @throws MqttException
     */
    public synchronized MqttClient getMqttClient() throws MqttException {
        if(mqttClient==null){
            String clientId= mqttConfig.getClientIdPrefix()+UUIDUtils.getUUID();
            String username=UUIDUtils.getUUID();
            String md5Password= DigestUtils.md5Hex(username);
            char[] password = md5Password.toCharArray();

            connOpts.setUserName(username);
            connOpts.setPassword(password);

            connOpts.setCleanSession(true);
            mqttClient=new MqttClient(mqttConfig.getBroker(), clientId, persistence);
            log.info("Connecting to broker: "+mqttConfig.getBroker());
            mqttClient.connect(connOpts);
            log.info("Connected to broker: "+mqttConfig.getBroker());
        }

        if(!mqttClient.isConnected()){
            log.info("Connecting to broker: "+mqttConfig.getBroker());
            mqttClient.connect(connOpts);
            log.info("Connected to broker: "+mqttConfig.getBroker());
        }

        return mqttClient;
    }

    /**
     *     
     * @throws MqttException
     */
    public synchronized void disConnected() throws MqttException {
        if(!mqttClient.isConnected()){
            log.info("disconnecting to broker: "+mqttConfig.getBroker());
            mqttClient.disconnect();
            log.info("disconnected to broker: "+mqttConfig.getBroker());
        }
    }

}

4、使用可能http://tools.emqx.io/connections/25bd3ff0-c1c1-11ea-91e9-199b8be7f6f6オンラインテスト.