Seata解析-TMとRMサービス側登録プロセスの詳細


本稿はseata 1.3に基づく.バージョン0
「Seata解析-seataコアクラスNettyRemotingServerの詳細」では、TMとRMの登録を処理するためのRegTmProcessorとRegRmProcessorについて説明しています.この2つのプロセッサは、サービス側がTMとRMの登録方法について詳しく説明します.
文書ディレクトリ
  • 一、TM登録
  • 二、RM登録
  • 三、総括
  • 一、TM登録
    まずTMの登録プロセスを紹介します.サービス側は、TMの登録要求を受信すると、要求をオブジェクトRegisterTMRequestに変換し、RegTmProcessorのonRegTmMessageメソッド処理にオブジェクトを転送する.次はonRegTmMessageメソッドです.コードは削除され、コアロジックのみが表示されます.
    private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
         
            //  TM      
            RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody();
            ...
            try {
         
                if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
         
                    //TM         
                    ChannelManager.registerTMChannel(message, ctx.channel());
                    ...
                }
            } catch (Exception exx) {
         
                ...
            }
            //        
            RegisterTMResponse response = new RegisterTMResponse(isSuccess);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
        }
    

    onRegTmMessageメソッドは最後にTMの登録要求をChannelManagerに転送する.registerTMChannel処理.registerTMChannelの処理フローも非常に簡単で、以下の4つのステップに分けられます.
  • クライアントバージョン番号を検証します.
  • RpcContextオブジェクトを構築する.
  • クライアントリンクチャネルとRpcContextオブジェクトとの対応関係を1つのグローバルMap属性に保存する.
  • クライアントのアプリケーション名、IP、ポートとRpcContextオブジェクトの対応関係も1つのグローバルMapオブジェクトに保存する.

  • 次にregisterTMChannelメソッドの具体的な実装を見てみましょう.
    	public static void registerTMChannel(RegisterTMRequest request, Channel channel) throws IncompatibleVersionException {
         
            //          ,        0.7.1
            Version.checkVersion(request.getVersion());
            //  RpcContext  
            RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
                request.getApplicationId(),
                request.getTransactionServiceGroup(),
                null, channel);
            //       channel rpcContext          IDENTIFIED_CHANNELS 
            //    rpcContext     IDENTIFIED_CHANNELS
            //IDENTIFIED_CHANNELS        channel RpcCcontext     ,    RpcContext                 RpcContext
            rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
            //clientIdentified=      +   IP
            String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
                + ChannelUtil.getClientIpFromChannel(channel);
            //TM_CHANNELS     TM    ,   ConcurrentMap>
            //TM_CHANNELS key=      +   IP
         	//TM_CHANNELS value  key               ,value  value    RpcContext  
            TM_CHANNELS.putIfAbsent(clientIdentified, new ConcurrentHashMap<Integer, RpcContext>());
            //clientIdentifiedMap key            ,value    RpcContext  
            ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
            rpcContext.holdInClientChannels(clientIdentifiedMap);
        }
    

    registerTMChannelメソッドは、RpcContextのholdInIdentifiiedChannelsメソッドとholdInClientChannelsメソッドを呼び出し、各メソッドの具体的な実装を以下に示す.まず、RpcContextオブジェクトを作成する方法buildChannelHolderを見てみましょう.
    private static RpcContext buildChannelHolder(NettyPoolKey.TransactionRole clientRole, String version, String applicationId,
                                                     String txServiceGroup, String dbkeys, Channel channel) {
         
            RpcContext holder = new RpcContext();
            //      ,   TM  RM、SERVER,   TransactionRole  
            holder.setClientRole(clientRole);
            //      
            holder.setVersion(version);
            //clientId=       +IP+  
            holder.setClientId(buildClientId(applicationId, channel));
            //applicationId    ,  spring.application.name  
            holder.setApplicationId(applicationId);
            //    ,    seata.tx-service-group  ,
            //        TM  ,            
            holder.setTransactionServiceGroup(txServiceGroup);
            //resources               ,            URL
            holder.addResources(dbKeytoSet(dbkeys));
            //     
            holder.setChannel(channel);
            return holder;
        }
    

    registerTMChannel buildChannelHolderを呼び出してRpcContextオブジェクトを作成した後、holdInIdentifiedChannelsとholdInClientChannelsを呼び出し続け、TMの登録を完了します.
    	public void holdInIdentifiedChannels(ConcurrentMap<Channel, RpcContext> clientIDHolderMap) {
         
            if (this.clientIDHolderMap != null) {
         
                throw new IllegalStateException();
            }
            this.clientIDHolderMap = clientIDHolderMap;
            this.clientIDHolderMap.put(channel, this);
        }
        public void holdInClientChannels(ConcurrentMap<Integer, RpcContext> clientTMHolderMap) {
         
            if (this.clientTMHolderMap != null) {
         
                throw new IllegalStateException();
            }
            this.clientTMHolderMap = clientTMHolderMap;
            //        
            Integer clientPort = ChannelUtil.getClientPortFromChannel(channel);
            this.clientTMHolderMap.put(clientPort, this);
        }
    

    どちらの方法も簡単で、関連情報をグローバルなMapオブジェクトに直接保存します.TM情報登録に成功した後、構築RegisterTMResponseオブジェクトはクライアントに返される.このTMの登録プロセスはすべて終了します.上記の手順から分かるように、TM登録は、TMのアプリケーション情報と接続チャネルをグローバルMapオブジェクトに保存し、対応するリンクのライフサイクル全体を貫くRpcContextコンテキストオブジェクトを作成することである.
    二、RM登録
    RM登録はTM登録と非常に類似しており、サービス側は要求を受信した後、要求オブジェクトをRegRmProcessorのonRegRmMessageメソッドに転送する.このメソッドはonRegTmMessageと類似しており、ここでは説明しない.onRegTmMessageはリクエストオブジェクトをChannelManagerに転送する.registermChannel処理、次の方法を見てみましょう.
    	public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
            throws IncompatibleVersionException {
         
            //      
            Version.checkVersion(resourceManagerRequest.getVersion());
            //  RM     ,      TM      ,        URL
            Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
            RpcContext rpcContext;
            if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
         
                //  channel     ,    rpcContext  
                rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
                    resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
                    resourceManagerRequest.getResourceIds(), channel);
                rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
            } else {
         
                //       ,       
                rpcContext = IDENTIFIED_CHANNELS.get(channel);
                rpcContext.addResources(dbkeySet);
            }
            if (dbkeySet == null || dbkeySet.isEmpty()) {
          return; }
            //         
            for (String resourceId : dbkeySet) {
         
                String clientIp;
                ConcurrentMap<Integer, RpcContext> portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -> new ConcurrentHashMap<>())
                        .computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -> new ConcurrentHashMap<>())
                        .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), clientIpKey -> new ConcurrentHashMap<>());
                //              portMap 
                rpcContext.holdInResourceManagerChannels(resourceId, portMap);
                updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
            }
    
        }
        //      ,     RM     
        private static void updateChannelsResource(String resourceId, String clientIp, String applicationId) {
         
            ConcurrentMap<Integer, RpcContext> sourcePortMap = RM_CHANNELS.get(resourceId).get(applicationId).get(clientIp);
            for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
                RpcContext>>>> rmChannelEntry : RM_CHANNELS.entrySet()) {
         
                //         ,   
                if (rmChannelEntry.getKey().equals(resourceId)) {
         
                    continue;
                }
                ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = rmChannelEntry.getValue();
                //      ,   
                if (!applicationIdMap.containsKey(applicationId)) {
         
                    continue;
                }
                ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientIpMap = applicationIdMap.get(applicationId);
                //  IP  ,   
                if (!clientIpMap.containsKey(clientIp)) {
         
                    continue;
                }
                ConcurrentMap<Integer, RpcContext> portMap = clientIpMap.get(clientIp);
                for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
         
                    Integer port = portMapEntry.getKey();
                    if (!sourcePortMap.containsKey(port)) {
         
                        RpcContext rpcContext = portMapEntry.getValue();
                        //                  ,
                        //          RpcContext   sourcePortMap ,           
                        sourcePortMap.put(port, rpcContext);
                        //            URL       RpcContext 
                        //  :  ID->RM    ->RpcContext  Map   
                        rpcContext.holdInResourceManagerChannels(resourceId, port);
                    }
                }
            }
        }
    

    updateChannelsResourceはRMのリソース情報を更新するために使用されます.このロジックは煩雑で、総じて言えば、新しく登録されたデータベースリソースが登録された古いリソースと同じアプリケーションに属し、IPが同じであるが、ポートが異なる場合、現在登録されているリソースが古いリソースが属するアプリケーションと同じであり、1台のマシンに配備されていることを示しています.ポートが異なるだけで、seataは古いリソースのポートとRpcContext対応関係を新しいリソースに登録し、古いリソースのRpcContextにも新しいリソースの情報を追加します.2つの問題があります
  • なぜTM登録の場合、接続が登録されているかどうかを判断せず、RMが必要ですか?
  • なぜRMが最後にリソース情報を更新する必要があるのか、すなわちupdateChannelsResourceメソッドを呼び出す役割は何ですか.

  • 最初の問題については,TMとRMを解析するまで待機し,原因を説明する.2番目の問題は、リソースを更新する理由は、RMとの接続が切断された場合、RMがブランチトランザクションに関連しているため、RMがブランチトランザクションのロールバックを通知し、RMとの接続が切断された場合、seataは同じIP上で同じアプリケーションの異なるポートの接続を選択して通知し、トランザクションの一貫性を保証するためである.
    三、まとめ
    本論文では,サービス側におけるTMとRMの登録プロセスを解析し,総じて両者の登録プロセスは非常に類似しており,まずRpcContextオブジェクトを構築し,その後,そのオブジェクトをアプリケーション情報とともにメモリのMapオブジェクトに格納する.RpcContextオブジェクトは、接続のライフサイクル全体を貫通します.