Hadoop 2.6.0学習ノート(14)Hadoop Two HA:Active Stand byElector
ルーシュリーの作業ノートは、プログラマーが文芸的ではないと誰が言っていますか?
この文章の翻訳はhttp://johnjianfang.blogspot.com/このサイトは直接アクセスできません.承知してるでしょ!
Hadoop two HAはApache Zookeeperを通じて状態を調整し共有します.Name node HAとResource Manager HAはいずれもactive/standbyモードを採用しています.任意の時点ではactive leaderは一つしかありません.複数のホストからactive leaderを選出するために、HadoopはActive Stand byElector類を提供しています.この種類はApache Zookeeperと結合してこの目標を実現します.Active Stand byElectorはzookeeper管理者と似ています.zookeeperとコントローラの間で働いています.
Active Stand byElector類の定義:
interface implement ation of Zookeeper calback for create
インターフェースimplement ation of Zookeeper watch events(connection and node)
フォーマットzookeeper
この文章の翻訳はhttp://johnjianfang.blogspot.com/このサイトは直接アクセスできません.承知してるでしょ!
Hadoop two HAはApache Zookeeperを通じて状態を調整し共有します.Name node HAとResource Manager HAはいずれもactive/standbyモードを採用しています.任意の時点ではactive leaderは一つしかありません.複数のホストからactive leaderを選出するために、HadoopはActive Stand byElector類を提供しています.この種類はApache Zookeeperと結合してこの目標を実現します.Active Stand byElectorはzookeeper管理者と似ています.zookeeperとコントローラの間で働いています.
Active Stand byElector類の定義:
package org.apache.hadoop.ha;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
// ,
public interface ActiveStandbyElectorCallback {
void becomeActive() throws ServiceFailedException;
void becomeStandby();
void enterNeutralMode();
void notifyFatalError(String errorMessage);
void fenceOldActive(byte[] oldActiveData);
}
@VisibleForTesting
protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
@VisibleForTesting
protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
//
}
中心思想は、一時ノード「Active Stand byElectorck」を作成し、一つのマシンが成功的にこのノードを作成したら、leaderになります.このノードは一時的であるため、セッションが失効したり、接続が失敗したりすると、現在のactive leaderがこのノードをloseする可能性があるので、他のノードは同じノードを作成する機会があり、leaderになる.現在のactive leaderは、時間をおいてactive状態に復帰することができます.このような特殊な場合、二つのactiveのleaderがあるかもしれません.このような状況を回避するために、Active Stand byElectorは、長い間(PERSISTENT)ノード「Active BreadCrumb」を作成し、自分のデータを保存するために使用しています.したがって、新しいactive leaderは、active leaderを分離することができます./**
* ActiveStandbyElector "ActiveBreadCrumb" ,
* failover (fenced)
*/
private void writeBreadCrumbNode(Stat oldBreadcrumbStat) throws KeeperException, InterruptedException {
Preconditions.checkState(appData != null, "no appdata");
LOG.info("Writing znode " + zkBreadCrumbPath + " to indicate that the local node is the most recent active...");
if (oldBreadcrumbStat == null) {
// No previous active, just create the node
createWithRetries(zkBreadCrumbPath, appData, zkAcl, CreateMode.PERSISTENT);
} else {
// There was a previous active, update the node
setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
}
}
例えば、DFSZK FailoverControllerは、「Active BreadCrumb」ノードにおいて、ホームname、port、ZKFC port、name service id、and name node idを含むHAServiceTargetオブジェクトを保存している.package org.apache.hadoop.ha;
@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController {
static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
protected static final String USAGE = "Usage: java zkfc [ -formatZK [-force] [-nonInteractive] ]";
//
}
package org.apache.hadoop.hdfs.tools;
@InterfaceAudience.Private
public class DFSZKFailoverController extends ZKFailoverController {
@Override
protected byte[] targetToData(HAServiceTarget target) { // HAServiceTarget
InetSocketAddress addr = target.getAddress();
return ActiveNodeInfo.newBuilder()
.setHostname(addr.getHostName())
.setPort(addr.getPort())
.setZkfcPort(target.getZKFCAddress().getPort())
.setNameserviceId(localNNTarget.getNameServiceId())
.setNamenodeId(localNNTarget.getNameNodeId())
.build()
.toByteArray();
}
//
}
package org.apache.hadoop.ha;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HAServiceTarget {
private static final String HOST_SUBST_KEY = "host";
private static final String PORT_SUBST_KEY = "port";
private static final String ADDRESS_SUBST_KEY = "address";
//
}
package org.apache.hadoop.hdfs.tools;
@InterfaceAudience.Private
public class NNHAServiceTarget extends HAServiceTarget {
// Keys added to the fencing script environment
private static final String NAMESERVICE_ID_KEY = "nameserviceid";
private static final String NAMENODE_ID_KEY = "namenodeid";
//
}
Active Stand byElectorの定義から、実際にはそれがzookeeperのコールバック処理プログラムであることが分かります.package org.apache.hadoop.ha;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
//
private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
}
private void monitorLockNodeAsync() {
zkClient.exists(zkLockFilePath, watcher, this, zkClient);
}
}
明らかに、create()方法がzookeeperで呼び出された時、Active Stand byElectorはStringCallbackとして働いた.existsがznodeを監視するように呼び出された時、Active Stand byElectorはStatitCallbackになりました.interface implement ation of Zookeeper calback for create
@Override
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
if (isStaleClient(ctx)) return;
LOG.debug("CreateNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState +
" for " + this);
Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
if (becomeActive()) {
monitorActiveStatus();
} else {
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
becomeStandby();
}
// if we had retried then the znode could have been created by our first
// attempt to the server (that we lost) and this node exists response is
// for the second attempt. verify this case via ephemeral node owner. this
// will happen on the callback for monitoring the lock.
monitorActiveStatus();
return;
}
String errorMessage = "Received create error from Zookeeper. code:"
+ code.toString() + " for path " + path;
LOG.debug(errorMessage);
if (shouldRetry(code)) {
if (createRetryCount < maxRetryNum) {
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
++createRetryCount;
createLockNodeAsync();
return;
}
errorMessage = errorMessage
+ ". Not retrying further znode create connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock acquisition failed because session was lost");
return;
}
fatalError(errorMessage);
}
interface implement ation of Zookeeper calback for monitor(exists)@Override
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;
assert wantToBeInElection :
"Got a StatNode result after quitting election";
LOG.debug("StatNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState + " for " + this);
Code code = Code.get(rc);
if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
}
// the watch set by us will notify about changes
return;
}
if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
return;
}
String errorMessage = "Received stat error from Zookeeper. code:"
+ code.toString();
LOG.debug(errorMessage);
if (shouldRetry(code)) {
if (statRetryCount < maxRetryNum) {
++statRetryCount;
monitorLockNodeAsync();
return;
}
errorMessage = errorMessage
+ ". Not retrying further znode monitoring connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock monitoring failed because session was lost");
return;
}
fatalError(errorMessage);
}
Active Station byElectorは、sessionとconnectionに関連するイベントを観察するために、Zookeeperの観察者を登録しています.インターフェースimplement ation of Zookeeper watch events(connection and node)
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: " + event.getState());
break;
}
return;
}
// , node ,ActiveStandbyElector 。
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
The join action is simply to try to create the lock node. private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
ロックnodeを成功裏に作成した後、ActiveStand byElectorはまず古いactive leaderを隔離して、自分のデータをbredcrumbノードに書き込みます. private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);
LOG.debug("Becoming active for " + this);
appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}
もしActive Stand byElectorが選挙から退出する必要があるなら、彼自身のbredcrumbノードを削除しようとします. public synchronized void quitElection(boolean needFence) {
LOG.info("Yielding from election");
if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode();
}
reset();
wantToBeInElection = false;
}
Active Stand byElector provides a calback interface for its caler such as ZKFailover Controller for name node HA and EmbodElectore for rervice manager HA.public interface ActiveStandbyElectorCallback {
//
}
For example,the ElectorCallbacks in ZK Failover Controller is defined as follows. /**
* Callbacks from elector
*/
class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
public void becomeActive() throws ServiceFailedException {
ZKFailoverController.this.becomeActive();
}
@Override
public void becomeStandby() {
ZKFailoverController.this.becomeStandby();
}
@Override
public void enterNeutralMode() {
}
@Override
public void notifyFatalError(String errorMessage) {
fatalError(errorMessage);
}
@Override
public void fenceOldActive(byte[] data) {
ZKFailoverController.this.fenceOldActive(data);
}
@Override
public String toString() {
synchronized (ZKFailoverController.this) {
return "Elector callbacks for " + localTarget;
}
}
}
Active Station byElectorがZookeeperで選挙に勝った後、ZKFailover Controllerの中のbecomme Active()はname nodeに一回のRPC要求を実行するために使われて、対応するname nodeがactive状態に変化します. private synchronized void becomeActive() throws ServiceFailedException {
LOG.info("Trying to make " + localTarget + " active...");
try {
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
conf, FailoverController.getRpcTimeoutToNewActive(conf)), createReqInfo());
String msg = "Successfully transitioned " + localTarget + " to active state";
LOG.info(msg);
serviceState = HAServiceState.ACTIVE;
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) {
//
}
}
if Active Stand byElector makes a decision to become standby,the Failover Controller asks name node to become standby instead. private synchronized void becomeStandby() {
LOG.info("ZK Election indicated that " + localTarget +
" should become standby");
try {
int timeout = FailoverController.getGracefulFenceTimeout(conf);
localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
LOG.info("Successfully transitioned " + localTarget +
" to standby state");
} catch (Exception e) {
LOG.error("Couldn't transition " + localTarget + " to standby state",
e);
// TODO handle this. It's a likely case since we probably got fenced
// at the same time.
}
serviceState = HAServiceState.STANDBY;
}
Hadoop 2.6.0 HAクラスタ構築 完成した後、一歩はフォーマットzookeeperと起動HAのactiveとstandbyの切り替えです.フォーマットzookeeper
bin/hdfs zkfc -formatZK
#
hdfs shell :
COMMAND=$1
#
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif #
DFSZK FailoverControllerのmainメソッドを実行します. public static void main(String args[]) throws Exception {
if (DFSUtil.parseHelpArgument(args,ZKFailoverController.USAGE, System.out, true)) {
System.exit(0);
}
GenericOptionsParser parser = new GenericOptionsParser(new HdfsConfiguration(), args);
DFSZKFailoverController zkfc = DFSZKFailoverController.create(parser.getConfiguration());
System.exit(zkfc.run(parser.getRemainingArgs()));
}
# HdfsConfiguration extends Configuration
DFSZK FailoverControllerのmainメソッドでZK Failover Controllerのrunを呼び出し、runでdoRun(String[]args)を呼び出してformatitZKパラメータを受信します.private int doRun(String[] args) {
try {
initZK();
} catch (KeeperException ke) {
// ......
return ERR_CODE_NO_ZK;
}
if (args.length > 0) {
if ("-formatZK".equals(args[0])) { // hdfs zkfc formatZK
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) { // formatZK -force
force = true;
} else if ("-nonInteractive".equals(args[i])) { // formatZK -nonInteractive
interactive = false;
} else { //
badArg(args[i]);
}
} //
return formatZK(force, interactive);
} else { // hdfs zkfc formatZK
badArg(args[0]);
}
}
//
if (!elector.parentZNodeExists()) {
// ......
return ERR_CODE_NO_PARENT_ZNODE;
}
try {
localTarget.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
// ......
return ERR_CODE_NO_FENCER;
}
initRPC();
initHM();
startRPC();
try {
mainLoop();
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}
zkfcを起動しますsbin/hadoop-daemon.sh start zkfc