Hadoop 2.6.0学習ノート(14)Hadoop Two HA:Active Stand byElector


ルーシュリーの作業ノートは、プログラマーが文芸的ではないと誰が言っていますか?
この文章の翻訳はhttp://johnjianfang.blogspot.com/このサイトは直接アクセスできません.承知してるでしょ!
    Hadoop two HAはApache Zookeeperを通じて状態を調整し共有します.Name node HAとResource Manager HAはいずれもactive/standbyモードを採用しています.任意の時点ではactive leaderは一つしかありません.複数のホストからactive leaderを選出するために、HadoopはActive Stand byElector類を提供しています.この種類はApache Zookeeperと結合してこの目標を実現します.Active Stand byElectorはzookeeper管理者と似ています.zookeeperとコントローラの間で働いています.
    Active Stand byElector類の定義:
package org.apache.hadoop.ha;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
    //       ,     
    public interface ActiveStandbyElectorCallback {
        void becomeActive() throws ServiceFailedException;    
        void becomeStandby();    
        void enterNeutralMode();    
        void notifyFatalError(String errorMessage);    
        void fenceOldActive(byte[] oldActiveData);
    }
    
    @VisibleForTesting
    protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
    @VisibleForTesting
    protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
    
    //      
}
    中心思想は、一時ノード「Active Stand byElectorck」を作成し、一つのマシンが成功的にこのノードを作成したら、leaderになります.このノードは一時的であるため、セッションが失効したり、接続が失敗したりすると、現在のactive leaderがこのノードをloseする可能性があるので、他のノードは同じノードを作成する機会があり、leaderになる.現在のactive leaderは、時間をおいてactive状態に復帰することができます.このような特殊な場合、二つのactiveのleaderがあるかもしれません.このような状況を回避するために、Active Stand byElectorは、長い間(PERSISTENT)ノード「Active BreadCrumb」を作成し、自分のデータを保存するために使用しています.したがって、新しいactive leaderは、active leaderを分離することができます.
/** 
 * ActiveStandbyElector   "ActiveBreadCrumb"    ,
 *       failover      (fenced)
 */
private void writeBreadCrumbNode(Stat oldBreadcrumbStat) throws KeeperException, InterruptedException {
    Preconditions.checkState(appData != null, "no appdata");
    
    LOG.info("Writing znode " + zkBreadCrumbPath + " to indicate that the local node is the most recent active...");
    if (oldBreadcrumbStat == null) {
       // No previous active, just create the node
       createWithRetries(zkBreadCrumbPath, appData, zkAcl, CreateMode.PERSISTENT);
    } else {
       // There was a previous active, update the node
       setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
    }
}
    例えば、DFSZK FailoverControllerは、「Active BreadCrumb」ノードにおいて、ホームname、port、ZKFC port、name service id、and name node idを含むHAServiceTargetオブジェクトを保存している.
package org.apache.hadoop.ha;

@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController {

  static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
  
  public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
  private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
  private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
  private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
  public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
  private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
  public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
  static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
  
  protected static final String USAGE = "Usage: java zkfc [ -formatZK [-force] [-nonInteractive] ]";

  //      
}
package org.apache.hadoop.hdfs.tools;

@InterfaceAudience.Private
public class DFSZKFailoverController extends ZKFailoverController {
  @Override
  protected byte[] targetToData(HAServiceTarget target) {    // HAServiceTarget 
    InetSocketAddress addr = target.getAddress();

    return ActiveNodeInfo.newBuilder()
      .setHostname(addr.getHostName())
      .setPort(addr.getPort())
      .setZkfcPort(target.getZKFCAddress().getPort())
      .setNameserviceId(localNNTarget.getNameServiceId())
      .setNamenodeId(localNNTarget.getNameNodeId())
      .build()
      .toByteArray();
  }
  
  //      
}
package org.apache.hadoop.ha;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HAServiceTarget {
  private static final String HOST_SUBST_KEY = "host";
  private static final String PORT_SUBST_KEY = "port";
  private static final String ADDRESS_SUBST_KEY = "address";
  
  //      
}
package org.apache.hadoop.hdfs.tools;
@InterfaceAudience.Private
public class NNHAServiceTarget extends HAServiceTarget {
  // Keys added to the fencing script environment
  private static final String NAMESERVICE_ID_KEY = "nameserviceid";
  private static final String NAMENODE_ID_KEY = "namenodeid";
  
  //      
}
    Active Stand byElectorの定義から、実際にはそれがzookeeperのコールバック処理プログラムであることが分かります.
package org.apache.hadoop.ha;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
  //      
  private void createLockNodeAsync() {
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
  }

  private void monitorLockNodeAsync() {
    zkClient.exists(zkLockFilePath, watcher, this, zkClient);
  }
}
    明らかに、create()方法がzookeeperで呼び出された時、Active Stand byElectorはStringCallbackとして働いた.existsがznodeを監視するように呼び出された時、Active Stand byElectorはStatitCallbackになりました.
    interface implement ation of Zookeeper calback for create
  @Override
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    if (isStaleClient(ctx)) return;
    LOG.debug("CreateNode result: " + rc + " for path: " + path
        + " connectionState: " + zkConnectionState +
        "  for " + this);

    Code code = Code.get(rc);
    if (isSuccess(code)) {
      // we successfully created the znode. we are the leader. start monitoring
      if (becomeActive()) {
        monitorActiveStatus();
      } else {
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }

    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        // znode exists and we did not retry the operation. so a different
        // instance has created it. become standby and monitor lock.
        becomeStandby();
      }
      // if we had retried then the znode could have been created by our first
      // attempt to the server (that we lost) and this node exists response is
      // for the second attempt. verify this case via ephemeral node owner. this
      // will happen on the callback for monitoring the lock.
      monitorActiveStatus();
      return;
    }

    String errorMessage = "Received create error from Zookeeper. code:"
        + code.toString() + " for path " + path;
    LOG.debug(errorMessage);

    if (shouldRetry(code)) {
      if (createRetryCount < maxRetryNum) {
        LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
        ++createRetryCount;
        createLockNodeAsync();
        return;
      }
      errorMessage = errorMessage
          + ". Not retrying further znode create connection errors.";
    } else if (isSessionExpired(code)) {
      // This isn't fatal - the client Watcher will re-join the election
      LOG.warn("Lock acquisition failed because session was lost");
      return;
    }

    fatalError(errorMessage);
  }
    interface implement ation of Zookeeper calback for monitor(exists)
@Override
  public synchronized void processResult(int rc, String path, Object ctx,
      Stat stat) {
    if (isStaleClient(ctx)) return;
    
    assert wantToBeInElection :
        "Got a StatNode result after quitting election";
    
    LOG.debug("StatNode result: " + rc + " for path: " + path
        + " connectionState: " + zkConnectionState + " for " + this);
        

    Code code = Code.get(rc);
    if (isSuccess(code)) {
      // the following owner check completes verification in case the lock znode
      // creation was retried
      if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
        // we own the lock znode. so we are the leader
        if (!becomeActive()) {
          reJoinElectionAfterFailureToBecomeActive();
        }
      } else {
        // we dont own the lock znode. so we are a standby.
        becomeStandby();
      }
      // the watch set by us will notify about changes
      return;
    }

    if (isNodeDoesNotExist(code)) {
      // the lock znode disappeared before we started monitoring it
      enterNeutralMode();
      joinElectionInternal();
      return;
    }

    String errorMessage = "Received stat error from Zookeeper. code:"
        + code.toString();
    LOG.debug(errorMessage);

    if (shouldRetry(code)) {
      if (statRetryCount < maxRetryNum) {
        ++statRetryCount;
        monitorLockNodeAsync();
        return;
      }
      errorMessage = errorMessage
          + ". Not retrying further znode monitoring connection errors.";
    } else if (isSessionExpired(code)) {
      // This isn't fatal - the client Watcher will re-join the election
      LOG.warn("Lock monitoring failed because session was lost");
      return;
    }

    fatalError(errorMessage);
  }
    Active Station byElectorは、sessionとconnectionに関連するイベントを観察するために、Zookeeperの観察者を登録しています.
    インターフェースimplement ation of Zookeeper watch events(connection and node)
 synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    Event.EventType eventType = event.getType();
    if (isStaleClient(zk)) return;

    if (eventType == Event.EventType.None) {
      // the connection state has changed
      switch (event.getState()) {
      case SyncConnected:
        LOG.info("Session connected.");
        // if the listener was asked to move to safe state then it needs to  be undone
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) {
          monitorActiveStatus();
        }
        break;
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: " + event.getState());
        break;
      }

      return;
    }

    //     ,  node     ,ActiveStandbyElector         。
    String path = event.getPath();
    if (path != null) {
      switch (eventType) {
      case NodeDeleted:
        if (state == State.ACTIVE) {
          enterNeutralMode();
        }
        joinElectionInternal();
        break;
      case NodeDataChanged:
        monitorActiveStatus();
        break;
      default:
        LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
        monitorActiveStatus();
      }

      return;
    }

    // some unexpected error has occurred
    fatalError("Unexpected watch error from Zookeeper");
  }
   The join action is simply to try to create the lock node.
  private void joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
      if (!reEstablishSession()) {
        fatalError("Failed to reEstablish connection with ZooKeeper");
        return;
      }
    }

    createRetryCount = 0;
    wantToBeInElection = true;
    createLockNodeAsync();
  }
    ロックnodeを成功裏に作成した後、ActiveStand byElectorはまず古いactive leaderを隔離して、自分のデータをbredcrumbノードに書き込みます.
  private boolean becomeActive() {
    assert wantToBeInElection;
    if (state == State.ACTIVE) {
      // already active
      return true;
    }
    try {
      Stat oldBreadcrumbStat = fenceOldActive();
      writeBreadCrumbNode(oldBreadcrumbStat);
      
      LOG.debug("Becoming active for " + this);
      appClient.becomeActive();
      state = State.ACTIVE;
      return true;
    } catch (Exception e) {
      LOG.warn("Exception handling the winning of election", e);
      // Caller will handle quitting and rejoining the election.
      return false;
    }
  }
    もしActive Stand byElectorが選挙から退出する必要があるなら、彼自身のbredcrumbノードを削除しようとします.
  public synchronized void quitElection(boolean needFence) {
    LOG.info("Yielding from election");
    if (!needFence && state == State.ACTIVE) {
      // If active is gracefully going back to standby mode, remove
      // our permanent znode so no one fences us.
      tryDeleteOwnBreadCrumbNode();
    }
    reset();
    wantToBeInElection = false;
  }
    Active Stand byElector provides a calback interface for its caler such as ZKFailover Controller for name node HA and EmbodElectore for rervice manager HA.
public interface ActiveStandbyElectorCallback {
    //      
}
    For example,the ElectorCallbacks in ZK Failover Controller is defined as follows.
  /**
   * Callbacks from elector
   */
  class ElectorCallbacks implements ActiveStandbyElectorCallback {
    @Override
    public void becomeActive() throws ServiceFailedException {
      ZKFailoverController.this.becomeActive();
    }

    @Override
    public void becomeStandby() {
      ZKFailoverController.this.becomeStandby();
    }

    @Override
    public void enterNeutralMode() {
    }

    @Override
    public void notifyFatalError(String errorMessage) {
      fatalError(errorMessage);
    }

    @Override
    public void fenceOldActive(byte[] data) {
      ZKFailoverController.this.fenceOldActive(data);
    }
    
    @Override
    public String toString() {
      synchronized (ZKFailoverController.this) {
        return "Elector callbacks for " + localTarget;
      }
    }
  }
    Active Station byElectorがZookeeperで選挙に勝った後、ZKFailover Controllerの中のbecomme Active()はname nodeに一回のRPC要求を実行するために使われて、対応するname nodeがactive状態に変化します.
  private synchronized void becomeActive() throws ServiceFailedException {
    LOG.info("Trying to make " + localTarget + " active...");
    try {
      HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
          conf, FailoverController.getRpcTimeoutToNewActive(conf)), createReqInfo());
      String msg = "Successfully transitioned " + localTarget + " to active state";
      LOG.info(msg);
      serviceState = HAServiceState.ACTIVE;
      recordActiveAttempt(new ActiveAttemptRecord(true, msg));
    } catch (Throwable t) {
      //  
    }
  }
    if Active Stand byElector makes a decision to become standby,the Failover Controller asks name node to become standby instead.
  private synchronized void becomeStandby() {
    LOG.info("ZK Election indicated that " + localTarget +
        " should become standby");
    try {
      int timeout = FailoverController.getGracefulFenceTimeout(conf);
      localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
      LOG.info("Successfully transitioned " + localTarget +
          " to standby state");
    } catch (Exception e) {
      LOG.error("Couldn't transition " + localTarget + " to standby state",
          e);
      // TODO handle this. It's a likely case since we probably got fenced
      // at the same time.
    }
    serviceState = HAServiceState.STANDBY;
  }
Hadoop 2.6.0 HAクラスタ構築 完成した後、一歩はフォーマットzookeeperと起動HAのactiveとstandbyの切り替えです.
フォーマットzookeeper
bin/hdfs zkfc -formatZK
#
 hdfs  shell   :
COMMAND=$1
#    
elif [ "$COMMAND" = "zkfc" ] ; then
  CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif     #  
DFSZK FailoverControllerのmainメソッドを実行します.
  public static void main(String args[]) throws Exception {
    if (DFSUtil.parseHelpArgument(args,ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser(new HdfsConfiguration(), args);
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(parser.getConfiguration());
    
    System.exit(zkfc.run(parser.getRemainingArgs()));
  }
  # HdfsConfiguration extends Configuration              
DFSZK FailoverControllerのmainメソッドでZK Failover Controllerのrunを呼び出し、runでdoRun(String[]args)を呼び出してformatitZKパラメータを受信します.
private int doRun(String[] args) {
    try {
      initZK();
    } catch (KeeperException ke) {
      // ......
      return ERR_CODE_NO_ZK;
    }
    if (args.length > 0) {
      if ("-formatZK".equals(args[0])) {        // hdfs zkfc      formatZK
        boolean force = false;
        boolean interactive = true;
        for (int i = 1; i < args.length; i++) {
          if ("-force".equals(args[i])) {       // formatZK -force
            force = true;
          } else if ("-nonInteractive".equals(args[i])) {     // formatZK -nonInteractive
            interactive = false;
          } else {                    //     
            badArg(args[i]);
          }
        }        //        
        return formatZK(force, interactive);
      } else {                                  // hdfs zkfc      formatZK
        badArg(args[0]);
      }
    }

    //    
    if (!elector.parentZNodeExists()) {
      // ......
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      // ......
      return ERR_CODE_NO_FENCER;
    }

    initRPC();
    initHM();
    startRPC();
    try {
      mainLoop();
    } finally {
      rpcServer.stopAndJoin();
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
}
zkfcを起動します
sbin/hadoop-daemon.sh start zkfc