hadoopソース解析のhdfs読み出しデータ全フロー分析
詳細概要 DataXceiverServerの説明 DataXceiverServer について初期化動作 動作原理 DataXceiverの説明 Opクラス紹介 処理ロジック BlockSenderデータ読み出し 従来方式によるデータ伝送 を実現する.ゼロコピーによるデータ転送 原理 具体的な操作
クライアントデータ読み込みプロセス分析 java api読み出しデータ 構造DFSInputStream ファイルのブロック情報 を取得する. DFSInputStream readデータ Sender送信データ まとめ
http://zhangjun5965.iteye.com/blog/2375278
概要
hdfsのファイルはブロックの形式で格納され、各ブロックにはデフォルトで3つのコピーがあり、これらのコピーはまた異なるdatandoeに格納され、ファイルを読み出す過程は、まずこれらのブロックのアドレスを取得し、その後、各速いデータを順次読み出すことである.
HDfs読み書きデータはDataXceiverServerを通じてサービスを提供し、javaのsocketサービスを確立し、クライアントからの各種要求を受け入れ、各要求には異なる操作コードがあり、サービス側はこの操作コードを通じてどの要求かを判断する.リクエストが来るたびに、ロジックを具体的に処理するためにスレッドを新規作成し、具体的な実装について簡単な分析を行います.
DataXceiverServerの説明
DataXceiverServerについて
DataXceiverServerクラスはorgにある.apache.hadoop.hdfs.server.Datanodeパッケージの下で、
注釈から、DataXceiverServerは、データ・ブロックの受信および送信、クライアントおよび他のdatanodeの要求の傍受に使用されることがわかります.
初期化作業
DataXceiverServerがデータを受信送信ためのものである以上、datanodeの動作の一部であるべきであり、DataXceiverServerの初期化コードからDataXceiverServerの初期化コードを見つけた.
DatanodeのstartDataNodeメソッドでは、initDataXceiver(conf);DataXceiverServerを初期化し、initDataXceiverメソッドに入ります.
コードを通じて、まずnewのPeerServerのサブクラスTcpPeerServerを見ました.そして、このpeerServerと対応するconfをパラメータnewとしてDataXceiverServerオブジェクトを1つ追加し、スレッドグループに追加するデーモンスレッドに設定する.
ここでは、スレッドグループとデーモンスレッドの2つの重要な概念について述べる.Javaではスレッドグループ内のスレッドを操作することができ、例えばinterrupt操作は1つのスレッドグループ内のすべてのスレッドを遮断する、デーモンスレッドに設定することができ、これによりメインスレッドが終了した場合にすべてのデーモンスレッドが自動的に終了することができる.
さぎょうげんり
DataXceiverServerのrunメソッドを見てみましょう
コードを通じて、私たちは1つの要求が来るたびに、DataXceiverServerは1つのデーモンプロセスDataXceiverを作成して要求を処理して、各datanodeの上でどれだけのDataXceiverを作成することができて、DataXceiverServerの中の変数maxXceiverCountが制御するのです.この変数はプロファイルで構成することができ、変数名はdfsである.datanode.max.transfer.threads、デフォルトの数字は4096で、これはdatanodeの運行状況と性能に基づいて構成することができ、hdfs最適化の重要なパラメータでもある.
DataXceiverの紹介
Opクラスの紹介
データの送受信を行うサービスDataXceiverが作成すると、Opクラスの各オペランドコードによって各種の操作が識別され、Opクラスの具体的な経路はorgである.apache.hadoop.hdfs.protocol.datatransfer.Opは、ここでは、読み取り、書き込み、copyなど、異なる操作を区別するためのいくつかの操作コードを定義する.
プロセスロジック
DataXceiverがスレッドである以上、彼の処理ロジックはrunメソッドにあるはずです.runメソッドを見てみましょう.
op=readOp();具体的にどのような操作なのか、読み取り、書き込み、copyなどを取得し、processOp(op);方法は具体的なロジックを処理します
メソッドでは,switchにより具体的に配布し,異なるメソッドに異なる論理を実行させる.
トレースコード、最後にやはりDataXceiverクラスの中のreadBlock方法を呼び出して具体的なデータを読み取る操作をします
主にBlockSenderオブジェクトを構築し、sendBlockメソッドによってクライアントにデータを送信します.
BlockSenderデータの読み込み
従来の方法でデータ転送を実現
従来の方法では、まずカーネルが全体のデータを読み出し、カーネルユーザーを越えてアプリケーションにデータをプッシュし、次にアプリケーションがカーネルユーザーを越えてデータをプッシュし、ソケットに書きます.アプリケーションは実際には、ディスクファイルのデータをソケットに転送するあまり効率的ではない仲介役を担当しています.
ゼロコピーによるデータ転送
げんり
Javaクラスライブラリはjavaを通過する.nio.channels.FileChannelのtransferTo()メソッドは、LinuxおよびUNIXシステム上でゼロコピーをサポートします.transferTo()メソッドを使用して、アプリケーションを流れる必要がなく、呼び出されたチャネルから別の書き込み可能なチャネルにバイトを直接転送できます.
具体的な操作
BlockSenderのdoSendBlockメソッドでは,以下の操作によりtransferTo操作が可能か否かを判断する.
一連の検査を経た後にsendPacket方法で具体的な操作を行う
その中でsockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);具体的なjavaの最下位レベルの操作をカプセル化
クライアントリードデータフロー分析
前のコードでdatanodeが起動時にjavaのsocketを起動してリクエストを傍受していることを知っていますが、クライアントのリクエストはどのように送信されていますか?これがこれから私たちが研究する問題です.
JAva apiデータ読み出し
まず簡単なjava apiがhdfsデータを読み出すコードについてお話しします
まずFileSystem fs=FileSystem.get(conf);FileSystemのサブクラス、すなわち分散ファイルシステムDistributedFileSystemをインスタンス化します.(具体的には,hdfs://,DistributedFileSystemのような伝達されたconf内のパスの接頭辞の構成によってどのシステムをインスタンス化するかを決定するが,具体的にはここでは説明しない)
そしてfsを通過する.open(p); を選択します.
DFSInputStreamの構築
トレースコード、DistributedFileSystemのopenメソッドを開きます
DFSInputStreamは、hdfsのファイルを読み込むための分散ファイルの入力ストリームであり、DFSclientのopenメソッドによってDFSInputStreamを開きます.
ファイルのブロック情報の取得
DFSclientのopenメソッドでは,newはファイルのブロック情報をnamnodeから取得する主なメソッドがopenInfoであるDFSinputStreamクラスを記述し,解析した.
内部のfetchLocatedBlocks AndGetLastBlockLengthメソッドでは、名前からすべてのブロック情報と最後の速い長さを取得し、最後のブロックの長さを取得するのは主に同時読み書きの場合であり、データを読むときに他のスレッドが追加操作を行っている場合、最後のブロックのサイズが変化することがわかります.
トレースコード、最後にorgを呼び出す.apache.hadoop.hdfs.DFSClient.CallGetBlockLocations(ClientProtocol,String,long,long)メソッドは、namenodeのエージェントを介してリクエストを送信します.具体的には、ClientProtocolインタフェースの実装クラスClientNamenodeProtocolTranslatorPBのgetBlockLocationsメソッドにリクエストのオブジェクトGetBlockLocationsRequestProtoがカプセル化され、hadoop rpcを介してnamenodeに送信されてデータを取得します.
具体的な実装コードはnamenodeのサービスエージェントorgである.apache.hadoop.hdfs.server.namenode.NameNodeRpcServerのgetBlockLocationsメソッドが実装されます.
主にnamesystemでgetBlockLocationsは、ネーミングスペースから対応する情報を取得します.主に、ファイル対応のINodeFileを取得してから、ファイル対応のブロック情報を取得します.返されるブロックの情報は、datanodeのクライアントからの距離に基づいて簡単にソートされます.具体的には、コードをトレースすることができます.
DFSInputStream readデータ
DFSInputStreamのreadメソッドはreadWithStrategyメソッドに入り、blockSeekToメソッドに入ります.
次にorg.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(long)メソッドは、データを読み出すためのBlockReaderオブジェクトを構築します.
buildメソッドを追跡すると、RemoteBlockReader 2が呼び出された.新BlockReaderは、BlockReaderオブジェクトを取得します.
Sender送信データ
org.apache.hadoop.hdfs.RemoteBlockReader2.NewBlockReaderメソッドでは,SenderオブジェクトのreadBlockによりデータを読み出す.
readBlock方式では、値81の状態コードorgを送信.apache.hadoop.hdfs.protocol.datatransfer.Op.READ_BLOCKからDataXceiverへのpeerサービス.
ここでは,各パラメータをOpReadBlockProtoオブジェクトにセットし,初期化されたDataXceiverServerサービスに送信する.
このとき、サービス側がブロックしていたsocketスレッドは、オペレーティングコード81の読み取り要求を受信し、後続の処理を行う.
copyBlock、writeBlockなどの他のデータに対する操作はSenderで完了しています.
まとめ
これにより、hdfsがデータを読み出すすべてのプロセスを分析し、サービス側がどのように初期化するか、各要求に対してデータを読み取るためのスレッドを確立するか、ゼロコピー技術を利用してオーバーヘッドを削減する場合、クライアントがデータを読み取る要求をどのように送信するかを含む.
私はまだ勉强の段阶にあるので、间违いや漏れは避けられません.もし问题があれば、どうぞよろしくお愿いします.
http://zhangjun5965.iteye.com/blog/2375278
概要
hdfsのファイルはブロックの形式で格納され、各ブロックにはデフォルトで3つのコピーがあり、これらのコピーはまた異なるdatandoeに格納され、ファイルを読み出す過程は、まずこれらのブロックのアドレスを取得し、その後、各速いデータを順次読み出すことである.
HDfs読み書きデータはDataXceiverServerを通じてサービスを提供し、javaのsocketサービスを確立し、クライアントからの各種要求を受け入れ、各要求には異なる操作コードがあり、サービス側はこの操作コードを通じてどの要求かを判断する.リクエストが来るたびに、ロジックを具体的に処理するためにスレッドを新規作成し、具体的な実装について簡単な分析を行います.
DataXceiverServerの説明
DataXceiverServerについて
DataXceiverServerクラスはorgにある.apache.hadoop.hdfs.server.Datanodeパッケージの下で、
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
* other DataNodes. This small server does not use the
* Hadoop IPC mechanism.
*/
class DataXceiverServer implements Runnable {
/*
*PeerServer , datanode * TcpPeerServer,TcpPeerServer Java *ServerSocket . java socket , ,
*
*/
private final PeerServer peerServer;
// datanode
private final DataNode datanode;
private final HashMap peers = new HashMap();
private final HashMap peersXceiver = new HashMap();
private boolean closed = false;
/**
* , Xceiver
* Maximal number of concurrent xceivers per node.
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
*/
int maxXceiverCount =
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
}
注釈から、DataXceiverServerは、データ・ブロックの受信および送信、クライアントおよび他のdatanodeの要求の傍受に使用されることがわかります.
初期化作業
DataXceiverServerがデータを受信送信ためのものである以上、datanodeの動作の一部であるべきであり、DataXceiverServerの初期化コードからDataXceiverServerの初期化コードを見つけた.
DatanodeのstartDataNodeメソッドでは、initDataXceiver(conf);DataXceiverServerを初期化し、initDataXceiverメソッドに入ります.
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf), backlogLength);
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
コードを通じて、まずnewのPeerServerのサブクラスTcpPeerServerを見ました.そして、このpeerServerと対応するconfをパラメータnewとしてDataXceiverServerオブジェクトを1つ追加し、スレッドグループに追加するデーモンスレッドに設定する.
ここでは、スレッドグループとデーモンスレッドの2つの重要な概念について述べる.Javaではスレッドグループ内のスレッドを操作することができ、例えばinterrupt操作は1つのスレッドグループ内のすべてのスレッドを遮断する、デーモンスレッドに設定することができ、これによりメインスレッドが終了した場合にすべてのデーモンスレッドが自動的に終了することができる.
さぎょうげんり
DataXceiverServerのrunメソッドを見てみましょう
@Override
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
// accept , , , java serverSocket accept .
peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
// , DataXceiver.create , .
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
...................
}
//
...................
}
コードを通じて、私たちは1つの要求が来るたびに、DataXceiverServerは1つのデーモンプロセスDataXceiverを作成して要求を処理して、各datanodeの上でどれだけのDataXceiverを作成することができて、DataXceiverServerの中の変数maxXceiverCountが制御するのです.この変数はプロファイルで構成することができ、変数名はdfsである.datanode.max.transfer.threads、デフォルトの数字は4096で、これはdatanodeの運行状況と性能に基づいて構成することができ、hdfs最適化の重要なパラメータでもある.
DataXceiverの紹介
Opクラスの紹介
データの送受信を行うサービスDataXceiverが作成すると、Opクラスの各オペランドコードによって各種の操作が識別され、Opクラスの具体的な経路はorgである.apache.hadoop.hdfs.protocol.datatransfer.Opは、ここでは、読み取り、書き込み、copyなど、異なる操作を区別するためのいくつかの操作コードを定義する.
public enum Op {
WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
.........................
}
プロセスロジック
DataXceiverがスレッドである以上、彼の処理ロジックはrunメソッドにあるはずです.runメソッドを見てみましょう.
/**
* Read/write data from/to the DataXceiverServer.
*/
@Override
public void run() {
int opsProcessed = 0;
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"is running an older version of Hadoop which does not support " +
"encryption");
} else {
LOG.info("Failed to read expected SASL data transfer protection " +
"handshake from client at " + peer.getRemoteAddressString() +
". Perhaps the client is running an older version of Hadoop " +
"which does not support SASL data transfer protection");
}
return;
}
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here.
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
incrDatanodeNetworkErrors();
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = monotonicNow();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
........................
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
if (peer != null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
op=readOp();具体的にどのような操作なのか、読み取り、書き込み、copyなどを取得し、processOp(op);方法は具体的なロジックを処理します
メソッドでは,switchにより具体的に配布し,異なるメソッドに異なる論理を実行させる.
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
トレースコード、最後にやはりDataXceiverクラスの中のreadBlock方法を呼び出して具体的なデータを読み取る操作をします
@Override
public void readBlock(final ExtendedBlock block,
final Token blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
long read = 0;
updateCurrentThreadName("Sending block " + block);
OutputStream baseStream = getOutputStream();
DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
..............................
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, sendChecksum, datanode, clientTraceFmt,
cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(ERROR, msg);
throw e;
}
// send op status
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow();
read = blockSender.sendBlock(out, baseStream, null); // send data
.................................
} catch ( SocketException ignored ) {
.................................
} finally {
IOUtils.closeStream(blockSender);
}
}
主にBlockSenderオブジェクトを構築し、sendBlockメソッドによってクライアントにデータを送信します.
BlockSenderデータの読み込み
従来の方法でデータ転送を実現
従来の方法では、まずカーネルが全体のデータを読み出し、カーネルユーザーを越えてアプリケーションにデータをプッシュし、次にアプリケーションがカーネルユーザーを越えてデータをプッシュし、ソケットに書きます.アプリケーションは実際には、ディスクファイルのデータをソケットに転送するあまり効率的ではない仲介役を担当しています.
ゼロコピーによるデータ転送
げんり
Javaクラスライブラリはjavaを通過する.nio.channels.FileChannelのtransferTo()メソッドは、LinuxおよびUNIXシステム上でゼロコピーをサポートします.transferTo()メソッドを使用して、アプリケーションを流れる必要がなく、呼び出されたチャネルから別の書き込み可能なチャネルにバイトを直接転送できます.
具体的な操作
BlockSenderのdoSendBlockメソッドでは,以下の操作によりtransferTo操作が可能か否かを判断する.
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
一連の検査を経た後にsendPacket方法で具体的な操作を行う
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
その中でsockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);具体的なjavaの最下位レベルの操作をカプセル化
クライアントリードデータフロー分析
前のコードでdatanodeが起動時にjavaのsocketを起動してリクエストを傍受していることを知っていますが、クライアントのリクエストはどのように送信されていますか?これがこれから私たちが研究する問題です.
JAva apiデータ読み出し
まず簡単なjava apiがhdfsデータを読み出すコードについてお話しします
@Test
public void testRead() {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path p = new Path("hdfs://localhost:9000/a.txt");
FSDataInputStream in = fs.open(p);
BufferedReader buff = new BufferedReader(new InputStreamReader(in));
String str = null;
while ((str = buff.readLine()) != null) {
System.out.println(str);
}
buff.close();
in.close();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
まずFileSystem fs=FileSystem.get(conf);FileSystemのサブクラス、すなわち分散ファイルシステムDistributedFileSystemをインスタンス化します.(具体的には,hdfs://,DistributedFileSystemのような伝達されたconf内のパスの接頭辞の構成によってどのシステムをインスタンス化するかを決定するが,具体的にはここでは説明しない)
そしてfsを通過する.open(p); を選択します.
DFSInputStreamの構築
トレースコード、DistributedFileSystemのopenメソッドを開きます
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
DFSInputStreamは、hdfsのファイルを読み込むための分散ファイルの入力ストリームであり、DFSclientのopenメソッドによってDFSInputStreamを開きます.
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
ファイルのブロック情報の取得
DFSclientのopenメソッドでは,newはファイルのブロック情報をnamnodeから取得する主なメソッドがopenInfoであるDFSinputStreamクラスを記述し,解析した.
内部のfetchLocatedBlocks AndGetLastBlockLengthメソッドでは、名前からすべてのブロック情報と最後の速い長さを取得し、最後のブロックの長さを取得するのは主に同時読み書きの場合であり、データを読むときに他のスレッドが追加操作を行っている場合、最後のブロックのサイズが変化することがわかります.
トレースコード、最後にorgを呼び出す.apache.hadoop.hdfs.DFSClient.CallGetBlockLocations(ClientProtocol,String,long,long)メソッドは、namenodeのエージェントを介してリクエストを送信します.具体的には、ClientProtocolインタフェースの実装クラスClientNamenodeProtocolTranslatorPBのgetBlockLocationsメソッドにリクエストのオブジェクトGetBlockLocationsRequestProtoがカプセル化され、hadoop rpcを介してnamenodeに送信されてデータを取得します.
@Override
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
.newBuilder()
.setSrc(src)
.setOffset(offset)
.setLength(length)
.build();
try {
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
req);
return resp.hasLocations() ?
PBHelper.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
具体的な実装コードはnamenodeのサービスエージェントorgである.apache.hadoop.hdfs.server.namenode.NameNodeRpcServerのgetBlockLocationsメソッドが実装されます.
@Override // ClientProtocol
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
主にnamesystemでgetBlockLocationsは、ネーミングスペースから対応する情報を取得します.主に、ファイル対応のINodeFileを取得してから、ファイル対応のブロック情報を取得します.返されるブロックの情報は、datanodeのクライアントからの距離に基づいて簡単にソートされます.具体的には、コードをトレースすることができます.
DFSInputStream readデータ
DFSInputStreamのreadメソッドはreadWithStrategyメソッドに入り、blockSeekToメソッドに入ります.
次にorg.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(long)メソッドは、データを読み出すためのBlockReaderオブジェクトを構築します.
/**
*
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
..................................
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
........................
}
buildメソッドを追跡すると、RemoteBlockReader 2が呼び出された.新BlockReaderは、BlockReaderオブジェクトを取得します.
/**
*
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
..................................
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
........................
}
Sender送信データ
org.apache.hadoop.hdfs.RemoteBlockReader2.NewBlockReaderメソッドでは,SenderオブジェクトのreadBlockによりデータを読み出す.
public static BlockReader newBlockReader(String file,
ExtendedBlock block,
Token blockToken,
long startOffset, long len,
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum, cachingStrategy);
................................
}
readBlock方式では、値81の状態コードorgを送信.apache.hadoop.hdfs.protocol.datatransfer.Op.READ_BLOCKからDataXceiverへのpeerサービス.
@Override
public void readBlock(final ExtendedBlock blk,
final Token blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
}
ここでは,各パラメータをOpReadBlockProtoオブジェクトにセットし,初期化されたDataXceiverServerサービスに送信する.
このとき、サービス側がブロックしていたsocketスレッドは、オペレーティングコード81の読み取り要求を受信し、後続の処理を行う.
copyBlock、writeBlockなどの他のデータに対する操作はSenderで完了しています.
まとめ
これにより、hdfsがデータを読み出すすべてのプロセスを分析し、サービス側がどのように初期化するか、各要求に対してデータを読み取るためのスレッドを確立するか、ゼロコピー技術を利用してオーバーヘッドを削減する場合、クライアントがデータを読み取る要求をどのように送信するかを含む.
私はまだ勉强の段阶にあるので、间违いや漏れは避けられません.もし问题があれば、どうぞよろしくお愿いします.