hadoopのときめきメモリアル


hadoopのクラスタはmaster/slaveモードに基づいており、namenodeおよびjobtrackerはmasterに属し、datanode/tasktrackerはslavesに属する.マスターは1つしかありませんが、slavesは複数あります.
namenodeとdatanodeの間の通信、jobtrackerとtasktrackerの直接の通信は、すべて“心拍”を通じて完成しました.
以前hadoop心拍原理のソースコードを見たことがありますが、今日はもう一度思い出して、ほほほ、だから「心拍思い出」と言います.
1、心拍数メカニズム
心臓の鼓動のメカニズムは大体こうです.
1)masterが起動すると、ipc serverが開きます.
2)slaveが起動すると、マスターに接続され、3秒おきにマスターに「心拍数」を自発的に送信し、自分の状態情報をマスターに伝え、マスターもこの心拍数の戻り値を通じてslaveノードに指令を伝達する.
2、心拍のコードを見つける
namenodeとdatanodeでは、datanodeのofferServiceメソッドでは、3秒おきにnamenodeに心拍数のコードを送信します.
 /**
   * Main loop for the DataNode.  Runs until shutdown,
   * forever calling remote NameNode functions.
   */
  public void offerService() throws Exception {
     
    ...

    //
    // Now loop for a long time....
    //

    while (shouldRun) {
      try {
        long startTime = now();

        //
        // Every so often, send heartbeat or block-report
        //
        
	//     3  ,  namenode   
        if (startTime - lastHeartbeat > heartBeatInterval) {
          //
          // All heartbeat messages include following info:
          // -- Datanode name
          // -- data transfer port
          // -- Total capacity
          // -- Bytes remaining
          //
          lastHeartbeat = startTime;
          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());

	  //         ,“    ”      namenode     ??

          myMetrics.heartbeats.inc(now() - startTime);
          //LOG.info("Just sent heartbeat, with name " + localName);

	  //          (namenode  datanode   )
          if (!processCommand(cmds))
            continue;
        }

  	//         
	...
    } // while (shouldRun)
  } // offerService

上のコードは、単機のプログラムであれば、不思議なことはありません.しかし、これはhadoopクラスタです!Datanodeとnamenodeは2台の異なるマシン(または2つのJVM)で動作します!Datanodeマシンがnamenodeのメソッドを直接呼び出すなんて!どうやって実現したの?もしかして伝説のRMIかな??
次に,このメソッド呼び出しの詳細を主に解析する.
3、心拍の底の詳細1:datanodeはどのようにnamenodeオブジェクトを獲得しますか?
まず、DataNodeクラスには、namenodeのメンバー変数があります.
public class DataNode extends Configured 
    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
  ...
  public DatanodeProtocol namenode = null;
  ... 
}

NameNodeクラスの定義は次のとおりです.
public class NameNode implements ClientProtocol, DatanodeProtocol,
                                 NamenodeProtocol, FSConstants,
                                 RefreshAuthorizationPolicyProtocol {
  ... 
}

注:NameNodeはDatanodeProtocolインタフェースを実装し、DatanodeProtocolインタフェースはnamenodeとdatanode間の通信方法を定義します.
では、DataNodeクラスはどのようにしてNameNodeクラスの参照を取得しますか?
Datanode側でnamenode変数に値を割り当てるコード:
    // connect to name node
    this.namenode = (DatanodeProtocol) 
      RPC.waitForProxy(DatanodeProtocol.class,
                       DatanodeProtocol.versionID,
                       nameNodeAddr, 
                       conf);

RPCクラスの追跡を続行します.
VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(addr, ticket, conf, factory));

今、分かった!
1)namenodeへの付与は,真のnewがDatanodeProtocolインタフェースを実現したオブジェクトではなく,
ダイナミックエージェント !!
2)上記のコードではprotocolのタイプはDatanodeProtocolである.class
3)namenodeに対するすべての呼び出しは,Invokerに委任された.
4、心拍の底の細部2:Invoker類を見る
Invokerクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス:
	 private static class Invoker implements InvocationHandler {
このクラスではinvokeメソッドを参照してください.
  
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			...

		      ObjectWritable value = (ObjectWritable)
		        client.call(new Invocation(method, args), address, 
		                    method.getDeclaringClass(), ticket);
      			...
		      return value.get();
		   }

すべてのメソッド呼び出しはdelegateによってclientのcallメソッドに渡されました!
ClientはInvokerのメンバー変数です.
  
 private Client client;

したがって、DatanodeProtocolの各メソッド呼び出しは、Invocationオブジェクトにパッケージされ、clientによってパッケージされていることがわかります.コール()呼び出し
5、心拍の底の細部3:Invocation類
Invocationクラスはorg.apache.hadoop.ipc.RPCクラスの静的内部クラス
ビジネスロジックの方法はありません.主な役割はVOです.
6、心拍の底の細部4:client類のcall方法
次にclientクラスのcallメソッドに重点を置きます.

  public Writable call(Writable param, InetSocketAddress addr, 
                       Class<?> protocol, UserGroupInformation ticket)  
                       throws InterruptedException, IOException {

    Call call = new Call(param);   
		//  Invocation   Call
    Connection connection = getConnection(addr, protocol, ticket, call);
		//        
    connection.sendParam(call);                 // send the parameter
		//  “   ”  call    
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
		//       
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          throw wrapException(addr, call.error);
        }
      } else {
        return call.value;
		//   
      }
    }
  }

7、今、一目瞭然
datanode namenode  heartbeat      :

	a)  datanode     namenode proxy
	b)  datanode ,  namenode proxy heartbeat  :
		namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());
	c)  datanode  namenode             (   “    ”)  Invocation  ,   client.call  
	d) client call   Invocation   Call  
	e) client  call      namenode   
	f) namenode   ,   namenode  Call, process ,  Responder   !
	g) datanode    ,       DatanodeCommand[]
		

8、ダイナミックエージェントを見る
ダイナミックエージェント:「インタフェースのみで、対応していない実装クラス」を可能にします.具体的な方法の実装は別のクラスに委託できるからです!!
この例ではdatanodeについてDatanodeProtocolインタフェースには実装クラスがありません!
*** THE END ***