hadoop2.2原理:HDFSのファイル読み書きを分析する
6958 ワード
File Read
プログラムの例: 1 public class FileRead {
2
3 public static void main(Sting[] args) throws Exception {
4 Configuration conf = new Configuration();
5 FileSystem fs = FileSystem.get(conf);
6 InputStream in = new InputStream();
7 in = fs.open(new Path(args[0]));
8 IOUtils.copyBytes(in, System.out, 4096, false);
9 IOUtils.closeStream(in);
10 }
11 }
プロセス分析:
(1)open
クライアントがファイルを読み込むと、FileSystem ojbectでopen()メソッドが呼び出され、FileSystemはHDFSのinstanceである.
上記のプログラムから表示されます.
line 5 FileSystemのinstanceを得る
line 7はFileSystem上のopen()メソッドを呼び出す
(2)get block location
その後、HDFSはRPC(Remote Procedure Call)を介してnamenodeを呼び出し、the locations of the blocks for the first few blocksを取得し、各blockについてnamenodeはこのblockを持つdatanodeのaddressを返し、datanodeはNetwork Topologyに基づいて再ソートされる.
locationsを取得すると、DFSは1つのFSDataInputStreamを返してclientにデータを読み出し、FSDataInputStreamは順次wrap 1つのDFSInputStreamを返してdatanodeとnamenodeのI/Oを管理し、DFSInputStreamにはfirst few datanodeのaddressも格納されます.
(3)read
ClientがFSDataInputStreamにcall read()を接続すると、DFSInputStreamは最初のblockを格納した最近のdatanodeに接続し、その後call read()メソッドはdatanodeからclientにデータを読み出し、blockの最後に達するとDFSinputStreamはこのdatanodeとの接続を閉じ、次のblockを格納したdatanodeを見つけて順次往復する...
(4) DFSInputStream
DFSInputStreamは、各packetの最近のblockを順番に読み込み、1つのblockを読むたびにdatanodeと接続を再確立します.
DFSInputStreamは、namenodeと接続を維持しながら、次のpacketのblocksが存在するdatanodeのlocationsを再取得します.
(5)FSDataInputStream
FSDataInputStreamはclientとdatanode接続の仲介であり、client call read()methodsはFSDataInputStreamを介してDFSInputStreamを呼び出す
(6)許容誤差
データの読み込み中に発生したエラーは主に2種類あります.
1.DFSINputStreamとdatanodeのcommunicationにエラーが発生しました.この場合、DFSINputStreamは、このpacketの次のblockが保存されているdatanodeのうち最も近いものに接続しようとします.また、次のblockが再びこのdatanodeから読み込まれることを防止するために、このdatanodeが記録されます.
2.DFSINputStream checksum data from datanodeで破損したデータブロックが見つかった場合、DFSINputStreamが別のdatanodeからこのpacketの次のblockコピーを読み込もうとする前にnamenodeに報告されます.
File Write
プログラムの例: 1 import org.apache.hadoop.fs.FileSystem;
2 import org.apache.hadoop.fs.Path;
3 import org.apache.hadoop.conf.Configuration;
4
5 public class CreateDir {
6 public static void main(String[] args) throws Exception {
7 Configuration conf = new Configuration();
8 String dst = args[0];
9 FileSystem fs = FileSystem.get(conf);
10 fs.create(new Path(dst));
11 }
12 }
プロセス分析:
(1)create
クライアントがファイルに書き込むと、FileSystem object上でcreate()メソッドが呼び出され、FileSystemはHDFSのinstanceである.
(2) create new file in filesystem's namespace
DFSはRemote Procedure Callを介してcall namenodeを呼び出します.namenodeはfilesystemのnamespaceで新しいファイルを生成します.新しいファイルを作成する前に、nanemodeはclientにcreate fileのpermissionがあるかどうか、createするファイルがすでに存在するかどうか、検査が通過しない場合、IOExceptionを放出します.
create fileの後、DFSはFSDataOutputStreamを返してclientにファイルを書き、FSDataInputStreamはwrap DFSoutputStreamを返してnamenodeとdatanodeと交流します.
(3) client write data
クライアントがファイルへのデータの書き込みを開始すると、DFSoutputStreamは書き込み対象のデータsplit intoの多くのpacketsを書き込み、これらのpacketは内部キューdata queueに書き込まれ、DFSoutputStreamはこのdata queueを維持します.
このdata queueはDataStreamerによって使用され、DataStreamerは主にnamenodeに申請を発行し、新しいpacketのblockコピーに適切なdatanodeを割り当て、namenodeは適切なdatanodesを選択してこれらのdata blocksを格納する.
このpacketを格納するblocksのdatanodeはpipelineを構成し、各packetのblockのreplication levelが3であると仮定すると、このpipelineは3つのdatanodeから構成される.
DataStreamerはこのpacketをpipelineの最初のdatanodeにインポートします.このdatanodeはこのpacketの後にforward it to 2番目のdatanodeを格納します.同様に、2番目のdatanodeはこのpacketを格納し、forward to 3番目のdatanodeを格納します.
(4)許容誤差
(3)DFSoutputStreamがdata queueを維持していることを知っています.また、
DFSoutputStreamはまた、pipeline内のdatanode acknowledgedのすべてのpacketがすでに存在する場合、このpacketはack queueから移動するack queue(acknowledged)を維持する.
データの書き込み中にdatanodeの書き込みに失敗した場合、次のアクションが発生します.
まずpipelineは閉じ、ack queueに書き込まれているすべてのpacketsがdata queueの前に追加され、下流のdatanodeがpacketsを失うことなく、現在datanodeに書き込まれているデータブロックが識別され、悪いdatnodeに書き込まれている一部のデータがこのdatanode recover以降に削除されます.
failedのdatanodeはpipelineから移動し、namenodeはこれらにnoticeされ、新しいpipelineを構成するためにdatanodeが再割り当てされます.次のpacketのblockは影響を受けません.
blockを書き込むときに大量のdatanode failedがdfsを満たす限り.replication.min(default is 1)では、この書き込みは失敗せず、blockはcluster上のdatanodeにコピーされて同期し、dfsに達することを知る.Replicationが設定した数(デフォルトは3)
(5)データ書込の最終段階
クライアントがデータ書き込みを完了すると、クライアントはFSDataOutputStream上のclose()を呼び出します.この動作は、namenodeにfileの書き込みが完了した信号を送信する前にflushsの残りのpacketsをdatanode pipelineにすべて送信し、acknowledgeを待つ.
DataStreamerは以前、すべてのpacketsに対してnamenodeにblock locationsを申請したことがあるため、namenodeはこのファイルがどのblocksから構成されているか知っています.
1 public class FileRead {
2
3 public static void main(Sting[] args) throws Exception {
4 Configuration conf = new Configuration();
5 FileSystem fs = FileSystem.get(conf);
6 InputStream in = new InputStream();
7 in = fs.open(new Path(args[0]));
8 IOUtils.copyBytes(in, System.out, 4096, false);
9 IOUtils.closeStream(in);
10 }
11 }
プログラムの例:
1 import org.apache.hadoop.fs.FileSystem;
2 import org.apache.hadoop.fs.Path;
3 import org.apache.hadoop.conf.Configuration;
4
5 public class CreateDir {
6 public static void main(String[] args) throws Exception {
7 Configuration conf = new Configuration();
8 String dst = args[0];
9 FileSystem fs = FileSystem.get(conf);
10 fs.create(new Path(dst));
11 }
12 }
プロセス分析:
(1)create
クライアントがファイルに書き込むと、FileSystem object上でcreate()メソッドが呼び出され、FileSystemはHDFSのinstanceである.
(2) create new file in filesystem's namespace
DFSはRemote Procedure Callを介してcall namenodeを呼び出します.namenodeはfilesystemのnamespaceで新しいファイルを生成します.新しいファイルを作成する前に、nanemodeはclientにcreate fileのpermissionがあるかどうか、createするファイルがすでに存在するかどうか、検査が通過しない場合、IOExceptionを放出します.
create fileの後、DFSはFSDataOutputStreamを返してclientにファイルを書き、FSDataInputStreamはwrap DFSoutputStreamを返してnamenodeとdatanodeと交流します.
(3) client write data
クライアントがファイルへのデータの書き込みを開始すると、DFSoutputStreamは書き込み対象のデータsplit intoの多くのpacketsを書き込み、これらのpacketは内部キューdata queueに書き込まれ、DFSoutputStreamはこのdata queueを維持します.
このdata queueはDataStreamerによって使用され、DataStreamerは主にnamenodeに申請を発行し、新しいpacketのblockコピーに適切なdatanodeを割り当て、namenodeは適切なdatanodesを選択してこれらのdata blocksを格納する.
このpacketを格納するblocksのdatanodeはpipelineを構成し、各packetのblockのreplication levelが3であると仮定すると、このpipelineは3つのdatanodeから構成される.
DataStreamerはこのpacketをpipelineの最初のdatanodeにインポートします.このdatanodeはこのpacketの後にforward it to 2番目のdatanodeを格納します.同様に、2番目のdatanodeはこのpacketを格納し、forward to 3番目のdatanodeを格納します.
(4)許容誤差
(3)DFSoutputStreamがdata queueを維持していることを知っています.また、
DFSoutputStreamはまた、pipeline内のdatanode acknowledgedのすべてのpacketがすでに存在する場合、このpacketはack queueから移動するack queue(acknowledged)を維持する.
データの書き込み中にdatanodeの書き込みに失敗した場合、次のアクションが発生します.
まずpipelineは閉じ、ack queueに書き込まれているすべてのpacketsがdata queueの前に追加され、下流のdatanodeがpacketsを失うことなく、現在datanodeに書き込まれているデータブロックが識別され、悪いdatnodeに書き込まれている一部のデータがこのdatanode recover以降に削除されます.
failedのdatanodeはpipelineから移動し、namenodeはこれらにnoticeされ、新しいpipelineを構成するためにdatanodeが再割り当てされます.次のpacketのblockは影響を受けません.
blockを書き込むときに大量のdatanode failedがdfsを満たす限り.replication.min(default is 1)では、この書き込みは失敗せず、blockはcluster上のdatanodeにコピーされて同期し、dfsに達することを知る.Replicationが設定した数(デフォルトは3)
(5)データ書込の最終段階
クライアントがデータ書き込みを完了すると、クライアントはFSDataOutputStream上のclose()を呼び出します.この動作は、namenodeにfileの書き込みが完了した信号を送信する前にflushsの残りのpacketsをdatanode pipelineにすべて送信し、acknowledgeを待つ.
DataStreamerは以前、すべてのpacketsに対してnamenodeにblock locationsを申請したことがあるため、namenodeはこのファイルがどのblocksから構成されているか知っています.