hadoop 2.7.3ソース解析のdatanode登録と心拍メカニズム
もっと読む datanode登録とときめき datanode登録 datanode心拍数 namenodeは登録と心拍情報を受信します. DatanodeManager簡単紹介 namednoeは登録情報 を受信する..name node受信心拍情報 http://zhangjun5965.iteye.com/admin/blogs/2386384
datanode登録とときめき
hadoopが起動する時、正常な流れは先にnamenoeを起動して、datanodeを起動します.なめなのnodeはdatanodeの登録を受け入れますので、datanodeの登録とときめきはその起動時から始まります.入り口の方法はもちろんdatanodeのmain方法です.
コードを追跡してdatanodeの構造方法の中で発見して、BlockPoolManagerの対象を初期化して、そのblock Pool Manager.refresh Namenodesを通じて(conf);このdatanodeに関するnamenode情報をプロファイルから取得し、登録と心拍情報を生成する.
具体的にはBlockPoolManagerの中のstartAllの方法で、startAllの方法を通じて、datanodeの上のすべてのBPO-Serviceを起動します.
datanode端の登録は比較的簡単で、connectTonAndHandschake方法を追跡することによって、最後に呼び出されたのはDatanodeProtocol Server SideTranslation PB.register Datanode方法です.
ここではDatanodeRegistrationオブジェクトをパラメータとして構成しており、namenodeがdatanodeを検証する必要があるいくつかの基本情報が含まれています.
最後にdatanodeとnamenodeが直接的にインタラクションするプロトコルDatanodeProtocolインターフェースのregister Datanode方法でnamenodeにrpc要求を送信してdatanodeを登録します.
この方法は最後にdatanodeをnamenodeに登録してから戻ってきた結果を処理してから戻る.
datanodeの心拍操作は主にoffer Service方法の中で行われます.この方法はshuldRunがfalseに戻るまでずっと実行されます.心拍操作は、まずnamenodeに心拍の要求を送り、その結果に基づいていくつかの情報を更新し、namenodeから持ち帰った各種コマンド(DatanodeCommand配列)を処理します.
心拍を送る方法はsendHeart Beatで、最終的にDatanodeProtocol Server SideTranslatoPB類のsendHeart beat(RpcController、Heart beat Request Protto)メソッドを呼び出して、心拍の要求を送ります.
DatanodeManager簡単な紹介
まず、DatanodeManagerの重要な変数をいくつか紹介します.
登録とときめきに関わらず、datanodeはrpcでnamenodeの同名の方法を呼び出しました.具体的な実現はNameNodeRpcServeにあります.
register Datanode方法ではFSS Namesystemのregister Datanode方法を呼び出し、最終的な処理方法はDatanodeManager.register Datanodeにあります.
まず下記の2行のコードを通じて登録されたdatanodeのdatanodemanageの中の2つのmapの情報を取得しました.
namenodeが心拍数を処理するのはdatanodeと同名の方法sendHeart beatの中で、最終の処理方法はDatanodeManager.handleHeart beatの方法です.
ときめきの具体的な流れは以下の通りです.
1.datanodeの情報を入手して、接続が許可されているかどうかを判断します.もし許可されていないなら、直接に異常を投げます.2.登録したかどうかを判断し、登録していない場合は、登録コマンド3に直接戻ります.datanodeの情報を更新するのは、主にDatanodeDescriptorの情報を更新することです.例えば、使用空間、空きスペースなど.4.セキュリティモードであるかどうかを確認します.リース状況を確認します.6.コピーコマンドを生成します.7.削除コマンドを生成します.8.キャッシュ関連コマンドを生成します.帯域幅関連コマンドを生成します.10.すべてのコマンドを返します.
関連コードは以下の通りです.
datanode登録とときめき
hadoopが起動する時、正常な流れは先にnamenoeを起動して、datanodeを起動します.なめなのnodeはdatanodeの登録を受け入れますので、datanodeの登録とときめきはその起動時から始まります.入り口の方法はもちろんdatanodeのmain方法です.
コードを追跡してdatanodeの構造方法の中で発見して、BlockPoolManagerの対象を初期化して、そのblock Pool Manager.refresh Namenodesを通じて(conf);このdatanodeに関するnamenode情報をプロファイルから取得し、登録と心拍情報を生成する.
具体的にはBlockPoolManagerの中のstartAllの方法で、startAllの方法を通じて、datanodeの上のすべてのBPO-Serviceを起動します.
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction
BPPServiceActorスレッドは、BPPServiceActorが対応するnamenodeに登録と心拍メッセージを送信するために、BPPOfferServiceのstart方法でループして起動される.//This must be called only by blockPoolManager
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
具体的な実現方法は自然にBPPServiceActorのrun方法にあります./**
* , offerService , shouldRun shouldServiceRun false
* No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode
* functionality.
*
* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
* happen either at shutdown or due to refreshNamenodes.
*/
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// setup storage
// namenode, datanode
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
.......
}
}
runningState = RunningState.RUNNING;
while (shouldRun()) {
try {
//
offerService();
} catch (Exception ex) {
.....
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
...
} finally {
....
}
}
datanode登録datanode端の登録は比較的簡単で、connectTonAndHandschake方法を追跡することによって、最後に呼び出されたのはDatanodeProtocol Server SideTranslation PB.register Datanode方法です.
ここではDatanodeRegistrationオブジェクトをパラメータとして構成しており、namenodeがdatanodeを検証する必要があるいくつかの基本情報が含まれています.
最後にdatanodeとnamenodeが直接的にインタラクションするプロトコルDatanodeProtocolインターフェースのregister Datanode方法でnamenodeにrpc要求を送信してdatanodeを登録します.
この方法は最後にdatanodeをnamenodeに登録してから戻ってきた結果を処理してから戻る.
@Override
public RegisterDatanodeResponseProto registerDatanode(
RpcController controller, RegisterDatanodeRequestProto request)
throws ServiceException {
DatanodeRegistration registration = PBHelper.convert(request
.getRegistration());
DatanodeRegistration registrationResp;
try {
registrationResp = impl.registerDatanode(registration);
} catch (IOException e) {
throw new ServiceException(e);
}
return RegisterDatanodeResponseProto.newBuilder()
.setRegistration(PBHelper.convert(registrationResp)).build();
}
datanodeときめきdatanodeの心拍操作は主にoffer Service方法の中で行われます.この方法はshuldRunがfalseに戻るまでずっと実行されます.心拍操作は、まずnamenodeに心拍の要求を送り、その結果に基づいていくつかの情報を更新し、namenodeから持ち帰った各種コマンド(DatanodeCommand配列)を処理します.
心拍を送る方法はsendHeart Beatで、最終的にDatanodeProtocol Server SideTranslatoPB類のsendHeart beat(RpcController、Heart beat Request Protto)メソッドを呼び出して、心拍の要求を送ります.
@Override
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
HeartbeatResponse response;
try {
......
//
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary);
} catch (IOException e) {
throw new ServiceException(e);
}
//
HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
.newBuilder();
DatanodeCommand[] cmds = response.getCommands();
.............
}
具体的に心拍数を送るプロトコルについては、DatanodeProtocol類のsendHeart beat方法を見てみます./**
*
* sendHeartbeat namenode datanode , . namenode datanode , DatanodeCommand 。 DatanodeCommand ,DataNode 、 datanode 。
*
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
* It also gives the NameNode a chance to return
* an array of "DatanodeCommand" objects in HeartbeatResponse.
* A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
* @param registration datanode registration information datanode
* @param reports utilization report per storage datanode (datanode , , 、disk、ssd )
* @param xmitsInProgress number of transfers from this datanode to others
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @param volumeFailureSummary info about volume failures
* @throws IOException on error
*/
@Idempotent
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports,
long dnCacheCapacity,
long dnCacheUsed,
int xmitsInProgress,
int xceiverCount,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary)
throws IOException;
namenodeは登録と心拍情報を受信します.DatanodeManager簡単な紹介
まず、DatanodeManagerの重要な変数をいくつか紹介します.
/**
*
*
* datanodeMap map StorageID DatanodeDescriptor , ,datanode namenode 。
*
* 1. storage id , map 。
* 2. ,
* 3. datanode storage id
*
* Stores the datanode -> block map.
*
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
* storage id. In order to keep the storage map consistent it tracks
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
*
* - added to the map if it is a new storage id;
* - updated with a new datanode started as a replacement for the old one
* with the same storage id; and
* - removed if and only if an existing datanode is restarted to serve a
* different storage id.
*
*
* Mapping: StorageID -> DatanodeDescriptor
*/
private final NavigableMap datanodeMap
= new TreeMap();
/**
*
* Cluster network topology
*/
private final NetworkTopology networktopology;
/**
* host DatanodeDescriptor , datanode。
* Host2NodesMap String DatanodeDescriptor[] ,
* datanode map ,
* Host2NodesMap
* Host names to datanode descriptors mapping.
*/
private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
namednoeは登録情報を受信します.登録とときめきに関わらず、datanodeはrpcでnamenodeの同名の方法を呼び出しました.具体的な実現はNameNodeRpcServeにあります.
register Datanode方法ではFSS Namesystemのregister Datanode方法を呼び出し、最終的な処理方法はDatanodeManager.register Datanodeにあります.
まず下記の2行のコードを通じて登録されたdatanodeのdatanodemanageの中の2つのmapの情報を取得しました.
// nodeS datanodeMap datanode
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
// nodeN host2DatanodeMap
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(nodeReg.getIpAddr(), nodeReg.getXferPort());
次にdatanodemanageのdatanodeMapのコメントの中で言っている3つの状況をそれぞれ処理します.// , ID
if (nodeN != null && nodeN != nodeS) {
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
//
removeDatanode(nodeN);
// physically remove node from datanodeMap
// , datanode
wipeDatanode(nodeN);
nodeN = null;
}
// ,
if (nodeS != null) {
................
nodeS.updateRegInfo(nodeReg);
nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
nodeS.setDisallowed(false); // Node is in the include list
//
// resolve network location
if(this.rejectUnresolvedTopologyDN) {
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
} else {
nodeS.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
nodeS.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeS));
}
getNetworkTopology().add(nodeS);
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
incrementVersionCount(nodeS.getSoftwareVersion());
startDecommissioningIfExcluded(nodeS);
success = true;
...........................
}
//
// resolve network location
// ,
if(this.rejectUnresolvedTopologyDN) {
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
} else {
nodeDescr.setNetworkLocation(
resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
nodeDescr.setDependentHostNames(
getNetworkDependenciesWithDefault(nodeDescr));
}
networktopology.add(nodeDescr);
nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
// register new datanode
addDatanode(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
heartbeatManager.addDatanode(nodeDescr);
incrementVersionCount(nodeReg.getSoftwareVersion());
startDecommissioningIfExcluded(nodeDescr);
success = true;
上記では、三つの登録の場合にそれぞれ処理を行い、新しいノードに登録する場合には、最終的にaddDatanode方法を呼び出して登録し、主にその二つのmapに該当するdatanode情報を追加し、datanodeをネットワークトポロジに追加する./** Add a datanode. */
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
// datanodeMap host2DatanodeMap
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
//
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
name node受信心拍数情報namenodeが心拍数を処理するのはdatanodeと同名の方法sendHeart beatの中で、最終の処理方法はDatanodeManager.handleHeart beatの方法です.
ときめきの具体的な流れは以下の通りです.
1.datanodeの情報を入手して、接続が許可されているかどうかを判断します.もし許可されていないなら、直接に異常を投げます.2.登録したかどうかを判断し、登録していない場合は、登録コマンド3に直接戻ります.datanodeの情報を更新するのは、主にDatanodeDescriptorの情報を更新することです.例えば、使用空間、空きスペースなど.4.セキュリティモードであるかどうかを確認します.リース状況を確認します.6.コピーコマンドを生成します.7.削除コマンドを生成します.8.キャッシュ関連コマンドを生成します.帯域幅関連コマンドを生成します.10.すべてのコマンドを返します.
関連コードは以下の通りです.
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
// datanode
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
//
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
//
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// datanode , ,
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
//
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
//
//check lease recovery
BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
.................................
return new DatanodeCommand[] { brCommand };
}
final List cmds = new ArrayList();
//
//check pending replication
List pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
// ,
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
//
boolean sendingCachingCommands = false;
long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand =
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
DatanodeProtocol.DNA_CACHE, blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
//
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
//
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
return new DatanodeCommand[0];
}