Zoo KeeperのWatchメカニズムを分析します。
17023 ワード
WatchはZoo Keeperのリリース/購読機能を実現する最も核心的なキャラクターです。我々はZoo Keeperのあるノードの変化に対して後続の処理を行う必要がある場合、Watchに使用する必要がある。
Zoo KeeperのWatchメカニズムは、総じて三つの流れに分けられます。client登録Watch、server処理Watch、clientフィードバックWatch。次に、上記の三つの流れからWatchがどのように働いているかを分析します。
Client登録Watch
ClientにはZKWatch Managerが登録されているすべてのWatchを保存します。
1.標準のコンストラクタを使って登録します。
public byte[]getData(String path,boot watch,Stt stat)
このbootlean変数はデフォルトのWatchを登録するかどうかを表します。
public byte[]getData(final String path、Watch Watch、Stt stat)
ClientはこのPacketを送信した後、SendThreadスレッドのreadResponse()によって応答を受けて、finish Packetメソッドを呼び出してpacketからWathcerを提出し、ZKWatch Managerに登録します。
クライアント登録Watchは、完全なWatchを登録するのではなく、Request Headerとrequstの二つの属性をネットワーク転送するだけです。
サービスが最後のステップで要求された処理を行う場合、FinalRequest Processe ort.processRequest()方法は、このときの要求がWatch登録が必要かどうかを判断します。
Watchトリガー
上記の2ステップを通じて、WatchとclientとServerの端に存在しますが、Watchはどうやってclientをリセットしますか?
私たちはWatcher Managerのtrigger Watchに入ります。
Client側でコールバックを実行します。
Client端末はClienntCnxnを通じてクライアントの下のネットワーク通信を管理する。また、内部クラスSendThreadは、メッセージの受信および送信を行うために使用される。
SendThread.readRespose()は、コードの一部を切り取りました。
EventThread.queueEvent
EventThread.run()
1.クライアントにはZKWatcher Managerが登録のためのWatch 2を記憶するためのものがあります。クライアントが登録Watchを含む要求を開始したら、まずこの要求を必要登録Watchと表記してから、Packetとしてパッケージし、サービス端末3に送信します。サービス端末が要求を受信したら、登録する必要があるWatchを読み取ります。WatchのようなServerCnxtをWatch Managerに入れます。4.Watch対応のノードにEventイベントが発生した場合、サービス端末から通知が送信され、クライアントパスにXXEventが発生したと伝えられます。5.クライアントが通知を受け取ったら(SendThreadは通知を受けます)、関連のWatchを探してWatch SetEventPairと包装して、waiting行列に入れます。6.クライアントのEventThreadはRun方法を実行し、Watch SetEventPairをwaitingから取り出し、すべてのWatchを見つけて本格的なコールバック業務の実行を行う。
Zoo KeeperのWatchメカニズムは、総じて三つの流れに分けられます。client登録Watch、server処理Watch、clientフィードバックWatch。次に、上記の三つの流れからWatchがどのように働いているかを分析します。
Client登録Watch
ClientにはZKWatch Managerが登録されているすべてのWatchを保存します。
1.標準のコンストラクタを使って登録します。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
このWatchはセッション全体のデフォルトのWatchとしてdefault Wathcerに保存されます。2.getData、exist方法で登録し、ここでgetDataを例にして分析します。getDataには2つの重負荷の方法があります。public byte[]getData(String path,boot watch,Stt stat)
このbootlean変数はデフォルトのWatchを登録するかどうかを表します。
public byte[]getData(final String path、Watch Watch、Stt stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
//WatchRegistration Watch path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
// request , watcher
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// ReplyHeader
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
Watch RegistrationはWatchとpathの対応関係を保存するために使用されます。Zoo Keeperでは、一番小さい通信ユニットはPacketですので、Watch RegistrationをPacketで包装する必要があります。Packetに包装してから、列に入れて発送を待ちます。ClientはこのPacketを送信した後、SendThreadスレッドのreadResponse()によって応答を受けて、finish Packetメソッドを呼び出してpacketからWathcerを提出し、ZKWatch Managerに登録します。
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
// watcher
p.watchRegistration.register(p.replyHeader.getErr());
}
。。。
}
Watch Registration.register方法。// Wathcer, ZKWatcherManager 。
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map> watches = getWatches(rc);
synchronized(watches) {
Set watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
サービスエンド処理Watchクライアント登録Watchは、完全なWatchを登録するのではなく、Request Headerとrequstの二つの属性をネットワーク転送するだけです。
サービスが最後のステップで要求された処理を行う場合、FinalRequest Processe ort.processRequest()方法は、このときの要求がWatch登録が必要かどうかを判断します。
case OpCode.getData: {
...
// Watcher 。
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
...
}
登録が必要なら、Watchに対応したServerCnxnとpathをWatch Managerに格納します。Watchトリガー
上記の2ステップを通じて、WatchとclientとServerの端に存在しますが、Watchはどうやってclientをリセットしますか?
私たちはWatcher Managerのtrigger Watchに入ります。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
// WatchedEvent
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
// Path Watcher
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
// Watcher , Watcher
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// process Watcher
w.process(e);
}
return watchers;
}
私たちはprocessの方法を見て何をしましたか?abstract public void process(WatchedEvent event);
ウォッチのprocessはabstract方法です。私たちはServerCnxnがimplement Watchインターフェースであることを知っています。また、NIOSTER CnxnがServerCnxnを実現しました。だからNIOSTerverCnxnのプロcessを見てみます。synchronized public void process(WatchedEvent event) {
// -1,
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
// WatcherEvent ,
WatcherEvent e = event.getWrapper();
//
sendResponse(h, e, "notification");
}
これにより、processはWatchの業務方法を本当に実行するのではなく、通知の役割を果たしていることが分かります。本当にコールバック業務ロジックを実行するのはClient側です。Client側でコールバックを実行します。
Client端末はClienntCnxnを通じてクライアントの下のネットワーク通信を管理する。また、内部クラスSendThreadは、メッセージの受信および送信を行うために使用される。
SendThread.readRespose()は、コードの一部を切り取りました。
if (replyHdr.getXid() == -1) {
// -1 means notification
//-1 , process
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
// WatcherEvent
event.deserialize(bbia, "response");
// convert from a server path to a client path
// chrootPath ,
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else
event.setPath(serverPath.substring(chrootPath.length()));
}
//WatcherEvent WatchedEvent
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
// WatchedEvent eventThread
eventThread.queueEvent( we );
return;
}
これから分かるように、SendThreadは通知を受けるだけで、本当のWatchがReadを実行するのはeventThreadの中です。EventThread.queueEvent
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
// WatchedEvent watchers
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
// waitingEvents
waitingEvents.add(pair);
}
このWatch edventに関するすべてのwatch ersを見つけたら、watch ersをwaitingEventsの列に入れます。EventThreadのrunは常に列から任務を取り出して実行します。EventThread.run()
@Override
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// event
processEvent(event);
}
......
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
// Watcher, process
//
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
....
流れのまとめ1.クライアントにはZKWatcher Managerが登録のためのWatch 2を記憶するためのものがあります。クライアントが登録Watchを含む要求を開始したら、まずこの要求を必要登録Watchと表記してから、Packetとしてパッケージし、サービス端末3に送信します。サービス端末が要求を受信したら、登録する必要があるWatchを読み取ります。WatchのようなServerCnxtをWatch Managerに入れます。4.Watch対応のノードにEventイベントが発生した場合、サービス端末から通知が送信され、クライアントパスにXXEventが発生したと伝えられます。5.クライアントが通知を受け取ったら(SendThreadは通知を受けます)、関連のWatchを探してWatch SetEventPairと包装して、waiting行列に入れます。6.クライアントのEventThreadはRun方法を実行し、Watch SetEventPairをwaitingから取り出し、すべてのWatchを見つけて本格的なコールバック業務の実行を行う。