(二)Springboot集積MQTT実現メッセージプッシュ
11270 ワード
本文は学習ノートで、主に本人の学習過程を記録するために用いられる.内容は転載!!!
①pom依存
②ymlでmqttを設定する(カスタム構成)
③mqttメッセージ属性構成クラスの作成
④mqttメッセージプッシュエンティティの作成
⑤mqttメッセージプッシュまたはサブスクリプションクライアントの作成
⑥取得クラスの作成の構成
⑦mqttプッシュコールバッククラス
本文の一部は転載するhttps://blog.csdn.net/zhangxing52077/article/details/80568244
①pom依存
org.springframework.boot
spring-boot-starter-integration
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
②ymlでmqttを設定する(カスタム構成)
#mq
com:
mqtt:
host: tcp://ip:1883
clientid: mqttjs_e8022a4d0b
topic: good,test,yes
username: zhangxing
password: zxp52077
timeout: 10
keepalive: 20
③mqttメッセージ属性構成クラスの作成
@Component
@ConfigurationProperties(prefix = "com.mqtt")
@Setter
@Getter
public class MqttConfiguration {
private String host;
private String clientid;
private String topic;
private String username;
private String password;
private int timeout;
private int keepalive;
}
④mqttメッセージプッシュエンティティの作成
@Slf4j
@Setter
@Getter
public class PushPayload {
//
private String type;
//
private String mobile;
//
private String title;
//
private String content;
//
private Integer badge = 1;
//
private String sound = "default";
public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){
this.type = type;
this.mobile = mobile;
this.title = title;
this.content = content;
this.badge = badge;
this.sound = sound;
}
public static class Builder{
//
private String type;
//
private String mobile;
//
private String title;
//
private String content;
//
private Integer badge = 1;
//
private String sound = "default";
public Builder setType(String type) {
this.type = type;
return this;
}
public Builder setMobile(String mobile) {
this.mobile = mobile;
return this;
}
public Builder setTitle(String title) {
this.title = title;
return this;
}
public Builder setContent(String content) {
this.content = content;
return this;
}
public Builder setBadge(Integer badge) {
this.badge = badge;
return this;
}
public Builder setSound(String sound) {
this.sound = sound;
return this;
}
public PushPayload bulid(){
return new PushPayload(type,mobile,title,content,badge,sound);
}
}
public static Builder getPushPayloadBuider(){
return new Builder();
}
@Override
public String toString() {
return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect);
}
}
⑤mqttメッセージプッシュまたはサブスクリプションクライアントの作成
@Slf4j
public class MqttPushClient {
private MqttClient client;
private static volatile MqttPushClient mqttPushClient = null;
public static MqttPushClient getInstance(){
if(null == mqttPushClient){
synchronized (MqttPushClient.class){
if(null == mqttPushClient){
mqttPushClient = new MqttPushClient();
}
}
}
return mqttPushClient;
}
private MqttPushClient() {
connect();
}
private void connect(){
try {
client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(PropertiesUtil.MQTT_USER_NAME);
options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
try {
client.setCallback(new PushCallback());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* , qos 0,
* @param topic
* @param pushMessage
*/
public void publish(String topic,PushPayload pushMessage){
publish(0, false, topic, pushMessage);
}
/**
*
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.toString().getBytes());
MqttTopic mTopic = client.getTopic(topic);
if(null == mTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* ,qos 0
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,0);
}
/**
*
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
String kdTopic = "good";
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326")
.setContent("designModel")
.bulid();
MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);
}
}
⑥取得クラスの作成の構成
public class PropertiesUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENTID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static int MQTT_TIMEOUT;
public static int MQTT_KEEP_ALIVE;
public static final String ELASTIC_SEARCH_HOST;
public static final int ELASTIC_SEARCH_PORT;
public static final String ELASTIC_SEARCH_CLUSTER_NAME;
static {
MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST");
MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID");
MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME");
MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD");
MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT"));
MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE"));
}
static {
ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST");
ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT"));
ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME");
}
private static Properties loadMqttProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private static Properties loadEsProperties() {
InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
⑦mqttプッシュコールバッククラス
/**
* @auther zx
* @date 2018/5/28 9:20
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// ,
System.out.println(" , ");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe
System.out.println(" : " + topic);
System.out.println(" Qos : " + message.getQos());
System.out.println(" : " + new String(message.getPayload()));
}
}
@Test
public void test() {
PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test")
.setMobile("119")
.setType("2018")
.bulid();
mqttClientComponent.push("yes",pushPayload);
}
本文の一部は転載するhttps://blog.csdn.net/zhangxing52077/article/details/80568244