DataNode起動最適化の改善:ディスク検出の並列化

19558 ワード

前言
この記事では、HDFSでのDataNode起動の問題について、皆さんが普段出会うシーンについてお話ししたいと思います.DataNodeの起動は非常に迅速なことではないでしょうか.この中に大きな問題がありますか?ここを見て、あなたもそう思っているなら、あなたが運営しているクラスタが遭遇する異常なシーンはまだ多くありません.本明細書で説明する問題は、DataNodeが起動できない問題ではなく、起動が遅すぎる場合がある問題です.DataNodeプロセスの開始が遅すぎることによる直接的な影響は、データのサービス遅延です.クラスタ規模は、大規模レベルの場合、大量のDataNodeが遅い起動する現象が発生すると、クラスタ自体の対外サービスに大きな影響を及ぼす可能性があります.本明細書では、現在のDataNodeの起動最適化について説明し、起動時間を短縮します.最適化の主なポイントは、DataNodeの起動時のディスク検出動作です.
既存のDataNode起動時のディスク検出
このセクションの主な内容について説明する前に、DataNodeの起動時の操作について概説します.HDFSでは、1つのデータノードが起動から最終的にデータサービスを提供するまで、多くの操作手順が行われます.ここでは主に以下の点に要約します.
  • 1.解析データディレクトリであるdatadirで構成されているディレクトリを読み込みます.
  • 2.これらのディレクトリに対応するディスクが不良ディスクであるかどうかを確認します(このステップの既存のロジックはシリアル実行です).
  • 3.DataNodeはハートビートメッセージを送信し、NameNodeに登録します.
  • 4.各データ・ディレクトリの下にあるデータ・ブロックをスキャンし、NameNodeに最初に報告します.

  • 主に上記のロジックですが、このセクションで最適化するポイントは、ディスク検出に関する2番目のポイントです.DataNodeの起動中に、なぜディスクの健康診断を行うのですか?ノード自体のデータ可用性を保証する重要な指標であるため、データノードがディスク検出で不良ディスクの個数が許容しきい値(適合可能)を超えていることを発見した場合、データノードの起動に直接失敗し、異常を放出します.このように、HDFSはディスクの可用性を重視しています.通常の場合、この部分の検出操作は非常に順調ですが、場合によっては、次の2つのような検出に時間がかかる場合があります.
    1つ目は、ノード内に構成されているディスクディレクトリが非常に多い場合です.例えば、マシンには10ブロック、20ブロックのディスクがあり、ディスク数に対応するディレクトリを構成しました.現在、ディスク健康診断の論理はシリアル実行であるため、総実行時間は線形に増加します.もちろん、マシンディスク自体が健康であれば、総時間もあまりかかりません.恐ろしいのは2つ目の状況です.2つ目は、個別のDataNodeディスク・データ・ディレクトリの検出が非常に遅いことです.このディレクトリは、ディスク自体のパフォーマンスの問題に対応している可能性があります(DataNodeディスク・ヘルス検出時に、ディレクトリの下にファイル、ディレクトリ・アクションを作成してディスクが使用可能かどうかを判断しようとします).このとき、後で検出されるディレクトリは、現在のディスク検出動作の完了を待たざるを得ず、最終的には総検出時間が長すぎます.
    したがって、第2の点で述べた個別ディスク検出が全体に極めて遅い影響を及ぼす問題を避けるために、改造することができ、改造の核心点は、既存のディスク検出の実行ロジックをシリアル化から並列化に変更することにある.この改善策は現在コミュニティで行われている、JIRA番号HDFS-11086(DataNode disk check improvements).本文の主な思想とコードもこのJIRAの上で参考にします.
    既存のDataNode内部ディスク検出コード
    次に、現在のDataNodeディスク検出コードを見てみましょう.シリアル実行の論理であることが分かっている以上、DataNode内部ではどのように実行されているのでしょうか.
    まず、DataNodeノードを初期化する操作です.コードは次のとおりです.
      public static DataNode instantiateDataNode(String args [], Configuration conf,
          SecureResources resources) throws IOException {
        if (conf == null)
          conf = new HdfsConfiguration();
    
        if (args != null) {
          // parse generic hadoop options
          GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
          args = hParser.getRemainingArgs();
        }
    
        //   DataNode    
        if (!parseArguments(args, conf)) {
          printUsage(System.err);
          return null;
        }
        Collection dataLocations = getStorageLocations(conf);
        UserGroupInformation.setConfiguration(conf);
        SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
            DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
        //   DataNode      
        return makeInstance(dataLocations, conf, resources);
      }

    rそしてmakeInstanceメソッドに入り
      static DataNode makeInstance(Collection dataDirs,
          Configuration conf, SecureResources resources) throws IOException {
        LocalFileSystem localFS = FileSystem.getLocal(conf);
        FsPermission permission = new FsPermission(
            conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                     DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
        //          
        DataNodeDiskChecker dataNodeDiskChecker =
            new DataNodeDiskChecker(permission);
        //               
        List locations =
            checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
        DefaultMetricsSystem.initialize("DataNode");
    
        assert locations.size() > 0 : "number of data directories should be > 0";
        return new DataNode(conf, locations, resources);
      }

    最後に、実際のディスク検出動作に入ります.
      static List checkStorageLocations(
          Collection dataDirs,
          LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
              throws IOException {
        ArrayList locations = new ArrayList();
        StringBuilder invalidDirs = new StringBuilder();
        //       ,           
        for (StorageLocation location : dataDirs) {
          final URI uri = location.getUri();
          try {
            //                  
            dataNodeDiskChecker.checkDir(localFS, new Path(uri));
            //           ,       ,       
            locations.add(location);
          } catch (IOException ioe) {
            //     IO  ,          ,      
            LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
                + location + " : ", ioe);
            invalidDirs.append("\"").append(uri.getPath()).append("\" ");
          }
        }
        //          0,           
        if (locations.size() == 0) {
          throw new IOException("All directories in "
              + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
              + invalidDirs);
        }
        //           
        return locations;
      }

    OK、これによって私たちが改造する場所を明確に知ることができます.
    DataNodeディスク検出並列化改造
    このセクションでは、JIRA HDFS-11086の2つのサブJIRA:HDFS-11119(Support for parallel checking of StorageLocations on DataNode startup)とHDFS-1148(Update DataNode to use StorageLocationChecker at startup)に焦点を当てたコミュニティの現在の改善について説明します.
    HDFS−11086では、著者らは、Future−Getのような非同期実行モードを導入しているが、JDKにおけるオリジナルのFuture−Getではなく、ListenableFutureというクラス(パッケージ名com.google.common.util.concurrent.ListenableFuture)を使用している.このFuture類を使ってみてください.
    また、2つの小さな点は、本人が同様に良い最適化点と考えている点です.1つ目は、最近のディスク検出の結果を追加的に保持し、最小検出に必要な間隔の時間サイズを新たに定義します.この点は非常に有意義であり,短時間での繰返し検出動作を回避することができる.すなわち、あるディスクが最小検出間隔で再び検出されると、前回の検出結果が直接返され、真の検出操作は実行されません.第二に、内部にディスク検出の最大タイムアウト時間が新たに定義され、言い換えれば、あるディスク検出が非常に遅い場合、IO異常を直接投げ出してこの操作を終了し、次のディスク検出結果の戻りを行う.
    ディスク検出改造関連クラス設計
    今回のディスク検出改造に関するクラス設計では、新しいpackage:orgに配置する次のクラスを定義した.apache.hadoop.hdfs.server.datanode.checkerの下(最新のhadoop-trunkのコードを取得すれば見つかります).
  • AsyncChecker:最も基本的なインタフェースクラスで、内部的に非同期検出と停止を開始するための操作方法を定義しています.
  • ThrottledAsyncChecker:非同期検出インタフェースの具体的な実装クラス.
  • StorageLocationChecker:ディスク検出オブジェクトクラスです.このオブジェクトは、各ディスクを並列に検出するために、上の非同期検出ディスククラスを呼び出します.
  • VolumeCheckResult:ディスク検出結果クラス、HEALTHY、DEGRADED、FAILEDの3種類の検出結果が定義されています.

  • ディスク検出の具体的なコード実装
    関連クラスの設計を理解した後、私たちは最後にこの部分のコード実装を本当に勉強します.まず改造すべき点は,前述のmakeInstance法におけるシリアル検出の論理であり,元のコードは以下の通りである.
      static DataNode makeInstance(Collection dataDirs,
          Configuration conf, SecureResources resources) throws IOException {
        LocalFileSystem localFS = FileSystem.getLocal(conf);
        FsPermission permission = new FsPermission(
            conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                     DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
        DataNodeDiskChecker dataNodeDiskChecker =
            new DataNodeDiskChecker(permission);
        //           
        List locations =
            checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
        DefaultMetricsSystem.initialize("DataNode");
    
        assert locations.size() > 0 : "number of data directories should be > 0";
        return new DataNode(conf, locations, resources);
      }

    このコードセクションでは、次のコードに変更されます.
      static DataNode makeInstance(Collection dataDirs,
          Configuration conf, SecureResources resources) throws IOException {
        //       ,  StorageLocationChecker       
        List locations;
        StorageLocationChecker storageLocationChecker =
            new StorageLocationChecker(conf, new Timer());
        try {
          locations = storageLocationChecker.check(conf, dataDirs);
        } catch (InterruptedException ie) {
          throw new IOException("Failed to instantiate DataNode", ie);
        }
        DefaultMetricsSystem.initialize("DataNode");
        assert locations.size() > 0 : "number of data directories should be > 0";
        return new DataNode(conf, locations, resources);
      }

    次にStorageLocationChecker内の検出方法を重点的に見てみましょう.
      public List check(
          final Configuration conf,
          final Collection dataDirs)
          throws InterruptedException, IOException {
    
        //       、       
        final ArrayList goodLocations = new ArrayList<>();
        final Set failedLocations = new HashSet<>();
        //       
        final Map> futures =
            Maps.newHashMap();
        final LocalFileSystem localFS = FileSystem.getLocal(conf);
        final CheckContext context = new CheckContext(localFS, expectedPermission);
    
        //       ,         AsyncChecker ,      
        for (StorageLocation location : dataDirs) {
          futures.put(location,
              delegateChecker.schedule(location, context));
        }
    
        //           
        final long checkStartTimeMs = timer.monotonicNow();
    
        // Retrieve the results of the disk checks.
        for (Map.Entry> entry : futures.entrySet()) {
    
          //              
          final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
          //                                 
          final long timeLeftMs = Math.max(0,
              maxAllowedTimeForCheckMs - waitSoFarMs);
          final StorageLocation location = entry.getKey();
    
          try {
            //          ,              ,            
            final VolumeCheckResult result =
                entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
            //                 
            switch (result) {
            case HEALTHY:
              goodLocations.add(entry.getKey());
              break;
            case DEGRADED:
              LOG.warn("StorageLocation {} appears to be degraded.", location);
              break;
            case FAILED:
              LOG.warn("StorageLocation {} detected as failed.", location);
              failedLocations.add(location);
              break;
            default:
              LOG.error("Unexpected health check result {} for StorageLocation {}",
                  result, location);
              goodLocations.add(entry.getKey());
            }
          } catch (ExecutionException|TimeoutException e) {
            //       ,         
            LOG.warn("Exception checking StorageLocation " + location,
                e.getCause());
            failedLocations.add(location);
          }
        }
        //               ,   IO  
        if (failedLocations.size() > maxVolumeFailuresTolerated) {
          throw new IOException(
              "Too many failed volumes: " + failedLocations.size() +
              ". The configuration allows for a maximum of " +
              maxVolumeFailuresTolerated + " failed volumes.");
        }
        //           ,     
        if (goodLocations.size() == 0) {
          throw new IOException("All directories in "
              + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
              + failedLocations);
        }
        //           
        return goodLocations;
      }

    上で実行した論理は非常に明確で,最後にThrottledAsyncCheckerの非同期検出論理,すなわち入口は上のschedule法である.
      public synchronized ListenableFuture schedule(
          final Checkable target,
          final K context) {
        LOG.debug("Scheduling a check of {}", target);
        //                 ,        
        if (checksInProgress.containsKey(target)) {
          return checksInProgress.get(target);
        }
    
        //                  
        if (completedChecks.containsKey(target)) {
          //           
          final LastCheckResult result = completedChecks.get(target);
          //              
          final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
          //                  ,            
          if (msSinceLastCheck < minMsBetweenChecks) {
            LOG.debug("Skipped checking {}. Time since last check {}ms " +
                "is less than the min gap {}ms.",
                target, msSinceLastCheck, minMsBetweenChecks);
            return result.result != null ?
                Futures.immediateFuture(result.result) :
                Futures.immediateFailedFuture(result.exception);
          }
        }
        //                
        final ListenableFuture lf = executorService.submit(
            new Callable() {
              @Override
              public V call() throws Exception {
                return target.check(context);
              }
            });
        //  Future            
        checksInProgress.put(target, lf);
        addResultCachingCallback(target, lf);
        return lf;
      }

    ここでのターゲットオブジェクトtarget.checkメソッドは、DiskChekerの真のディスク検出メソッド、すなわちStorageLocationのcheckメソッドを呼び出します.関連コードは次のとおりです.
      public VolumeCheckResult check(CheckContext context) throws IOException {
        //          DiskChecker     
        DiskChecker.checkDir(
            context.localFileSystem,
            new Path(baseURI),
            context.expectedPermission);
        return VolumeCheckResult.HEALTHY;
      }

    OK,以上が本稿で述べるDataNode起動最適化改造のディスク検出並列化の内容である.これは小さな改造にすぎないことを軽視してはいけません.極端な場合、後続の多くの問題を避けるのに役立つかもしれません.
    参考資料
    [1].DataNode disk check improvements [2].Support for parallel checking of StorageLocations on DataNode startup [3].Update DataNode to use StorageLocationChecker at startup