Seata解析-TMとRMサービス側登録プロセスの詳細
本稿はseata 1.3に基づく.バージョン0
「Seata解析-seataコアクラスNettyRemotingServerの詳細」では、TMとRMの登録を処理するためのRegTmProcessorとRegRmProcessorについて説明しています.この2つのプロセッサは、サービス側がTMとRMの登録方法について詳しく説明します.
文書ディレクトリ一、TM登録 二、RM登録 三、総括 一、TM登録
まずTMの登録プロセスを紹介します.サービス側は、TMの登録要求を受信すると、要求をオブジェクトRegisterTMRequestに変換し、RegTmProcessorのonRegTmMessageメソッド処理にオブジェクトを転送する.次はonRegTmMessageメソッドです.コードは削除され、コアロジックのみが表示されます.
onRegTmMessageメソッドは最後にTMの登録要求をChannelManagerに転送する.registerTMChannel処理.registerTMChannelの処理フローも非常に簡単で、以下の4つのステップに分けられます.クライアントバージョン番号を検証します. RpcContextオブジェクトを構築する. クライアントリンクチャネルとRpcContextオブジェクトとの対応関係を1つのグローバルMap属性に保存する. クライアントのアプリケーション名、IP、ポートとRpcContextオブジェクトの対応関係も1つのグローバルMapオブジェクトに保存する.
次にregisterTMChannelメソッドの具体的な実装を見てみましょう.
registerTMChannelメソッドは、RpcContextのholdInIdentifiiedChannelsメソッドとholdInClientChannelsメソッドを呼び出し、各メソッドの具体的な実装を以下に示す.まず、RpcContextオブジェクトを作成する方法buildChannelHolderを見てみましょう.
registerTMChannel buildChannelHolderを呼び出してRpcContextオブジェクトを作成した後、holdInIdentifiedChannelsとholdInClientChannelsを呼び出し続け、TMの登録を完了します.
どちらの方法も簡単で、関連情報をグローバルなMapオブジェクトに直接保存します.TM情報登録に成功した後、構築RegisterTMResponseオブジェクトはクライアントに返される.このTMの登録プロセスはすべて終了します.上記の手順から分かるように、TM登録は、TMのアプリケーション情報と接続チャネルをグローバルMapオブジェクトに保存し、対応するリンクのライフサイクル全体を貫くRpcContextコンテキストオブジェクトを作成することである.
二、RM登録
RM登録はTM登録と非常に類似しており、サービス側は要求を受信した後、要求オブジェクトをRegRmProcessorのonRegRmMessageメソッドに転送する.このメソッドはonRegTmMessageと類似しており、ここでは説明しない.onRegTmMessageはリクエストオブジェクトをChannelManagerに転送する.registermChannel処理、次の方法を見てみましょう.
updateChannelsResourceはRMのリソース情報を更新するために使用されます.このロジックは煩雑で、総じて言えば、新しく登録されたデータベースリソースが登録された古いリソースと同じアプリケーションに属し、IPが同じであるが、ポートが異なる場合、現在登録されているリソースが古いリソースが属するアプリケーションと同じであり、1台のマシンに配備されていることを示しています.ポートが異なるだけで、seataは古いリソースのポートとRpcContext対応関係を新しいリソースに登録し、古いリソースのRpcContextにも新しいリソースの情報を追加します.2つの問題がありますなぜTM登録の場合、接続が登録されているかどうかを判断せず、RMが必要ですか? なぜRMが最後にリソース情報を更新する必要があるのか、すなわちupdateChannelsResourceメソッドを呼び出す役割は何ですか.
最初の問題については,TMとRMを解析するまで待機し,原因を説明する.2番目の問題は、リソースを更新する理由は、RMとの接続が切断された場合、RMがブランチトランザクションに関連しているため、RMがブランチトランザクションのロールバックを通知し、RMとの接続が切断された場合、seataは同じIP上で同じアプリケーションの異なるポートの接続を選択して通知し、トランザクションの一貫性を保証するためである.
三、まとめ
本論文では,サービス側におけるTMとRMの登録プロセスを解析し,総じて両者の登録プロセスは非常に類似しており,まずRpcContextオブジェクトを構築し,その後,そのオブジェクトをアプリケーション情報とともにメモリのMapオブジェクトに格納する.RpcContextオブジェクトは、接続のライフサイクル全体を貫通します.
「Seata解析-seataコアクラスNettyRemotingServerの詳細」では、TMとRMの登録を処理するためのRegTmProcessorとRegRmProcessorについて説明しています.この2つのプロセッサは、サービス側がTMとRMの登録方法について詳しく説明します.
文書ディレクトリ
まずTMの登録プロセスを紹介します.サービス側は、TMの登録要求を受信すると、要求をオブジェクトRegisterTMRequestに変換し、RegTmProcessorのonRegTmMessageメソッド処理にオブジェクトを転送する.次はonRegTmMessageメソッドです.コードは削除され、コアロジックのみが表示されます.
private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
// TM
RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
...
try {
if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
//TM
ChannelManager.registerTMChannel(message, ctx.channel());
...
}
} catch (Exception exx) {
...
}
//
RegisterTMResponse response = new RegisterTMResponse(isSuccess);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
}
onRegTmMessageメソッドは最後にTMの登録要求をChannelManagerに転送する.registerTMChannel処理.registerTMChannelの処理フローも非常に簡単で、以下の4つのステップに分けられます.
次にregisterTMChannelメソッドの具体的な実装を見てみましょう.
public static void registerTMChannel(RegisterTMRequest request, Channel channel) throws IncompatibleVersionException {
// , 0.7.1
Version.checkVersion(request.getVersion());
// RpcContext
RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
request.getApplicationId(),
request.getTransactionServiceGroup(),
null, channel);
// channel rpcContext IDENTIFIED_CHANNELS
// rpcContext IDENTIFIED_CHANNELS
//IDENTIFIED_CHANNELS channel RpcCcontext , RpcContext RpcContext
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
//clientIdentified= + IP
String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
+ ChannelUtil.getClientIpFromChannel(channel);
//TM_CHANNELS TM , ConcurrentMap>
//TM_CHANNELS key= + IP
//TM_CHANNELS value key ,value value RpcContext
TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
//clientIdentifiedMap key ,value RpcContext
ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
rpcContext.holdInClientChannels(clientIdentifiedMap);
}
registerTMChannelメソッドは、RpcContextのholdInIdentifiiedChannelsメソッドとholdInClientChannelsメソッドを呼び出し、各メソッドの具体的な実装を以下に示す.まず、RpcContextオブジェクトを作成する方法buildChannelHolderを見てみましょう.
private static RpcContext buildChannelHolder(NettyPoolKey.TransactionRole clientRole, String version, String applicationId,
String txServiceGroup, String dbkeys, Channel channel) {
RpcContext holder = new RpcContext();
// , TM RM、SERVER, TransactionRole
holder.setClientRole(clientRole);
//
holder.setVersion(version);
//clientId= +IP+
holder.setClientId(buildClientId(applicationId, channel));
//applicationId , spring.application.name
holder.setApplicationId(applicationId);
// , seata.tx-service-group ,
// TM ,
holder.setTransactionServiceGroup(txServiceGroup);
//resources , URL
holder.addResources(dbKeytoSet(dbkeys));
//
holder.setChannel(channel);
return holder;
}
registerTMChannel buildChannelHolderを呼び出してRpcContextオブジェクトを作成した後、holdInIdentifiedChannelsとholdInClientChannelsを呼び出し続け、TMの登録を完了します.
public void holdInIdentifiedChannels(ConcurrentMap<Channel, RpcContext> clientIDHolderMap) {
if (this.clientIDHolderMap != null) {
throw new IllegalStateException();
}
this.clientIDHolderMap = clientIDHolderMap;
this.clientIDHolderMap.put(channel, this);
}
public void holdInClientChannels(ConcurrentMap<Integer, RpcContext> clientTMHolderMap) {
if (this.clientTMHolderMap != null) {
throw new IllegalStateException();
}
this.clientTMHolderMap = clientTMHolderMap;
//
Integer clientPort = ChannelUtil.getClientPortFromChannel(channel);
this.clientTMHolderMap.put(clientPort, this);
}
どちらの方法も簡単で、関連情報をグローバルなMapオブジェクトに直接保存します.TM情報登録に成功した後、構築RegisterTMResponseオブジェクトはクライアントに返される.このTMの登録プロセスはすべて終了します.上記の手順から分かるように、TM登録は、TMのアプリケーション情報と接続チャネルをグローバルMapオブジェクトに保存し、対応するリンクのライフサイクル全体を貫くRpcContextコンテキストオブジェクトを作成することである.
二、RM登録
RM登録はTM登録と非常に類似しており、サービス側は要求を受信した後、要求オブジェクトをRegRmProcessorのonRegRmMessageメソッドに転送する.このメソッドはonRegTmMessageと類似しており、ここでは説明しない.onRegTmMessageはリクエストオブジェクトをChannelManagerに転送する.registermChannel処理、次の方法を見てみましょう.
public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
throws IncompatibleVersionException {
//
Version.checkVersion(resourceManagerRequest.getVersion());
// RM , TM , URL
Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
RpcContext rpcContext;
if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
// channel , rpcContext
rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
resourceManagerRequest.getResourceIds(), channel);
rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
} else {
// ,
rpcContext = IDENTIFIED_CHANNELS.get(channel);
rpcContext.addResources(dbkeySet);
}
if (dbkeySet == null || dbkeySet.isEmpty()) {
return; }
//
for (String resourceId : dbkeySet) {
String clientIp;
ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
.computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
.computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
// portMap
rpcContext.holdInResourceManagerChannels(resourceId, portMap);
updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
}
}
// , RM
private static void updateChannelsResource(String resourceId, String clientIp, String applicationId) {
ConcurrentMap<Integer, RpcContext> sourcePortMap = RM_CHANNELS.get(resourceId).get(applicationId).get(clientIp);
for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>> rmChannelEntry : RM_CHANNELS.entrySet()) {
// ,
if (rmChannelEntry.getKey().equals(resourceId)) {
continue;
}
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = rmChannelEntry.getValue();
// ,
if (!applicationIdMap.containsKey(applicationId)) {
continue;
}
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientIpMap = applicationIdMap.get(applicationId);
// IP ,
if (!clientIpMap.containsKey(clientIp)) {
continue;
}
ConcurrentMap<Integer, RpcContext> portMap = clientIpMap.get(clientIp);
for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
Integer port = portMapEntry.getKey();
if (!sourcePortMap.containsKey(port)) {
RpcContext rpcContext = portMapEntry.getValue();
// ,
// RpcContext sourcePortMap ,
sourcePortMap.put(port, rpcContext);
// URL RpcContext
// : ID->RM ->RpcContext Map
rpcContext.holdInResourceManagerChannels(resourceId, port);
}
}
}
}
updateChannelsResourceはRMのリソース情報を更新するために使用されます.このロジックは煩雑で、総じて言えば、新しく登録されたデータベースリソースが登録された古いリソースと同じアプリケーションに属し、IPが同じであるが、ポートが異なる場合、現在登録されているリソースが古いリソースが属するアプリケーションと同じであり、1台のマシンに配備されていることを示しています.ポートが異なるだけで、seataは古いリソースのポートとRpcContext対応関係を新しいリソースに登録し、古いリソースのRpcContextにも新しいリソースの情報を追加します.2つの問題があります
最初の問題については,TMとRMを解析するまで待機し,原因を説明する.2番目の問題は、リソースを更新する理由は、RMとの接続が切断された場合、RMがブランチトランザクションに関連しているため、RMがブランチトランザクションのロールバックを通知し、RMとの接続が切断された場合、seataは同じIP上で同じアプリケーションの異なるポートの接続を選択して通知し、トランザクションの一貫性を保証するためである.
三、まとめ
本論文では,サービス側におけるTMとRMの登録プロセスを解析し,総じて両者の登録プロセスは非常に類似しており,まずRpcContextオブジェクトを構築し,その後,そのオブジェクトをアプリケーション情報とともにメモリのMapオブジェクトに格納する.RpcContextオブジェクトは、接続のライフサイクル全体を貫通します.