Android MQTTの使用

13071 ワード

MQTTを利用して簡単なチャットシステムを作ることができます.
まず依存関係に加わる
//    
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

直接上クラスMQ管理クラスは、Timetask,30秒で一度接続状態を検出する.切断すると再接続する注意しなければならないのは、各携帯電話に接続されているclientIdは変えられないはずだ.
public class MQTTManager {

    private static final String TAG = "MQTTManager";
    private String clientId = "android_client";
    private static MQTTManager mqttManager = null;
    private MqttClient client;
    private MqttConnectOptions options;
    //private Context mContext;

    private MessageHandlerCallBack callBack;
    private MsgPublishListener publishListener;

    private MQConnectListener connectListener;

    private TimerTask task;
    private Timer timer;

    private MQTTManager() {
        //mContext = context;
        //             clientId ,    
        //if (MagicUtil.isEmpty(UserInfoManager.getInstance().getMQId())) {
        //    clientId = clientId + MqttClient.generateClientId();
        //} else {
        //    clientId = UserInfoManager.getInstance().getMQId();
        //}
    }

    /**
     *     MQTTManager  
     *
     * @return     MQTTManager     
     */
    public static MQTTManager getInstance() {
        if (mqttManager == null) {
            synchronized (MQTTManager.class) {
                if (mqttManager == null) {
                    mqttManager = new MQTTManager();
                }
            }
        }
        return mqttManager;
    }

    private NetChangeReceiver mNetChangeReceiver;
    private Context unregisterContext;

    public void setNetReceiver(Context context) {
        IntentFilter filter = new IntentFilter();
        //filter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION);
        //filter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION);
        filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        mNetChangeReceiver = new NetChangeReceiver();
        context.registerReceiver(mNetChangeReceiver, filter);
        unregisterContext = context;
    }

    //        
    public void unregisterNet() {
        if (mNetChangeReceiver != null && unregisterContext != null) {
            unregisterContext.unregisterReceiver(mNetChangeReceiver);
            mNetChangeReceiver = null;
            unregisterContext = null;
        }
    }

    /**
     *      
     */
    public void connect() {
        ThreadManage.getInstance().execute(new Runnable() {
            @Override
            public void run() {
                Log.d(TAG, "    MQtt");
                try {
                    if (isConnected()) {
                        Log.d(TAG, "     ");
                        if (connectListener != null) {
                            connectListener.success(true);
                        }
                        return;
                    }
                    // host    ,clientId   MQTT    ID,          ,MemoryPersistence  clientid     ,        
                    client = new MqttClient("tcp:192.168.1.20:1883", clientId, new MemoryPersistence());
                    // MQTT     
                    options = new MqttConnectOptions();
                    //       session,       false                ,     true                  
                    options.setCleanSession(true);
                    //         
                    options.setUserName("admin");
                    //        
                    options.setPassword("public".toCharArray());
                    //            
                    options.setConnectionTimeout(30);
                    //                     1.5*20                      ,             
                    options.setKeepAliveInterval(30);
                    //     
                    // MqttTopic topic = client.getTopic(TOPIC);
                    //setWill  ,                       。           
                    //options.setWill(topic, "close".getBytes(), 2, true);
                    //SSLSocketFactory sslSocketFactory = null;
                   /* try {
                        sslSocketFactory = sslContextFromStream(mContext.getAssets().open("server.pem")).getSocketFactory();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    options.setSocketFactory(sslSocketFactory);*/
                    client.setCallback(new PushCallback());
                    client.connect(options);
                    if (client.isConnected() && connectListener != null) {
                        connectListener.success(true);
                        startCheck();
                        //UserInfoManager.getInstance().putMQId(clientId);
                    }
                    Log.d(TAG, "ClientId=" + client.getClientId());
                } catch (MqttException e) {
                    e.printStackTrace();
                    Log.e(TAG, "connect: " + e);
                }
            }
        });
    }

    private void startCheck() {
        if (timer == null) {
            timer = new Timer();
        }
        if (task == null) {
            task = new TimerTask() {
                @Override
                public void run() {
                    MagicLog.d("    MQ=========");
                    if (isConnected()) {
                        MagicLog.d("    =========");
                    } else {
                        MagicLog.d("    =========");
                        connect();
                    }
                }
            };
            timer.schedule(task, 50, 30*1000); //30         ,     
            MagicLog.d("========================    MQTT=======================");
        }
    }

    /**
     *     
     *
     * @param topic        
     */
    public void subscribeMsg(String topic, int qos) {
        if (client != null) {
            int[] Qos = {qos};
            String[] topic1 = {topic};
            try {
                client.subscribe(topic1, Qos);
                MagicLog.d("    topic: " + topic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *     
     *
     * @param topic            
     * @param msg           
     * @param isRetained        
     */
    public void publish(String topic, String msg, boolean isRetained, int qos) {
        try {
            if (client != null) {
                MqttMessage message = new MqttMessage();
                message.setQos(qos);
                message.setRetained(isRetained);
                message.setPayload(msg.getBytes());
                client.publish(topic, message);
                MagicLog.d("    :" + msg);
            }
        } catch (MqttException e) {
            e.printStackTrace();
            if (callBack != null) {
                publishListener.messagePublishOk(false);
            }
        }
    }

    int count = 0;

    /**
     *           
     */
    public class PushCallback implements MqttCallback {

        public void connectionLost(Throwable cause) {
            MagicLog.e("------connectionLost: " + cause);
            /*if (count < 5) {
                count++;//5   
                MagicLog.d("    ,    " + count + " " + cause);
                try {
                    client.close();
                    connect();
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }*/
        }

        /**
         *        
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            //publish       
            //MagicLog.d("         " + token.isComplete());
            if (publishListener != null) {
                publishListener.messagePublishOk(token.isComplete());
            }
        }

        /**
         *          
         */
        @Override
        public void messageArrived(final String topicName, final MqttMessage message) throws Exception {
            //subscribe             
            MagicLog.d("     :" + new String(message.getPayload()));
            if (callBack != null) {
                callBack.messageArrived(topicName, new String(message.getPayload()));
            }
        }
    }

    /**
     *            
     *
     * @param callBack
     */
    public void setMessageHandlerCallBack(MessageHandlerCallBack callBack) {
        this.callBack = callBack;
    }

    public void setConnectListener(MQConnectListener connectListener) {
        this.connectListener = connectListener;
    }

    public void setPublishListener(MsgPublishListener publishListener) {
        this.publishListener = publishListener;
    }

    /**
     *     
     */
    public void disconnect() {
        if (client != null && client.isConnected()) {
            try {
                client.disconnect();
                release();
                mqttManager = null;
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *     
     */
    private void release() {
        if (task != null) {
            task.cancel();
            task = null;
        }
        if (timer != null) {
            timer.purge();
            timer.cancel();
            timer = null;
        }
    }

    /**
     *         
     *
     * @return
     */
    public boolean isConnected() {
        if (client != null) {
            return client.isConnected();
        }
        return false;
    }

    public SSLContext sslContextFromStream(InputStream inputStream) throws Exception {

        CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
        Certificate certificate = certificateFactory.generateCertificate(inputStream);

        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca", certificate);

        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init(keyStore);

        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);

        return sslContext;
    }

    public interface MessageHandlerCallBack {
        void messageArrived(String topicName, String content);
    }

    public interface MsgPublishListener{
        void messagePublishOk(boolean isOk);
    }

 
ネットワーク変更クラスのリスニング:
public class NetChangeReceiver extends BroadcastReceiver {
    private int count = 0;

    @Override
    public void onReceive(Context context, Intent intent) {
        if (ConnectivityManager.CONNECTIVITY_ACTION.equals(intent.getAction())) {
            //MagicLog.d("===================    ==================");
            //      
            ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo networkInfo = connectivityManager.getActiveNetworkInfo();
            if (networkInfo != null && networkInfo.isAvailable()) {
                MagicLog.d("===================    ");
                if (count != 0) {
                    MagicLog.d("===================    ,    MQTTManager==================");
                    MQTTManager manager = MQTTManager.getInstance();
                    manager.connect();
                }
                //EventBus.getDefault().post(new EventNetChange(true));
            } else {
                MagicLog.d("===================     ==================");
                //EventBus.getDefault().post(new EventNetChange(false));
            }
            //MagicLog.d("===================    count==================:" + 0);
            count++;
        }
    }
    
}

最後に使いました.ホームページへの呼び出し:
private MQTTManager mMqttManager;

private void initMQ() {
        mMqttManager = MQTTManager.getInstance();
        mMqttManager.connect();
        mMqttManager.setNetReceiver(this);
        mMqttManager.setConnectListener(connect ->
                subscribeMessage()
        );
    }

    //    
    private void subscribeMessage() {
        mMqttManager.subscribeMsg(UserInfoManager.getInstance().getTopicServer(), 2);
        mMqttManager.setMessageHandlerCallBack((topicName, content) -> {
            try {
               //     

            } catch (JSONException e) {
                e.printStackTrace();
            }
        });
    }

 @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mMqttManager != null) {
            mMqttManager.unregisterNet();
            mMqttManager.disconnect();
        }
    }