HDFS 2.Xソース分析の:NameNodeファイルを読む原理

6878 ワード

原文は雲台ブログより:http://yuntai.1kapp.com/?p=952
HDFSは1回書き,複数回読むアプリケーションシーンとして設計されており,これはそのMapReduceメカニズムと密接に関連しているはずであり,線上の読み書きスケールモニタリングにより,概ね読み書き比が10:1であり,その設計の目標も検証されている.
3.1リードフロー分析
GFS論文で述べたファイル読み込みの簡単な流れ:
HDFSでは、具体的な流れは次の図のようになります.
上の図から、ファイルの読み取りには次のようなプロセスが必要であることがわかります.
HDFSが提供するクライアント開発ライブラリClientを使用して、リモートのNameNodeにRPCリクエストを開始します.
Namenodeは状況に応じてファイルの一部またはすべてのblockリストを返し、各blockに対してNamenodeはそのblockコピーがあるDataNodeアドレスを返します.
クライアント開発ライブラリClientは、クライアントに最も近いDataNodeを選択してblockを読み込む.クライアント自体がDataNodeである場合、ローカルから直接データが取得される.現在のblockのデータを読み出した後、現在のDataNodeとの接続を閉じ、次のblockを読み込むために最適なDataNodeを探します.
リストのblockを読み終わった後、ファイルの読み取りがまだ終わっていない場合、クライアント開発ライブラリはNamenodeに次のblockリストを取得し続けます.
1つのblockを読み込むとchecksum検証が行われ、datanodeを読み込むときにエラーが発生した場合、クライアントはNamenodeに通知し、次のblockコピーを持つdatanodeから読み続けます.
ここで、NameNodeがクライアントから受け取ったRPC要求についてのみファイルを取得し、ファイルの対応するブロック、ブロックの位置を分析し、その呼び出しコードの流れは、NameNodeRpcServer.getBlockLocations()->FSNamesystem.getBlockLocations()->FSNamesystem.getBlockLocations()->DatanodeManager.sortLocatedBlocks()である.具体的なフローチャートは以下の通りです.
上の図から分かるように、Clientを受信するRPC要求方法getBlockLocationsは、パスに読み取り権限があるかどうかを確認し、ブロック位置を取得しようとする最初のループに入り、最初であれば、まずNameNodeノードの状態が読み取り操作を許可しているかどうかを確認する.次に、NameSpaceからファイルのinodeオブジェクトを取得します.アクセス時間がサポートされ、ループが1回目の場合、contineは次のループに入り、2回目の場合、ファイルのアクセス時間を変更し、BlockManagerクラスの呼び出し方法createLocatedBlocksを開始します.
for (int attempt = 0; attempt < 2; attempt++) {
      if (attempt == 0) { // first attempt is with readlock
        readLock();//     ,    
      }  else { // second attempt is with  write lock
//       ,          
        writeLock(); // writelock is needed to set accesstime
      }
      try {
        checkOperation(OperationCategory.READ);
        // if the namenode is in safemode, then do not update access time
        if (isInSafeMode()) {
          doAccessTime = false;
        }
        long now = now();
// namespace      inode  
        INodeFile inode = dir.getFileINode(src);
        if (inode == null) {
          throw new FileNotFoundException("File does not exist: " + src);
        }
        assert !inode.isLink();
        if (doAccessTime && isAccessTimeSupported()) {
          if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
            // if we have to set access time but we only have the readlock, then
            // restart this entire operation with the writeLock.
            if (attempt == 0) {
              continue;
            }
          }
          dir.setTimes(src, inode, -1, now, false);
        }
//       
        return blockManager.createLocatedBlocks(inode.getBlocks(),
            inode.computeFileSize(false), inode.isUnderConstruction(),
            offset, length, needBlockToken);
      } finally {
        if (attempt == 0) {
//       ,    
          readUnlock();
        } else {
//       ,    ,               ,               
          writeUnlock();
        }
      }
    }

ファイルにブロックがある場合は、ファイルブロック情報とその位置情報を取得する必要があります.
assert namesystem.hasReadOrWriteLock();
    if (blocks == null) {
      return null;
} else if (blocks.length == 0) {
//      
      return new LocatedBlocks(0, isFileUnderConstruction,
          Collections.emptyList(), null, false);
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
      }
      final AccessMode mode = needBlockToken? AccessMode.READ: null;
//       ,      ,       DN    
      final List locatedblocks = createLocatedBlockList(
          blocks, offset, length, Integer.MAX_VALUE, mode);
      final BlockInfo last = blocks[blocks.length - 1];
      //          complete  ,    =    -       
      //           
      final long lastPos = last.isComplete()?
          fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
          : fileSizeExcludeBlocksUnderConstruction;
//               
      final LocatedBlock lastlb = createLocatedBlock(last, lastPos, mode);
      return new LocatedBlocks(
          fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
          locatedblocks, lastlb, last.isComplete());
メソッドcreateLocatedBlockList
,名前とそのパラメータから分かるように,この方法により,ファイルのすべてのブロック,ブロック情報(ファイル内のブロックの開始位置を含む),ブロックが存在するDNマシンを取得することができる.メソッドcreateLocatedBlockは各ブロックを処理するために使用され、メソッドのみを変更し、3つのコピーのうちいくつかのコピーが破損している場合は、破損していないブロックのみがmachinesセットに追加され、3つのコピーが破損している場合は、3つのコピーがあるDNがmachinesセットに追加されますか?これはなぜですか.クライアントでcorrupを表示してブロックが破損しているかどうかを判断しますか?再確認が必要です
。。。。。。。。。。。。。。。。。。。。。。。。 
//       DN    
    final int numNodes = blocksMap.numNodes(blk);
    //         :            
    final boolean isCorrupt = numCorruptNodes == numNodes;
    //            ,                 
    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
    final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
    int j = 0;
    if (numMachines > 0) {
      for(Iterator it = blocksMap.nodeIterator(blk);
          it.hasNext();) {
        final DatanodeDescriptor d = it.next();
        final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
        //               ,           machines  
        //          ,         DN   machines  ??      ??
        //          corrupt         
        if (isCorrupt || (!isCorrupt && !replicaCorrupt))
          machines[j++] = d;
      }
    }
    assert j == machines.length :
      "isCorrupt: " + isCorrupt +
      " numMachines: " + numMachines +
      " numNodes: " + numNodes +
      " numCorrupt: " + numCorruptNodes +
      " numCorruptRepls: " + numCorruptReplicas;
    final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
    //eb:  poolid         
    //machines:       
    return new LocatedBlock(eb, machines, pos, isCorrupt);
ファイルのすべてのLocatedBlockオブジェクトを取得した後、各ブロックに対応するDNノードをclientからの距離でソートする必要があり、クライアントに近いほど配列の前に並び、そのDNノードが退役した場合、ランキングの数組の一番後ろにソートルールを導入し、ファイルのダウンロード速度を向上させることができる.具体的な実装はDatanodeManager.
sortLocatedBlocks()
 /** Sort the located blocks by the distance to the target host.
   *   DN client   ,       DN       ,           */
  public void sortLocatedBlocks(final String targethost,
      final List locatedblocks) {
    //sort the blocks
    final DatanodeDescriptor client = getDatanodeByHost(targethost);
    for (LocatedBlock b : locatedblocks) {
    //          ,  DN             
      networktopology.pseudoSortByDistance(client, b.getLocations());
      // Move decommissioned datanodes to the bottom
      //    DN     , DN     ,   DN    DN    
      Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
    }   
  }