ActiveMQソース解析(五):activemqのbrokerクラスタについて
7197 ワード
総じてbrokerのクラスタは2種類あり、主従クラスタ(master-slave)と多主クラスタ(network of brokers)があり、apache activemqの公式サイトのドキュメントのclusteringセクションのQueue comsumer clusterをクラスタ配置方式としているのを見て、ドキュメントの意味をよく理解しています.このクラスタはcusumerのクラスタであり、実際の生産環境で一般的に使用されているbrokerクラスタではありません.ここでは,本明細書で述べたactivemqのクラスタがbrokerのクラスタであることを宣言する.
前述したbrokerとbrokerの間の通信も述べたが,brokerとbrokerの間のネットワークブリッジはnetworkパケットの下のクラスによって行われる.
Master-Slave方式
(1)共有ファイルシステムに基づく,(2)共有JDBCに基づく,(3)コピー可能なLevelDBに基づく(zookeeper依存)の3つの方式があり,ここでは第1の方式の構成のみを説明する.
persistenceAdapterの構成:
TransportConnectorの構成:
Broker-CRuster方式
全部で2つの方法がある:(1)静的発見方式(2)動的発見方式
静的検出
broker 1の構成
broker 2の構成
動的検出
broker 1の構成
broker 2の構成
実際の応用では、例えば電子商取引のアーキテクチャでは、複数のmasterがnetwork of brokerを介して接続され、各masterが複数のslaverに接続されているため、メッセージミドルウェアの可用性を大幅に向上させることができるが、ここでは後述しない.
networkconnectorの接続リモートbrokerのコードを添付し、activemqはデフォルトでDiscoveryNetworkConnectorを使用します.
まず、SimpleDiscoveryAgentクラスのstartメソッドを見てみましょう.
次にDiscoveryNetworkConnectorのイベント処理関数onServiceAddを見てみましょう
前述したbrokerとbrokerの間の通信も述べたが,brokerとbrokerの間のネットワークブリッジはnetworkパケットの下のクラスによって行われる.
Master-Slave方式
(1)共有ファイルシステムに基づく,(2)共有JDBCに基づく,(3)コピー可能なLevelDBに基づく(zookeeper依存)の3つの方式があり,ここでは第1の方式の構成のみを説明する.
persistenceAdapterの構成:
TransportConnectorの構成:
Broker-CRuster方式
全部で2つの方法がある:(1)静的発見方式(2)動的発見方式
静的検出
broker 1の構成
broker 2の構成
動的検出
broker 1の構成
broker 2の構成
実際の応用では、例えば電子商取引のアーキテクチャでは、複数のmasterがnetwork of brokerを介して接続され、各masterが複数のslaverに接続されているため、メッセージミドルウェアの可用性を大幅に向上させることができるが、ここでは後述しない.
networkconnectorの接続リモートbrokerのコードを添付し、activemqはデフォルトでDiscoveryNetworkConnectorを使用します.
まず、SimpleDiscoveryAgentクラスのstartメソッドを見てみましょう.
public void start() throws Exception {
taskRunner = new TaskRunnerFactory();
taskRunner.init();
running.set(true);
for (int i = 0; i < services.length; i++) {
// networkconnector add service , service uri
listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
}
}
次にDiscoveryNetworkConnectorのイベント処理関数onServiceAddを見てみましょう
public void onServiceAdd(DiscoveryEvent event) {
// Ignore events once we start stopping.
if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
return;
}
// url
String url = event.getServiceName();
if (url != null) {
URI uri;
try {
uri = new URI(url);
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: {} due to bad URI syntax: ", url, e);
return;
}
// ( )
if (localURI.equals(uri)) {
LOG.debug("not connecting loopback: {}", uri);
return;
}
if (connectionFilter != null && !connectionFilter.connectTo(uri)) {
LOG.debug("connectionFilter disallows connection to: {}", uri);
return;
}
// Should we try to connect to that URI?
if (activeEvents.putIfAbsent(uri, event) != null) {
LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: {}", uri);
return;
}
URI connectUri = uri;
try {
connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: {} to: {}", new Object[]{ parameters, connectUri }, e);
}
LOG.info("Establishing network connection from {} to {}", localURI, connectUri);
Transport remoteTransport;
Transport localTransport;
try {
// Allows the transport to access the broker's ssl configuration.
SslContext.setCurrentSslContext(getBrokerService().getSslContext());
try {
// broker
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
LOG.debug("Connection failure exception: ", e);
activeEvents.remove(uri);
return;
}
try {
// transport
localTransport = createLocalTransport();
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
LOG.debug("Connection failure exception: ", e);
activeEvents.remove(uri);
return;
}
} finally {
SslContext.setCurrentSslContext(null);
}
// , broker broker
NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
try {
synchronized (bridges) {
//
bridges.put(uri, bridge);
}
bridge.start();
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not start network bridge between: {} and: {} due to: {}", new Object[]{ localURI, uri, e.getMessage() });
LOG.debug("Start failure exception: ", e);
try {
// Will remove bridge and active event.
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
LOG.debug("Discovery agent failure while handling failure event: {}", e1.getMessage(), e1);
}
}
}
}