Spring-boot統合mqtt
4738 ワード
サービス側とクライアントとの非同期通信のために、mqttを採用し、サービス側とクライアントは同じtopicを購読して通信する.
mqttのusernameはカスタマイズでき、passwordはmd 5(username)です.
1、導入依存
2、構成クラスの追加
3、ツール類の作成
4、使用可能http://tools.emqx.io/connections/25bd3ff0-c1c1-11ea-91e9-199b8be7f6f6オンラインテスト.
mqttのusernameはカスタマイズでき、passwordはmd 5(username)です.
1、導入依存
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.0
org.fusesource.mqtt-client
mqtt-client
1.14
2、構成クラスの追加
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
/**
* mqtt
* @Author zyh
* @Date 2020/7/9 15:59
*/
@Configuration
@RefreshScope
@Data
public class MqttConfig {
/**
*
*/
@Value("${mqtt.topic.prefix}")
private String topicPrefix;
/**
* level 0:
* level 1: ,( )
* level 2:
*/
@Value("${mqtt.qos}")
private int qos= 2;
/**
* , , , :tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
*/
@Value("${mqtt.broker}")
private String broker;
/**
* clientId
*/
@Value("${mqtt.clientId.prefix}")
private String clientIdPrefix;
3、ツール類の作成
import lombok.extern.slf4j.Slf4j;
import net.go2global.common.core.util.UUIDUtils;
import net.go2global.service.translate.config.MqttConfig;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author zyh
* @Date 2020/7/9 16:13
*/
@Slf4j
@Component
public class MqttClientUtils {
@Autowired
private MqttConfig mqttConfig;
private MqttClient mqttClient=null;
private MemoryPersistence persistence = new MemoryPersistence();
private MqttConnectOptions connOpts = new MqttConnectOptions();
/**
*
* @param clientId
* @param content
* @throws MqttException
*/
public void sendMsgToUser(String clientId,String content) throws MqttException {
String topic=mqttConfig.getTopicPrefix()+clientId;
MqttClient mqttClient = getMqttClient();
log.info("sending msg to : "+clientId+",topic:"+topic+",content:"+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(mqttConfig.getQos());
mqttClient.publish(topic, message);
log.info("send success msg to : "+clientId+",topic:"+topic+",content:"+content);
//disConnected();
}
/**
* client
* @return
* @throws MqttException
*/
public synchronized MqttClient getMqttClient() throws MqttException {
if(mqttClient==null){
String clientId= mqttConfig.getClientIdPrefix()+UUIDUtils.getUUID();
String username=UUIDUtils.getUUID();
String md5Password= DigestUtils.md5Hex(username);
char[] password = md5Password.toCharArray();
connOpts.setUserName(username);
connOpts.setPassword(password);
connOpts.setCleanSession(true);
mqttClient=new MqttClient(mqttConfig.getBroker(), clientId, persistence);
log.info("Connecting to broker: "+mqttConfig.getBroker());
mqttClient.connect(connOpts);
log.info("Connected to broker: "+mqttConfig.getBroker());
}
if(!mqttClient.isConnected()){
log.info("Connecting to broker: "+mqttConfig.getBroker());
mqttClient.connect(connOpts);
log.info("Connected to broker: "+mqttConfig.getBroker());
}
return mqttClient;
}
/**
*
* @throws MqttException
*/
public synchronized void disConnected() throws MqttException {
if(!mqttClient.isConnected()){
log.info("disconnecting to broker: "+mqttConfig.getBroker());
mqttClient.disconnect();
log.info("disconnected to broker: "+mqttConfig.getBroker());
}
}
}
4、使用可能http://tools.emqx.io/connections/25bd3ff0-c1c1-11ea-91e9-199b8be7f6f6オンラインテスト.