ActiveMQソース解析(五):activemqのbrokerクラスタについて


総じてbrokerのクラスタは2種類あり、主従クラスタ(master-slave)と多主クラスタ(network of brokers)があり、apache activemqの公式サイトのドキュメントのclusteringセクションのQueue comsumer clusterをクラスタ配置方式としているのを見て、ドキュメントの意味をよく理解しています.このクラスタはcusumerのクラスタであり、実際の生産環境で一般的に使用されているbrokerクラスタではありません.ここでは,本明細書で述べたactivemqのクラスタがbrokerのクラスタであることを宣言する.
前述したbrokerとbrokerの間の通信も述べたが,brokerとbrokerの間のネットワークブリッジはnetworkパケットの下のクラスによって行われる.
Master-Slave方式
       
(1)共有ファイルシステムに基づく,(2)共有JDBCに基づく,(3)コピー可能なLevelDBに基づく(zookeeper依存)の3つの方式があり,ここでは第1の方式の構成のみを説明する.
persistenceAdapterの構成:

    
       
TransportConnectorの構成:

   

Broker-CRuster方式
全部で2つの方法がある:(1)静的発見方式(2)動的発見方式
静的検出
broker 1の構成

     


    
     
broker 2の構成
 
    


      

動的検出
broker 1の構成

    


    

broker 2の構成

    


    

実際の応用では、例えば電子商取引のアーキテクチャでは、複数のmasterがnetwork of brokerを介して接続され、各masterが複数のslaverに接続されているため、メッセージミドルウェアの可用性を大幅に向上させることができるが、ここでは後述しない.
networkconnectorの接続リモートbrokerのコードを添付し、activemqはデフォルトでDiscoveryNetworkConnectorを使用します.
まず、SimpleDiscoveryAgentクラスのstartメソッドを見てみましょう.
public void start() throws Exception {
        taskRunner = new TaskRunnerFactory();
        taskRunner.init();

        running.set(true);
        for (int i = 0; i < services.length; i++) {
            //     networkconnector add service  ,   service   uri
            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
        }
    }

 
次にDiscoveryNetworkConnectorのイベント処理関数onServiceAddを見てみましょう
public void onServiceAdd(DiscoveryEvent event) {
        // Ignore events once we start stopping.
        if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
            return;
        }
        //  url
        String url = event.getServiceName();
        if (url != null) {
            URI uri;
            try {
                uri = new URI(url);
            } catch (URISyntaxException e) {
                LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e);
                return;
            }
            //             (     )
            if (localURI.equals(uri)) {
                LOG.debug("not connecting loopback: {}", uri);
                return;
            }

            if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
                LOG.debug("connectionFilter disallows connection to: {}", uri);
                return;
            }

            // Should we try to connect to that URI?
            if (activeEvents.putIfAbsent(uri, event) != null) {
                LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: {}", uri);
                return;
            }

            URI connectUri = uri;
            try {
                connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
            } catch (URISyntaxException e) {
                LOG.warn("could not apply query parameters: {} to: {}", new Object[]{ parameters, connectUri }, e);
            }

            LOG.info("Establishing network connection from {} to {}", localURI, connectUri);

            Transport remoteTransport;
            Transport localTransport;
            try {
                // Allows the transport to access the broker's ssl configuration.
                SslContext.setCurrentSslContext(getBrokerService().getSslContext());
                try {
                    //    broker
                    remoteTransport = TransportFactory.connect(connectUri);
                } catch (Exception e) {
                    LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
                    LOG.debug("Connection failure exception: ", e);
                    activeEvents.remove(uri);
                    return;
                }
                try {
                    //    transport  
                    localTransport = createLocalTransport();
                } catch (Exception e) {
                    ServiceSupport.dispose(remoteTransport);
                    LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
                    LOG.debug("Connection failure exception: ", e);
                    activeEvents.remove(uri);
                    return;
                }
            } finally {
                SslContext.setCurrentSslContext(null);
            }
            //    ,    broker   broker   
            NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
            try {
                synchronized (bridges) {
                    //           
                    bridges.put(uri, bridge);
                }
                bridge.start();
            } catch (Exception e) {
                ServiceSupport.dispose(localTransport);
                ServiceSupport.dispose(remoteTransport);
                LOG.warn("Could not start network bridge between: {} and: {} due to: {}", new Object[]{ localURI, uri, e.getMessage() });
                LOG.debug("Start failure exception: ", e);
                try {
                    // Will remove bridge and active event.
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e1) {
                    LOG.debug("Discovery agent failure while handling failure event: {}", e1.getMessage(), e1);
                }
            }
        }
    }