hadoopのときめきメモリアル
hadoopのクラスタはmaster/slaveモードに基づいており、namenodeおよびjobtrackerはmasterに属し、datanode/tasktrackerはslavesに属する.マスターは1つしかありませんが、slavesは複数あります.
namenodeとdatanodeの間の通信、jobtrackerとtasktrackerの直接の通信は、すべて“心拍”を通じて完成しました.
以前hadoop心拍原理のソースコードを見たことがありますが、今日はもう一度思い出して、ほほほ、だから「心拍思い出」と言います.
1、心拍数メカニズム
心臓の鼓動のメカニズムは大体こうです.
1)masterが起動すると、ipc serverが開きます.
2)slaveが起動すると、マスターに接続され、3秒おきにマスターに「心拍数」を自発的に送信し、自分の状態情報をマスターに伝え、マスターもこの心拍数の戻り値を通じてslaveノードに指令を伝達する.
2、心拍のコードを見つける
namenodeとdatanodeでは、datanodeのofferServiceメソッドでは、3秒おきにnamenodeに心拍数のコードを送信します.
上のコードは、単機のプログラムであれば、不思議なことはありません.しかし、これはhadoopクラスタです!Datanodeとnamenodeは2台の異なるマシン(または2つのJVM)で動作します!Datanodeマシンがnamenodeのメソッドを直接呼び出すなんて!どうやって実現したの?もしかして伝説のRMIかな??
次に,このメソッド呼び出しの詳細を主に解析する.
3、心拍の底の詳細1:datanodeはどのようにnamenodeオブジェクトを獲得しますか?
まず、DataNodeクラスには、namenodeのメンバー変数があります.
NameNodeクラスの定義は次のとおりです.
注:NameNodeはDatanodeProtocolインタフェースを実装し、DatanodeProtocolインタフェースはnamenodeとdatanode間の通信方法を定義します.
では、DataNodeクラスはどのようにしてNameNodeクラスの参照を取得しますか?
Datanode側でnamenode変数に値を割り当てるコード:
RPCクラスの追跡を続行します.
今、分かった!
1)namenodeへの付与は,真のnewがDatanodeProtocolインタフェースを実現したオブジェクトではなく,
ダイナミックエージェント !!
2)上記のコードではprotocolのタイプはDatanodeProtocolである.class
3)namenodeに対するすべての呼び出しは,Invokerに委任された.
4、心拍の底の細部2:Invoker類を見る
Invokerクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス:
すべてのメソッド呼び出しはdelegateによってclientのcallメソッドに渡されました!
ClientはInvokerのメンバー変数です.
したがって、DatanodeProtocolの各メソッド呼び出しは、Invocationオブジェクトにパッケージされ、clientによってパッケージされていることがわかります.コール()呼び出し
5、心拍の底の細部3:Invocation類
Invocationクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス
ビジネスロジックの方法はありません.主な役割はVOです.
6、心拍の底の細部4:client類のcall方法
次にclientクラスのcallメソッドに重点を置きます.
7、今、一目瞭然
8、ダイナミックエージェントを見る
ダイナミックエージェント:「インタフェースのみで、対応していない実装クラス」を可能にします.具体的な方法の実装は別のクラスに委託できるからです!!
この例ではdatanodeについてDatanodeProtocolインタフェースには実装クラスがありません!
*** THE END ***
namenodeとdatanodeの間の通信、jobtrackerとtasktrackerの直接の通信は、すべて“心拍”を通じて完成しました.
以前hadoop心拍原理のソースコードを見たことがありますが、今日はもう一度思い出して、ほほほ、だから「心拍思い出」と言います.
1、心拍数メカニズム
心臓の鼓動のメカニズムは大体こうです.
1)masterが起動すると、ipc serverが開きます.
2)slaveが起動すると、マスターに接続され、3秒おきにマスターに「心拍数」を自発的に送信し、自分の状態情報をマスターに伝え、マスターもこの心拍数の戻り値を通じてslaveノードに指令を伝達する.
2、心拍のコードを見つける
namenodeとdatanodeでは、datanodeのofferServiceメソッドでは、3秒おきにnamenodeに心拍数のコードを送信します.
/**
* Main loop for the DataNode. Runs until shutdown,
* forever calling remote NameNode functions.
*/
public void offerService() throws Exception {
...
//
// Now loop for a long time....
//
while (shouldRun) {
try {
long startTime = now();
//
// Every so often, send heartbeat or block-report
//
// 3 , namenode
if (startTime - lastHeartbeat > heartBeatInterval) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
lastHeartbeat = startTime;
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
// ,“ ” namenode ??
myMetrics.heartbeats.inc(now() - startTime);
//LOG.info("Just sent heartbeat, with name " + localName);
// (namenode datanode )
if (!processCommand(cmds))
continue;
}
//
...
} // while (shouldRun)
} // offerService
上のコードは、単機のプログラムであれば、不思議なことはありません.しかし、これはhadoopクラスタです!Datanodeとnamenodeは2台の異なるマシン(または2つのJVM)で動作します!Datanodeマシンがnamenodeのメソッドを直接呼び出すなんて!どうやって実現したの?もしかして伝説のRMIかな??
次に,このメソッド呼び出しの詳細を主に解析する.
3、心拍の底の詳細1:datanodeはどのようにnamenodeオブジェクトを獲得しますか?
まず、DataNodeクラスには、namenodeのメンバー変数があります.
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
...
public DatanodeProtocol namenode = null;
...
}
NameNodeクラスの定義は次のとおりです.
public class NameNode implements ClientProtocol, DatanodeProtocol,
NamenodeProtocol, FSConstants,
RefreshAuthorizationPolicyProtocol {
...
}
注:NameNodeはDatanodeProtocolインタフェースを実装し、DatanodeProtocolインタフェースはnamenodeとdatanode間の通信方法を定義します.
では、DataNodeクラスはどのようにしてNameNodeクラスの参照を取得しますか?
Datanode側でnamenode変数に値を割り当てるコード:
// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
RPCクラスの追跡を続行します.
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
今、分かった!
1)namenodeへの付与は,真のnewがDatanodeProtocolインタフェースを実現したオブジェクトではなく,
ダイナミックエージェント !!
2)上記のコードではprotocolのタイプはDatanodeProtocolである.class
3)namenodeに対するすべての呼び出しは,Invokerに委任された.
4、心拍の底の細部2:Invoker類を見る
Invokerクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス:
private static class Invoker implements InvocationHandler {
このクラスではinvokeメソッドを参照してください.public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), address,
method.getDeclaringClass(), ticket);
...
return value.get();
}
すべてのメソッド呼び出しはdelegateによってclientのcallメソッドに渡されました!
ClientはInvokerのメンバー変数です.
private Client client;
したがって、DatanodeProtocolの各メソッド呼び出しは、Invocationオブジェクトにパッケージされ、clientによってパッケージされていることがわかります.コール()呼び出し
5、心拍の底の細部3:Invocation類
Invocationクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス
ビジネスロジックの方法はありません.主な役割はVOです.
6、心拍の底の細部4:client類のcall方法
次にclientクラスのcallメソッドに重点を置きます.
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket)
throws InterruptedException, IOException {
Call call = new Call(param);
// Invocation Call
Connection connection = getConnection(addr, protocol, ticket, call);
//
connection.sendParam(call); // send the parameter
// “ ” call
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
//
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
throw wrapException(addr, call.error);
}
} else {
return call.value;
//
}
}
}
7、今、一目瞭然
datanode namenode heartbeat :
a) datanode namenode proxy
b) datanode , namenode proxy heartbeat :
namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getDfsUsed(),
data.getRemaining(),
xmitsInProgress.get(),
getXceiverCount());
c) datanode namenode ( “ ”) Invocation , client.call
d) client call Invocation Call
e) client call namenode
f) namenode , namenode Call, process , Responder !
g) datanode , DatanodeCommand[]
8、ダイナミックエージェントを見る
ダイナミックエージェント:「インタフェースのみで、対応していない実装クラス」を可能にします.具体的な方法の実装は別のクラスに委託できるからです!!
この例ではdatanodeについてDatanodeProtocolインタフェースには実装クラスがありません!
*** THE END ***