Hadoop本第3版 3章 Hadoop分散ファイルシステムについて


*こちらはOpt社内で実施しているオライリーのHadoop本第3版の輪読会用資料になります


自己紹介

株式会社オプト シニアエンジニア @sisisin

  • GitHub
  • Twitter
  • フロントエンドマン(Angular,AngularJS中心)
  • スクラムマスター
  • 最近インフラ周りも始めました
  • Scalaも書いてます

Hadoop本第3版第3章Hadoop分散ファイルシステムについてかいつまんで説明していこうと思います
内容的にはHadoop本の内容をピックアップしているだけなので、真面目にやるなら実際のHadoop本を読んだほうが良いです


事前準備
(必須ではないが、動かしてみながらやるなら。)

  • この記事を参考に、ローカルのHDFS環境を設定する
  • リポジトリをクローンし、 3eブランチをチェックアウトしておく

はじめに

Hadoopは独自の分散ファイルシステムであるHDFS(Hadoop Distributed Filesystem)と呼ばれる仕組みを持っている
この章では、HDFSがどのような設計で、どんなAPIを用いて扱えるのかといったことを説明する


HDFSの設計

HDFSの設計は「HDFSは、ストリーミング型のデータアクセスパターンによって、非常に大きなファイルを保存するために設計されたファイルシステムで、コモディティハードウェアによって構成されるクラスタで動作します」と表現されている

  • 非常に大きなファイル
    • 数ペタバイトに及ぶ
  • ストリーミング型のデータアクセス
    • 「書き込みは一度、読み出しは何度も行う」のが最も効率的なデータ処理パターンだという発想に基づいている
  • コモディティハードウェア
    • 市場で普通に調達可能なハードウェアでクラスタを構成しても問題なく動くよう設計されている

HDFSの設計

HDFSの不向きな領域

  • 低レイテンシのデータアクセス
    • HDFSは高スループットを出すためにレイテンシを犠牲にしてしまうことがあり得る設計になっている
  • 大量の小さなファイル
    • ネームノード(後述。クラスタ上のどこに実際のデータが有るかを管理するデーモン)はメモリ上にファイルシステムのメタデータを持つので、ファイルシステム上に持てるファイル数がネームノードのメモリ量によって制限されてしまう。そのためファイル数が多くなる状況には向かない
    • ファイル・ディレクトリなどは1つに付き150バイト必要なので、例えばクラスタ内に100万個のファイルがあると最低でも300MBのメモリが必要といった具合
  • 複数のライターからの書き込みや任意のファイルの修正
    • HDFSは1つのライターのみから、ファイルの末尾に対してのみ書き込みが出来るという制約がある
    • 「書き込みは一度」の思想に反するのでそりゃそう、という感想

HDFSに関する概念

ブロック

  • ファイルシステムのブロックと同様の概念
    • ただし一般のファイルシステムよりは遥かに大きい(デフォルトで64MB)
    • HDFS上のファイルはブロックサイズのchunkに分割され、それぞれのchunkは独立した単位として保存される

ブロックを抽象化していることによる恩恵

  1. ディスクのサイズよりも大きいファイルが作れる
  2. ストレージのサブシステムが単純化出来る
    • ブロックが固定長になる
    • メタデータ(例えばパーミッション情報など)を持たないですむ
  3. レプリケーションと相性が良い
    • ブロックだけを物理的に別のマシンに複製しておける

ブロックが大きい理由

ファイルシステムのブロックに比べてHDFSのブロックが大きくなっているのは、ディスクのシークのコストを抑え、大容量ファイルの転送にかかる時間のボトルネックをシークコストではなくディスクの転送レートにすることが出来るため。

例えば、シークタイムを10ms、転送レートを100MB/sと仮定したときに、シークに使われる時間を1%に抑えようとするとブロックサイズを100MBにしなければいけないということがわかる。
なお、 ブロック数 < クラスタ内のノード数になると、MapReduceするときにノードが余ることになるので行き過ぎは良くない。


ネームノードとデータノード

HDFSクラスタには、マスター/ワーカーパターンで動作する2種類のノード群がある
- ネームノード(マスター)
- データノード(ワーカー)


ネームノード

ファイルシステムの名前空間を管理するノード。

  • ファイルシステムのツリーと、ツリー内の全ファイル及びディレクトリのメタデータを管理する。
  • この情報はローカルのディスク上に名前空間のイメージと編集ログという2つのファイルとして保存される。
  • 全てのファイルの全てのブロックがどのデータノードにあるかも保持しているが、この情報は永続化されない。
    • システム起動時に毎回データノード群から再構築される作りになっているため

データノード

ブロックを管理するノード。

  • クライアントあるいはネームノードからの要求に応じてブロックの読み書きを行う
  • 定期的にネームノードに自分が保管しているブロックのリストを報告する

ここまでの説明でわかるとおり、ネームノードがないとHDFSは使えないので、単一障害点となる。
Hadoopはネームノードの耐障害性を高めるために以下の2つの仕組みを持っている

  • メタデータ情報構成ファイルのバックアップ
  • セカンダリネームノード
    • セカンダリといいつつ、ネームノードとして働かないので注意
    • セカンダリの活用方法は10章で解説するらしい

HDFSフェデレーション

大規模なHDFSクラスタではネームノードのメモリがスケーリングする上でボトルネックになりうる。
HDFSフェデレーションを用いることで、ネームノードがファイルシステムの名前空間の一部を管理出来るようになり、この問題を解決する。

例えば、 ネームノード1には /user以下を、ネームノード2には /share以下を管理させる、というように出来る
このとき、ネームノード1が障害を起こしていても、ネームノード2が管理する /share以下は問題なく扱える。


HDFSの高可用性

Hadoop2系からはHDFSのHA対応がリリースされており、これによってネームノードの単一障害点問題を緩和している。
アクティブ・スタンバイ構成でネームノードのペアを起き、アクティブなネームノードに障害が起きた場合はスタンバイしていたネームノードが処理を引き継ぐように出来ている


フェイルオーバーとフェンシング

フェイルオーバーについて

  • アクティブネームノードからスタンバイネームノードへの移行はHadoopシステム内のフェイルオーバーコントローラによって管理されている
  • 各ネームノードはフェイルオーバーコントローラのプロセスを実行する
  • プロセスは、自ネームノードの障害モニタリングとフェイルオーバーのトリガー実行を行う
  • 手動でフェイルオーバーすることも可能(その場合はグレースフルにフェイルオーバーされる)

フェンシングについて

  • ネットワーク分断などで、アクティブなネームノードが動いてるにもかかわらずフェイルオーバーが実施された際にシステムに影響を出さないようにする処理をフェンシングという
  • ネームノードのプロセスをkillしたり、ネームノードのプロセスから共有ストレージへのアクセス権を剥奪するなどを行う

コマンドラインインターフェース

*実際に動かしてみる場合は、ローカルでHadoopを擬似分散モードで立ち上げること

Hadoopでは、HDFSのファイルシステムを利用するコマンドラインインターフェースが用意されている。
コマンドの例を紹介する


$ hadoop fs -help
$ hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://localhost/user/tom/quangle.txt # ローカルの `quangle.txt`をhdfs上へコピー
$ hadoop fs -copyToLocal /user/tom/quangle.txt ./quangle.txt # hdfs上からローカルファイルシステムへコピー
$ hadoop fs -mkdir books # `books`ディレクトリを作成
$ hadoop fs -ls # リスト

hadoop fs -lsでは、通常の ls コマンド異なり、2つ目の列にファイルの複製数が表示される


Hadoopのファイルシステム群

  • Hadoopは抽象化されたファイルシステムの概念を持っており、HDFSはその実装の1つでしかない
  • Javaの抽象クラスである org.apache.hadoop.fs.FileSystemはHadoopのファイルシステムを表現するもので、実装は複数ある(例えばローカルディスクのファイルシステムを利用したものやAmazon S3を利用したものなどがある)
  • これらのファイルシステムへの選択にはURIスキーマを用いる
    • 例: hadoop fs -ls file:///
  • どのファイルシステムへもMapReduceを実行することは出来るが、大規模データを処理するなら分散ファイルシステムを用いるべきである

インターフェース

HDFSにHTTP経由でアクセスができる

  • ノードへ直接アクセス
    • WebHDFSを利用することで読み書き込みが出来るようになる
    • 有効化するには dfs.webhdfs.enabledtrueにする
  • HDFSプロキシへアクセス( DistributedFileSystemAPI を使う)

HTTP以外にもC言語向けのライブラリや、Unixファイルシステムと統合するFUSE(Filesystem in Userspace)というものもある


Javaインターフェース

HadoopのファイルシステムとやりとりするためのAPIとして、FileSystemクラスが提供されている
HDFS向けのDistributedFileSystemやその他のクラスは全てこのFileSystemクラスの子クラスとして実装されている。


この章ではJava上でどんな事ができるかをサンプルコードを記載しながら解説しているが、特に解説することもないので本の内容と以下のコードを参照してもらえればと。
https://github.com/tomwhite/hadoop-book/tree/3e/ch03/src


なお、以下のような項目について解説されている

  • HadoopURLからのデータ読み出し
  • FileSystem APIを使ったデータの読み出し
  • FSDataInputStream
  • データ書き込み
  • FSDataOutputStream
  • ディレクトリ
  • ファイルシステムへの問い合わせ
    • ファイルのメタデータ:FileStatus
    • ファイルのリスト
    • ファイルパターン
    • パスフィルタ
  • データの削除

データフロー

ファイルの読み・書きをするときのデータフローを解説する

ファイル読み込み

  1. DistributedFileSystem#openを呼び、ファイルをオープンする
  2. ネームノードからファイルのメタデータを取得し、ネームノード/データノードからデータを取得できる FSDataInputStreamを返す
  3. クライアントは渡されたStreamに対して FSDataInputStream#readを呼び、データを読む
  4. DFSInputStreamは自身の保持するファイルの1つ目(=最も近い)のブロックに接続し、クライアントにデータを渡す
  5. ブロックの終端にたどり着いたら、データノードへの接続をクローズし、次のブロックを探して接続、引き続きクライアントにデータを渡す(この処理はクライアントに対して透過的に行われるのでクライアントからは連続したStreamを受け取り続けているようにしか見えない)
  6. 該当のファイルの最終ブロックまで読み終えたらクライアントは FSDataInputStream#closeを呼び、ファイルをクローズする

読み込み中にデータノードとの通信時にエラーが起こった場合
- そのブロックを保持している最も近いデータノードを試す
- 障害のあったデータノードを保持しておく(その後の処理で不要なリトライが行われないようにするため)
- データノードから転送されたデータのチェックサムを確認し、壊れたブロックがあったらネームノードに報告の上、そのブロックの複製を読みに行く

以上のような設計になっているおかげで、データへのアクセスをクラスタ内の全てのデータノードに分散できるようになっている


ファイル書き込み

ファイルの新規作成とデータの書き込みというケースを考える

  1. DistributedFileSystem#createを呼び、ファイルを作成する
  2. ネームノードの持つメタデータを元にファイル作成が可能化を検証し、問題がなければブロック未割り当てのファイルが作成されて、 FSDataOutputStreamを返す
  3. クライアントが書き込み要求を Streamに対して行い、FSDataOutputStreamは書き込みの準備を行う(ネームノードへの書き込むべきデータノードのリスト取得要求など)
  4. 書き込むパケットを FSDataOutputStreamがデータノードに対して転送する。複製数が2以上の場合は、データノードは複製対象の別のデータノードへデータを転送する
  5. 全てのデータノードは、自身が書き込み要求を終えたことを FSDataOutputStreamへ伝える。これは ackQueueと呼ばれるもので管理されており、要求が完了したらキューから取り除かれる
  6. 全てのパケットについて書き込みが完了したら FSDataOutputStream#closeを呼び、Streamを閉じる。ここでネームノードは1つのファイル書き込みが完了した通知を得る

補足

dataQueue, ackQueue: https://github.com/apache/hadoop/blob/f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java#L514


一貫性モデル

ファイル読み書きに際してのデータの見え方を示す。
HDFSではPOSIXの要求のいくつかを性能のためのトレードオフで犠牲にしており、通常期待されるものと違う結果になる操作がある。

具体的には、ファイルに書き込まれた内容は、ストリームがフラッシュされたあとでも見えるとは限らないというものがある

サンプルコード


Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L)); // 直前で `flush()`しているにもかかわらずファイルの内容は0byteに見える


書き込み中のデータは、1ブロック分書き終わった時点で初めて読み込み側から参照できるようになる。つまり基本的にはブロック単位で書き終わったもののみが参照可能である扱い。
ただし、 FSDataOutoputStream#syncを実行すると強制的に参照可能にすることが出来る
なお、HDFSでファイルをクローズした時(OutputStream#closeが呼ばれた時)はこの syncが暗黙的に呼ばれている

この一貫性モデルは、書き込み中になんらかの障害が発生した場合に最大で1ブロック分のデータが失われうることを示している。
そのため、データをロストしないように適宜 syncが呼ばれているべきである。
だが、 sync メソッドにはオーバーヘッドもあるので、データの頑健性とスループットのトレードオフを考慮して適切なタイミングを見つける必要がある。


データ取り込み(Flume,Sqoop)

  • ファイルシステムからHDFSへデータを取り込む際はFlumeやSqoopといったツールがあるので使おう
  • Flumeは大量のストリーミングデータを取り込むことができる
  • Sqoopは構造化データストアからバルクインポートを行える

distcpによる並列コピー

ファイル集合への操作を並列実行するコマンド distcpが提供されている。
本来の用途としてはHDFSクラスタ間のデータ転送だが、オプションによって変更のあったファイルのみ上書きなどが出来る
このコマンドはMapReduceのジョブとして実装されており、コピーの処理はmapとして実行される(reducerはない)

$ hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar

namenode1とnamenode2が同一のバージョンのHadoopで動作している場合は以上の例のようにhdfsスキーマを利用する。
異なるバージョンだとRPCシステムに互換がないため動かせないためである。
この問題を回避するためには、以下の方法がある

  • 転送元をHTTPベースのHFTPファイルシステムを利用する
  • 転送元と転送先にHDFS HTTPプロキシを利用する

データをHDFSにコピーする場合はクラスタのバランスを考慮することが重要。
HDFSはブロックがクラスタ間で平等に分配されているときに最も上手く動くので、 distcpで偏りが出る状況は避けたい
distcpする際にクラスタ内のノード数より多くのmapを使えばこの問題は回避できるので、1ノード辺り20のmapをデフォルトとして実行し始めるのが良いとされている


Hadoopアーカイブ

Hadoopアーカイブ(HARファイル)は複数のファイルをHDFSブロックに効率よくまとめ、ネームノードのメモリ使用量を削減しながら、ファイルへのアクセスは透過的に行えるようにしてくれるファイルアーカイブ機能である。
これを活用することでHDFSは小さいファイル群を効率よく保存できないという問題を緩和出来る

が、現実ではあんまり使われていないらしい


おわり