artemisのHAManagerについて

16414 ワード

シーケンス
本文は主にartemisのHAManagerを研究する
HAManager
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAManager.java
public interface HAManager extends ActiveMQComponent {

   /**
    * return the current backup servers
    *
    * @return the backups
    */
   Map getBackupServers();
}
  • HAManagerはActiveMQComponentインタフェースを継承し、getBackupServerメソッド
  • を定義します.
    StandaloneHAManager
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/StandaloneHAManager.java
    public class StandaloneHAManager implements HAManager {
    
       Map servers = new HashMap<>();
    
       boolean isStarted = false;
    
       @Override
       public Map getBackupServers() {
          return servers;
       }
    
       @Override
       public void start() throws Exception {
          if (isStarted)
             return;
          isStarted = true;
       }
    
       @Override
       public void stop() throws Exception {
          if (!isStarted)
             return;
          isStarted = false;
       }
    
       @Override
       public boolean isStarted() {
          return isStarted;
       }
    }
  • StandaloneHAManagerはHAManagerインタフェースを実現し、getBackupServerメソッドは空のmap
  • を返します.
    ColocatedHAManager
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java
    public class ColocatedHAManager implements HAManager {
    
       private final ColocatedPolicy haPolicy;
    
       private final ActiveMQServer server;
    
       private final Map backupServers = new HashMap<>();
    
       private boolean started;
    
       public ColocatedHAManager(ColocatedPolicy haPolicy, ActiveMQServer activeMQServer) {
          this.haPolicy = haPolicy;
          server = activeMQServer;
       }
    
       /**
        * starts the HA manager.
        */
       @Override
       public void start() {
          if (started)
             return;
    
          server.getActivation().haStarted();
    
          started = true;
       }
    
       /**
        * stop any backups
        */
       @Override
       public void stop() {
          for (ActiveMQServer activeMQServer : backupServers.values()) {
             try {
                activeMQServer.stop();
             } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
             }
          }
          backupServers.clear();
          started = false;
       }
    
       @Override
       public boolean isStarted() {
          return started;
       }
    
       public synchronized boolean activateBackup(int backupSize,
                                                  String journalDirectory,
                                                  String bindingsDirectory,
                                                  String largeMessagesDirectory,
                                                  String pagingDirectory,
                                                  SimpleString nodeID) throws Exception {
          if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) {
             return false;
          }
          if (haPolicy.getBackupPolicy().isSharedStore()) {
             return activateSharedStoreBackup(journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory);
          } else {
             return activateReplicatedBackup(nodeID);
          }
       }
    
       /**
        * return the current backup servers
        *
        * @return the backups
        */
       @Override
       public Map getBackupServers() {
          return backupServers;
       }
    
       /**
        * send a request to a live server to start a backup for us
        *
        * @param connectorPair the connector for the node to request a backup from
        * @param backupSize    the current size of the requested nodes backups
        * @param replicated
        * @return true if the request wa successful.
        * @throws Exception
        */
       public boolean requestBackup(Pair connectorPair,
                                    int backupSize,
                                    boolean replicated) throws Exception {
          ClusterController clusterController = server.getClusterManager().getClusterController();
          try
             (
                ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA());
             ) {
             clusterControl.authorize();
             if (replicated) {
                return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID());
             } else {
                return clusterControl.requestSharedStoreBackup(backupSize, server.getConfiguration().getJournalLocation().getAbsolutePath(), server.getConfiguration().getBindingsLocation().getAbsolutePath(), server.getConfiguration().getLargeMessagesLocation().getAbsolutePath(), server.getConfiguration().getPagingLocation().getAbsolutePath());
    
             }
          }
       }
    
       private synchronized boolean activateSharedStoreBackup(String journalDirectory,
                                                              String bindingsDirectory,
                                                              String largeMessagesDirectory,
                                                              String pagingDirectory) throws Exception {
          Configuration configuration = server.getConfiguration().copy();
          ActiveMQServer backup = server.createBackupServer(configuration);
          try {
             int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
             String name = "colocated_backup_" + backupServers.size() + 1;
             //make sure we don't restart as we are colocated
             haPolicy.getBackupPolicy().setRestartBackup(false);
             //set the backup policy
             backup.setHAPolicy(haPolicy.getBackupPolicy());
             updateSharedStoreConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory, haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
    
             backupServers.put(configuration.getName(), backup);
             backup.start();
          } catch (Exception e) {
             backup.stop();
             ActiveMQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e);
             return false;
          }
          ActiveMQServerLogger.LOGGER.activatingSharedStoreSlave();
          return true;
       }
    
       /**
        * activate a backup server replicating from a specified node.
        *
        * decline and the requesting server can cast a re vote
        *
        * @param nodeID the id of the node to replicate from
        * @return true if the server was created and started
        * @throws Exception
        */
       private synchronized boolean activateReplicatedBackup(SimpleString nodeID) throws Exception {
          final TopologyMember member;
          try {
             member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());
             if (!Objects.equals(member.getBackupGroupName(), haPolicy.getBackupPolicy().getBackupGroupName())) {
                return false;
             }
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
             return false;
          }
          Configuration configuration = server.getConfiguration().copy();
          ActiveMQServer backup = server.createBackupServer(configuration);
          try {
             int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);
             String name = "colocated_backup_" + backupServers.size() + 1;
             //make sure we don't restart as we are colocated
             haPolicy.getBackupPolicy().setRestartBackup(false);
             //set the backup policy
             backup.setHAPolicy(haPolicy.getBackupPolicy());
             updateReplicatedConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
             backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member);
             backupServers.put(configuration.getName(), backup);
             backup.start();
          } catch (Exception e) {
             backup.stop();
             ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
             return false;
          }
          ActiveMQServerLogger.LOGGER.activatingReplica(nodeID);
          return true;
       }
    
       //......
    }
  • ColocatedHAManagerはHAManagerインタフェースを実現し、getBackupServerメソッドはbackupServerを返します.a c t i v a t e SharedStoreBackupメソッドおよびa c t i v a t e Replicated Backupメソッドは、server.createBackupServer(configuration)によってbackupを作成し、backupServerに追加します.activateBackupメソッドは、haPolicy.getBackupPolicy()に従ってactivateSharedStoreBackupまたはactivateReplicatedBackupメソッド
  • を実行することを選択する.
    ColocatedPolicy
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
    public class ColocatedPolicy implements HAPolicy {
    
       /*live stuff*/
       private boolean requestBackup = ActiveMQDefaultConfiguration.isDefaultHapolicyRequestBackup();
    
       private int backupRequestRetries = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();
    
       private long backupRequestRetryInterval = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();
    
       private int maxBackups = ActiveMQDefaultConfiguration.getDefaultHapolicyMaxBackups();
    
       private int backupPortOffset = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();
    
       /*backup stuff*/
       private List excludedConnectors = new ArrayList<>();
    
       private BackupPolicy backupPolicy;
    
       private HAPolicy livePolicy;
    
       //......
    }
  • ColocatedPolicyは、backupPolicy属性
  • を定義するHAPolicyインターフェースを実現する
    BackupPolicy
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
    public abstract class BackupPolicy implements HAPolicy {
    
       protected ScaleDownPolicy scaleDownPolicy;
       protected boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
    
       public ScaleDownPolicy getScaleDownPolicy() {
          return scaleDownPolicy;
       }
    
       public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) {
          this.scaleDownPolicy = scaleDownPolicy;
       }
    
       @Override
       public boolean isBackup() {
          return true;
       }
    
       @Override
       public String getScaleDownClustername() {
          return null;
       }
    
       @Override
       public String getScaleDownGroupName() {
          return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null;
       }
    
       public boolean isRestartBackup() {
          return restartBackup;
       }
    
       public void setRestartBackup(boolean restartBackup) {
          this.restartBackup = restartBackup;
       }
    }
  • BackupPolicy宣言は、isBackupメソッドがtrueを返すHAPolicyインタフェースを実現した.SharedStoreSlavePolicyとReplicaPolicyの2つの実装クラスがあります.
    SharedStoreSlavePolicy
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java
    public class SharedStoreSlavePolicy extends BackupPolicy {
    
       private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();
    
       private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
    
       private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();
    
       //this is how we act once we have failed over
       private SharedStoreMasterPolicy sharedStoreMasterPolicy;
    
       public SharedStoreSlavePolicy() {
       }
    
       public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,
                                     boolean restartBackup,
                                     boolean allowAutoFailBack,
                                     ScaleDownPolicy scaleDownPolicy) {
          this.failoverOnServerShutdown = failoverOnServerShutdown;
          this.restartBackup = restartBackup;
          this.allowAutoFailBack = allowAutoFailBack;
          this.scaleDownPolicy = scaleDownPolicy;
       }
    
       @Deprecated
       public long getFailbackDelay() {
          return -1;
       }
    
       @Deprecated
       public void setFailbackDelay(long failbackDelay) {
       }
    
       public boolean isFailoverOnServerShutdown() {
          return failoverOnServerShutdown;
       }
    
       public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) {
          this.failoverOnServerShutdown = failoverOnServerShutdown;
       }
    
       public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {
          if (sharedStoreMasterPolicy == null) {
             sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);
          }
          return sharedStoreMasterPolicy;
       }
    
       public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy) {
          this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
       }
    
       @Override
       public boolean isSharedStore() {
          return true;
       }
    
       @Override
       public boolean canScaleDown() {
          return scaleDownPolicy != null;
       }
    
       public boolean isAllowAutoFailBack() {
          return allowAutoFailBack;
       }
    
       public void setAllowAutoFailBack(boolean allowAutoFailBack) {
          this.allowAutoFailBack = allowAutoFailBack;
       }
    
       public void setIsWaitForActivation(boolean isWaitForActivation) {
          this.isWaitForActivation = isWaitForActivation;
       }
    
       @Override
       public Activation createActivation(ActiveMQServerImpl server,
                                          boolean wasLive,
                                          Map activationParams,
                                          ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
          return new SharedStoreBackupActivation(server, this);
       }
    
       @Override
       public String getBackupGroupName() {
          return null;
       }
    }
  • SharedStoreSlavePolicyはBackupPolicyを継承し、isSharedStoreメソッドはtrue
  • を返します.
    ReplicaPolicy
    activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
    public class ReplicaPolicy extends BackupPolicy {
    
       private String clusterName;
    
       private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();
    
       private String groupName = null;
    
       private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();
    
       //used if we create a replicated policy for when we become live.
       private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
    
       private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
    
       /*
       * what quorum size to use for voting
       * */
       private int quorumSize;
    
       /*
       * whether or not this live broker should vote to remain live
       * */
       private boolean voteOnReplicationFailure;
    
       private ReplicatedPolicy replicatedPolicy;
    
       private final NetworkHealthCheck networkHealthCheck;
    
       private int voteRetries;
    
       private long voteRetryWait;
    
       private final int quorumVoteWait;
    
       private long retryReplicationWait;
    
       //......
    
       @Override
       public boolean isRestartBackup() {
          return restartBackup;
       }
    
       @Override
       public void setRestartBackup(boolean restartBackup) {
          this.restartBackup = restartBackup;
       }
    
       @Override
       public boolean isSharedStore() {
          return false;
       }
    
       //......   
    }
  • ReplicaPolicyはBackupPolicyを継承し、isSharedStoreメソッドはfalse
  • を返します.
    小結
    HAManagerはActiveMQComponentインタフェースを継承し、getBackupServerメソッドを定義します.StandaloneHAManagerはHAManagerインタフェースを実現し、getBackupServerメソッドは空のmapを返します.ColocatedHAManagerはHAManagerインタフェースを実現し、getBackupServerメソッドがbackupServerに戻る.a c t i v a t e SharedStoreBackupメソッドおよびa c t i v a t e Replicated Backupメソッドは、server.createBackupServer(configuration)によってbackupを作成し、backupServerに追加します.activateBackupメソッドではhaPolicy.getBackupPolicy()に従ってactivateSharedStoreBackupまたはactivateReplicatedBackupメソッドを選択します
    doc
  • HAManager