Elasticsearchクライアントソース分析
16728 ワード
背景
クライアント起動の大まかな流れを簡単に紹介します.ここでクライアントとESクラスタを接続する方法はTCP形式で接続されています.今回の分析はElasticsearch-2.3.4のバージョンに基づいています.
初期化
TCP形式でESクラスタに接続するには、いくつかのパラメータを指定する必要があります. cluster.name:ESクラスタの名前 接続ノードIPおよびポート(複数可) を初期化する. client.transport.sniff:クライアントがクラスタの状態を嗅ぎ、クラスタ内のすべてのノードアドレスをクライアントに追加できるかどうかは、手動ですべてのノードアドレス を指定する必要はありません.
初期化コードは次のとおりです.
ESがbuilderモードでクライアントを初期化していることがわかります.build()メソッドに簡単について、ESがTransportClientをどのように構築しているかを見てみましょう.
ノードの追加
TransportClient.を通って詳細については、次の点を参照してください.
さらに、ノードとの接続を確立する際に、「軽い」と「重い」の接続方法を区別します.軽接続方式:軽接続方式の接続は、クライアントが初期化するときにESクラスタの最初のいくつかのノードを接続するだけで、クラスタ内の他のノード を嗅ぐために使用されることが多い.再接続の方式:タイプを区別する必要があるのは、主にビッグデータ量の要求に対して時間がかかることが多く、一般的な要求に影響を与える可能性があるため、一定の資源隔離 を行う必要がある.
ふかへいこう
「再接続」のノードリストから、ポーリング(Round Robbin)によって要求が送信されます.コードは次のとおりです.
まとめ
ここまで、ESクライアントの起動プロセスを一度整理しました.
クライアント起動の大まかな流れを簡単に紹介します.ここでクライアントとESクラスタを接続する方法はTCP形式で接続されています.今回の分析はElasticsearch-2.3.4のバージョンに基づいています.
初期化
TCP形式でESクラスタに接続するには、いくつかのパラメータを指定する必要があります.
初期化コードは次のとおりです.
Settings settings = Settings.settingsBuilder().put("cluster.name", ES_CLUSTER_NAME).put("client.transport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
for (String esAddressPerNode : ES_ADDRESS.split("\\,")) {
try {
//
client = ((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esAddressPerNode), ES_PORT));
} catch (UnknownHostException e) {
log.warn("client.addTransportAddress error", e);
}
}
log.info("connectedNodes:" + ((TransportClient)client).connectedNodes());
ESがbuilderモードでクライアントを初期化していることがわかります.build()メソッドに簡単について、ESがTransportClientをどのように構築しているかを見てみましょう.
public TransportClient build() {
Settings settings = InternalSettingsPreparer.prepareSettings(this.settings);
settings = settingsBuilder()
.put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval
.put(settings)
.put("network.server", false) // Netty
.put("node.client", true) // ,ES :master、data、ingest(5.0 , index )
.put(CLIENT_TYPE_SETTING, CLIENT_TYPE) // =transport
.build();
PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses);
this.settings = pluginsService.updatedSettings();
Version version = Version.CURRENT;
final ThreadPool threadPool = new ThreadPool(settings); // , ES ,
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
//
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
modules.add(new PluginsModule(pluginsService));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule(namedWriteableRegistry));
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportModule(this.settings, namedWriteableRegistry));
modules.add(new SearchModule() {
@Override
protected void configure() {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));
pluginsService.processModules(modules);
Injector injector = modules.createInjector();
// Guice TransportService
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start(); // transportService, , TransportService.doStart()
transportService.acceptIncomingRequests(); // CountDownLatch
TransportClient transportClient = new TransportClient(injector); // Client
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
TransportService.class {
protected void doStart() {
...
transport.start(); // transport=NettyTransport, NettyTransport
...
}
}
NettyTransport.class {
protected void doStart() {
boolean success = false;
try {
clientBootstrap = createClientBootstrap(); // Netty
if (settings.getAsBoolean("network.server", true)) { // Netty , network.server=false
...
...
}
success = true;
} finally {
if (success == false) {
doStop();
}
}
}
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
} else { // NIO
int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
bossCount,
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
... //Netty
return clientBootstrap; //
}
}
}
// TransportClient
TransportClient.class {
private TransportClient(Injector injector) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class);
}
}
ノードの追加
TransportClient.を通って詳細については、次の点を参照してください.
TransportClient.class {
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
public TransportClient addTransportAddress(TransportAddress transportAddress) {
nodesService.addTransportAddresses(transportAddress);
return this;
}
}
// TransportClientNodesService
TransportClientNodesService.class {
public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
synchronized (mutex) {
List filtered = new ArrayList<>(transportAddresses.length);
for (TransportAddress transportAddress : transportAddresses) {
boolean found = false;
for (DiscoveryNode otherNode : listedNodes) {
if (otherNode.address().equals(transportAddress)) {
found = true;
logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
break;
}
}
if (!found) {
filtered.add(transportAddress);
}
}
if (filtered.isEmpty()) {
return this;
}
List builder = new ArrayList<>();
builder.addAll(listedNodes());
for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
logger.debug("adding address [{}]", node);
builder.add(node);
}
listedNodes = Collections.unmodifiableList(builder);
// ,
nodesSampler.sample();
}
return this;
}
}
// nodesSampler.sample() , SniffNodesSampler.doSample()
class SniffNodesSampler extends NodeSampler {
@Override
protected void doSample() {
//listedNodes
//nodes
Set nodesToPing = Sets.newHashSet();
for (DiscoveryNode node : listedNodes) {
nodesToPing.add(node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.add(node);
}
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap clusterStateResponses = ConcurrentCollections.newConcurrentMap();
for (final DiscoveryNode listedNode : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
try {
if (!transportService.nodeConnected(listedNode)) { //
try {
if (nodes.contains(listedNode)) {
logger.trace("connecting to cluster node [{}]", listedNode);
transportService.connectToNode(listedNode); // ,
} else {
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode); // , ,
}
} catch (Exception e) {
logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
latch.countDown();
return;
}
}
// , :ES Master , Master
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new BaseTransportResponseHandler() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(listedNode, response);
latch.countDown();
}
@Override
public void handleException(TransportException e) {
logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
latch.countDown();
}
});
} catch (Throwable e) {
logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
return;
}
HashSet newNodes = new HashSet<>(); //
HashSet newFilteredNodes = new HashSet<>(); // ES
for (Map.Entry entry : clusterStateResponses.entrySet()) {
if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", entry.getValue().getState().nodes().localNode(), clusterName);
newFilteredNodes.add(entry.getKey());
continue;
}
for (ObjectCursor cursor : entry.getValue().getState().nodes().dataNodes().values()) {
newNodes.add(cursor.value);
}
}
nodes = validateNewNodes(newNodes); // , ,
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
//
}
//TransportClientNodesService , , 5s
class ScheduledNodeSampler implements Runnable {
@Override
public void run() {
try {
nodesSampler.sample(); // SniffNodesSampler.doSample()
} catch (Exception e) {
logger.warn("failed to sample", e);
}
}
}
}
さらに、ノードとの接続を確立する際に、「軽い」と「重い」の接続方法を区別します.
protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connect = clientBootstrap.connect(address);
connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connect.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause());
}
Channel[] channels = new Channel[1];
channels[0] = connect.getChannel();
channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels, channels, channels);
}
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
//
ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; //size=2
ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; //size=3
ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; //size=6
ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; //size=1
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; //size=1
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectState.length; i++) {
connectState[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}
...
}
ふかへいこう
「再接続」のノードリストから、ポーリング(Round Robbin)によって要求が送信されます.コードは次のとおりです.
private final AtomicInteger randomNodeGenerator = new AtomicInteger();
public void execute(NodeListenerCallback callback, ActionListener listener) {
List nodes = this.nodes;
int index = getNodeNumber();
DiscoveryNode node = nodes.get((index) % nodes.size());
...
}
private int getNodeNumber() {
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
return index;
}
まとめ
ここまで、ESクライアントの起動プロセスを一度整理しました.