OkHttp 3 WebSocket接続を実現
38611 ワード
プロジェクトにはIMモジュールがあり、WebSocketを使用していますので、ここに記録しておきます.
WebSocketのフレームワークが多く、OkHttp 3もWebSocketをサポートしていることを知り、Okhttpを採用して実現しました.
一つはWebSocketをもう一つ導入する必要のないサードパーティライブラリで、一つはOkhttp 3の口コミと安定性が非常によく、しかも更新されています.
依存の追加
実装手順 OkHttpClient構成初期化パラメータを構築します. WebSocketのUrlアドレスを使用して接続します. WebSocketの接続ステータスコールバックとメッセージコールバックを設定します. メッセージjson処理業務等を解析する. 接続が成功すると、WebSocketを使用してメッセージ が送信される. OkHttpClient の構成 Urlを使用してWebSocket要求(一般的にはバックエンドインタフェースが接続されたUrlアドレスを返す) を構築する.接続を開始し、コールバックを構成します. onOpen()、接続成功 onMessage(String text)は、文字列タイプのメッセージを受信し、一般的にこの を使用します. onMessage(ByteString bytes)は、バイト配列タイプメッセージを受信し、ここでは を使用しませんでした. onClosed()接続が を閉じる onFailure()は、接続に失敗し、通常はここで再接続操作 を開始する.
WebSocketオブジェクトを使用してメッセージを送信し、msgはメッセージ内容(一般的にjsonですが、もちろんxmlなど他のものも使用できます)であり、sendメソッドはすぐに送信結果を返します.
RxJavaパッケージに合わせる
RxJavaを構成すると、WebSocketのデータ変換、スレッド切り替え、再接続処理などの機能を強化できます.
実装手順 Api呼び出しインタフェースを定義し、外部はApiに接触するだけで内部実装ロジックに関心を持つ必要はありません. ビルダーモードでは、多数の構成パラメータを使用して、まずBuilderクラスを使用して保存し、build()メソッドを使用してRxWebSocketオブジェクトを生成します. Api実装クラス、ここではエージェントモードを使用してエージェントオブジェクトを定義し、内部実装を隔離し、一時的に単純な転送実装にすぎず、後続に制限ロジックを追加する必要がある場合、ここで追加)、装飾者モード強化ロジックの使用も便利である. WebSocketInfoメッセージエンティティ、RxJava送信メッセージ通知サブスクライバは、送信されたメッセージ文字列、再接続されたかどうか、接続に成功したかどうかなど、送信されたメッセージを保存するエンティティModelクラスが必要であることをサブスクライバに通知します. キャッシュが必要なモデル実装が必要なインタフェース キャッシュプールインタフェース 統合キャッシュモデルラップクラス ベースキャッシュプール キャッシュモデル を実装が具体的に実装される. は、接続WebSocketをObservableの購読コールバックにカプセル化する. は、同じ接続オブジェクトを共有する複数のUrlとデータソースをMapでキャッシュします. shareオペレータを使用して、複数のオブザーバに1つのデータソースを同時に購読させます.すべての購読者が購読をキャンセルした場合、接続を解除します.
タイミング送信ハートビート維持接続
WebSocketが断線した後、バックエンドはすぐに接続が切断されたことを知ることができないため、ハートビートメッセージが双方の通信を維持する必要があります.
心拍数を実現するには、本質的にはタイミングメッセージであり、RxJavaのintervalオペレータを使用してタスクを実行します.ここで私のメッセージにはタイムスタンプを追加する必要があるので、timestampオペレータを追加して、実行結果ごとにタイムスタンプを追加します.ハートビート情報jsonの生成は、GsonがJsonにシーケンス化されたり、FastJson処理されたり、他の汎用パラメータが追加されたりするなど、外部での生成を期待するため、WebSocketベースライブラリに書くべきではないので、HeartBeatGenerateCallbackコールバックを提供してJsonを生成します. ここでは、ネットワークが開いていない場合、心拍メッセージを送信しない最適化を行いました. ハートビートコールバックインタフェースは、外部にハートビートjson を生成させる.ハートビートメッセージを送信するには、Urlアドレス、間隔時間、間隔時間単位、ハートビートメッセージ生成コールバックが必要である.
再接続の実現
RxJavaの再接続構成には、RxJavaがRetryオペレータを提供し、再試行をサポートするという天然の利点があります.onFailure()接続失敗コールバックでonError()を手動で発行し、データソースにretryオペレータを追加して再試行すると、データソースのサブスクリプションコールバックに再接続されます.
使用 RxWebSocketBuilderを使用してRxWebSocket を構築 Urlアドレス に接続する.同期送信メッセージ(接続が正常でなければ送信に失敗しない) .非同期送信メッセージ(接続が正常であることを確認する必要はなく、接続が成功すると自動的に送信) .ハートビートパケット を送信
まとめ
OkhttpのWebSocketの使用は比較的簡単で、基本的にはリクエストの開始とコールバックの構成の2つのステップで、send()メソッドを使用してメッセージを送信します.
しかし、実際に使用するには、RxJavaに合わせて非同期コールバックをObservableにカプセル化して購読者に通知し、データ変換、スレッド切り替え、接続再試行、心拍数など、RxJavaのさまざまなオペレータを使用する必要があります.
WebSocketのフレームワークが多く、OkHttp 3もWebSocketをサポートしていることを知り、Okhttpを採用して実現しました.
一つはWebSocketをもう一つ導入する必要のないサードパーティライブラリで、一つはOkhttp 3の口コミと安定性が非常によく、しかも更新されています.
依存の追加
implementation 'com.squareup.okhttp3:okhttp:3.8.1'
実装手順
OkHttpClient mClient = new OkHttpClient.Builder()
.readTimeout(3, TimeUnit.SECONDS)//
.writeTimeout(3, TimeUnit.SECONDS)//
.connectTimeout(3, TimeUnit.SECONDS)//
.build();
//
String url = "ws://xxxxx"
//
Request request = new Request.Builder().get().url(url).build();
//
WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
// ...
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
// ...( json)
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
// ...( )
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
// ...
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
// ...
}
});
//
boolean isSendSuccess = webSocket.send(msg);
RxJavaパッケージに合わせる
RxJavaを構成すると、WebSocketのデータ変換、スレッド切り替え、再接続処理などの機能を強化できます.
実装手順
public interface WebSocketWorker {
/**
* ,
*/
Observable get(String url);
/**
* , ,
*
* @param timeout
* @param timeUnit
*/
Observable get(String url, long timeout, TimeUnit timeUnit);
/**
* ,url WebSocket ,
*
* @param msg , , json
*/
Observable send(String url, String msg);
/**
* ,
*
* @param byteString ByteString
*/
Observable send(String url, ByteString byteString);
/**
* WebSocket ,
*/
Observable asyncSend(String url, String msg);
/**
* , ByteString
*/
Observable asyncSend(String url, ByteString byteString);
/**
* Url
*/
Observable close(String url);
/**
* Url
*/
boolean closeNow(String url);
/**
*
*/
Observable> closeAll();
/**
*
*/
void closeAllNow();
}
public class RxWebSocketBuilder {
Context mContext;
/**
* Log
*/
boolean mIsPrintLog;
/**
* Log
*/
Logger.LogDelegate mLogDelegate;
/**
* OkHttpClient
*/
OkHttpClient mClient;
/**
* SSL
*/
SSLSocketFactory mSslSocketFactory;
X509TrustManager mTrustManager;
/**
*
*/
long mReconnectInterval;
/**
*
*/
TimeUnit mReconnectIntervalTimeUnit;
public RxWebSocketBuilder(Context context) {
this.mContext = context.getApplicationContext();
}
public RxWebSocketBuilder isPrintLog(boolean isPrintLog) {
this.mIsPrintLog = isPrintLog;
return this;
}
public RxWebSocketBuilder logger(Logger.LogDelegate logDelegate) {
Logger.setDelegate(logDelegate);
return this;
}
public RxWebSocketBuilder client(OkHttpClient client) {
this.mClient = client;
return this;
}
public RxWebSocketBuilder sslSocketFactory(SSLSocketFactory sslSocketFactory, X509TrustManager trustManager) {
this.mSslSocketFactory = sslSocketFactory;
this.mTrustManager = trustManager;
return this;
}
public RxWebSocketBuilder reconnectInterval(long reconnectInterval, TimeUnit reconnectIntervalTimeUnit) {
this.mReconnectInterval = reconnectInterval;
this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
return this;
}
public RxWebSocket build() {
return new RxWebSocket(this);
}
}
public class RxWebSocket implements WebSocketWorker {
private Context mContext;
/**
* Log
*/
private boolean mIsPrintLog;
/**
* Log
*/
private Logger.LogDelegate mLogDelegate;
/**
* OkHttpClient
*/
private OkHttpClient mClient;
/**
* SSL
*/
private SSLSocketFactory mSslSocketFactory;
private X509TrustManager mTrustManager;
/**
*
*/
private long mReconnectInterval;
/**
*
*/
private TimeUnit mReconnectIntervalTimeUnit;
/**
*
*/
private WebSocketWorker mWorkerImpl;
private RxWebSocket() {
}
RxWebSocket(RxWebSocketBuilder builder) {
this.mContext = builder.mContext;
this.mIsPrintLog = builder.mIsPrintLog;
this.mLogDelegate = builder.mLogDelegate;
this.mClient = builder.mClient == null ? new OkHttpClient() : builder.mClient;
this.mSslSocketFactory = builder.mSslSocketFactory;
this.mTrustManager = builder.mTrustManager;
this.mReconnectInterval = builder.mReconnectInterval == 0 ? 1 : builder.mReconnectInterval;
this.mReconnectIntervalTimeUnit = builder.mReconnectIntervalTimeUnit == null ? TimeUnit.SECONDS : builder.mReconnectIntervalTimeUnit;
setup();
}
/**
*
*/
private void setup() {
this.mWorkerImpl = new WebSocketWorkerImpl(
this.mContext,
this.mIsPrintLog,
this.mLogDelegate,
this.mClient,
this.mSslSocketFactory,
this.mTrustManager,
this.mReconnectInterval,
this.mReconnectIntervalTimeUnit);
}
//...Api mWorkerImpl,mWorkerImpl
}
public interface ICacheTarget {
/**
*
*
* @return
*/
T reset();
}
public interface ICachePool> {
/**
*
*/
T onCreateCache();
/**
*
*/
int onSetupMaxCacheCount();
/**
*
*
* @param cacheKey Key,
*/
T obtain(String cacheKey);
/**
* ,
*
* @param cacheTarget
*/
T onObtainCacheAfter(ICacheTarget cacheTarget);
}
public class CacheItem implements Serializable {
private static final long serialVersionUID = -401778630524300400L;
/**
*
*/
private T cacheTarget;
/**
*
*/
private long recentlyUsedTime;
public CacheItem(T cacheTarget, long recentlyUsedTime) {
this.cacheTarget = cacheTarget;
this.recentlyUsedTime = recentlyUsedTime;
}
public T getCacheTarget() {
return cacheTarget;
}
public void setCacheTarget(T cacheTarget) {
this.cacheTarget = cacheTarget;
}
public long getRecentlyUsedTime() {
return recentlyUsedTime;
}
public void setRecentlyUsedTime(long recentlyUsedTime) {
this.recentlyUsedTime = recentlyUsedTime;
}
}
public abstract class BaseCachePool> implements ICachePool, Comparator> {
/**
*
*/
private ConcurrentHashMap>> mPool;
public BaseCachePool() {
mPool = new ConcurrentHashMap<>(8);
}
@Override
public T obtain(String cacheKey) {
//
LinkedList> cacheChain;
// ,
if (!mPool.containsKey(cacheKey)) {
cacheChain = new LinkedList<>();
} else {
cacheChain = mPool.get(cacheKey);
}
if (cacheChain == null) {
throw new NullPointerException("cacheChain ");
}
// ,
if (cacheChain.size() < onSetupMaxCacheCount()) {
T cache = onCreateCache();
CacheItem cacheItem = new CacheItem<>(cache, System.currentTimeMillis());
cacheChain.add(cacheItem);
mPool.put(cacheKey, cacheChain);
return cache;
}
// 。 , , ( )
Collections.sort(cacheChain, this);
CacheItem cacheItem = cacheChain.getFirst();
cacheItem.setRecentlyUsedTime(System.currentTimeMillis());
//
T cacheTarget = cacheItem.getCacheTarget();
cacheTarget = onObtainCacheAfter(cacheTarget);
return cacheTarget;
}
@Override
public T onObtainCacheAfter(ICacheTarget cacheTarget) {
// reset , ,
return cacheTarget.reset();
}
@Override
public int compare(CacheItem o1, CacheItem o2) {
return Long.compare(o1.getRecentlyUsedTime(), o2.getRecentlyUsedTime());
}
}
public class WebSocketInfo implements Serializable, ICacheTarget {
private static final long serialVersionUID = -880481254453932113L;
private WebSocket mWebSocket;
private String mStringMsg;
private ByteString mByteStringMsg;
/**
*
*/
private boolean isConnect;
/**
*
*/
private boolean isReconnect;
/**
*
*/
private boolean isPrepareReconnect;
/**
*
*/
@Override
public WebSocketInfo reset() {
this.mWebSocket = null;
this.mStringMsg = null;
this.mByteStringMsg = null;
this.isConnect = false;
this.isReconnect = false;
this.isPrepareReconnect = false;
return this;
}
// get、set
}
public class WebSocketWorkerImpl implements WebSocketWorker {
private static final String TAG = WebSocketWorkerImpl.class.getName();
/**
*
*/
private Context mContext;
/**
* OkHttpClient
*/
private OkHttpClient mClient;
/**
*
*/
private long mReconnectInterval;
/**
*
*/
private TimeUnit mReconnectIntervalTimeUnit;
/**
* ,Url Observable
*/
private Map> mObservableCacheMap;
/**
* Url WebSocket , Url WebSocket
*/
private Map mWebSocketPool;
/**
* WebSocketInfo
*/
private final WebSocketInfoPool mWebSocketInfoPool;
public WebSocketWorkerImpl(
Context context,
boolean isPrintLog,
Logger.LogDelegate logDelegate,
OkHttpClient client,
SSLSocketFactory sslSocketFactory,
X509TrustManager trustManager,
long reconnectInterval,
TimeUnit reconnectIntervalTimeUnit) {
this.mContext = context;
// Logger
Logger.setDelegate(logDelegate);
Logger.setLogPrintEnable(isPrintLog);
this.mClient = client;
//
this.mReconnectInterval = reconnectInterval;
this.mReconnectIntervalTimeUnit = reconnectIntervalTimeUnit;
// SSL
if (sslSocketFactory != null && trustManager != null) {
mClient = mClient.newBuilder().sslSocketFactory(sslSocketFactory, trustManager).build();
}
this.mObservableCacheMap = new HashMap<>(16);
this.mWebSocketPool = new HashMap<>(16);
mWebSocketInfoPool = new WebSocketInfoPool();
}
@Override
public Observable get(String url) {
return getWebSocketInfo(url);
}
@Override
public Observable get(String url, long timeout, TimeUnit timeUnit) {
return getWebSocketInfo(url, timeout, timeUnit);
}
@Override
public Observable send(String url, String msg) {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new IllegalStateException("The WebSocket not open"));
} else {
emitter.onNext(webSocket.send(msg));
}
}
});
}
@Override
public Observable send(String url, ByteString byteString) {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new IllegalStateException("The WebSocket not open"));
} else {
emitter.onNext(webSocket.send(byteString));
}
}
});
}
@Override
public Observable asyncSend(String url, String msg) {
return getWebSocket(url)
.take(1)
.map(new Function() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return webSocket.send(msg);
}
});
}
@Override
public Observable asyncSend(String url, ByteString byteString) {
return getWebSocket(url)
.take(1)
.map(new Function() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return webSocket.send(byteString);
}
});
}
@Override
public Observable close(String url) {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
WebSocket webSocket = mWebSocketPool.get(url);
if (webSocket == null) {
emitter.onError(new NullPointerException("url:" + url + " WebSocket must be not null"));
} else {
emitter.onNext(webSocket);
}
}
}).map(new Function() {
@Override
public Boolean apply(WebSocket webSocket) throws Exception {
return closeWebSocket(webSocket);
}
});
}
@Override
public boolean closeNow(String url) {
return closeWebSocket(mWebSocketPool.get(url));
}
@Override
public Observable> closeAll() {
return Observable
.just(mWebSocketPool)
.map(new Function
タイミング送信ハートビート維持接続
WebSocketが断線した後、バックエンドはすぐに接続が切断されたことを知ることができないため、ハートビートメッセージが双方の通信を維持する必要があります.
心拍数を実現するには、本質的にはタイミングメッセージであり、RxJavaのintervalオペレータを使用してタスクを実行します.ここで私のメッセージにはタイムスタンプを追加する必要があるので、timestampオペレータを追加して、実行結果ごとにタイムスタンプを追加します.
public class NetworkUtil {
private NetworkUtil() {
}
/**
*
*
* @param context
* @param needWifi wifi
*/
public static boolean hasNetWorkStatus(Context context, boolean needWifi) {
NetworkInfo info = getActiveNetwork(context);
if (info == null) {
return false;
}
if (!needWifi) {
return info.isAvailable();
} else if (info.getType() == ConnectivityManager.TYPE_WIFI) {
return info.isAvailable();
}
return false;
}
/**
*
*
* @param context
* @return NetworkInfo
*/
public static NetworkInfo getActiveNetwork(Context context) {
ConnectivityManager mConnMgr = (ConnectivityManager) context
.getSystemService(Context.CONNECTIVITY_SERVICE);
if (mConnMgr == null) {
return null;
}
//
return mConnMgr.getActiveNetworkInfo();
}
}
public interface HeartBeatGenerateCallback {
/**
*
*
* @param timestamp
* @return
*/
String onGenerateHeartBeatMsg(long timestamp);
}
@Override
public Observable heartBeat(String url, int period, TimeUnit unit,
HeartBeatGenerateCallback heartBeatGenerateCallback) {
if (heartBeatGenerateCallback == null) {
return Observable.error(new NullPointerException("heartBeatGenerateCallback == null"));
}
return Observable
.interval(period, unit)
//timestamp ,
.timestamp()
.retry()
.flatMap(new Function, ObservableSource>() {
@Override
public ObservableSource apply(Timed timed) throws Exception {
long timestamp = timed.time();
// , ,
if (mContext != null && NetworkUtil.hasNetWorkStatus(mContext, false)) {
String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
Logger.d(TAG, " : " + heartBeatMsg);
if (hasWebSocketConnection(url)) {
return send(url, heartBeatMsg);
} else {
// , , , WebSocket ,
// WebSocket , ,
return asyncSend(url, heartBeatMsg);
}
} else {
Logger.d(TAG, " , , , ");
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(false);
}
});
}
}
});
}
再接続の実現
RxJavaの再接続構成には、RxJavaがRetryオペレータを提供し、再試行をサポートするという天然の利点があります.onFailure()接続失敗コールバックでonError()を手動で発行し、データソースにretryオペレータを追加して再試行すると、データソースのサブスクリプションコールバックに再接続されます.
private final class WebSocketOnSubscribe implements ObservableOnSubscribe {
private String mWebSocketUrl;
private WebSocket mWebSocket;
private boolean isReconnecting = false;
public WebSocketOnSubscribe(String webSocketUrl) {
this.mWebSocketUrl = webSocketUrl;
}
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
//...
}
private Request createRequest(String url) {
return new Request.Builder().get().url(url).build();
}
/**
* WebSocket
*/
private synchronized void initWebSocket(ObservableEmitter emitter) {
if (mWebSocket == null) {
mWebSocket = mClient.newWebSocket(createRequest(mWebSocketUrl), new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//
if (!emitter.isDisposed()) {
mWebSocketPool.put(mWebSocketUrl, mWebSocket);
//
if (isReconnecting) {
emitter.onNext(createReconnect(mWebSocketUrl, webSocket));
} else {
emitter.onNext(createConnect(mWebSocketUrl, webSocket));
}
}
isReconnecting = false;
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
//
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveStringMsg(mWebSocketUrl, webSocket, text));
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
//
if (!emitter.isDisposed()) {
emitter.onNext(createReceiveByteStringMsg(mWebSocketUrl, webSocket, bytes));
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
if (!emitter.isDisposed()) {
emitter.onNext(createClose(mWebSocketUrl));
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
isReconnecting = true;
mWebSocket = null;
// WebSocket ,retry
removeWebSocketCache(webSocket);
if (!emitter.isDisposed()) {
emitter.onNext(createPrepareReconnect(mWebSocketUrl));
// onError, retry
emitter.onError(new ImproperCloseException());
}
}
});
}
}
}
使用
// OkHttpClient
OkHttpClient mClient = new OkHttpClient.Builder()
.readTimeout(3, TimeUnit.SECONDS)//
.writeTimeout(3, TimeUnit.SECONDS)//
.connectTimeout(3, TimeUnit.SECONDS)//
.build();
//RxWebSocketBuilder RxWebSocket
RxWebSocket rxWebSocket = new RxWebSocketBuilder(context)
// Log
.isPrintLog(true)
//5
.reconnectInterval(5, TimeUnit.SECONDS)
.client(mClient)
.build();
String url = "ws://xxxxxxxxx"
//
rxWebSocket.get(url)
//
.compose(RxSchedulerUtil.ioToMain())
//
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer() {
@Override
public void accept(WebSocketInfo webSocketInfo) throws Exception {
String json = webSocketInfo.getStringMsg();
// json
...
}
});
rxWebSocket.send(url, " ")
.compose(RxSchedulerUtil.ioToMain())
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer() {
@Override
public void accept(Boolean isSuccess) throws Exception {
if(isSuccess) {
//
} ele {
//
}
}
});
rxWebSocket.asyncSend(url, " ")
.compose(RxSchedulerUtil.ioToMain())
.as(RxLifecycleUtil.bindLifecycle(mLifecycleOwner))
.subscribe(new Consumer() {
@Override
public void accept(Boolean isSuccess) throws Exception {
if(isSuccess) {
//
} ele {
//
}
}
});
rxWebSocket.heartBeat(url, 6 ,TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
@Override
public String onGenerateHeartBeatMsg(long timestamp) {
// Json, , , 1000 。
//
return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
String.valueOf(timestamp / 1000)));
}
});
まとめ
OkhttpのWebSocketの使用は比較的簡単で、基本的にはリクエストの開始とコールバックの構成の2つのステップで、send()メソッドを使用してメッセージを送信します.
しかし、実際に使用するには、RxJavaに合わせて非同期コールバックをObservableにカプセル化して購読者に通知し、データ変換、スレッド切り替え、接続再試行、心拍数など、RxJavaのさまざまなオペレータを使用する必要があります.