ElasticSearch Getフロー

13118 ワード

Getリクエストを処理するActionはTransportGetActionです.まずTransportActionクラス継承階層を参照してください.TransportGetActionTransportSingleShardActionから継承され、その継承階層は下図のようになります.
TransportActionクラス継承階層では、TransportSingleShardActionサブクラスが主に単一shardのみの操作を処理し、エラーが発生した場合は他のコピーに続行できます.TransportSingleShardAction.doExecute関数を直接見てみましょう
//TransportSingleShardAction
@Override
protected void doExecute(Task task, Request request, ActionListener listener) {
    new AsyncSingleAction(request, listener).start();
}

//TransportSingleShardAction.AsyncSingleAction
private AsyncSingleAction(Request request, ActionListener listener) {
    this.listener = listener;

    ClusterState clusterState = clusterService.state();
    if (logger.isTraceEnabled()) {
        logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
    }
    nodes = clusterState.nodes();
    ClusterBlockException blockException = checkGlobalBlock(clusterState);
    if (blockException != null) {
        throw blockException;
    }

    String concreteSingleIndex;
    //resolveIndex   TransportGetAction      ,    true
    if (resolveIndex(request)) {
        concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
    } else {
        concreteSingleIndex = request.index();
    }
    this.internalRequest = new InternalRequest(request, concreteSingleIndex);
    //TransportGetAction        :
    //update the routing
    resolveRequest(clusterState, internalRequest);

    blockException = checkRequestBlock(clusterState, internalRequest);
    if (blockException != null) {
        throw blockException;
    }
    //     TransportGetAction   ,              Iterator
    this.shardIt = shards(clusterState, internalRequest);
}
AsyncSingleActionを構築すると、startメソッドが呼び出されます.
//TransportSingleShardAction.AsyncSingleAction
public void start() {
    //                    
    if (shardIt == null) {
        //        ,     ,            Get  
        //   Get       ,      。
        //    Get                         ,
        //    transportShardAction   ShardTransportHandler  
        // just execute it on the local node
        transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler() {
            @Override
            public Response newInstance() {
                return newResponse();
            }

            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(final Response response) {
                listener.onResponse(response);
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
        });
    } else {
        //                   
        perform(null);
    }
}
perform関数の論理は次のとおりです.
private void perform(@Nullable final Exception currentFailure) {
        //       ,         ,      
        //(               )         
        Exception lastFailure = this.lastFailure;
        if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
            lastFailure = currentFailure;
            this.lastFailure = currentFailure;
        }
        //             ,          ,     shardRouting
        //      ,     perform  ,         ShardRouting
        final ShardRouting shardRouting = shardIt.nextOrNull();
        //       ShardRouting         
        if (shardRouting == null) {
            Exception failure = lastFailure;
            if (failure == null || isShardNotAvailableException(failure)) {
                failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
            } else {
                logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
            }
            listener.onFailure(failure);
            return;
        }
        // ShardRouting   node
        DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
        //       AsyncSingleAction.onFailure,          perform  
        if (node == null) {
            onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
        } else {
            ···
            //         ,   Get   ShardIt       
            //   action transportShardAction
            transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler() {

                @Override
                public Response newInstance() {
                    return newResponse();
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
                //       listener.onResponse        
                @Override
                public void handleResponse(final Response response) {
                    listener.onResponse(response);
                }
                //       AsyncSingleAction.onFailure  
                @Override
                public void handleException(TransportException exp) {
                    onFailure(shardRouting, exp);
                }
            });
        }
    }
}

//AsyncSingleAction.onFailure         perform  
private void onFailure(ShardRouting shardRouting, Exception e) {
    if (e != null) {
        logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);
    }
    perform(e);
}

上記のコードでは,協調ノードが解析したノード反復器からノードを順次取得し,そのノードに要求を送信し,使用するactionはaction transportShardActionである.TransportSingleShardActionのコンストラクション関数には、登録されたリクエスト処理handlerが次のように表示されます.
//TransportSingleShardAction
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
TransportSingleShardAction.ShardTransportHandlerは次のとおりです.
//TransportSingleShardAction.ShardTransportHandler
private class ShardTransportHandler implements TransportRequestHandler {
    @Override
    public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
        }
        //      
        asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
            transportShardAction, request));
    }
}
TransportSingleShardAction.asyncShardOperationはサブクラスTransportGetActionで書き換えられ、以下のようになる.
@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());
    //  Get       ,  realtime=true,           ,
    //         
    if (request.realtime()) { // we are not tied to a refresh cycle here anyway
        super.asyncShardOperation(request, shardId, listener);
    } else {
        //  realtime=false,       listener,         ,         
        //                ,          。
        indexShard.awaitShardSearchActive(b -> {
            try {
                super.asyncShardOperation(request, shardId, listener);
            } catch (Exception ex) {
                listener.onFailure(ex);
            }
        });
    }
}
TransportSingleShardAction.asyncShardOperationは次のとおりです.
//TransportSingleShardAction
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException {
    threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
        @Override
        public void onFailure(Exception e) {
            listener.onFailure(e);
        }

        @Override
        protected void doRun() throws Exception {
            //shardOperation          , TransportSingleShardAction  
            //TransportGetAction   
            listener.onResponse(shardOperation(request, shardId));
        }
    });
}

次はTransportGetAction.shardOperation
//TransportGetAction
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
    IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
    IndexShard indexShard = indexService.getShard(shardId.id());

    if (request.refresh() && !request.realtime()) {
        indexShard.refresh("refresh_flag_get");
    }

    GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
            request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
    return new GetResponse(result);
}


最終的にはEngineでGet操作が処理されます.次に、InternalEngine.get関数の実装を概説します.
//InternalEngine
@Override
public GetResult get(Get get, BiFunction searcherFactory) throws EngineException {
    assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
    try (ReleasableLock ignored = readLock.acquire()) {
        ensureOpen();
        SearcherScope scope;
        //realtime=true       refresh  
        if (get.realtime()) {
            VersionValue versionValue = null;
            try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
                // we need to lock here to access the version map to do this truly in RT
                versionValue = getVersionFromMap(get.uid().bytes());
            }
            if (versionValue != null) {
                //           ,      
                if (versionValue.isDelete()) {
                    return GetResult.NOT_EXISTS;
                }
                //      
                if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                    throw new VersionConflictEngineException(shardId, get.type(), get.id(),
                        get.versionType().explainConflictForReads(versionValue.version, get.version()));
                }
                //                   false
                //                update  
                if (get.isReadFromTranslog()) {
                    // this is only used for updates - API _GET calls will always read form a reader for consistency
                    // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                    if (versionValue.getLocation() != null) {
                        try {
                            Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                            if (operation != null) {
                                // in the case of a already pruned translog generation we might get null here - yet very unlikely
                                TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
                                    .getIndexSettings().getIndexVersionCreated());
                                return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
                                    new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
                            }
                        } catch (IOException e) {
                            maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                            throw new EngineException(shardId, "failed to read operation from translog", e);
                        }
                    } else {
                        trackTranslogLocation.set(true);
                    }
                }
                //realtime=true,    
                refresh("realtime_get", SearcherScope.INTERNAL);
            }
            scope = SearcherScope.INTERNAL;
        } else {
            // we expose what has been externally expose in a point in time snapshot via an explicit refresh
            scope = SearcherScope.EXTERNAL;
        }

        // no version, get the version from the index, we know that we refresh on flush
        //       
        return getFromSearcher(get, searcherFactory, scope);
    }
}