ElasticsearchのRoundRobinSupplierについて

7088 ワード

シーケンス
本文は主にElasticsearchのRoundRobinSupplierを研究する
RoundRobinSupplier
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java
final class RoundRobinSupplier implements Supplier {

    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);
    private volatile S[] selectors;
    private AtomicInteger counter = new AtomicInteger(0);

    RoundRobinSupplier() {
        this.selectors = null;
    }

    RoundRobinSupplier(S[] selectors) {
        this.selectors = selectors;
        this.selectorsSet.set(true);
    }

    @Override
    public S get() {
        S[] selectors = this.selectors;
        return selectors[counter.getAndIncrement() % selectors.length];
    }

    void setSelectors(S[] selectors) {
        if (selectorsSet.compareAndSet(false, true)) {
            this.selectors = selectors;
        } else {
            throw new AssertionError("Selectors already set. Should only be set once.");
        }
    }

    int count() {
        return selectors.length;
    }
}
  • RoundRobinSupplierはSupplierインタフェースを実現し、getメソッドはcounter.getAndIncrement() % selectors.lengthを使用してselectors配列の下付き文字を選択し、下付き文字の値
  • を返す.
    NioSelectorGroup
    elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
    public class NioSelectorGroup implements NioGroup {
    
        private final List dedicatedAcceptors;
        private final RoundRobinSupplier acceptorSupplier;
    
        private final List selectors;
        private final RoundRobinSupplier selectorSupplier;
    
        private final AtomicBoolean isOpen = new AtomicBoolean(true);
    
        //......
    
        public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory,
                                int selectorCount, Function, EventHandler> eventHandlerFunction) throws IOException {
            dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount);
            selectors = new ArrayList<>(selectorCount);
    
            try {
                List> suppliersToSet = new ArrayList<>(selectorCount);
                for (int i = 0; i < selectorCount; ++i) {
                    RoundRobinSupplier supplier = new RoundRobinSupplier<>();
                    suppliersToSet.add(supplier);
                    NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier));
                    selectors.add(selector);
                }
                for (RoundRobinSupplier supplierToSet : suppliersToSet) {
                    supplierToSet.setSelectors(selectors.toArray(new NioSelector[0]));
                    assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list.";
                }
    
                for (int i = 0; i < dedicatedAcceptorCount; ++i) {
                    RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                    NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier));
                    dedicatedAcceptors.add(acceptor);
                }
    
                if (dedicatedAcceptorCount != 0) {
                    acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0]));
                } else {
                    acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                }
                selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0]));
                assert selectorCount == selectors.size() : "We need to have created all the selectors at this point.";
                assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point.";
    
                startSelectors(selectors, selectorThreadFactory);
                startSelectors(dedicatedAcceptors, acceptorThreadFactory);
            } catch (Exception e) {
                try {
                    close();
                } catch (Exception e1) {
                    e.addSuppressed(e1);
                }
                throw e;
            }
        }
    
        public  S bindServerChannel(InetSocketAddress address, ChannelFactory factory)
            throws IOException {
            ensureOpen();
            return factory.openNioServerSocketChannel(address, acceptorSupplier);
        }
    
        @Override
        public  S openChannel(InetSocketAddress address, ChannelFactory, S> factory) throws IOException {
            ensureOpen();
            return factory.openNioChannel(address, selectorSupplier);
        }    
    
        //......
    }
  • NioSelectorGroupのコンストラクタは、acceptorSupplierとselectorSupplierの2つのRoundRobinSupplierを作成しました.bindServerChannelメソッドはfactoryを実行する.openNioServerSocketChannel(address, acceptorSupplier);OpenChannelメソッドはfactoryを実行する.openNioChannel(address, selectorSupplier)

  • ChannelFactory
    elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java
    public abstract class ChannelFactory {
        //......
    
        public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier supplier) throws IOException {
            ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
            NioSelector selector = supplier.get();
            ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
            scheduleServerChannel(serverChannel, selector);
            return serverChannel;
        }
    
        public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier supplier) throws IOException {
            SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
            NioSelector selector = supplier.get();
            Socket channel = internalCreateChannel(selector, rawChannel);
            scheduleChannel(channel, selector);
            return channel;
        }
    
        //......
    }
  • ChannelFactoryのopenNioServerSocketChannelおよびopenNioChannelメソッドは、Supplierパラメータを受信し、このsupplierによってNioSelector
  • を選択する
    小結
  • RoundRobinSupplierはSupplierインタフェースを実現し、getメソッドはcounter.getAndIncrement() % selectors.lengthを使用してselectors配列の下付き文字を選択し、下付き文字の値
  • を返す.
  • NioSelectorGroupのコンストラクタは、acceptorSupplierとselectorSupplierの2つのRoundRobinSupplierを作成しました.bindServerChannelメソッドはfactoryを実行する.openNioServerSocketChannel(address, acceptorSupplier);OpenChannelメソッドはfactoryを実行する.openNioChannel(address, selectorSupplier)
  • ChannelFactoryのopenNioServerSocketChannelおよびopenNioChannelメソッドは、Supplierパラメータを受信し、このsupplierによってNioSelector
  • を選択する
    doc
  • RoundRobinSupplier