Elasticsearchクライアントソース分析

16728 ワード

背景
クライアント起動の大まかな流れを簡単に紹介します.ここでクライアントとESクラスタを接続する方法はTCP形式で接続されています.今回の分析はElasticsearch-2.3.4のバージョンに基づいています.
初期化
TCP形式でESクラスタに接続するには、いくつかのパラメータを指定する必要があります.
  • cluster.name:ESクラスタの名前
  • 接続ノードIPおよびポート(複数可)
  • を初期化する.
  • client.transport.sniff:クライアントがクラスタの状態を嗅ぎ、クラスタ内のすべてのノードアドレスをクライアントに追加できるかどうかは、手動ですべてのノードアドレス
  • を指定する必要はありません.
    初期化コードは次のとおりです.
    Settings settings = Settings.settingsBuilder().put("cluster.name", ES_CLUSTER_NAME).put("client.transport.sniff", true).build();
    client = TransportClient.builder().settings(settings).build();
    for (String esAddressPerNode : ES_ADDRESS.split("\\,")) {
       try {
          //       
          client = ((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esAddressPerNode), ES_PORT));
       } catch (UnknownHostException e) {
          log.warn("client.addTransportAddress error", e);
       }
    }
    log.info("connectedNodes:" + ((TransportClient)client).connectedNodes());
    

    ESがbuilderモードでクライアントを初期化していることがわかります.build()メソッドに簡単について、ESがTransportClientをどのように構築しているかを見てみましょう.
    public TransportClient build() {
        Settings settings = InternalSettingsPreparer.prepareSettings(this.settings);
        settings = settingsBuilder()
                .put(NettyTransport.PING_SCHEDULE, "5s") // enable by default the transport schedule ping interval
                .put(settings)
                .put("network.server", false) //      Netty     
                .put("node.client", true) //         ,ES                 :master、data、ingest(5.0      ,      index     )
                .put(CLIENT_TYPE_SETTING, CLIENT_TYPE) //     =transport
                .build();
     
        PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses);
        this.settings = pluginsService.updatedSettings();
     
        Version version = Version.CURRENT;
     
        final ThreadPool threadPool = new ThreadPool(settings); //         ,    ES                ,          
        NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
     
        boolean success = false;
        try {
            //               
            ModulesBuilder modules = new ModulesBuilder();
            modules.add(new Version.Module(version));
            // plugin modules must be added here, before others or we can get crazy injection errors...
            for (Module pluginModule : pluginsService.nodeModules()) {
                modules.add(pluginModule);
            }
            modules.add(new PluginsModule(pluginsService));
            modules.add(new SettingsModule(this.settings));
            modules.add(new NetworkModule(namedWriteableRegistry));
            modules.add(new ClusterNameModule(this.settings));
            modules.add(new ThreadPoolModule(threadPool));
            modules.add(new TransportModule(this.settings, namedWriteableRegistry));
            modules.add(new SearchModule() {
                @Override
                protected void configure() {
                    // noop
                }
            });
            modules.add(new ActionModule(true));
            modules.add(new ClientTransportModule());
            modules.add(new CircuitBreakerModule(this.settings));
     
            pluginsService.processModules(modules);
     
            Injector injector = modules.createInjector();
            //  Guice  TransportService
            final TransportService transportService = injector.getInstance(TransportService.class);
            transportService.start(); //  transportService,     ,     TransportService.doStart()
            transportService.acceptIncomingRequests(); //       CountDownLatch
     
            TransportClient transportClient = new TransportClient(injector); //   Client  
            success = true;
            return transportClient;
        } finally {
            if (!success) {
                ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
            }
        }
    }
      
    TransportService.class {
        protected void doStart() {
            ...
            transport.start();  //   transport=NettyTransport,       NettyTransport      
            ...
        }
    }
      
    NettyTransport.class {
        protected void doStart() {
            boolean success = false;
            try {
                clientBootstrap = createClientBootstrap(); //  Netty       
                if (settings.getAsBoolean("network.server", true)) { //      Netty    ,       network.server=false
                    ...
                    ...
                }
                success = true;
            } finally {
                if (success == false) {
                    doStop();
                }
            }
        }
      
        private ClientBootstrap createClientBootstrap() {
            if (blockingClient) {
                clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings,                 TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
            } else { //    NIO  
                int bossCount = settings.getAsInt("transport.netty.boss_count", 1);
                clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
                    Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
                    bossCount,
                    new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
                    new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
                ... //Netty       
                return clientBootstrap; //             
            }
        }
    }
    //      TransportClient
    TransportClient.class {
        private TransportClient(Injector injector) {
            super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
            this.injector = injector;
            nodesService = injector.getInstance(TransportClientNodesService.class);
            proxy = injector.getInstance(TransportProxyClient.class);
        }
    }
    

    ノードの追加
    TransportClient.を通って詳細については、次の点を参照してください.
    TransportClient.class {
        private final TransportClientNodesService nodesService;
        private final TransportProxyClient proxy;
        public TransportClient addTransportAddress(TransportAddress transportAddress) {
            nodesService.addTransportAddresses(transportAddress);
            return this;
        }
    }
      
    //    TransportClientNodesService   
    TransportClientNodesService.class {
        public TransportClientNodesService addTransportAddresses(TransportAddress... transportAddresses) {
            synchronized (mutex) {
                List filtered = new ArrayList<>(transportAddresses.length);
                for (TransportAddress transportAddress : transportAddresses) {
                    boolean found = false;
                    for (DiscoveryNode otherNode : listedNodes) {
                        if (otherNode.address().equals(transportAddress)) {
                            found = true;
                            logger.debug("address [{}] already exists with [{}], ignoring...", transportAddress, otherNode);
                            break;
                        }
                    }
                    if (!found) {
                        filtered.add(transportAddress);
                    }
                }
                if (filtered.isEmpty()) {
                    return this;
                }
                List builder = new ArrayList<>();
                builder.addAll(listedNodes());
                for (TransportAddress transportAddress : filtered) {
                    DiscoveryNode node = new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress, minCompatibilityVersion);
                    logger.debug("adding address [{}]", node);
                    builder.add(node);
                }
                listedNodes = Collections.unmodifiableList(builder);
                //                     ,                 
                nodesSampler.sample();
            }
            return this;
        }
    }
      
    //    nodesSampler.sample()   ,   SniffNodesSampler.doSample()
    class SniffNodesSampler extends NodeSampler {
     
        @Override
        protected void doSample() {
            //listedNodes               
            //nodes            
            Set nodesToPing = Sets.newHashSet();
            for (DiscoveryNode node : listedNodes) {
                nodesToPing.add(node);
            }
            for (DiscoveryNode node : nodes) {
                nodesToPing.add(node);
            }
     
            final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
            final ConcurrentMap clusterStateResponses = ConcurrentCollections.newConcurrentMap();
            for (final DiscoveryNode listedNode : nodesToPing) {
                threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            if (!transportService.nodeConnected(listedNode)) { //            
                                try {
                                    if (nodes.contains(listedNode)) {
                                        logger.trace("connecting to cluster node [{}]", listedNode);
                                        transportService.connectToNode(listedNode); //      ,         
                                    } else {
                                        logger.trace("connecting to listed node (light) [{}]", listedNode);
                                        transportService.connectToNodeLight(listedNode); //     ,        ,            
                                    }
                                } catch (Exception e) {
                                    logger.debug("failed to connect to node [{}], ignoring...", e, listedNode);
                                    latch.countDown();
                                    return;
                                }
                            }
                            //             ,        :ES                     Master  ,  Master             
                            transportService.sendRequest(listedNode, ClusterStateAction.NAME,
                                    headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
                                    TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
                                    new BaseTransportResponseHandler() {
     
                                        @Override
                                        public ClusterStateResponse newInstance() {
                                            return new ClusterStateResponse();
                                        }
     
                                        @Override
                                        public String executor() {
                                            return ThreadPool.Names.SAME;
                                        }
     
                                        @Override
                                        public void handleResponse(ClusterStateResponse response) {
                                            clusterStateResponses.put(listedNode, response);
                                            latch.countDown();
                                        }
     
                                        @Override
                                        public void handleException(TransportException e) {
                                            logger.info("failed to get local cluster state for {}, disconnecting...", e, listedNode);
                                            transportService.disconnectFromNode(listedNode);
                                            latch.countDown();
                                        }
                                    });
                        } catch (Throwable e) {
                            logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
                            transportService.disconnectFromNode(listedNode);
                            latch.countDown();
                        }
                    }
                });
            }
     
            try {
                latch.await();
            } catch (InterruptedException e) {
                return;
            }
     
            HashSet newNodes = new HashSet<>(); //          
            HashSet newFilteredNodes = new HashSet<>(); //  ES     
            for (Map.Entry entry : clusterStateResponses.entrySet()) {
                if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
                    logger.warn("node {} not part of the cluster {}, ignoring...", entry.getValue().getState().nodes().localNode(), clusterName);
                    newFilteredNodes.add(entry.getKey());
                    continue;
                }
                for (ObjectCursor cursor : entry.getValue().getState().nodes().dataNodes().values()) {
                    newNodes.add(cursor.value);
                }
            }
     
            nodes = validateNewNodes(newNodes); //        ,                ,     
            filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
            //                                
        }
      
        //TransportClientNodesService                 ,               ,   5s  
        class ScheduledNodeSampler implements Runnable {
            @Override
            public void run() {
                try {
                    nodesSampler.sample(); //     SniffNodesSampler.doSample()
                } catch (Exception e) {
                    logger.warn("failed to sample", e);
                }
            }
        }
    }
    

    さらに、ノードとの接続を確立する際に、「軽い」と「重い」の接続方法を区別します.
  • 軽接続方式:軽接続方式の接続は、クライアントが初期化するときにESクラスタの最初のいくつかのノードを接続するだけで、クラスタ内の他のノード
  • を嗅ぐために使用されることが多い.
    protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
        ChannelFuture connect = clientBootstrap.connect(address);
        connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
        if (!connect.isSuccess()) {
            throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connect.getCause());
        }
        Channel[] channels = new Channel[1];
        channels[0] = connect.getChannel();
        channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
        return new NodeChannels(channels, channels, channels, channels, channels);
    }
    
  • 再接続の方式:タイプを区別する必要があるのは、主にビッグデータ量の要求に対して時間がかかることが多く、一般的な要求に影響を与える可能性があるため、一定の資源隔離
  • を行う必要がある.
    protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
        //             
        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; //size=2
        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; //size=3
        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; //size=6
        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; //size=1
        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; //size=1
        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
        for (int i = 0; i < connectRecovery.length; i++) {
            connectRecovery[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectBulk.length; i++) {
            connectBulk[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectReg.length; i++) {
            connectReg[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectState.length; i++) {
            connectState[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectPing.length; i++) {
            connectPing[i] = clientBootstrap.connect(address);
        }
     
        ...
    }
    

    ふかへいこう
    「再接続」のノードリストから、ポーリング(Round Robbin)によって要求が送信されます.コードは次のとおりです.
    private final AtomicInteger randomNodeGenerator = new AtomicInteger();
      
    public  void execute(NodeListenerCallback callback, ActionListener listener) {
        List nodes = this.nodes;
        int index = getNodeNumber();
        DiscoveryNode node = nodes.get((index) % nodes.size());
        ...
    }
      
    private int getNodeNumber() {
        int index = randomNodeGenerator.incrementAndGet();
        if (index < 0) {
            index = 0;
            randomNodeGenerator.set(0);
        }
        return index;
    }
    

    まとめ
    ここまで、ESクライアントの起動プロセスを一度整理しました.