消費者ハートビートスレッド

14193 ワード

kafka brokerは消費者をグループ管理し,消費者が生きているかどうかを知る必要があるため,クライアントは心拍スレッドを持って心拍数を送信する.
3つの関連パラメータ:
session.timeout.msheartbeat.interval.msmax.poll.interval.ms
AbstractCoordinator.HeartbeatThread
@Override
public void run() {
    try {
        log.debug("Heartbeat thread started");
        while (true) {
            //    
            synchronized (AbstractCoordinator.this) {
                if (closed)
                    return;

                if (!enabled) {
                    //   enabled   false,   ,    
                    AbstractCoordinator.this.wait();
                    continue;
                }

                if (state != MemberState.STABLE) {
                    //   enabled   false
                    disable();
                    continue;
                }

                //      io
                client.pollNoWakeup();
                long now = time.milliseconds();

                if (coordinatorUnknown()) {
                    if (findCoordinatorFuture != null || lookupCoordinator().failed())
                        AbstractCoordinator.this.wait(retryBackoffMs);
                } else if (heartbeat.sessionTimeoutExpired(now)) {
                    // now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout
                    markCoordinatorUnknown();
                } else if (heartbeat.pollTimeoutExpired(now)) {
                    // now - lastPoll > maxPollInterval
                    //      
                    maybeLeaveGroup();
                } else if (!heartbeat.shouldHeartbeat(now)) {
                    AbstractCoordinator.this.wait(retryBackoffMs);
                } else {
                    //          
                    heartbeat.sentHeartbeat(now);

                    sendHeartbeatRequest().addListener(new RequestFutureListener() {
                        @Override
                        public void onSuccess(Void value) {
                            synchronized (AbstractCoordinator.this) {
                                heartbeat.receiveHeartbeat(time.milliseconds());
                            }
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            synchronized (AbstractCoordinator.this) {
                                if (e instanceof RebalanceInProgressException) {
                                    // it is valid to continue heartbeating while the group is rebalancing. This
                                    // ensures that the coordinator keeps the member in the group for as long
                                    // as the duration of the rebalance timeout. If we stop sending heartbeats,
                                    // however, then the session timeout may expire before we can rejoin.
                                    heartbeat.receiveHeartbeat(time.milliseconds());
                                } else {
                                    heartbeat.failHeartbeat();

                                    // wake up the thread if it's sleeping to reschedule the heartbeat
                                    AbstractCoordinator.this.notify();
                                }
                            }
                        }
                    });
                }
            }
        }
    } catch (AuthenticationException e) {
        log.error("An authentication error occurred in the heartbeat thread", e);
        this.failed.set(e);
    } catch (GroupAuthorizationException e) {
        log.error("A group authorization error occurred in the heartbeat thread", e);
        this.failed.set(e);
    } catch (InterruptedException | InterruptException e) {
        Thread.interrupted();
        log.error("Unexpected interrupt received in heartbeat thread", e);
        this.failed.set(new RuntimeException(e));
    } catch (Throwable e) {
        log.error("Heartbeat thread failed due to unexpected error", e);
        if (e instanceof RuntimeException)
            this.failed.set((RuntimeException) e);
        else
            this.failed.set(new RuntimeException(e));
    } finally {
        log.debug("Heartbeat thread has closed");
    }
}

ハートビートコールバック
synchronized RequestFuture sendHeartbeatRequest() {
    log.debug("Sending Heartbeat request to coordinator {}", coordinator);
    HeartbeatRequest.Builder requestBuilder =
            new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
    return client.send(coordinator, requestBuilder)
            .compose(new HeartbeatResponseHandler());
}

private class HeartbeatResponseHandler extends CoordinatorResponseHandler {
    @Override
    public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
        sensors.heartbeatLatency.record(response.requestLatencyMs());
        Errors error = heartbeatResponse.error();
        if (error == Errors.NONE) {
            log.debug("Received successful Heartbeat response");
            future.complete(null);
        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR) {
            log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                    coordinator());
            markCoordinatorUnknown();
            future.raise(error);
        } else if (error == Errors.REBALANCE_IN_PROGRESS) {
            log.debug("Attempt to heartbeat failed since group is rebalancing");
            requestRejoin();
            future.raise(Errors.REBALANCE_IN_PROGRESS);
        } else if (error == Errors.ILLEGAL_GENERATION) {
            log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
            resetGeneration();
            future.raise(Errors.ILLEGAL_GENERATION);
        } else if (error == Errors.UNKNOWN_MEMBER_ID) {
            log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
            resetGeneration();
            future.raise(Errors.UNKNOWN_MEMBER_ID);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(new GroupAuthorizationException(groupId));
        } else {
            future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
        }
    }
}

正常に戻ると、受信心拍数の時間を記録し、Errorsに戻る.REBALANCE_IN_PROGRESSまたはErrors.ILLEGAL_GENERATIONはグループに再加入する必要があります.