OkHttp 3 WebSocket接続を実現

38611 ワード

プロジェクトにはIMモジュールがあり、WebSocketを使用していますので、ここに記録しておきます.
WebSocketのフレームワークが多く、OkHttp 3もWebSocketをサポートしていることを知り、Okhttpを採用して実現しました.
一つはWebSocketをもう一つ導入する必要のないサードパーティライブラリで、一つはOkhttp 3の口コミと安定性が非常によく、しかも更新されています.
依存の追加
implementation 'com.squareup.okhttp3:okhttp:3.8.1'

実装手順
  • OkHttpClient構成初期化パラメータを構築します.
  • WebSocketのUrlアドレスを使用して接続します.
  • WebSocketの接続ステータスコールバックとメッセージコールバックを設定します.
  • メッセージjson処理業務等を解析する.
  • 接続が成功すると、WebSocketを使用してメッセージ
  • が送信される.
  • OkHttpClient
  • の構成
    OkHttpClient mClient = new OkHttpClient.Builder()
            .readTimeout(3, TimeUnit.SECONDS)//        
            .writeTimeout(3, TimeUnit.SECONDS)//        
            .connectTimeout(3, TimeUnit.SECONDS)//        
            .build();
    
  • Urlを使用してWebSocket要求(一般的にはバックエンドインタフェースが接続されたUrlアドレスを返す)
  • を構築する.
    //    
    String url = "ws://xxxxx"
    //          
    Request request = new Request.Builder().get().url(url).build();
    
  • 接続を開始し、コールバックを構成します.
  • onOpen()、接続成功
  • onMessage(String text)は、文字列タイプのメッセージを受信し、一般的にこの
  • を使用します.
  • onMessage(ByteString bytes)は、バイト配列タイプメッセージを受信し、ここでは
  • を使用しませんでした.
  • onClosed()接続が
  • を閉じる
  • onFailure()は、接続に失敗し、通常はここで再接続操作
  • を開始する.
    //    
    WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            super.onOpen(webSocket, response);
            //    ...
        }
    
        @Override
        public void onMessage(WebSocket webSocket, String text) {
            super.onMessage(webSocket, text);
            //    ...(       json)
        }
    
        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes) {
            super.onMessage(webSocket, bytes);
            //    ...(        )
        }
    
        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
            super.onClosed(webSocket, code, reason);
            //    ...
        }
    
        @Override
        public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
            super.onFailure(webSocket, throwable, response);
            //    ...
        }
    });
    
  • WebSocketオブジェクトを使用してメッセージを送信し、msgはメッセージ内容(一般的にjsonですが、もちろんxmlなど他のものも使用できます)であり、sendメソッドはすぐに送信結果を返します.
  • //    
    boolean isSendSuccess = webSocket.send(msg);
    

    RxJavaパッケージに合わせる
    RxJavaを構成すると、WebSocketのデータ変換、スレッド切り替え、再接続処理などの機能を強化できます.
    実装手順
  • Api呼び出しインタフェースを定義し、外部はApiに接触するだけで内部実装ロジックに関心を持つ必要はありません.
  • public interface WebSocketWorker {
        /**
         *     ,       
         */
        Observable get(String url);
    
        /**
         *         ,              ,     
         *
         * @param timeout      
         * @param timeUnit       
         */
        Observable get(String url, long timeout, TimeUnit timeUnit);
    
        /**
         *   ,url WebSocket         ,       
         *
         * @param msg   ,           ,   json
         */
        Observable send(String url, String msg);
    
        /**
         *   ,  
         *
         * @param byteString      ByteString
         */
        Observable send(String url, ByteString byteString);
    
        /**
         *    WebSocket    ,    
         */
        Observable asyncSend(String url, String msg);
    
        /**
         *   ,       ByteString
         */
        Observable asyncSend(String url, ByteString byteString);
    
        /**
         *     Url   
         */
        Observable close(String url);
    
        /**
         *       Url   
         */
        boolean closeNow(String url);
    
        /**
         *         
         */
        Observable> closeAll();
    
        /**
         *         
         */
        void closeAllNow();
    }
    
  • ビルダーモードでは、多数の構成パラメータを使用して、まずBuilderクラスを使用して保存し、build()メソッドを使用してRxWebSocketオブジェクトを生成します.
  • public class RxWebSocketBuilder {
        Context mContext;
        /**
         *     Log
         */
        boolean mIsPrintLog;
        /**
         * Log    
         */
        Logger.LogDelegate mLogDelegate;
        /**
         *       OkHttpClient
         */
        OkHttpClient mClient;
        /**
         *   SSL
         */
        SSLSocketFactory mSslSocketFactory;
        X509TrustManager mTrustManager;
        /**
         *       
         */
        long mReconnectInterval;
        /**
         *          
         */
        TimeUnit mReconnectIntervalTimeUnit;
    
        public RxWebSocketBuilder(Context context) {
            this.mContext = context.getApplicationContext();
        }
    
        public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
            this.mIsPrintLog = isPrintLog;
            return this;
        }
    
        public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
            Logger.setDelegate(logDelegate);
            return this;
        }
    
        public RxWebSocketBuilder client(OkHttpClient client) {
            this.mClient = client;
            return this;
        }
    
        public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
            this.mSslSocketFactory = sslSocketFactory;
            this.mTrustManager = trustManager;
            return this;
        }
    
        public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
            this.mReconnectInterval = reconnectInterval;
            this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
            return this;
        }
    
        public RxWebSocket build() {
            return new RxWebSocket(this);
        }
    }
    
  • Api実装クラス、ここではエージェントモードを使用してエージェントオブジェクトを定義し、内部実装を隔離し、一時的に単純な転送実装にすぎず、後続に制限ロジックを追加する必要がある場合、ここで追加)、装飾者モード強化ロジックの使用も便利である.
  • public class RxWebSocket implements WebSocketWorker {
        private Context mContext;
        /**
         *     Log
         */
        private boolean mIsPrintLog;
        /**
         * Log    
         */
        private Logger.LogDelegate mLogDelegate;
        /**
         *       OkHttpClient
         */
        private OkHttpClient mClient;
        /**
         *   SSL
         */
        private SSLSocketFactory mSslSocketFactory;
        private X509TrustManager mTrustManager;
        /**
         *       
         */
        private long mReconnectInterval;
        /**
         *          
         */
        private TimeUnit mReconnectIntervalTimeUnit;
        /**
         *         
         */
        private WebSocketWorker mWorkerImpl;
    
        private RxWebSocket() {
        }
    
        RxWebSocket(RxWebSocketBuilder builder) {
            this.mContext = builder.mContext;
            this.mIsPrintLog = builder.mIsPrintLog;
            this.mLogDelegate = builder.mLogDelegate;
            this.mClient = builder.mClient == null ? new OkHttpClient() : builder.mClient;
            this.mSslSocketFactory = builder.mSslSocketFactory;
            this.mTrustManager = builder.mTrustManager;
            this.mReconnectInterval = builder.mReconnectInterval == 0 ? 1 : builder.mReconnectInterval;
            this.mReconnectIntervalTimeUnit = builder.mReconnectIntervalTimeUnit == null ? TimeUnit.SECONDS : builder.mReconnectIntervalTimeUnit;
            setup();
        }
    
        /**
         *     
         */
        private void setup() {
            this.mWorkerImpl = new WebSocketWorkerImpl(
                    this.mContext,
                    this.mIsPrintLog,
                    this.mLogDelegate,
                    this.mClient,
                    this.mSslSocketFactory,
                    this.mTrustManager,
                    this.mReconnectInterval,
                    this.mReconnectIntervalTimeUnit);
        }
    
        //...Api    mWorkerImpl,mWorkerImpl       
    }
    
  • WebSocketInfoメッセージエンティティ、RxJava送信メッセージ通知サブスクライバは、送信されたメッセージ文字列、再接続されたかどうか、接続に成功したかどうかなど、送信されたメッセージを保存するエンティティModelクラスが必要であることをサブスクライバに通知します.
  • キャッシュが必要なモデル実装が必要なインタフェース
  • public interface ICacheTarget {
        /**
         *     
         *
         * @return       
         */
        T reset();
    }
    
  • キャッシュプールインタフェース
  • public interface ICachePool> {
        /**
         *        
         */
        T onCreateCache();
    
        /**
         *            
         */
        int onSetupMaxCacheCount();
    
        /**
         *         
         *
         * @param cacheKey   Key,                 
         */
        T obtain(String cacheKey);
    
        /**
         *           ,                
         *
         * @param cacheTarget     
         */
        T onObtainCacheAfter(ICacheTarget cacheTarget);
    }
    
  • 統合キャッシュモデルラップクラス
  • public class CacheItem implements Serializable {
        private static final long serialVersionUID = -401778630524300400L;
    
        /**
         *      
         */
        private T cacheTarget;
        /**
         *       
         */
        private long recentlyUsedTime;
    
        public CacheItem(T cacheTarget, long recentlyUsedTime) {
            this.cacheTarget = cacheTarget;
            this.recentlyUsedTime = recentlyUsedTime;
        }
    
        public T getCacheTarget() {
            return cacheTarget;
        }
    
        public void setCacheTarget(T cacheTarget) {
            this.cacheTarget = cacheTarget;
        }
    
        public long getRecentlyUsedTime() {
            return recentlyUsedTime;
        }
    
        public void setRecentlyUsedTime(long recentlyUsedTime) {
            this.recentlyUsedTime = recentlyUsedTime;
        }
    }
    
  • ベースキャッシュプール
  • public abstract class BaseCachePool> implements ICachePool, Comparator> {
        /**
         *    
         */
        private ConcurrentHashMap>> mPool;
    
        public BaseCachePool() {
            mPool = new ConcurrentHashMap<>(8);
        }
    
        @Override
        public T obtain(String cacheKey) {
            //   
            LinkedList> cacheChain;
            //     ,    
            if (!mPool.containsKey(cacheKey)) {
                cacheChain = new LinkedList<>();
            } else {
                cacheChain = mPool.get(cacheKey);
            }
            if (cacheChain == null) {
                throw new NullPointerException("cacheChain        ");
            }
            //        ,      
            if (cacheChain.size() < onSetupMaxCacheCount()) {
                T cache = onCreateCache();
                CacheItem cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
                cacheChain.add(cacheItem);
                mPool.put(cacheKey, cacheChain);
                return cache;
            }
            //        。          ,        ,        (       )
            Collections.sort(cacheChain, this);
            CacheItem cacheItem = cacheChain.getFirst();
            cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
            //      
            T cacheTarget = cacheItem.getCacheTarget();
            cacheTarget = onObtainCacheAfter(cacheTarget);
            return cacheTarget;
        }
        
        @Override
        public T onObtainCacheAfter(ICacheTarget cacheTarget) {
            //    reset      ,       ,       
            return cacheTarget.reset();
        }
    
        @Override
        public int compare(CacheItem o1, CacheItem o2) {
            return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
        }
    }
    
  • キャッシュモデル
  • を実装
    public class WebSocketInfo implements Serializable, ICacheTarget {
        private static final long serialVersionUID = -880481254453932113L;
    
        private WebSocket mWebSocket;
        private String mStringMsg;
        private ByteString mByteStringMsg;
        /**
         *     
         */
        private boolean isConnect;
        /**
         *     
         */
        private boolean isReconnect;
        /**
         *     
         */
        private boolean isPrepareReconnect;
    
        /**
         *   
         */
        @Override
        public WebSocketInfo reset() {
            this.mWebSocket = null;
            this.mStringMsg = null;
            this.mByteStringMsg = null;
            this.isConnect = false;
            this.isReconnect = false;
            this.isPrepareReconnect = false;
            return this;
        }
    
         //  get、set  
    }
    
  • が具体的に実装される.
  • は、接続WebSocketをObservableの購読コールバックにカプセル化する.
  • は、同じ接続オブジェクトを共有する複数のUrlとデータソースをMapでキャッシュします.
  • shareオペレータを使用して、複数のオブザーバに1つのデータソースを同時に購読させます.すべての購読者が購読をキャンセルした場合、接続を解除します.
  • public class WebSocketWorkerImpl implements WebSocketWorker {
        private static final String TAG = WebSocketWorkerImpl.class.getName();
    
        /**
         *    
         */
        private Context mContext;
        /**
         *       OkHttpClient
         */
        private OkHttpClient mClient;
        /**
         *       
         */
        private long mReconnectInterval;
        /**
         *          
         */
        private TimeUnit mReconnectIntervalTimeUnit;
    
        /**
         *        ,Url    Observable
         */
        private Map> mObservableCacheMap;
        /**
         *   Url    WebSocket  ,   Url    WebSocket  
         */
        private Map mWebSocketPool;
        /**
         * WebSocketInfo   
         */
        private final WebSocketInfoPool mWebSocketInfoPool;
    
        public WebSocketWorkerImpl(
                Context context,
                boolean isPrintLog,
                Logger.LogDelegate logDelegate,
                OkHttpClient client,
                SSLSocketFactory sslSocketFactory,
                X509TrustManager trustManager,
                long reconnectInterval,
                TimeUnit reconnectIntervalTimeUnit) {
            this.mContext = context;
            //  Logger
            Logger.setDelegate(logDelegate);
            Logger.setLogPrintEnable(isPrintLog);
            this.mClient = client;
            //      
            this.mReconnectInterval = reconnectInterval;
            this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
            //  SSL
            if (sslSocketFactory != null && trustManager != null) {
                mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
            }
            this.mObservableCacheMap = new HashMap<>(16);
            this.mWebSocketPool = new HashMap<>(16);
            mWebSocketInfoPool = new WebSocketInfoPool();
        }
    
        @Override
        public Observable get(String url) {
            return getWebSocketInfo(url);
        }
    
        @Override
        public Observable get(String url, long timeout, TimeUnit timeUnit) {
            return getWebSocketInfo(url, timeout, timeUnit);
        }
    
        @Override
        public Observable send(String url, String msg) {
            return Observable.create(new ObservableOnSubscribe() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    WebSocket webSocket = mWebSocketPool.get(url);
                    if (webSocket == null) {
                        emitter.onError(new IllegalStateException("The WebSocket not open"));
                    } else {
                        emitter.onNext(webSocket.send(msg));
                    }
                }
            });
        }
    
        @Override
        public Observable send(String url, ByteString byteString) {
            return Observable.create(new ObservableOnSubscribe() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    WebSocket webSocket = mWebSocketPool.get(url);
                    if (webSocket == null) {
                        emitter.onError(new IllegalStateException("The WebSocket not open"));
                    } else {
                        emitter.onNext(webSocket.send(byteString));
                    }
                }
            });
        }
    
        @Override
        public Observable asyncSend(String url, String msg) {
            return getWebSocket(url)
                    .take(1)
                    .map(new Function() {
                        @Override
                        public Boolean apply(WebSocket webSocket) throws Exception {
                            return webSocket.send(msg);
                        }
                    });
        }
    
        @Override
        public Observable asyncSend(String url, ByteString byteString) {
            return getWebSocket(url)
                    .take(1)
                    .map(new Function() {
                        @Override
                        public Boolean apply(WebSocket webSocket) throws Exception {
                            return webSocket.send(byteString);
                        }
                    });
        }
    
        @Override
        public Observable close(String url) {
            return Observable.create(new ObservableOnSubscribe() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    WebSocket webSocket = mWebSocketPool.get(url);
                    if (webSocket == null) {
                        emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
                    } else {
                        emitter.onNext(webSocket);
                    }
                }
            }).map(new Function() {
                @Override
                public Boolean apply(WebSocket webSocket) throws Exception {
                    return closeWebSocket(webSocket);
                }
            });
        }
    
        @Override
        public boolean closeNow(String url) {
            return closeWebSocket(mWebSocketPool.get(url));
        }
    
        @Override
        public Observable> closeAll() {
            return Observable
                    .just(mWebSocketPool)
                    .map(new Function, Collection>() {
                        @Override
                        public Collection apply(Map webSocketMap) throws Exception {
                            return webSocketMap.values();
                        }
                    })
                    .concatMap(new Function, ObservableSource>() {
                        @Override
                        public ObservableSource apply(Collection webSockets) throws Exception {
                            return Observable.fromIterable(webSockets);
                        }
                    }).map(new Function() {
                        @Override
                        public Boolean apply(WebSocket webSocket) throws Exception {
                            return closeWebSocket(webSocket);
                        }
                    }).collect(new Callable>() {
                        @Override
                        public List call() throws Exception {
                            return new ArrayList<>();
                        }
                    }, new BiConsumer, Boolean>() {
                        @Override
                        public void accept(List list, Boolean isCloseSuccess) throws Exception {
                            list.add(isCloseSuccess);
                        }
                    }).toObservable();
        }
    
        @Override
        public void closeAllNow() {
            for (Map.Entry entry : mWebSocketPool.entrySet()) {
                closeWebSocket(entry.getValue());
            }
        }
    
        /**
         *      
         */
        private boolean hasWebSocketConnection(String url) {
            return mWebSocketPool.get(url) != null;
        }
    
        /**
         *   WebSocket  
         */
        private boolean closeWebSocket(WebSocket webSocket) {
            if (webSocket == null) {
                return false;
            }
            WebSocketCloseEnum normalCloseEnum = WebSocketCloseEnum.USER_EXIT;
            boolean result = webSocket.close(normalCloseEnum.getCode(), normalCloseEnum.getReason());
            if (result) {
                removeUrlWebSocketMapping(webSocket);
            }
            return result;
        }
    
        /**
         *   Url WebSocket   
         */
        private void removeUrlWebSocketMapping(WebSocket webSocket) {
            for (Map.Entry entry : mWebSocketPool.entrySet()) {
                if (entry.getValue() == webSocket) {
                    String url = entry.getKey();
                    mObservableCacheMap.remove(url);
                    mWebSocketPool.remove(url);
                }
            }
        }
    
        private void removeWebSocketCache(WebSocket webSocket) {
            for (Map.Entry entry : mWebSocketPool.entrySet()) {
                if (entry.getValue() == webSocket) {
                    String url = entry.getKey();
                    mWebSocketPool.remove(url);
                }
            }
        }
    
        public Observable getWebSocket(String url) {
            return getWebSocketInfo(url)
                    .filter(new Predicate() {
                        @Override
                        public boolean test(WebSocketInfo webSocketInfo) throws Exception {
                            return webSocketInfo.getWebSocket() != null;
                        }
                    })
                    .map(new Function() {
                        @Override
                        public WebSocket apply(WebSocketInfo webSocketInfo) throws Exception {
                            return webSocketInfo.getWebSocket();
                        }
                    });
        }
    
        public Observable getWebSocketInfo(String url) {
            return getWebSocketInfo(url, 5, TimeUnit.SECONDS);
        }
    
        public synchronized Observable getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
            //      
            Observable observable = mObservableCacheMap.get(url);
            if (observable == null) {
                //     ,  
                observable = Observable
                        .create(new WebSocketOnSubscribe(url))
                        .retry()
                        //   share   ,             ,      
                        .doOnDispose(new Action() {
                            @Override
                            public void run() throws Exception {
                                //       ,    
                                closeNow(url);
                                Logger.d(TAG, "          ,    ...");
                            }
                        })
                        //Share   ,              
                        .share()
                        //            ,         ,         UI  
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
                //      
                mObservableCacheMap.put(url, observable);
            } else {
                //    ,       
                WebSocket webSocket = mWebSocketPool.get(url);
                if (webSocket != null) {
                    observable = observable.startWith(createConnect(url, webSocket));
                }
            }
            return observable;
        }
    
        /**
         *      
         */
        private final class WebSocketOnSubscribe implements ObservableOnSubscribe {
            private String mWebSocketUrl;
            private WebSocket mWebSocket;
            private boolean isReconnecting = false;
    
            public WebSocketOnSubscribe(String webSocketUrl) {
                this.mWebSocketUrl = webSocketUrl;
            }
    
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                //  retry        ,        ,      
                if (mWebSocket == null && isReconnecting) {
                    if (Thread.currentThread() != Looper.getMainLooper().getThread()) {
                        long millis = mReconnectIntervalTimeUnit.toMillis(mReconnectInterval);
                        if (millis == 0) {
                            millis = 1000;
                        }
                        SystemClock.sleep(millis);
                    }
                }
                initWebSocket(emitter);
            }
    
            private Request createRequest(String url) {
                return new Request.Builder().get().url(url).build();
            }
    
            /**
             *    WebSocket
             */
            private synchronized void initWebSocket(ObservableEmitter emitter) {
                if (mWebSocket == null) {
                    mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
                        @Override
                        public void onOpen(WebSocket webSocket, Response response) {
                            super.onOpen(webSocket, response);
                            //    
                            if (!emitter.isDisposed()) {
                                mWebSocketPool.put(mWebSocketUrl, mWebSocket);
                                //    
                                if (isReconnecting) {
                                    emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
                                } else {
                                    emitter.onNext(createConnect(mWebSocketUrl, webSocket));
                                }
                            }
                            isReconnecting = false;
                        }
    
                        @Override
                        public void onMessage(WebSocket webSocket, String text) {
                            super.onMessage(webSocket, text);
                            //    
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
                            }
                        }
    
                        @Override
                        public void onMessage(WebSocket webSocket, ByteString bytes) {
                            super.onMessage(webSocket, bytes);
                            //    
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
                            }
                        }
    
                        @Override
                        public void onClosed(WebSocket webSocket, int code, String reason) {
                            super.onClosed(webSocket, code, reason);
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createClose(mWebSocketUrl));
                            }
                        }
    
                        @Override
                        public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
                            super.onFailure(webSocket, throwable, response);
                            isReconnecting = true;
                            mWebSocket = null;
                            //  WebSocket  ,retry      
                            removeWebSocketCache(webSocket);
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createPrepareReconnect(mWebSocketUrl));
                                //    onError, retry     
                                emitter.onError(new ImproperCloseException());
                            }
                        }
                    });
                }
            }
        }
    
    
        private WebSocketInfo createConnect(String url, WebSocket webSocket) {
            return mWebSocketInfoPool.obtain(url)
                    .setWebSocket(webSocket)
                    .setConnect(true);
        }
    
        private WebSocketInfo createReconnect(String url, WebSocket webSocket) {
            return mWebSocketInfoPool.obtain(url)
                    .setWebSocket(webSocket)
                    .setReconnect(true);
        }
    
        private WebSocketInfo createPrepareReconnect(String url) {
            return mWebSocketInfoPool.obtain(url)
                    .setPrepareReconnect(true);
        }
    
        private WebSocketInfo createReceiveStringMsg(String url, WebSocket webSocket, String stringMsg) {
            return mWebSocketInfoPool.obtain(url)
                    .setConnect(true)
                    .setWebSocket(webSocket)
                    .setStringMsg(stringMsg);
        }
    
        private WebSocketInfo createReceiveByteStringMsg(String url, WebSocket webSocket, ByteString byteMsg) {
            return mWebSocketInfoPool.obtain(url)
                    .setConnect(true)
                    .setWebSocket(webSocket)
                    .setByteStringMsg(byteMsg);
        }
    
        private WebSocketInfo createClose(String url) {
            return mWebSocketInfoPool.obtain(url);
        }
    }
    

    タイミング送信ハートビート維持接続
    WebSocketが断線した後、バックエンドはすぐに接続が切断されたことを知ることができないため、ハートビートメッセージが双方の通信を維持する必要があります.
    心拍数を実現するには、本質的にはタイミングメッセージであり、RxJavaのintervalオペレータを使用してタスクを実行します.ここで私のメッセージにはタイムスタンプを追加する必要があるので、timestampオペレータを追加して、実行結果ごとにタイムスタンプを追加します.
  • ハートビート情報jsonの生成は、GsonがJsonにシーケンス化されたり、FastJson処理されたり、他の汎用パラメータが追加されたりするなど、外部での生成を期待するため、WebSocketベースライブラリに書くべきではないので、HeartBeatGenerateCallbackコールバックを提供してJsonを生成します.
  • ここでは、ネットワークが開いていない場合、心拍メッセージを送信しない最適化を行いました.
  • public class NetworkUtil {
        private NetworkUtil() {
        }
    
        /**
         *          
         *
         * @param context     
         * @param needWifi     wifi  
         */
        public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
            NetworkInfo info = getActiveNetwork(context);
            if (info == null) {
                return false;
            }
            if (!needWifi) {
                return info.isAvailable();
            } else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
                return info.isAvailable();
            }
            return false;
        }
    
        /**
         *           
         *
         * @param context    
         * @return NetworkInfo
         */
        public static NetworkInfo getActiveNetwork(Context context) {
            ConnectivityManager mConnMgr = (ConnectivityManager) context
                    .getSystemService(Context.CONNECTIVITY_SERVICE);
            if (mConnMgr == null) {
                return null;
            }
            //           
            return mConnMgr.getActiveNetworkInfo();
        }
    }
    
  • ハートビートコールバックインタフェースは、外部にハートビートjson
  • を生成させる.
    public interface HeartBeatGenerateCallback {
        /**
         *             
         *
         * @param timestamp      
         * @return         
         */
        String onGenerateHeartBeatMsg(long timestamp);
    }
    
  • ハートビートメッセージを送信するには、Urlアドレス、間隔時間、間隔時間単位、ハートビートメッセージ生成コールバックが必要である.
  • @Override
    public Observable heartBeat(String url, int period, TimeUnit unit,
                                         HeartBeatGenerateCallback heartBeatGenerateCallback) {
    if (heartBeatGenerateCallback == null) {
        return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
    }
    return Observable
            .interval(period, unit)
            //timestamp   ,           
            .timestamp()
            .retry()
            .flatMap(new Function, ObservableSource>() {
                @Override
                public ObservableSource apply(Timed timed) throws Exception {
                    long timestamp = timed.time();
                    //    ,        ,            
                    if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
                        String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
                        Logger.d(TAG, "      : " + heartBeatMsg);
                        if (hasWebSocketConnection(url)) {
                            return send(url, heartBeatMsg);
                        } else {
                            //         ,      ,   ,   WebSocket    ,       
                            //   WebSocket     ,            ,   
                            return asyncSend(url, heartBeatMsg);
                        }
                    } else {
                        Logger.d(TAG, "     ,     ,       ,      ");
                        return Observable.create(new ObservableOnSubscribe() {
                            @Override
                            public void subscribe(ObservableEmitter emitter) throws Exception {
                                emitter.onNext(false);
                            }
                        });
                    }
                }
            });
    }
    

    再接続の実現
    RxJavaの再接続構成には、RxJavaがRetryオペレータを提供し、再試行をサポートするという天然の利点があります.onFailure()接続失敗コールバックでonError()を手動で発行し、データソースにretryオペレータを追加して再試行すると、データソースのサブスクリプションコールバックに再接続されます.
    private final class WebSocketOnSubscribe implements ObservableOnSubscribe {
            private String mWebSocketUrl;
            private WebSocket mWebSocket;
            private boolean isReconnecting = false;
    
            public WebSocketOnSubscribe(String webSocketUrl) {
                this.mWebSocketUrl = webSocketUrl;
            }
    
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                //...
            }
    
            private Request createRequest(String url) {
                return new Request.Builder().get().url(url).build();
            }
    
            /**
             *    WebSocket
             */
            private synchronized void initWebSocket(ObservableEmitter emitter) {
                if (mWebSocket == null) {
                    mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
                        @Override
                        public void onOpen(WebSocket webSocket, Response response) {
                            super.onOpen(webSocket, response);
                            //    
                            if (!emitter.isDisposed()) {
                                mWebSocketPool.put(mWebSocketUrl, mWebSocket);
                                //    
                                if (isReconnecting) {
                                    emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
                                } else {
                                    emitter.onNext(createConnect(mWebSocketUrl, webSocket));
                                }
                            }
                            isReconnecting = false;
                        }
    
                        @Override
                        public void onMessage(WebSocket webSocket, String text) {
                            super.onMessage(webSocket, text);
                            //    
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
                            }
                        }
    
                        @Override
                        public void onMessage(WebSocket webSocket, ByteString bytes) {
                            super.onMessage(webSocket, bytes);
                            //    
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
                            }
                        }
    
                        @Override
                        public void onClosed(WebSocket webSocket, int code, String reason) {
                            super.onClosed(webSocket, code, reason);
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createClose(mWebSocketUrl));
                            }
                        }
    
                        @Override
                        public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
                            super.onFailure(webSocket, throwable, response);
                            isReconnecting = true;
                            mWebSocket = null;
                            //  WebSocket  ,retry      
                            removeWebSocketCache(webSocket);
                            if (!emitter.isDisposed()) {
                                emitter.onNext(createPrepareReconnect(mWebSocketUrl));
                                //    onError, retry     
                                emitter.onError(new ImproperCloseException());
                            }
                        }
                    });
                }
            }
        }
    

    使用
  • RxWebSocketBuilderを使用してRxWebSocket
  • を構築
    //   OkHttpClient
    OkHttpClient mClient = new OkHttpClient.Builder()
            .readTimeout(3, TimeUnit.SECONDS)//        
            .writeTimeout(3, TimeUnit.SECONDS)//        
            .connectTimeout(3, TimeUnit.SECONDS)//        
            .build();
    
    //RxWebSocketBuilder  RxWebSocket
    RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
                    //    Log
                    .isPrintLog(true)
                    //5       
                    .reconnectInterval(5, TimeUnit.SECONDS)
                    .client(mClient)
                    .build();
    
  • Urlアドレス
  • に接続する.
    String url = "ws://xxxxxxxxx"
    //    
    rxWebSocket.get(url)
        //         
        .compose(RxSchedulerUtil.ioToMain())
        //      
        .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
        .subscribe(new Consumer() {
            @Override
            public void accept(WebSocketInfo webSocketInfo) throws Exception {
                String json = webSocketInfo.getStringMsg();
                //    json  
                ...
            }
        });
    
  • 同期送信メッセージ(接続が正常でなければ送信に失敗しない)
  • .
    rxWebSocket.send(url, "    ")
        .compose(RxSchedulerUtil.ioToMain())
        .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
        .subscribe(new Consumer() {
                @Override
                public void accept(Boolean isSuccess) throws Exception {
                    if(isSuccess) {
                          //    
                    } ele {
                          //    
                    }
                }
        });
    
  • 非同期送信メッセージ(接続が正常であることを確認する必要はなく、接続が成功すると自動的に送信)
  • .
    rxWebSocket.asyncSend(url, "    ")
        .compose(RxSchedulerUtil.ioToMain())
        .as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
        .subscribe(new Consumer() {
                @Override
                public void accept(Boolean isSuccess) throws Exception {
                    if(isSuccess) {
                          //    
                    } ele {
                          //    
                    }
                }
        });
    
  • ハートビートパケット
  • を送信
    rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
        @Override
        public String onGenerateHeartBeatMsg(long timestamp) {
            //    Json,      ,        ,    1000    。
            //              
            return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
                    String.valueOf(timestamp / 1000)));
        }
    });
    

    まとめ
    OkhttpのWebSocketの使用は比較的簡単で、基本的にはリクエストの開始とコールバックの構成の2つのステップで、send()メソッドを使用してメッセージを送信します.
    しかし、実際に使用するには、RxJavaに合わせて非同期コールバックをObservableにカプセル化して購読者に通知し、データ変換、スレッド切り替え、接続再試行、心拍数など、RxJavaのさまざまなオペレータを使用する必要があります.