nacos RaftCoreのMasterElectionについて

5412 ワード

シーケンス
本文は主にnacos RaftCoreのMasterElectionを研究する
RaftCore
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
@Component
public class RaftCore {
    
    //......

    @PostConstruct
    public void init() throws Exception {

        Loggers.RAFT.info("initializing Raft sub-system");

        executor.submit(notifier);

        long start = System.currentTimeMillis();

        raftStore.loadDatums(notifier, datums);

        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

        while (true) {
            if (notifier.tasks.size() <= 0) {
                break;
            }
            Thread.sleep(1000L);
        }

        initialized = true;

        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));

        GlobalExecutor.registerMasterElection(new MasterElection());
        GlobalExecutor.registerHeartbeat(new HeartBeat());

        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    }

    //......
}
  • RaftCoreのinitメソッドはGlobalExecuter.registerMasterElection(new MasterElection()によりMasterElection
  • に登録されている.
    GlobalExecutor
    nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java
    public class GlobalExecutor {
    
        //......
    
        public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
    
        public static void registerMasterElection(Runnable runnable) {
            executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
        }
    
        //......
    }
    
  • registerMasterElectionメソッド毎TICK_PERIOD_MSミリ秒スケジューリングrunnable
  • MasterElection
    nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java
        public class MasterElection implements Runnable {
            @Override
            public void run() {
                try {
    
                    if (!peers.isReady()) {
                        return;
                    }
    
                    RaftPeer local = peers.local();
                    local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
    
                    if (local.leaderDueMs > 0) {
                        return;
                    }
    
                    // reset timeout
                    local.resetLeaderDue();
                    local.resetHeartbeatDue();
    
                    sendVote();
                } catch (Exception e) {
                    Loggers.RAFT.warn("[RAFT] error while master election {}", e);
                }
    
            }
    
            public void sendVote() {
    
                RaftPeer local = peers.get(NetUtils.localServer());
                Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
                    JSON.toJSONString(getLeader()), local.term);
    
                peers.reset();
    
                local.term.incrementAndGet();
                local.voteFor = local.ip;
                local.state = RaftPeer.State.CANDIDATE;
    
                Map params = new HashMap<>(1);
                params.put("vote", JSON.toJSONString(local));
                for (final String server : peers.allServersWithoutMySelf()) {
                    final String url = buildURL(server, API_VOTE);
                    try {
                        HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler() {
                            @Override
                            public Integer onCompleted(Response response) throws Exception {
                                if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                    Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
                                    return 1;
                                }
    
                                RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
    
                                Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
    
                                peers.decideLeader(peer);
    
                                return 0;
                            }
                        });
                    } catch (Exception e) {
                        Loggers.RAFT.warn("error while sending vote to server: {}", server);
                    }
                }
            }
        }
    
  • MasterElectionはRunnable法を実現し、そのrun法はpeersでreadyであり、local.leaderDueMsからTICK_を減算する.PERIOD_MSが0以下のときに選挙が始まる.まずresetLeaderDueとresetHeartbeatDueを実行し、sendVoteメソッドを実行します.sendVoteメソッドは、まずpeersをリセットし、localPeerのtermをインクリメントし、voteForを自分のために設定し、stateをRaftPeer.State.CanDIDATEに更新し、最後にpeers.allServersWithoutMySelf()を巡り、自分のvote情報を他のpeerに非同期postする.他のpeerが正常に戻るとpeers.decideLeader(peer)が実行され、1が返されます.そうしないと0
  • が返されます.
    小結
    RaftCoreのinitメソッドは、GlobalExecutor.registerMasterElection(new MasterElection()によってMasterElectionが登録されている.registerMasterElectionメソッドTICK毎PERIOD_MSミリ秒スケジューリングMasterElectionはRunnable法を実現し,run法はpeersでreadyでlocal.leaderDueMsからTICK_を減算した.PERIOD_MSが0以下のときに選挙が始まる.まずresetLeaderDueとresetHeartbeatDueを実行し、sendVoteメソッドを実行します.
    doc
  • RaftCore