hadoop 2.7.3ソース解析のdatanode登録と心拍メカニズム


もっと読む
  • datanode登録とときめき
  • datanode登録
  • datanode心拍数
  • namenodeは登録と心拍情報を受信します.
  • DatanodeManager簡単紹介
  • namednoeは登録情報
  • を受信する.
  • .name node受信心拍情報
  •  http://zhangjun5965.iteye.com/admin/blogs/2386384
    datanode登録とときめき
    hadoopが起動する時、正常な流れは先にnamenoeを起動して、datanodeを起動します.なめなのnodeはdatanodeの登録を受け入れますので、datanodeの登録とときめきはその起動時から始まります.入り口の方法はもちろんdatanodeのmain方法です.
    コードを追跡してdatanodeの構造方法の中で発見して、BlockPoolManagerの対象を初期化して、そのblock Pool Manager.refresh Namenodesを通じて(conf);このdatanodeに関するnamenode情報をプロファイルから取得し、登録と心拍情報を生成する.
    具体的にはBlockPoolManagerの中のstartAllの方法で、startAllの方法を通じて、datanodeの上のすべてのBPO-Serviceを起動します.
    synchronized void startAll() throws IOException {
        try {
          UserGroupInformation.getLoginUser().doAs(
              new PrivilegedExceptionAction() {
                @Override
                public Object run() throws Exception {
                  for (BPOfferService bpos : offerServices) {
                    bpos.start();
                  }
                  return null;
                }
              });
        } catch (InterruptedException ex) {
          IOException ioe = new IOException();
          ioe.initCause(ex.getCause());
          throw ioe;
        }
      }
    BPPServiceActorスレッドは、BPPServiceActorが対応するnamenodeに登録と心拍メッセージを送信するために、BPPOfferServiceのstart方法でループして起動される.
    //This must be called only by blockPoolManager
      void start() {
        for (BPServiceActor actor : bpServices) {
          actor.start();
        }
      }
    具体的な実現方法は自然にBPPServiceActorのrun方法にあります.
    /**
       *         ,     offerService  ,  shouldRun   shouldServiceRun  false
       * No matter what kind of exception we get, keep retrying to offerService().
       * That's the loop that connects to the NameNode and provides basic DataNode
       * functionality.
       *
       * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
       * happen either at shutdown or due to refreshNamenodes.
       */
      @Override
      public void run() {
        LOG.info(this + " starting to offer service");
    
        try {
          while (true) {
            // init stuff
            try {
              // setup storage
              //   namenode,  datanode
              connectToNNAndHandshake();
              break;
            } catch (IOException ioe) {
                    .......
            }
          }
    
          runningState = RunningState.RUNNING;
    
          while (shouldRun()) {
            try {
              //    
              offerService();
            } catch (Exception ex) {
               .....
            }
          }
          runningState = RunningState.EXITED;
        } catch (Throwable ex) {
          ...
        } finally {
         ....
        }
      }
    datanode登録
    datanode端の登録は比較的簡単で、connectTonAndHandschake方法を追跡することによって、最後に呼び出されたのはDatanodeProtocol Server SideTranslation PB.register Datanode方法です.
    ここではDatanodeRegistrationオブジェクトをパラメータとして構成しており、namenodeがdatanodeを検証する必要があるいくつかの基本情報が含まれています.
    最後にdatanodeとnamenodeが直接的にインタラクションするプロトコルDatanodeProtocolインターフェースのregister Datanode方法でnamenodeにrpc要求を送信してdatanodeを登録します.
    この方法は最後にdatanodeをnamenodeに登録してから戻ってきた結果を処理してから戻る.
    @Override
      public RegisterDatanodeResponseProto registerDatanode(
          RpcController controller, RegisterDatanodeRequestProto request)
          throws ServiceException {
        DatanodeRegistration registration = PBHelper.convert(request
            .getRegistration());
        DatanodeRegistration registrationResp;
        try {
          registrationResp = impl.registerDatanode(registration);
        } catch (IOException e) {
          throw new ServiceException(e);
        }
        return RegisterDatanodeResponseProto.newBuilder()
            .setRegistration(PBHelper.convert(registrationResp)).build();
      }
    datanodeときめき
    datanodeの心拍操作は主にoffer Service方法の中で行われます.この方法はshuldRunがfalseに戻るまでずっと実行されます.心拍操作は、まずnamenodeに心拍の要求を送り、その結果に基づいていくつかの情報を更新し、namenodeから持ち帰った各種コマンド(DatanodeCommand配列)を処理します.
    心拍を送る方法はsendHeart Beatで、最終的にDatanodeProtocol Server SideTranslatoPB類のsendHeart beat(RpcController、Heart beat Request Protto)メソッドを呼び出して、心拍の要求を送ります.
    @Override
      public HeartbeatResponseProto sendHeartbeat(RpcController controller,
          HeartbeatRequestProto request) throws ServiceException {
        HeartbeatResponse response;
        try {
            ......
          //    
          response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
              report, request.getCacheCapacity(), request.getCacheUsed(),
              request.getXmitsInProgress(),
              request.getXceiverCount(), request.getFailedVolumes(),
              volumeFailureSummary);
        } catch (IOException e) {
          throw new ServiceException(e);
        }
        
        //    
        HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
            .newBuilder();
        DatanodeCommand[] cmds = response.getCommands();
        .............
      }
    具体的に心拍数を送るプロトコルについては、DatanodeProtocol類のsendHeart beat方法を見てみます.
    /**
       *
       * sendHeartbeat    namenode  datanode   ,            . namenode         datanode      ,  DatanodeCommand       。    DatanodeCommand  ,DataNode           、             datanode   。
       *
       * sendHeartbeat() tells the NameNode that the DataNode is still
       * alive and well.  Includes some status info, too. 
       * It also gives the NameNode a chance to return 
       * an array of "DatanodeCommand" objects in HeartbeatResponse.
       * A DatanodeCommand tells the DataNode to invalidate local block(s), 
       * or to copy them to other DataNodes, etc.
       * @param registration datanode registration information datanode     
       * @param reports utilization report per storage datanode           (datanode          ,            ,   、disk、ssd )
       * @param xmitsInProgress number of transfers from this datanode to others
       * @param xceiverCount number of active transceiver threads
       * @param failedVolumes number of failed volumes
       * @param volumeFailureSummary info about volume failures
       * @throws IOException on error
       */
      @Idempotent
      public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
                                           StorageReport[] reports,
                                           long dnCacheCapacity,
                                           long dnCacheUsed,
                                           int xmitsInProgress,
                                           int xceiverCount,
                                           int failedVolumes,
                                           VolumeFailureSummary volumeFailureSummary)
          throws IOException;
    namenodeは登録と心拍情報を受信します.
    DatanodeManager簡単な紹介
    まず、DatanodeManagerの重要な変数をいくつか紹介します.
    /**
       * 
       * 
       * datanodeMap  map      StorageID DatanodeDescriptor     ,     ,datanode namenode           。
       * 
       *  1.        storage id  ,    map 。
       *  2.        ,             
       *  3.       datanode      storage id   
       * 
       * Stores the datanode -> block map.  
       * 

    * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by * storage id. In order to keep the storage map consistent it tracks * all storages ever registered with the namenode. * A descriptor corresponding to a specific storage id can be *

      *
    • added to the map if it is a new storage id;
    • *
    • updated with a new datanode started as a replacement for the old one * with the same storage id; and
    • *
    • removed if and only if an existing datanode is restarted to serve a * different storage id.
    • *

    *

    * Mapping: StorageID -> DatanodeDescriptor */

    private final NavigableMap datanodeMap = new TreeMap(); /** * * Cluster network topology */ private final NetworkTopology networktopology; /** * host DatanodeDescriptor , datanode。 * Host2NodesMap String DatanodeDescriptor[] , * datanode map , * Host2NodesMap * Host names to datanode descriptors mapping. */ private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
    namednoeは登録情報を受信します.
    登録とときめきに関わらず、datanodeはrpcでnamenodeの同名の方法を呼び出しました.具体的な実現はNameNodeRpcServeにあります.
    register Datanode方法ではFSS Namesystemのregister Datanode方法を呼び出し、最終的な処理方法はDatanodeManager.register Datanodeにあります.
    まず下記の2行のコードを通じて登録されたdatanodeのdatanodemanageの中の2つのmapの情報を取得しました.
    //    nodeS   datanodeMap    datanode  
    DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
    // nodeN   host2DatanodeMap     
    DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(nodeReg.getIpAddr(), nodeReg.getXferPort());
    次にdatanodemanageのdatanodeMapのコメントの中で言っている3つの状況をそれぞれ処理します.
    //          ,         ID
          if (nodeN != null && nodeN != nodeS) {
            NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
            // nodeN previously served a different data storage, 
            // which is not served by anybody anymore.
            //  
            removeDatanode(nodeN);
            // physically remove node from datanodeMap
            //       ,      datanode       
            wipeDatanode(nodeN);
            nodeN = null;
          }
    
          //       ,        
          if (nodeS != null) {
                ................
              nodeS.updateRegInfo(nodeReg);
    
              nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
              nodeS.setDisallowed(false); // Node is in the include list
    
              //           
              // resolve network location
              if(this.rejectUnresolvedTopologyDN) {
                nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
                nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
              } else {
                nodeS.setNetworkLocation(
                    resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
                nodeS.setDependentHostNames(
                    getNetworkDependenciesWithDefault(nodeS));
              }
              getNetworkTopology().add(nodeS);
                
              // also treat the registration message as a heartbeat
              heartbeatManager.register(nodeS);
              incrementVersionCount(nodeS.getSoftwareVersion());
              startDecommissioningIfExcluded(nodeS);
              success = true;
             ...........................
          }
    
    
    
        //                    
        
        
                // resolve network location
                //      ,            
            if(this.rejectUnresolvedTopologyDN) {
              nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
              nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
            } else {
              nodeDescr.setNetworkLocation(
                  resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
              nodeDescr.setDependentHostNames(
                  getNetworkDependenciesWithDefault(nodeDescr));
            }
            networktopology.add(nodeDescr);
            nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
      
            // register new datanode
            addDatanode(nodeDescr);
            // also treat the registration message as a heartbeat
            // no need to update its timestamp
            // because its is done when the descriptor is created
            heartbeatManager.addDatanode(nodeDescr);
            incrementVersionCount(nodeReg.getSoftwareVersion());
            startDecommissioningIfExcluded(nodeDescr);
            success = true;
    上記では、三つの登録の場合にそれぞれ処理を行い、新しいノードに登録する場合には、最終的にaddDatanode方法を呼び出して登録し、主にその二つのmapに該当するdatanode情報を追加し、datanodeをネットワークトポロジに追加する.
    /** Add a datanode. */
      void addDatanode(final DatanodeDescriptor node) {
        // To keep host2DatanodeMap consistent with datanodeMap,
        // remove  from host2DatanodeMap the datanodeDescriptor removed
        // from datanodeMap before adding node to host2DatanodeMap.
        
        //  datanodeMap  host2DatanodeMap 
        synchronized(datanodeMap) {
          host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
        }
    
        //     
        networktopology.add(node); // may throw InvalidTopologyException
        host2DatanodeMap.add(node);
        checkIfClusterIsNowMultiRack(node);
    
        if (LOG.isDebugEnabled()) {
          LOG.debug(getClass().getSimpleName() + ".addDatanode: "
              + "node " + node + " is added to datanodeMap.");
        }
      }
    name node受信心拍数情報
    namenodeが心拍数を処理するのはdatanodeと同名の方法sendHeart beatの中で、最終の処理方法はDatanodeManager.handleHeart beatの方法です.
    ときめきの具体的な流れは以下の通りです.
    1.datanodeの情報を入手して、接続が許可されているかどうかを判断します.もし許可されていないなら、直接に異常を投げます.2.登録したかどうかを判断し、登録していない場合は、登録コマンド3に直接戻ります.datanodeの情報を更新するのは、主にDatanodeDescriptorの情報を更新することです.例えば、使用空間、空きスペースなど.4.セキュリティモードであるかどうかを確認します.リース状況を確認します.6.コピーコマンドを生成します.7.削除コマンドを生成します.8.キャッシュ関連コマンドを生成します.帯域幅関連コマンドを生成します.10.すべてのコマンドを返します.
    関連コードは以下の通りです.
    /** Handle heartbeat from datanodes. */
      public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] reports, final String blockPoolId,
          long cacheCapacity, long cacheUsed, int xceiverCount, 
          int maxTransfers, int failedVolumes,
          VolumeFailureSummary volumeFailureSummary) throws IOException {
        synchronized (heartbeatManager) {
          synchronized (datanodeMap) {
            DatanodeDescriptor nodeinfo = null;
            try {
              //  datanode   
              nodeinfo = getDatanode(nodeReg);
            } catch(UnregisteredNodeException e) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
            }
            //      
            // Check if this datanode should actually be shutdown instead. 
            if (nodeinfo != null && nodeinfo.isDisallowed()) {
              setDatanodeDead(nodeinfo);
              throw new DisallowedDatanodeException(nodeinfo);
            }
    
            //       
            if (nodeinfo == null || !nodeinfo.isRegistered()) {
              return new DatanodeCommand[]{RegisterCommand.REGISTER};
            }
    
            //  datanode   ,     ,     
            heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                             cacheCapacity, cacheUsed,
                                             xceiverCount, failedVolumes,
                                             volumeFailureSummary);
    
            //        
            // If we are in safemode, do not send back any recovery / replication
            // requests. Don't even drain the existing queue of work.
            if(namesystem.isInSafeMode()) {
              return new DatanodeCommand[0];
            }
    
             //      
            //check lease recovery
            BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
                .getLeaseRecoveryCommand(Integer.MAX_VALUE);
            if (blocks != null) {
              BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
                  blocks.length);
               .................................
              return new DatanodeCommand[] { brCommand };
            }
    
           
            final List cmds = new ArrayList();
            //      
            //check pending replication
            List pendingList = nodeinfo.getReplicationCommand(
                  maxTransfers);
            if (pendingList != null) {
              cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
                  pendingList));
            }
            //        ,      
            //check block invalidation
            Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
            if (blks != null) {
              cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
                  blockPoolId, blks));
            }
            //         
            boolean sendingCachingCommands = false;
            long nowMs = monotonicNow();
            if (shouldSendCachingCommands && 
                ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
                    timeBetweenResendingCachingDirectivesMs)) {
              DatanodeCommand pendingCacheCommand =
                  getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
                    DatanodeProtocol.DNA_CACHE, blockPoolId);
              if (pendingCacheCommand != null) {
                cmds.add(pendingCacheCommand);
                sendingCachingCommands = true;
              }
              DatanodeCommand pendingUncacheCommand =
                  getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
                    DatanodeProtocol.DNA_UNCACHE, blockPoolId);
              if (pendingUncacheCommand != null) {
                cmds.add(pendingUncacheCommand);
                sendingCachingCommands = true;
              }
              if (sendingCachingCommands) {
                nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
              }
            }
    
            blockManager.addKeyUpdateCommand(cmds, nodeinfo);
             //         
            // check for balancer bandwidth update
            if (nodeinfo.getBalancerBandwidth() > 0) {
              cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
              // set back to 0 to indicate that datanode has been sent the new value
              nodeinfo.setBalancerBandwidth(0);
            }
    
            //       
            if (!cmds.isEmpty()) {
              return cmds.toArray(new DatanodeCommand[cmds.size()]);
            }
          }
        }
    
        return new DatanodeCommand[0];
      }