消費者ハートビートスレッド
14193 ワード
kafka brokerは消費者をグループ管理し,消費者が生きているかどうかを知る必要があるため,クライアントは心拍スレッドを持って心拍数を送信する.
3つの関連パラメータ:
session.timeout.msheartbeat.interval.msmax.poll.interval.ms
AbstractCoordinator.HeartbeatThread
ハートビートコールバック
正常に戻ると、受信心拍数の時間を記録し、Errorsに戻る.REBALANCE_IN_PROGRESSまたはErrors.ILLEGAL_GENERATIONはグループに再加入する必要があります.
3つの関連パラメータ:
session.timeout.msheartbeat.interval.msmax.poll.interval.ms
AbstractCoordinator.HeartbeatThread
@Override
public void run() {
try {
log.debug("Heartbeat thread started");
while (true) {
//
synchronized (AbstractCoordinator.this) {
if (closed)
return;
if (!enabled) {
// enabled false, ,
AbstractCoordinator.this.wait();
continue;
}
if (state != MemberState.STABLE) {
// enabled false
disable();
continue;
}
// io
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// now - lastPoll > maxPollInterval
//
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
//
heartbeat.sentHeartbeat(now);
sendHeartbeatRequest().addListener(new RequestFutureListener() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat(time.milliseconds());
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat(time.milliseconds());
} else {
heartbeat.failHeartbeat();
// wake up the thread if it's sleeping to reschedule the heartbeat
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
} catch (AuthenticationException e) {
log.error("An authentication error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (GroupAuthorizationException e) {
log.error("A group authorization error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread", e);
this.failed.set(new RuntimeException(e));
} catch (Throwable e) {
log.error("Heartbeat thread failed due to unexpected error", e);
if (e instanceof RuntimeException)
this.failed.set((RuntimeException) e);
else
this.failed.set(new RuntimeException(e));
} finally {
log.debug("Heartbeat thread has closed");
}
}
ハートビートコールバック
synchronized RequestFuture sendHeartbeatRequest() {
log.debug("Sending Heartbeat request to coordinator {}", coordinator);
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler());
}
private class HeartbeatResponseHandler extends CoordinatorResponseHandler {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
Errors error = heartbeatResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful Heartbeat response");
future.complete(null);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
coordinator());
markCoordinatorUnknown();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("Attempt to heartbeat failed since group is rebalancing");
requestRejoin();
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
resetGeneration();
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
正常に戻ると、受信心拍数の時間を記録し、Errorsに戻る.REBALANCE_IN_PROGRESSまたはErrors.ILLEGAL_GENERATIONはグループに再加入する必要があります.