JetcdにおけるKeepAliveの実現と使用について

6112 ワード

前言
EtcdのJavaクライアントには多くのオープンソース実装があり、JetcdはEtcd公式倉庫のJavaクライアントであり、全体apiインタフェースの設計実装は公式goクライアントと類似しており、簡潔で使いやすい.このうち、リース継続のインタフェースは、keepAliveOnceとkeepAliveの2つを提供します.機能は名前の通り、keepAliveOnceは単一の継続的なインタフェースであり、リースを維持するには手動でこのインタフェースをトリガする必要があるため、このインタフェースは基本的に使用されません.keepAliveは自動的に契約を継続して生存するインタフェースです.ほとんどのシーンではkeepAliveを使用すればよいが、異なるシーンに対しては、レンタルttlの設定やkeepAlive異常時の処理など、いくつかの問題を考慮する必要がある.Jetcdプロジェクトの住所:https://github.com/etcd-io/jetcd
背景の問題
mysqlベースのbinlogサブスクリプションデータ変更のアプリケーションがあり、オンライン上で非常に重要なアプリケーションがこのサービスに基づいている.単一の障害があるため、jetcdのlock+keepAliveのメカニズムを使用してプライマリ・サービスの秒レベル切替の機能を実現した.具体的には「etcd選択プライマリ・障害プライマリ・秒レベル切替高可用性アーキテクチャ」を参照し、システムがオンライン上で実行された後、binlogのサービス切替はしばしばプライマリ・スタンバイ切替が発生するが、実際にはbinlogのサービスは非常に安定しており、プライマリ・スタンバイ切替サービスをオンラインにする前に、オンラインbinlogサービスがダウンすることは一度も発生したことがない.最後にリースTTLの設定に問題があったことを明らかにした.ここではまず問題と位置づけを投げ出して、次にJetcdのkeepAliveの具体的な実現を見て、それからなぜこの問題を招いたのかを分析します.
KeepAlive実装
まずkeepAliveの使い方を見てみましょう
    private long acquireActiveLease() throws InterruptedException, ExecutionException {
        long leaseId = leaseClient.grant(leaseTTL).get().getID();
        logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
        this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
                cancelTask();
            }
            @Override
            public void onCompleted() {
                logger.info("LeaderSelector lease renewal completed! start canceling task.");
                cancelTask();
            }
        });
        return leaseId;
    }

リースインプリメンテーションはすべてLeaseImplクラスにあり、EtcdClientでLeaseImplインスタンスを取得した後、まずgrantメソッドでttlでリースを取得するidを設定し、次にリースを入参呼び出しkeepAliveメソッドとし、2番目の入参は観察者オブジェクトであり、3つのインタフェースが内蔵されている.それぞれonNext:次のリースの継続時間を決定した後にトリガーされ、onError:継続異常時にトリガーされる.onCompleted:リースが期限切れになった後にトリガーされます.
keepAliveメソッドコード:
  public synchronized CloseableClient keepAlive(long leaseId, StreamObserver observer) {
    if (this.closed) {
      throw newClosedLeaseClientException();
    }

    KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
    keepAlive.addObserver(observer);

    if (!this.hasKeepAliveServiceStarted) {
      this.hasKeepAliveServiceStarted = true;
      this.start();
    }

    return new CloseableClient() {
      @Override
      public void close() {
        keepAlive.removeObserver(observer);
      }
    };
  }

LeaseImpl内部ではLeaseIdをkey,KeepAliveオブジェクトをvalueとするmapがメンテナンスされており,KeepAliveのクラスではStreamObserver集合,期限deadLine,次回の継続時間nextKeepAlive,継続leaseIdがメンテナンスされている.keepAliveメソッドを最初に呼び出すとstartがトリガーされ、継続スレッド(sendKeepAliveExecutor()が起動され、期限切れのスレッド(deadLineExecutor()がチェックされます.
  private void sendKeepAliveExecutor() {
    this.keepAliveResponseObserver = Observers.observer(
      response -> processKeepAliveResponse(response),
      error -> processOnError()
    );
    this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
    this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            // send keep alive req to the leases whose next keep alive is before now.
            this.keepAlives.entrySet().stream()
                .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
                .map(Entry::getKey)
                .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
                .forEach(keepAliveRequestObserver::onNext);
        },
        0,
        500,
        TimeUnit.MILLISECONDS
    );
  }

sendKeepAliveExecutorメソッドはkeepAlive機能全体の実装の核心であり、このメソッドはLeaseImplインスタンスで1回だけトリガーされ、500ミリ秒間隔のタイミングタスクスケジューリングが開始される.keepAlivesからnextkeepAlive時間が現在の時間より小さいKeepAliveオブジェクトをフィルタリングするたびに、再契約がトリガーされます.nextkeepAlive初期化値は、KeepAliveインスタンスを作成する現在の時間であり、その後、継続的な応答フローオブザーバインスタンスでは、processKeepAliveResponseメソッドが実行され、ここでKeepAliveオブジェクトのnextkeepAliveが維持される.
private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
    if (this.closed) {
      return;
    }
    final long leaseID = leaseKeepAliveResponse.getID();
    final long ttl = leaseKeepAliveResponse.getTTL();
    final KeepAlive ka = this.keepAlives.get(leaseID);
    if (ka == null) {
      // return if the corresponding keep alive has closed.
      return;
    }
    if (ttl > 0) {
      long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3;
      ka.setNextKeepAlive(nextKeepAlive);
      ka.setDeadLine(System.currentTimeMillis() + ttl * 1000);
      ka.onNext(leaseKeepAliveResponse);
    } else {
      // lease expired; close all keep alive
      this.removeKeepAlive(leaseID);
      ka.onError(
          newEtcdException(
            ErrorCode.NOT_FOUND,
            "etcdserver: requested lease not found"
          )
      );
    }
  }

最初の継続後の応答処理では、nextKeepAliveが現在の時間にttlの1/3時間を加えた後、すなわち、keyの有効期限を6 sに設定すると、keepAliveを使用する場合の継続期間の間隔は、2 sごとに継続期間が実行されることがわかる.ttlがゼロ未満の場合、keyが期限切れになって削除されたことを示すと、onErrorが直接トリガーされ、requested lease not foundの異常オブジェクトが渡されます.
文末のまとめ
一番上のbinlogのホストが頻繁に切り替える問題に戻ります.ttlの時間を5 s小さく設定しすぎたからです.クライアントとetcdサービスが5 s以上接続されていない限り、期間中に様々な理由でkeepAliveが正常に契約を継続していない可能性があり、プライマリ・スタンバイ・スイッチがトリガーされます.この時binlogサービス自体は何の問題もないが、リーダーシップを失ったため、自殺を選ぶ.後でttlを20 sに調整すると、プライマリ・スタンバイ・スイッチングはそれほど敏感ではありません.もう1つのシーンでは、etcdをサービス登録センターとして使用する場合もkeepAliveを使用し、ttlが20 sに設定されていても、継続契約がない可能性があり、登録されたサービスが期限切れになって削除された場合もあります.この場合、私たちのサービスプロセスは健康です.このシーンでは、onError、onCompletedイベントでリースを再取得し、新しいKeepAliveを追加する必要があります.