HDFS 2.Xソース分析の:NameNodeファイルを書く原理

8413 ワード

原文は雲台ブログより:http://yuntai.1kapp.com/?p=950
      HDFSは1回書き,複数回読むアプリケーションシーンとして設計されており,これはそのMapReduceメカニズムと密接に関連しているはずであり,線上の読み書きスケールモニタリングにより,概ね読み書き比が10:1であり,その設計の目標も検証されている. 
GFS論文で述べた書き込みファイルの簡単な流れ:
HDFS詳細フロー:
ファイルへの書き込みは、読み込みよりも複雑です.
1、HDFSが提供するクライアント開発ライブラリClientを使用して、リモートのNamenodeにRPC要求を開始する;
2、Namenodeは作成するファイルがすでに存在するかどうかを検査し、作成者は操作する権限があるかどうかを検査し、成功すればファイルに記録を作成し、そうしないとクライアントに異常を投げ出す.
3、クライアントがファイルの書き込みを開始すると、ファイルは複数のpacketsに分割され、内部でデータキュー「data queue」の形でこれらのpacketsを管理し、Namenodeに新しいblocksを申請し、replicasを格納するための適切なdatanodesリストを取得し、リストのサイズはNamenodeでreplicationの設定によって決まる.
4、pipeline(パイプ)の形でpacketをすべてのreplicasに書き込むことを開始します.開発ライブラリはpacketを最初のdatanodeにストリームで書き込み、このdatanodeはpacketを格納した後、このpipelineの次のdatanodeに渡し、最後のdatanodeまでデータを書く方法は流水線の形式を呈している.
5、最後のdatanodeが正常に保存された後、ack packetが返され、pipelineでクライアントに渡され、クライアントの開発ライブラリ内部で「ack queue」が維持され、datanodeから返されたack packetが正常に受信された後、「ack queue」から対応するpacketが除去される.
6、転送中にdatanodeに障害が発生した場合、現在のpipelineは閉じられ、障害が発生したdatanodeは現在のpipelineから削除され、残りのblockは残りのdatanodeの中でpipelineの形で転送され続け、Namenodeは新しいdatanodeを割り当て、replicas設定の数を維持する.
3.2.1.メタデータの作成
次の図は、NameNodeがファイルの書き込み中にファイルメタデータを作成するフローチャートを示しています.主に次のステップに分けられます.
                                                                                      メタデータフローチャートの作成
1、RPCから伝達されたパラメータを取得し、関連する合法性検査を行う.
2、親ディレクトリが存在するかどうかを検証し、存在しない場合は作成する(クライアントが親ディレクトリが存在しない場合は作成するように要求した場合、いいえ親ディレクトリが存在しないと報告する).
3、この操作が以前の古いファイルを上書きするか、追加するか、ファイルを再作成するかを判断する.以前の古いファイルを上書きし、古いファイルのメタデータとDNなどのマッピング関係を削除し、追加で書くと以前のinodeタイプがUC型inodeであることを変更し、ファイルを再作成するにはinodeオブジェクトを再作成する必要がある場合、もちろんそのオブジェクトはtree構造に掛けられます.同時に賃貸契約に加入することも忘れないでください.
4、操作をログファイルに書き込む;
コード呼び出しプロセスは、NameNodeRpcServer.Create()->FSNamesystem.startFile()->FSNamesystem.startFileInt()->FSNamesystem.checkOperation()->FSNamesystem.startFileInternal()です.startFileInternalは、startFileInternalで具体的には次のようになります.
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
try {
        // tree   inode    
      INodeFile myFile = dir.getFileINode(src);
      try {
        blockManager.verifyReplication(src, replication, clientMachine);
      } catch(IOException e) {
        throw new IOException("failed to create "+e.getMessage());
      }
      boolean create = flag.contains(CreateFlag.CREATE);
      if (myFile == null) {
        if (!create) {
          throw new FileNotFoundException("failed to overwrite or append to non-existent file "
            + src + " on client " + clientMachine);
        }
      } else {
        // File exists - must be one of append or overwrite
          //      ,         ,        
        if (overwrite) {
          delete(src, true);
        } else {
           //          ,      lease
          // Opening an existing file for write - may need to recover lease.
          recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
          if (!append) {
            throw new FileAlreadyExistsException("failed to create file " + src
                + " on client " + clientMachine
                + " because the file exists");
          }
        }
      }
      final DatanodeDescriptor clientNode =
          blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
      if (append && myFile != null) {
        return prepareFileForWrite(
            src, myFile, holder, clientMachine, clientNode, true);
      } else {
       // Now we can add the name to the filesystem. This file has no
       // blocks associated with it.
       //
       checkFsObjectLimit();
        // increment global generation stamp
        long genstamp = nextGenerationStamp();
        INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
            replication, blockSize, holder, clientMachine, clientNode, genstamp);
        if (newNode == null) {
          throw new IOException("DIR* NameSystem.startFile: " +
                                "Unable to add file to namespace.");
        }
        leaseManager.addLease(newNode.getClientName(), src);
        // record file record in log, record new generation stamp
        getEditLog().logOpenFile(src, newNode);
        if (NameNode.stateChangeLog.isDebugEnabled()) {
          NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                     +"add "+src+" to namespace for "+holder);
        }
3.2.2.データブロックの割当て
ClientはNameNodeの作成に成功したメタデータの結果を受信すると、NameNodeにブロックの割り当てを要求します.ファイルの各ブロックについて、ClientはRPCでNameNode側のaddbrick操作を呼び出し、addbrick内で新しいブロックにブロックIDとタイムスタンプを生成します.また、ブロックのいくつかのコピー(デフォルト3)がどのDataNodeを格納すべきかを格納します.NameNodeが新しく割り当てたブロックについては、次のような状態が発生します.
ちなみに、1つのブロックのコピーについては、次のような状態があります.
Clientは割当てブロックを受信し、応答ノードにデータブロックをアップロードする過程で、失敗した場合、NameNode割当てブロックを再要求します.もちろん、次回割当てたブロックは前回のブロックで割当てられたDataNodeノードには割り当てられません.前回のUploadデータブロックが失敗したため、このDataNodeノードとclientの間のネットワークに異常が発生する可能性があります.
                                                                        割付データブロックフローチャート
コード呼び出しプロセス:
NameNodeRpcServer.addBlock()->FSNamesystem.getAdditionalBlock()は、プロセス全体のコアインプリメンテーションもメソッドgetAdditionalBlockの内部にあり、詳細は次のコードを参照してください.ここでは、割り当てブロックが比較的重要で、理解しにくい点を主に説明します.ブロックを割り当てる前に、ファイルの最後のブロック情報が前回clientにアップロードされたブロックと一致しているかどうかを比較します.一致すれば、次の2つの状況があります.
1、今回のブロックの割り当てを再要求する前に、NameNodeはファイルにブロックを割り当てたばかりなので、今回の比較は等しいはずです.
2、ファイルが新しいファイルであれば、最後のブロックと前回Clientにアップロードされたブロックはすべて空であるため、等しいので、最初のブロックを割り当てることに相当する.
ただし、前回クライアントにアップロードされたブロックが、取得したファイルの最後のブロック情報と異なる場合もあります.
1、今回の操作はファイルappendを開いたばかりの最初の操作なので、lastBlockInFileは空ではありませんが、previousBlockは空です.
2、クライアントが今回の要求を行う前に、クライアントtimeoutまたはHAfailoverは、割り当てデータブロック情報の成功応答を受信していないため、クライアントが再び割り当てブロックを要求する可能性があるが、これではファイルに対してブロックが重複する.
                                         リクエストtimeoutフローチャート
                                    Name Node Failover状況フローチャート
現在のHadoop 2.0のアプローチは、ファイルの最後のブロックサイズが0の場合、その割り当てられたデータブロックを破棄して再割り当てすることであるが、これは実装上特に良くない可能性があり、まずファイルの最後のブロックを破棄し、最新のデータブロック情報をログに更新し、その後、最新のブロック情報をログに更新し、2回繰り返し更新する疑いがある.コードクリップ:
//           ,     0,        ,        ??                ,
          //                     ,                      
          // The retry case ("b" above) -- abandon the old block.
          NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
              "caught retry for allocation of a new block in " +
              src + ". Abandoning old block " + lastBlockInFile);
          //           ,         
          dir.removeBlock(src, pendingFile, lastBlockInFile);
        //    path        ,     ,                ,                  
          dir.persistBlocks(src, pendingFile);
3、1,2の2つを除き、現在は放出異常として処理されている.
次に、ラックを介してDataNodeノードを選択し、ファイルと新しいブロックのマッピングを確立し、更新されたブロック情報をログに書き込むコードクリップを貼り付けます.
// choose targets for the new block to be allocated.
    final DatanodeDescriptor targets[] = blockManager.chooseTarget(
        src, replication, clientNode, excludedNodes, blockSize);
 
    // Allocate a new block and record it in the INode.
    writeLock();
    try {
      checkOperation(OperationCategory.WRITE);
      if (isInSafeMode()) {
        throw new SafeModeException("Cannot add block to " + src, safeMode);
      }
      INode[] pathINodes = dir.getExistingPathINodes(src);
      int inodesLen = pathINodes.length;
      checkLease(src, clientName, pathINodes[inodesLen-1]);
      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction)
                                                pathINodes[inodesLen - 1];
                                                          
      if (!checkFileProgress(pendingFile, false)) {
        throw new NotReplicatedYetException("Not replicated yet:" + src);
      }
 
      // allocate new block record block locations in INode.
      //          
      newBlock = allocateBlock(src, pathINodes, targets);
     
      for (DatanodeDescriptor dn : targets) {
        dn.incBlocksScheduled();
      }
      //   ,     
      dir.persistBlocks(src, pendingFile);
    } finally {
      writeUnlock();
    }
    //            ?                    ,     persistBlocks true,
    //         ,            ,      
    if (persistBlocks) {
      getEditLog().logSync();
    }

これにより、データブロックが正常に割り当てられ、応答情報がクライアントに返される.