Android MQTTの使用
13071 ワード
MQTTを利用して簡単なチャットシステムを作ることができます.
まず依存関係に加わる
直接上クラスMQ管理クラスは、Timetask,30秒で一度接続状態を検出する.切断すると再接続する注意しなければならないのは、各携帯電話に接続されているclientIdは変えられないはずだ.
ネットワーク変更クラスのリスニング:
最後に使いました.ホームページへの呼び出し:
まず依存関係に加わる
//
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
直接上クラスMQ管理クラスは、Timetask,30秒で一度接続状態を検出する.切断すると再接続する注意しなければならないのは、各携帯電話に接続されているclientIdは変えられないはずだ.
public class MQTTManager {
private static final String TAG = "MQTTManager";
private String clientId = "android_client";
private static MQTTManager mqttManager = null;
private MqttClient client;
private MqttConnectOptions options;
//private Context mContext;
private MessageHandlerCallBack callBack;
private MsgPublishListener publishListener;
private MQConnectListener connectListener;
private TimerTask task;
private Timer timer;
private MQTTManager() {
//mContext = context;
// clientId ,
//if (MagicUtil.isEmpty(UserInfoManager.getInstance().getMQId())) {
// clientId = clientId + MqttClient.generateClientId();
//} else {
// clientId = UserInfoManager.getInstance().getMQId();
//}
}
/**
* MQTTManager
*
* @return MQTTManager
*/
public static MQTTManager getInstance() {
if (mqttManager == null) {
synchronized (MQTTManager.class) {
if (mqttManager == null) {
mqttManager = new MQTTManager();
}
}
}
return mqttManager;
}
private NetChangeReceiver mNetChangeReceiver;
private Context unregisterContext;
public void setNetReceiver(Context context) {
IntentFilter filter = new IntentFilter();
//filter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION);
//filter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION);
filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
mNetChangeReceiver = new NetChangeReceiver();
context.registerReceiver(mNetChangeReceiver, filter);
unregisterContext = context;
}
//
public void unregisterNet() {
if (mNetChangeReceiver != null && unregisterContext != null) {
unregisterContext.unregisterReceiver(mNetChangeReceiver);
mNetChangeReceiver = null;
unregisterContext = null;
}
}
/**
*
*/
public void connect() {
ThreadManage.getInstance().execute(new Runnable() {
@Override
public void run() {
Log.d(TAG, " MQtt");
try {
if (isConnected()) {
Log.d(TAG, " ");
if (connectListener != null) {
connectListener.success(true);
}
return;
}
// host ,clientId MQTT ID, ,MemoryPersistence clientid ,
client = new MqttClient("tcp:192.168.1.20:1883", clientId, new MemoryPersistence());
// MQTT
options = new MqttConnectOptions();
// session, false , true
options.setCleanSession(true);
//
options.setUserName("admin");
//
options.setPassword("public".toCharArray());
//
options.setConnectionTimeout(30);
// 1.5*20 ,
options.setKeepAliveInterval(30);
//
// MqttTopic topic = client.getTopic(TOPIC);
//setWill , 。
//options.setWill(topic, "close".getBytes(), 2, true);
//SSLSocketFactory sslSocketFactory = null;
/* try {
sslSocketFactory = sslContextFromStream(mContext.getAssets().open("server.pem")).getSocketFactory();
} catch (Exception e) {
e.printStackTrace();
}
options.setSocketFactory(sslSocketFactory);*/
client.setCallback(new PushCallback());
client.connect(options);
if (client.isConnected() && connectListener != null) {
connectListener.success(true);
startCheck();
//UserInfoManager.getInstance().putMQId(clientId);
}
Log.d(TAG, "ClientId=" + client.getClientId());
} catch (MqttException e) {
e.printStackTrace();
Log.e(TAG, "connect: " + e);
}
}
});
}
private void startCheck() {
if (timer == null) {
timer = new Timer();
}
if (task == null) {
task = new TimerTask() {
@Override
public void run() {
MagicLog.d(" MQ=========");
if (isConnected()) {
MagicLog.d(" =========");
} else {
MagicLog.d(" =========");
connect();
}
}
};
timer.schedule(task, 50, 30*1000); //30 ,
MagicLog.d("======================== MQTT=======================");
}
}
/**
*
*
* @param topic
*/
public void subscribeMsg(String topic, int qos) {
if (client != null) {
int[] Qos = {qos};
String[] topic1 = {topic};
try {
client.subscribe(topic1, Qos);
MagicLog.d(" topic: " + topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
*
*
* @param topic
* @param msg
* @param isRetained
*/
public void publish(String topic, String msg, boolean isRetained, int qos) {
try {
if (client != null) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(isRetained);
message.setPayload(msg.getBytes());
client.publish(topic, message);
MagicLog.d(" :" + msg);
}
} catch (MqttException e) {
e.printStackTrace();
if (callBack != null) {
publishListener.messagePublishOk(false);
}
}
}
int count = 0;
/**
*
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
MagicLog.e("------connectionLost: " + cause);
/*if (count < 5) {
count++;//5
MagicLog.d(" , " + count + " " + cause);
try {
client.close();
connect();
} catch (MqttException e) {
e.printStackTrace();
}
}*/
}
/**
*
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish
//MagicLog.d(" " + token.isComplete());
if (publishListener != null) {
publishListener.messagePublishOk(token.isComplete());
}
}
/**
*
*/
@Override
public void messageArrived(final String topicName, final MqttMessage message) throws Exception {
//subscribe
MagicLog.d(" :" + new String(message.getPayload()));
if (callBack != null) {
callBack.messageArrived(topicName, new String(message.getPayload()));
}
}
}
/**
*
*
* @param callBack
*/
public void setMessageHandlerCallBack(MessageHandlerCallBack callBack) {
this.callBack = callBack;
}
public void setConnectListener(MQConnectListener connectListener) {
this.connectListener = connectListener;
}
public void setPublishListener(MsgPublishListener publishListener) {
this.publishListener = publishListener;
}
/**
*
*/
public void disconnect() {
if (client != null && client.isConnected()) {
try {
client.disconnect();
release();
mqttManager = null;
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
*
*/
private void release() {
if (task != null) {
task.cancel();
task = null;
}
if (timer != null) {
timer.purge();
timer.cancel();
timer = null;
}
}
/**
*
*
* @return
*/
public boolean isConnected() {
if (client != null) {
return client.isConnected();
}
return false;
}
public SSLContext sslContextFromStream(InputStream inputStream) throws Exception {
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
Certificate certificate = certificateFactory.generateCertificate(inputStream);
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null);
keyStore.setCertificateEntry("ca", certificate);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(keyStore);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
return sslContext;
}
public interface MessageHandlerCallBack {
void messageArrived(String topicName, String content);
}
public interface MsgPublishListener{
void messagePublishOk(boolean isOk);
}
ネットワーク変更クラスのリスニング:
public class NetChangeReceiver extends BroadcastReceiver {
private int count = 0;
@Override
public void onReceive(Context context, Intent intent) {
if (ConnectivityManager.CONNECTIVITY_ACTION.equals(intent.getAction())) {
//MagicLog.d("=================== ==================");
//
ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo networkInfo = connectivityManager.getActiveNetworkInfo();
if (networkInfo != null && networkInfo.isAvailable()) {
MagicLog.d("=================== ");
if (count != 0) {
MagicLog.d("=================== , MQTTManager==================");
MQTTManager manager = MQTTManager.getInstance();
manager.connect();
}
//EventBus.getDefault().post(new EventNetChange(true));
} else {
MagicLog.d("=================== ==================");
//EventBus.getDefault().post(new EventNetChange(false));
}
//MagicLog.d("=================== count==================:" + 0);
count++;
}
}
}
最後に使いました.ホームページへの呼び出し:
private MQTTManager mMqttManager;
private void initMQ() {
mMqttManager = MQTTManager.getInstance();
mMqttManager.connect();
mMqttManager.setNetReceiver(this);
mMqttManager.setConnectListener(connect ->
subscribeMessage()
);
}
//
private void subscribeMessage() {
mMqttManager.subscribeMsg(UserInfoManager.getInstance().getTopicServer(), 2);
mMqttManager.setMessageHandlerCallBack((topicName, content) -> {
try {
//
} catch (JSONException e) {
e.printStackTrace();
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
if (mMqttManager != null) {
mMqttManager.unregisterNet();
mMqttManager.disconnect();
}
}