ElasticSearch Getフロー
13118 ワード
Getリクエストを処理するActionは
TransportActionクラス継承階層では、
上記のコードでは,協調ノードが解析したノード反復器からノードを順次取得し,そのノードに要求を送信し,使用するactionは
次は
最終的には
TransportGetAction
です.まずTransportActionクラス継承階層を参照してください.TransportGetAction
はTransportSingleShardAction
から継承され、その継承階層は下図のようになります.TransportActionクラス継承階層では、
TransportSingleShardAction
サブクラスが主に単一shardのみの操作を処理し、エラーが発生した場合は他のコピーに続行できます.TransportSingleShardAction.doExecute
関数を直接見てみましょう//TransportSingleShardAction
@Override
protected void doExecute(Task task, Request request, ActionListener listener) {
new AsyncSingleAction(request, listener).start();
}
//TransportSingleShardAction.AsyncSingleAction
private AsyncSingleAction(Request request, ActionListener listener) {
this.listener = listener;
ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}
String concreteSingleIndex;
//resolveIndex TransportGetAction , true
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
//TransportGetAction :
//update the routing
resolveRequest(clusterState, internalRequest);
blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
// TransportGetAction , Iterator
this.shardIt = shards(clusterState, internalRequest);
}
AsyncSingleAction
を構築すると、start
メソッドが呼び出されます.//TransportSingleShardAction.AsyncSingleAction
public void start() {
//
if (shardIt == null) {
// , , Get
// Get , 。
// Get ,
// transportShardAction ShardTransportHandler
// just execute it on the local node
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} else {
//
perform(null);
}
}
perform
関数の論理は次のとおりです.private void perform(@Nullable final Exception currentFailure) {
// , ,
//( )
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
// , , shardRouting
// , perform , ShardRouting
final ShardRouting shardRouting = shardIt.nextOrNull();
// ShardRouting
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
}
listener.onFailure(failure);
return;
}
// ShardRouting node
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
// AsyncSingleAction.onFailure, perform
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
···
// , Get ShardIt
// action transportShardAction
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
// listener.onResponse
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
// AsyncSingleAction.onFailure
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
}
//AsyncSingleAction.onFailure perform
private void onFailure(ShardRouting shardRouting, Exception e) {
if (e != null) {
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);
}
perform(e);
}
上記のコードでは,協調ノードが解析したノード反復器からノードを順次取得し,そのノードに要求を送信し,使用するactionは
action transportShardAction
である.TransportSingleShardAction
のコンストラクション関数には、登録されたリクエスト処理handlerが次のように表示されます.//TransportSingleShardAction
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
TransportSingleShardAction.ShardTransportHandler
は次のとおりです.//TransportSingleShardAction.ShardTransportHandler
private class ShardTransportHandler implements TransportRequestHandler {
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
//
asyncShardOperation(request, request.internalShardId, new HandledTransportAction.ChannelActionListener<>(channel,
transportShardAction, request));
}
}
TransportSingleShardAction.asyncShardOperation
はサブクラスTransportGetAction
で書き換えられ、以下のようになる.@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// Get , realtime=true, ,
//
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
super.asyncShardOperation(request, shardId, listener);
} else {
// realtime=false, listener, ,
// , 。
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
TransportSingleShardAction.asyncShardOperation
は次のとおりです.//TransportSingleShardAction
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
//shardOperation , TransportSingleShardAction
//TransportGetAction
listener.onResponse(shardOperation(request, shardId));
}
});
}
次は
TransportGetAction.shardOperation
//TransportGetAction
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}
最終的には
Engine
でGet操作が処理されます.次に、InternalEngine.get
関数の実装を概説します.//InternalEngine
@Override
public GetResult get(Get get, BiFunction searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
//realtime=true refresh
if (get.realtime()) {
VersionValue versionValue = null;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
// ,
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
//
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
// false
// update
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
//realtime=true,
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// no version, get the version from the index, we know that we refresh on flush
//
return getFromSearcher(get, searcherFactory, scope);
}
}