artemisのSharedNothingBackupQuarumについて

14286 ワード

シーケンス
本文は主にartemisのSharedNothingBackupQuarumを研究する
SharedNothingBackupQuorum
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {

   private TransportConfiguration liveTransportConfiguration;

   public enum BACKUP_ACTIVATION {
      FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
   }

   private QuorumManager quorumManager;

   private String targetServerID = "";

   private final NodeManager nodeManager;

   private final StorageManager storageManager;
   private final ScheduledExecutorService scheduledPool;
   private final int quorumSize;

   private final int voteRetries;

   private final long voteRetryWait;

   private final Object voteGuard = new Object();

   private CountDownLatch latch;

   private ClientSessionFactoryInternal sessionFactory;

   private CoreRemotingConnection connection;

   private final NetworkHealthCheck networkHealthCheck;

   private volatile boolean stopped = false;

   private final int quorumVoteWait;

   //......

   @Override
   public void nodeDown(Topology topology, long eventUID, String nodeID) {
      if (targetServerID.equals(nodeID)) {
         decideOnAction(topology);
      }
   }

   @Override
   public void nodeUp(Topology topology) {
      //noop
   }

   /**
    * if the connection to our replicated live goes down then decide on an action
    */
   @Override
   public void connectionFailed(ActiveMQException exception, boolean failedOver) {
      decideOnAction(sessionFactory.getServerLocator().getTopology());
   }

   @Override
   public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
      connectionFailed(me, failedOver);
   }

   @Override
   public void beforeReconnect(ActiveMQException exception) {
      //noop
   }

   public void decideOnAction(Topology topology) {
      //we may get called via multiple paths so need to guard
      synchronized (decisionGuard) {
         if (signal == BACKUP_ACTIVATION.FAIL_OVER) {
            if (networkHealthCheck != null && !networkHealthCheck.check()) {
               signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
            }
            return;
         }
         if (!isLiveDown()) {
            //lost connection but don't know if live is down so restart as backup as we can't replicate any more
            ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
            signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
         } else {
            // live is assumed to be down, backup fails-over
            ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults();
            signal = BACKUP_ACTIVATION.FAIL_OVER;
         }

         /* use NetworkHealthCheck to determine if node is isolated
          * if there are no addresses/urls configured then ignore and rely on quorum vote only
          */
         if (networkHealthCheck != null && !networkHealthCheck.isEmpty()) {
            if (networkHealthCheck.check()) {
               // live is assumed to be down, backup fails-over
               signal = BACKUP_ACTIVATION.FAIL_OVER;
            } else {
               ActiveMQServerLogger.LOGGER.serverIsolatedOnNetwork();
               signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
            }
         }
      }
      latch.countDown();
   }

   private boolean isLiveDown() {
      //lets assume live is not down
      Boolean decision = false;
      int voteAttempts = 0;
      int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;

      synchronized (voteGuard) {
         while (!stopped && voteAttempts++ < voteRetries) {
            //the live is dead so lets vote for quorum
            QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);

            quorumManager.vote(quorumVote);

            try {
               quorumVote.await(quorumVoteWait, TimeUnit.SECONDS);
            } catch (InterruptedException interruption) {
               // No-op. The best the quorum can do now is to return the latest number it has
               ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();
            }

            quorumManager.voteComplete(quorumVote);

            decision = quorumVote.getDecision();

            if (decision) {
               return decision;
            }
            try {
               voteGuard.wait(voteRetryWait);
            } catch (InterruptedException e) {
               //nothing to do here
            }
         }
      }

      return decision;
   }

   public synchronized void reset() {
      latch = new CountDownLatch(1);
   }   

   //......
}
  • SharedNothingBackupQuarumのnodeDownメソッドとconnectionFailedメソッドはいずれもdecideOnActionを実行します.この方法はsignalに対してBACKUP_であるACTIVATION.FAIL_OVERのnetworkHealthCheckがnullでない場合はnetworkHealthCheck.check()を実行し、falseであればsignalをBACKUP_に更新するACTIVATION.FAILURE_REPLICATINGはその後
  • に戻る.
  • signalが他の値であるisLiveDownメソッドを実行し、falseであればsignalをBACKUP_に更新するACTIVATION.FAILURE_REPLICATING、そうでなければsignalをBACKUP_に更新ACTIVATION.FAIL_OVER;最後にnetworkHealthCheckがnullでなく空でない場合にnetworkHealthCheck.check()が実行され、trueに戻るとsignalがBACKUP_に更新されますACTIVATION.FAIL_OVER、そうでなければsignalをBACKUP_に更新ACTIVATION.FAILURE_REPLICATING
  • isLiveDownメソッドQuorumVoteServer Connectを作成し、quorumManager.vote(quorumVote)を実行した後にquorumVote.await(quorumVoteWait,TimeUnit.SECONDS)を行い、最後にquorumManager.voteComplete(quorumVote)を実行し、quorumVote.getDecision()の値を取り、trueであればすぐに戻ります.そうでなければvoteGuard.wait(voteRetryWait)を実行し、再試行します.再試行voteRetries回
  • SharedNothingBackupActivation
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
    public final class SharedNothingBackupActivation extends Activation {
    
       //......
    
       public void run() {
          try {
    
             logger.trace("SharedNothingBackupActivation..start");
             synchronized (activeMQServer) {
                activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
             }
    
             //......
    
             synchronized (this) {
                logger.trace("Entered a synchronized");
                if (closed)
                   return;
                backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());
                activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
                activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));
             }
    
             //......         
    
             SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
             do {
    
                if (closed) {
                   if (logger.isTraceEnabled()) {
                      logger.trace("Activation is closed, so giving up");
                   }
                   return;
                }
    
                if (logger.isTraceEnabled()) {
                   logger.trace("looking up the node through nodeLocator.locateNode()");
                }
                //locate the first live server to try to replicate
                nodeLocator.locateNode();
                Pair possibleLive = nodeLocator.getLiveConfiguration();
                nodeID = nodeLocator.getNodeID();
    
                if (logger.isTraceEnabled()) {
                   logger.trace("nodeID = " + nodeID);
                }
                //in a normal (non failback) scenario if we couldn't find our live server we should fail
                if (!attemptFailBack) {
                   if (logger.isTraceEnabled()) {
                      logger.trace("attemptFailback=false, nodeID=" + nodeID);
                   }
    
                   //this shouldn't happen
                   if (nodeID == null) {
                      logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
                      throw new RuntimeException("Could not establish the connection");
                   }
                   activeMQServer.getNodeManager().setNodeID(nodeID);
                }
    
                if (possibleLive != null) {
                   clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA());
                   if (clusterControl == null) {
                      clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB());
                   }
                } else {
                   clusterControl = null;
                }
                if (clusterControl == null) {
    
                   if (logger.isTraceEnabled()) {
                      logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");
                   }
                   //its ok to retry here since we haven't started replication yet
                   //it may just be the server has gone since discovery
                   Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
                   signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;
                   continue;
                }
    
                activeMQServer.getThreadPool().execute(endpointConnector);
                /**
                 * Wait for a signal from the the quorum manager, at this point if replication has been successful we can
                 * fail over or if there is an error trying to replicate (such as already replicating) we try the
                 * process again on the next live server.  All the action happens inside {@link BackupQuorum}
                 */
                signal = backupQuorum.waitForStatusChange();
    
                if (logger.isTraceEnabled()) {
                   logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");
                }
    
                /**
                 * replicationEndpoint will be holding lots of open files. Make sure they get
                 * closed/sync'ed.
                 */
                ActiveMQServerImpl.stopComponent(replicationEndpoint);
                // time to give up
                if (!activeMQServer.isStarted() || signal == STOP) {
                   if (logger.isTraceEnabled()) {
                      logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal);
                   }
                   return;
                } else if (signal == FAIL_OVER) {
                   // time to fail over
                   if (logger.isTraceEnabled()) {
                      logger.trace("signal == FAIL_OVER, breaking the loop");
                   }
                   break;
                } else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {
                   // something has gone badly run restart from scratch
                   if (logger.isTraceEnabled()) {
                      logger.trace("Starting a new thread to stop the server!");
                   }
    
                   Thread startThread = new Thread(new Runnable() {
                      @Override
                      public void run() {
                         try {
                            if (logger.isTraceEnabled()) {
                               logger.trace("Calling activeMQServer.stop() and start() to restart the server");
                            }
                            activeMQServer.stop();
                            activeMQServer.start();
                         } catch (Exception e) {
                            ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);
                         }
                      }
                   });
                   startThread.start();
                   return;
                }
                //ok, this live is no good, let's reset and try again
                //close this session factory, we're done with it
                clusterControl.close();
                backupQuorum.reset();
                if (replicationEndpoint.getChannel() != null) {
                   replicationEndpoint.getChannel().close();
                   replicationEndpoint.setChannel(null);
                }
             }
             while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
    
             //......
    
          } catch (Exception e) {
             if (logger.isTraceEnabled()) {
                logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e);
             }
             if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())
                // do not log these errors if the server is being stopped.
                return;
             ActiveMQServerLogger.LOGGER.initializationError(e);
          }
       }
    
       //......         
    }
  • SharedNothingBackupActivationのrunメソッドはSharedNothingBackupQuarumを作成し、whileループはbackupQuarum.waitForStatusChange()を実行してsignalがSharedNothingBackupQuarum.BACKUP_にならないまでsignalを更新します.ACTIVATION.ALREADY_REPLICATING;signalでSharedNothingBackupQuarum.BACKUP_ACTIVATION.FAILURE_REPLICATING時にactiveMQServer.stop()およびactiveMQServer.start()を実行します.最後にbackupQuarum.reset()
  • を実行
    小結
    SharedNothingBackupQuarumのnodeDownおよびconnectionFailedメソッドはdecideOnActionを実行します.この方法はsignalに対してBACKUP_であるACTIVATION.FAIL_OVERのnetworkHealthCheckがnullでない場合はnetworkHealthCheck.check()を実行し、falseであればsignalをBACKUP_に更新するACTIVATION.FAILURE_REPLICATINGは次に戻る.signalが他の値のisLiveDownメソッドを実行する場合、falseであればsignalをBACKUP_に更新するACTIVATION.FAILURE_REPLICATING、そうでなければsignalをBACKUP_に更新ACTIVATION.FAIL_OVER;最後にnetworkHealthCheckがnullでなく空でない場合にnetworkHealthCheck.check()が実行され、trueに戻るとsignalがBACKUP_に更新されますACTIVATION.FAIL_OVER、そうでなければsignalをBACKUP_に更新ACTIVATION.FAILURE_REPLICATING
    doc
  • SharedNothingBackupQuorum