HDFS-ソース分析(2)-クライアントとサービス側
12259 ワード
RPC通信の両端はClient-クライアントであり、一端はServer-サーバであり、Client/Serverの区分は機械の物理的位置ではなく、通信における論理的地位にある.通信を開始したのはClientで、情報を受け取るのはServerです.
HDFSには、1,クライアント-nameNodeが存在する.2,クライアント-DataNode;3,DataNode-NameNode;4,DataNode-DataNodeの4種類のC/S構造の通信で、ここのクライアントはHDFSクラスタを使用するそのクライアントアプリケーションを指します.
PS:NameNode-DataNodeの通信は存在しないことがわかります.実際には、DataNodeはNameNodeに心拍数とノード情報を送信します.NameNodeは受動的に知られているだけで、自発的に要求することはありません.お客様の読み書きデータはまずNameNodeと通信して読み書きのDataNode情報を得て、クライアント-DataNode通信を開始します.NameNodeの中継は存在しません.
Client
org.apache.hadoop.ipc.ClientはClientのベースクラスで、クラス定義は以下の通りです.
公式定義は、「クライアントはIPCサービスに使用され、IPC呼び出しはWritableオブジェクトを受信パラメータとし、Writableオブジェクトを返します.」
Clientクラスには、次のようなものが含まれています.
重要な説明を選択します.
Call
CallはRPC呼び出しの抽象である.以下はCallのドメインとコンストラクタです.RPC呼び出しのいくつかのメカニズムがわかります.
RPCCallには必ず再試行メカニズム(retry)があり、idがcallを識別するために使用され、Writableタイプのrequestとresponseが呼び出しの入力と出力として使用され、IOExceptionがあり、まだ知らないが明らかに重要なrpcKindがあることがわかります.これは後で学ぶ必要があります.
Connection
Connectionクラス定義
接続はスレッドを起動し、接続送信要求を確立し、フィードバック結果が得られるまでsocketを読み取り続けます.
calls
以上のようにConnectionにはハッシュテーブルがあり、Connection上のすべてのcallが維持されているため、複数のCallはConnectionを多重化することができ、各call間にidが区別され、多重callはtcp/ip接続の確立と切断の費用を減らすことができることがわかる.
addCall
RPC呼び出しはaddCallを介してConnectionに追加されます.
ConnectionId
Clientクラスには、ConnectionIdをキーとし、Connectionを値とするConcurrentHashMapオブジェクトconnectionsがあります.
このClientが検索したRPC接続情報が含まれています.
接続にはソケット接続と接続IDオブジェクトがあります.
ConnectionIdにはInetSocketAddressと「ユーザーパス」が含まれています.このような翻訳が正しいかどうか分かりません.調べてみると、ticketはUserGroup Informationクラスのインスタンスで、その名の通り、ユーザー、ユーザーグループなどの情報が含まれており、権限が確定しているので、理解でき、通行証に翻訳しても当然です.
Server
Clientと言ってサーバーを話します.
Server.call()
Serverのコアメソッドはcallです
入力パラメータは、RPC.RpcKind(以下、RPC呼び出しの種類を設定する列挙タイプ、Writableまたはprotobuf)、protocol、writableオブジェクト、および呼び出し受信時間.
サーバは抽象クラスであり、callも抽象メソッドであり、具体的な実装は具体的なサーバ実装によって決定される.
Server.Call
ClientにはサブクラスCallがあり、Serverも似ています.
サーバはNIOを使用するため、接続ごとにスレッドを作成する必要がなく、県城1つでsocketのデータの読み取りを要求するデータを受信することができます.これは、Clientがサーバのフィードバックを待つ必要があり、サーバが要求処理トランザクションを受信するのに待つ必要がなく、ブロックの問題もありません.この単一スレッドは、サーバに内部クラスListenerとしてパッケージされています.
Listenerは要求を処理キューCallに入れ続け、Handler処理に渡すが、処理はマルチスレッドであり、runメソッドは1つのCallを循環的に取り出し、サーバを呼び出す.callメソッドを使用して、Responder結果キューに結果を入れます.
ソースコードの量が多いので、仕方なく完全に見通す必要はありません.エラーがあったり、バージョンの問題で違いがあったりしたら、許してください.
HDFSには、1,クライアント-nameNodeが存在する.2,クライアント-DataNode;3,DataNode-NameNode;4,DataNode-DataNodeの4種類のC/S構造の通信で、ここのクライアントはHDFSクラスタを使用するそのクライアントアプリケーションを指します.
PS:NameNode-DataNodeの通信は存在しないことがわかります.実際には、DataNodeはNameNodeに心拍数とノード情報を送信します.NameNodeは受動的に知られているだけで、自発的に要求することはありません.お客様の読み書きデータはまずNameNodeと通信して読み書きのDataNode情報を得て、クライアント-DataNode通信を開始します.NameNodeの中継は存在しません.
Client
org.apache.hadoop.ipc.ClientはClientのベースクラスで、クラス定義は以下の通りです.
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
* @see Server
*/
@Public
@InterfaceStability.Evolving
public class Client implements AutoCloseable {
公式定義は、「クライアントはIPCサービスに使用され、IPC呼び出しはWritableオブジェクトを受信パラメータとし、Writableオブジェクトを返します.」
Clientクラスには、次のようなものが含まれています.
ClientExecutorServiceFactory
Call
Connection
ConnectionId
IpcStreams
重要な説明を選択します.
Call
CallはRPC呼び出しの抽象である.以下はCallのドメインとコンストラクタです.RPC呼び出しのいくつかのメカニズムがわかります.
/**
* Class that represents an RPC call
*/
static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done
private final Object externalHandler;
private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
this.rpcRequest = param;
final Integer id = callId.get();
if (id == null) {
this.id = nextCallId();
} else {
callId.set(null);
this.id = id;
}
final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
this.externalHandler = EXTERNAL_CALL_HANDLER.get();
}
RPCCallには必ず再試行メカニズム(retry)があり、idがcallを識別するために使用され、Writableタイプのrequestとresponseが呼び出しの入力と出力として使用され、IOExceptionがあり、まだ知らないが明らかに重要なrpcKindがあることがわかります.これは後で学ぶ必要があります.
Connection
Connectionクラス定義
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
接続はスレッドを起動し、接続送信要求を確立し、フィードバック結果が得られるまでsocketを読み取り続けます.
calls
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
以上のようにConnectionにはハッシュテーブルがあり、Connection上のすべてのcallが維持されているため、複数のCallはConnectionを多重化することができ、各call間にidが区別され、多重callはtcp/ip接続の確立と切断の費用を減らすことができることがわかる.
addCall
/**
* Add a call to this connection's call queue and notify
* a listener; synchronized.
* Returns false if called during shutdown.
* @param call to add
* @return true if the call was added.
*/
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
return false;
calls.put(call.id, call);
notify();
return true;
}
RPC呼び出しはaddCallを介してConnectionに追加されます.
ConnectionId
Clientクラスには、ConnectionIdをキーとし、Connectionを値とするConcurrentHashMapオブジェクトconnectionsがあります.
private ConcurrentMap connections =
new ConcurrentHashMap<>();
このClientが検索したRPC接続情報が含まれています.
接続にはソケット接続と接続IDオブジェクトがあります.
/**
* This class holds the address and the user ticket. The client connections
* to servers are uniquely identified by
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public static class ConnectionId {
ConnectionIdにはInetSocketAddressと「ユーザーパス」が含まれています.このような翻訳が正しいかどうか分かりません.調べてみると、ticketはUserGroup Informationクラスのインスタンスで、その名の通り、ユーザー、ユーザーグループなどの情報が含まれており、権限が確定しているので、理解でき、通行証に翻訳しても当然です.
Server
Clientと言ってサーバーを話します.
Server.call()
Serverのコアメソッドはcallです
/** Called for each call. */
public abstract Writable call(RPC.RpcKind rpcKind, String protocol,
Writable param, long receiveTime) throws Exception;
入力パラメータは、RPC.RpcKind(以下、RPC呼び出しの種類を設定する列挙タイプ、Writableまたはprotobuf)、protocol、writableオブジェクト、および呼び出し受信時間.
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private final short value;
RpcKind(short val) {
this.value = val;
}
}
サーバは抽象クラスであり、callも抽象メソッドであり、具体的な実装は具体的なサーバ実装によって決定される.
Server.Call
ClientにはサブクラスCallがあり、Serverも似ています.
/** A generic call queued for handling. */
public static class Call implements Schedulable,
PrivilegedExceptionAction<Void> {
final int callId; // the client's call id
final int retryCount; // the retry count of the call
long timestamp; // time received when response is null
// time served when response is not null
private AtomicInteger responseWaitCount = new AtomicInteger(1);
final RPC.RpcKind rpcKind;
final byte[] clientId;
private final TraceScope traceScope; // the HTrace scope on the server side
private final CallerContext callerContext; // the call context
private boolean deferredResponse = false;
private int priorityLevel;
// the priority level assigned by scheduler, 0 by default
サーバはNIOを使用するため、接続ごとにスレッドを作成する必要がなく、県城1つでsocketのデータの読み取りを要求するデータを受信することができます.これは、Clientがサーバのフィードバックを待つ必要があり、サーバが要求処理トランザクションを受信するのに待つ必要がなく、ブロックの問題もありません.この単一スレッドは、サーバに内部クラスListenerとしてパッケージされています.
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
Listenerは要求を処理キューCallに入れ続け、Handler処理に渡すが、処理はマルチスレッドであり、runメソッドは1つのCallを循環的に取り出し、サーバを呼び出す.callメソッドを使用して、Responder結果キューに結果を入れます.
ソースコードの量が多いので、仕方なく完全に見通す必要はありません.エラーがあったり、バージョンの問題で違いがあったりしたら、許してください.