Elasticsearchコードリーディング その1
社内Elasticsearch/Lucene勉強会を実施中
これは何
愚直にHTTPリクエストを受け取って各種の処理が走る流れと、そのセットアップを起動のエントリポイントから追ってみた
(大きなOSSで処理の頭から追うのはあまり効率がよくないという説もある気がするけど、ひとまず行けるところまで行ってみる作戦)
対象
https://github.com/elastic/elasticsearch
v7.9.2
(今回は GET /
エンドポイント(MainAction)まわりぐらいを読んだ段階での調査結果なので、過剰な一般化をしている懸念あり)
先にまとめ
3行で
- RESTのエンドポイントからコードを置いたかったら
server/src/main/java/org/elasticsearch/action/ActionModule.java
から読めば良さそう
- RESTエンドポイントのHTTPサーバはnettyだけどplugin化されてる
-
server/src/main/java/org/elasticsearch/node/Node.java
の初期化処理とか、そこから呼ばれる server/src/main/java/org/elasticsearch/plugins/PluginsService.java
を眺めておくと、プラグインの読み込みの雰囲気が感じられる
前提知識
- ESは9200番台のHTTP(Rest)のエンドポイントと、9300番台のTransportのエンドポイントがある
読み取れた構成
- HTTPサーバは(標準構成では) Netty
- plugin/module化されていて差し替えも可能そう
- ソースでは
/modules/transport-netty4
配下
- 実行時にESインストールディレクトリの
./modules/transport-netty4
から読み込まれる
- 混乱した「HTTP vs Transport」「サーバ側 vs クライアント側」まわりの整理(当初「clientってどこのクライアントよ...」)
- HTTP
- クライアント
client/
- Javaのクライアントライブラリ
- サーバ
- 各種処理:
server/src/main/java/org/elasticsearch/rest/action/*/Rest*Action
- URLのメソッド、パスなんかはここで定義してある
- Transport
- クライアント
-
server/src/main/java/org/elasticsearch/client/node/NodeClient
- ローカルで実行できるタスク用っぽい感じ
- TransportActionを解釈するが実際にはTransportは挟まず直接ロジックを呼び出す
-
server/src/main/java/org/elasticsearch/client/transport/TransportClient
- 他ノードに処理を投げる用・・・・
- ・・・と思いきや deprecateされてる
- TransportClient is deprecated in favour of the High Level REST client and will be removed in Elasticsearch 8.0
- Transportレイヤで処理を受け付けて他のノードに実際の処理を飛ばす流れがあるはずでは・・・? => 未解読
- サーバ
- コントローラ: 未解読
- 各種処理:
server/src/main/java/org/elasticsearch/action/*/Transport*Action.java
- 実際のロジック
-
server/src/main/java/org/elasticsearch/cluster/ClusterService
とか
-
server/src/main/java/org/elasticsearch/*/*Service
になってる?
- ここはかなり処理ごとに違うかも
- RESTのエンドポイントから実際の処理を探したかったら、
- 1. 該当のメソッド、パスの
Rest*Action
を見つける
- 2. そのRestActionのActionTypeを見る
- 2.
ActionModule#setupActions
を見てそのActionTypeで実行されるTransportActionを調べる
- とすれば良さそう
解読に難航したところ
- Nettyまわりはプラグイン化されているのでソースコードの流れには直接は出てこない
- actionsのDI
injector.getInstance(new Key<Map<ActionType, TransportAction>>() {})
を注入箇所がどこか => A. ActionModule#configure
- DIゴリゴリのソースの解読辛いです
解読ログ
HTTPサーバの起動まで
server/src/main/java/org/elasticsearch/action/ActionModule.java
から読めば良さそうserver/src/main/java/org/elasticsearch/node/Node.java
の初期化処理とか、そこから呼ばれる server/src/main/java/org/elasticsearch/plugins/PluginsService.java
を眺めておくと、プラグインの読み込みの雰囲気が感じられる- plugin/module化されていて差し替えも可能そう
- ソースでは
/modules/transport-netty4
配下 - 実行時にESインストールディレクトリの
./modules/transport-netty4
から読み込まれる
- HTTP
- クライアント
client/
- Javaのクライアントライブラリ
- サーバ
- 各種処理:
server/src/main/java/org/elasticsearch/rest/action/*/Rest*Action
- URLのメソッド、パスなんかはここで定義してある
- 各種処理:
- クライアント
- Transport
- クライアント
-
server/src/main/java/org/elasticsearch/client/node/NodeClient
- ローカルで実行できるタスク用っぽい感じ
- TransportActionを解釈するが実際にはTransportは挟まず直接ロジックを呼び出す
-
server/src/main/java/org/elasticsearch/client/transport/TransportClient
- 他ノードに処理を投げる用・・・・
- ・・・と思いきや deprecateされてる
- TransportClient is deprecated in favour of the High Level REST client and will be removed in Elasticsearch 8.0
- Transportレイヤで処理を受け付けて他のノードに実際の処理を飛ばす流れがあるはずでは・・・? => 未解読
-
- サーバ
- コントローラ: 未解読
- 各種処理:
server/src/main/java/org/elasticsearch/action/*/Transport*Action.java
- クライアント
- 実際のロジック
-
server/src/main/java/org/elasticsearch/cluster/ClusterService
とか -
server/src/main/java/org/elasticsearch/*/*Service
になってる? - ここはかなり処理ごとに違うかも
-
- 1. 該当のメソッド、パスの
Rest*Action
を見つける - 2. そのRestActionのActionTypeを見る
- 2.
ActionModule#setupActions
を見てそのActionTypeで実行されるTransportActionを調べる - とすれば良さそう
injector.getInstance(new Key<Map<ActionType, TransportAction>>() {})
を注入箇所がどこか => A. ActionModule#configure
- DIゴリゴリのソースの解読辛いです
HTTPサーバの起動まで
起動のエントリポイント
/usr/share/elasticsearchbin/elasticsearch
( distribution/src/bin/elasticsearch
)
exec \
"$JAVA" \
"$XSHARE" \
$ES_JAVA_OPTS \
-Des.path.home="$ES_HOME" \
-Des.path.conf="$ES_PATH_CONF" \
-Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
-Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
-Des.bundled_jdk="$ES_BUNDLED_JDK" \
-cp "$ES_CLASSPATH" \
org.elasticsearch.bootstrap.Elasticsearch \
"$@"
org.elasticsearch.bootstrap.Elasticsearch
のメイン関数から読み始める
server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java
public static void main(final String[] args)
Bootstrap
へ
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
setup()したのちstart()
setupは Node
のコンストラクタ、startは Node
のstart()
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
INSTANCE = new Bootstrap();
/* ... */
INSTANCE.setup(true, environment);
/* ... */
INSTANCE.start();
}
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}
private void start() throws NodeValidationException {
node.start();
keepAliveThread.start();
}
起動時のログの最初のヤツがたしかにこの辺で出力されている
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
/* ... */
logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Build.CURRENT.getQualifiedVersion(),
jvmInfo.pid(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.hash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION);
logger.info("JVM home [{}]", System.getProperty("java.home"));
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
version[7.9.2], pid[10], build[default/docker/d34da0ea4a966c4e49417f2da2f244e3e97b4e6e/2020-09-23T00:45:33.626720Z], OS[Linux/4.9.125-linuxkit/amd64], JVM[AdoptOpenJDK/OpenJDK 64-Bit Server VM/15/15+36]
JVM home [/usr/share/elasticsearch/jdk]
JVM arguments [-Xshare:auto, -Des.networkaddress.cache.ttl=60, -Des.networkaddress.cache.negative.ttl=10, -XX:+AlwaysPreTouch, -Xss1m, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djna.nosys=true, -XX:-OmitStackTraceInFastThrow, -XX:+ShowCodeDetailsInExceptionMessages, -Dio.netty.noUnsafe=true, -Dio.netty.noKeySetOptimization=true, -Dio.netty.recycler.maxCapacityPerThread=0, -Dio.netty.allocator.numDirectArenas=0, -Dlog4j.shutdownHookEnabled=false, -Dlog4j2.disable.jmx=true, -Djava.locale.providers=SPI,COMPAT, -Xms1g, -Xmx1g, -XX:+UseG1GC, -XX:G1ReservePercent=25, -XX:InitiatingHeapOccupancyPercent=30, -Djava.io.tmpdir=/tmp/elasticsearch-5717270145267454348, -XX:+HeapDumpOnOutOfMemoryError, -XX:HeapDumpPath=data, -XX:ErrorFile=logs/hs_err_pid%p.log, -Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m, -Des.cgroups.hierarchy.override=/, -XX:MaxDirectMemorySize=536870912, -Des.path.home=/usr/share/elasticsearch, -Des.path.conf=/usr/share/elasticsearch/config, -Des.distribution.flavor=default, -Des.distribution.type=docker, -Des.bundled_jdk=true]
この辺で第一回迷子のはじまり
injector.getInstance(HttpServerTransport.class).start();
のインジェクトがどこかわからん
networkModule
から取得するようなのだが・・・
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
/* ... */
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
/* ... */
}
protected HttpServerTransport newHttpTransport(NetworkModule networkModule) {
return networkModule.getHttpServerTransportSupplier().get();
}
settings
で設定されてるファクトリ。じゃあ settings
はどこで
public Supplier<HttpServerTransport> getHttpServerTransportSupplier() {
final String name;
if (HTTP_TYPE_SETTING.exists(settings)) {
name = HTTP_TYPE_SETTING.get(settings);
} else {
name = HTTP_DEFAULT_TYPE_SETTING.get(settings);
}
final Supplier<HttpServerTransport> factory = transportHttpFactories.get(name);
if (factory == null) {
throw new IllegalStateException("Unsupported http.type [" + name + "]");
}
return factory;
}
正解は PluginsService
だった
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
/* ... */
this.pluginsService = new PluginsService(/* ... */)
/* ... */
}
moduleもplugin(言い方?)
ログを見るとたしかに読み込まれている。pluginは何も入れていないので何も読み込まれていない。
List<Tuple<PluginInfo, Plugin>> loaded = loadBundles(seenBundles);
pluginsLoaded.addAll(loaded);
loaded module [aggs-matrix-stats]
loaded module [analysis-common]
...
loaded module [transport-netty4]
...
no plugins loaded
ディレクトリを見て見るとたしかに読み込まれたやつらが居る
$ ls modules/
aggs-matrix-stats
analysis-common
constant-keyword
flattened
frozen-indices
ingest-common
ingest-geoip
ingest-user-agent
kibana
lang-expression
lang-mustache
lang-painless
mapper-extras
parent-join
percolator
rank-eval
reindex
repository-url
search-business-rules
searchable-snapshots
spatial
tasks
transform
transport-netty4
vectors
wildcard
x-pack-analytics
x-pack-async
x-pack-async-search
x-pack-autoscaling
x-pack-ccr
x-pack-core
x-pack-data-streams
x-pack-deprecation
x-pack-enrich
x-pack-eql
x-pack-graph
x-pack-identity-provider
x-pack-ilm
x-pack-logstash
x-pack-ml
x-pack-monitoring
x-pack-ql
x-pack-rollup
x-pack-security
x-pack-sql
x-pack-stack
x-pack-voting-only-node
x-pack-watcher
クラスは org.elasticsearch.transport.Netty4Plugin
$ cat modules/transport-netty4/plugin-descriptor.properties
...
classname=org.elasticsearch.transport.Netty4Plugin
...
ついに setting
箇所
public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";
@Override
public Settings additionalSettings() {
return Settings.builder()
// here we set the netty4 transport and http transport as the default. This is a set once setting
// ie. if another plugin does that as well the server will fail - only one default network can exist!
.put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), NETTY_HTTP_TRANSPORT_NAME)
.put(NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING.getKey(), NETTY_TRANSPORT_NAME)
.build();
}
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
}
@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings, getSharedGroupFactory(settings)));
}
迷子復帰。
setup後、 Nodeのstartでnettyもstart
public Node start() throws NodeValidationException {
TransportService transportService = injector.getInstance(TransportService.class);
/* ... */
transportService.start();
/* ... */
injector.getInstance(HttpServerTransport.class).start();
/* ... */
logger.info("started");
}
この辺はnettyの使い方通りに起動(使ったことないけど)
@Override
protected void doStart() {
try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();
/* ... */
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
/* ... */
bindServer();
}
public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings);
}
で、HTTPサーバ起動ログの箇所まで把握
protected void bindServer() {
// Bind and start to accept incoming connections.
/* ... */
logger.info("{}", boundAddress);
}
publish_address {172.17.0.2:9200}, bound_addresses {0.0.0.0:9200}
started
HTTPリクエストのハンドリング
nettyのハンドラから
private final Netty4HttpServerTransport serverTransport;
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
try {
serverTransport.incomingRequest(httpRequest, channel);
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
}
}
早々にHTTPサーバの実装に非依存の抽象レイヤに
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
}
追っていくと dispatcher.dispatchRequest
してる
dispatcher
:isdare:
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
if (badRequestCause != null) {
dispatcher.dispatchBadRequest(channel, threadContext, badRequestCause);
} else {
dispatcher.dispatchRequest(restRequest, channel, threadContext);
}
}
}
dispatcher
= RestController
NetworkModuleの初期化引数として渡される
ActionModule actionModule = new ActionModule(/* transportClient: */ false, /* ... */);
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController, clusterService.getClusterSettings());
RestController
はそのまんま RestController
if (transportClient) {
restController = null;
} else {
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}
dispatchRequest
はここにくる
allHandlers
をイテレートして処理できるものを探して handler.handleRequest()
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
/* ... */
tryAllHandlers(request, channel, threadContext);
/* ... */
}
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
try {
while (allHandlers.hasNext()) {
final RestHandler handler;
final MethodHandlers handlers = allHandlers.next();
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(requestMethod);
}
if (handler == null) {
if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
return;
}
} else {
dispatchRequest(request, channel, handler);
return;
}
}
}
}
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
/* ... */
handler.handleRequest(request, responseChannel, client);
/* ... */
}
allHandlers
はどこから
actionModule.initRestHandlers()
で
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
/* ... */
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");
/* ... */
}
ずらっと列挙されている
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
}
restController.registerHandler(handler);
};
registerHandler.accept(new RestAddVotingConfigExclusionAction());
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestMainAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
registerHandler.accept(new RestNodesHotThreadsAction());
registerHandler.accept(new RestClusterAllocationExplainAction());
registerHandler.accept(new RestClusterStatsAction());
registerHandler.accept(new RestClusterStateAction(settingsFilter));
registerHandler.accept(new RestClusterHealthAction());
registerHandler.accept(new RestClusterUpdateSettingsAction());
registerHandler.accept(new RestClusterGetSettingsAction(settings, clusterSettings, settingsFilter));
registerHandler.accept(new RestClusterRerouteAction(settingsFilter));
registerHandler.accept(new RestClusterSearchShardsAction());
registerHandler.accept(new RestPendingClusterTasksAction());
registerHandler.accept(new RestPutRepositoryAction());
registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));
registerHandler.accept(new RestDeleteRepositoryAction());
registerHandler.accept(new RestVerifyRepositoryAction());
registerHandler.accept(new RestCleanupRepositoryAction());
registerHandler.accept(new RestGetSnapshotsAction());
registerHandler.accept(new RestCreateSnapshotAction());
registerHandler.accept(new RestRestoreSnapshotAction());
registerHandler.accept(new RestDeleteSnapshotAction());
registerHandler.accept(new RestSnapshotsStatusAction());
registerHandler.accept(new RestGetIndicesAction());
registerHandler.accept(new RestIndicesStatsAction());
registerHandler.accept(new RestIndicesSegmentsAction());
registerHandler.accept(new RestIndicesShardStoresAction());
registerHandler.accept(new RestGetAliasesAction());
registerHandler.accept(new RestIndexDeleteAliasesAction());
registerHandler.accept(new RestIndexPutAliasAction());
registerHandler.accept(new RestIndicesAliasesAction());
registerHandler.accept(new RestCreateIndexAction());
registerHandler.accept(new RestResizeHandler.RestShrinkIndexAction());
registerHandler.accept(new RestResizeHandler.RestSplitIndexAction());
registerHandler.accept(new RestResizeHandler.RestCloneIndexAction());
registerHandler.accept(new RestRolloverIndexAction());
registerHandler.accept(new RestDeleteIndexAction());
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
registerHandler.accept(new RestAnalyzeAction());
registerHandler.accept(new RestGetIndexTemplateAction());
registerHandler.accept(new RestPutIndexTemplateAction());
registerHandler.accept(new RestDeleteIndexTemplateAction());
registerHandler.accept(new RestPutComponentTemplateAction());
registerHandler.accept(new RestGetComponentTemplateAction());
registerHandler.accept(new RestDeleteComponentTemplateAction());
registerHandler.accept(new RestPutComposableIndexTemplateAction());
registerHandler.accept(new RestGetComposableIndexTemplateAction());
registerHandler.accept(new RestDeleteComposableIndexTemplateAction());
registerHandler.accept(new RestSimulateIndexTemplateAction());
registerHandler.accept(new RestSimulateTemplateAction());
registerHandler.accept(new RestPutMappingAction());
registerHandler.accept(new RestGetMappingAction());
registerHandler.accept(new RestGetFieldMappingAction());
registerHandler.accept(new RestRefreshAction());
registerHandler.accept(new RestFlushAction());
registerHandler.accept(new RestSyncedFlushAction());
registerHandler.accept(new RestForceMergeAction());
registerHandler.accept(new RestUpgradeAction());
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());
registerHandler.accept(new RestResolveIndexAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
registerHandler.accept(new RestGetAction());
registerHandler.accept(new RestGetSourceAction());
registerHandler.accept(new RestMultiGetAction(settings));
registerHandler.accept(new RestDeleteAction());
registerHandler.accept(new RestCountAction());
registerHandler.accept(new RestTermVectorsAction());
registerHandler.accept(new RestMultiTermVectorsAction());
registerHandler.accept(new RestBulkAction(settings));
registerHandler.accept(new RestUpdateAction());
registerHandler.accept(new RestSearchAction());
registerHandler.accept(new RestSearchScrollAction());
registerHandler.accept(new RestClearScrollAction());
registerHandler.accept(new RestMultiSearchAction(settings));
registerHandler.accept(new RestValidateQueryAction());
registerHandler.accept(new RestExplainAction());
registerHandler.accept(new RestRecoveryAction());
registerHandler.accept(new RestReloadSecureSettingsAction());
// Scripts API
registerHandler.accept(new RestGetStoredScriptAction());
registerHandler.accept(new RestPutStoredScriptAction());
registerHandler.accept(new RestDeleteStoredScriptAction());
registerHandler.accept(new RestGetScriptContextAction());
registerHandler.accept(new RestGetScriptLanguageAction());
registerHandler.accept(new RestFieldCapabilitiesAction());
// Tasks API
registerHandler.accept(new RestListTasksAction(nodesInCluster));
registerHandler.accept(new RestGetTaskAction());
registerHandler.accept(new RestCancelTasksAction(nodesInCluster));
// Ingest API
registerHandler.accept(new RestPutPipelineAction());
registerHandler.accept(new RestGetPipelineAction());
registerHandler.accept(new RestDeletePipelineAction());
registerHandler.accept(new RestSimulatePipelineAction());
// Dangling indices API
registerHandler.accept(new RestListDanglingIndicesAction());
registerHandler.accept(new RestImportDanglingIndexAction());
registerHandler.accept(new RestDeleteDanglingIndexAction());
// CAT API
registerHandler.accept(new RestAllocationAction());
registerHandler.accept(new RestShardsAction());
registerHandler.accept(new RestMasterAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestSegmentsAction());
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
registerHandler.accept(new RestCatRecoveryAction());
registerHandler.accept(new RestHealthAction());
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction());
registerHandler.accept(new RestAliasAction());
registerHandler.accept(new RestThreadPoolAction());
registerHandler.accept(new RestPluginsAction());
registerHandler.accept(new RestFielddataAction());
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestTemplatesAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,
settingsFilter, indexNameExpressionResolver, nodesInCluster)) {
registerHandler.accept(handler);
}
}
registerHandler.accept(new RestCatAction(catActions));
}
各Rest◯◯Actionの中身
エンドポイントのメソッドやパスもここで定義してある
ActionType
を指定して client.execute()
ここで RestAction
(というクラスはないが)から ActionType
に変換
public class RestMainAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return unmodifiableList(asList(
new Route(GET, "/"),
new Route(HEAD, "/")));
}
@Override
public String getName() {
return "main_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) {
@Override
public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception {
return convertMainResponse(mainResponse, request, builder);
}
});
}
static BytesRestResponse convertMainResponse(MainResponse response, RestRequest request, XContentBuilder builder) throws IOException {
// Default to pretty printing, but allow ?pretty=false to disable
if (request.hasParam("pretty") == false) {
builder.prettyPrint().lfAtEnd();
}
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
}
簡単そうな例として GET /
= MainAction
の処理を実際に見ていってみる
NodeClient
とくに基底クラスではない
private final NodeClient client;
protected Node(final Environment initialEnvironment,
Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
client = new NodeClient(settings, threadPool);
/* ... */
ActionModule actionModule = new ActionModule(/* ... , */ client /* , ... */);
/* ... */
}
MainActionの client.execute()
はこの辺にくる
@Override
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
}
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
ActionType
からTransportAction
に変換される
private < Request extends ActionRequest,
Response extends ActionResponse
> TransportAction<Request, Response> transportAction(ActionType<Response> action) {
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
TransportAction<Request, Response> transportAction = actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
return transportAction;
}
第2回迷子
actions
は Node
のコンストラクタで client.initialize()
の引数で渡される
が、この injector.getInstance(new Key<Map<ActionType, TransportAction>>() {})
がどこで注入されているのか
private final NodeClient client;
protected Node(final Environment initialEnvironment,
/* ... */
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
/* ... */
}
正解は ActionModule#configure()
だった
static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}
public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}
public <Request extends ActionRequest, Response extends ActionResponse> void register(
ActionType<Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}
ActionRegistry actions = new ActionRegistry();
actions.register(MainAction.INSTANCE, TransportMainAction.class);
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class);
actions.register(ClearVotingConfigExclusionsAction.INSTANCE, TransportClearVotingConfigExclusionsAction.class);
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class);
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class);
actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
actions.register(AddIndexBlockAction.INSTANCE, TransportAddIndexBlockAction.class);
actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class,
TransportGetFieldMappingsIndexAction.class);
actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
actions.register(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
actions.register(PutComponentTemplateAction.INSTANCE, TransportPutComponentTemplateAction.class);
actions.register(GetComponentTemplateAction.INSTANCE, TransportGetComponentTemplateAction.class);
actions.register(DeleteComponentTemplateAction.INSTANCE, TransportDeleteComponentTemplateAction.class);
actions.register(PutComposableIndexTemplateAction.INSTANCE, TransportPutComposableIndexTemplateAction.class);
actions.register(GetComposableIndexTemplateAction.INSTANCE, TransportGetComposableIndexTemplateAction.class);
actions.register(DeleteComposableIndexTemplateAction.INSTANCE, TransportDeleteComposableIndexTemplateAction.class);
actions.register(SimulateIndexTemplateAction.INSTANCE, TransportSimulateIndexTemplateAction.class);
actions.register(SimulateTemplateAction.INSTANCE, TransportSimulateTemplateAction.class);
actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(FlushAction.INSTANCE, TransportFlushAction.class);
actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
actions.register(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
actions.register(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
actions.register(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
actions.register(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
actions.register(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
actions.register(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
actions.register(IndexAction.INSTANCE, TransportIndexAction.class);
actions.register(GetAction.INSTANCE, TransportGetAction.class);
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class);
actions.register(DeleteAction.INSTANCE, TransportDeleteAction.class);
actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);
actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
actions.register(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class);
actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class);
actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class);
actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class,
TransportFieldCapabilitiesIndexAction.class);
actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class);
actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);
// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);
// retention leases
actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class);
actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class);
actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class);
// Dangling indices
actions.register(ListDanglingIndicesAction.INSTANCE, TransportListDanglingIndicesAction.class);
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
return unmodifiableMap(actions.getRegistry());
}
@Override
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
bind(DestructiveOperations.class).toInstance(destructiveOperations);
bind(new TypeLiteral<RequestValidators<PutMappingRequest>>() {}).toInstance(mappingRequestValidators);
bind(new TypeLiteral<RequestValidators<IndicesAliasesRequest>>() {}).toInstance(indicesAliasesRequestRequestValidators);
if (false == transportClient) {
// Supporting classes only used when not a transport client
bind(AutoCreateIndex.class).toInstance(autoCreateIndex);
bind(TransportLivenessAction.class).asEagerSingleton();
// register ActionType -> transportAction Map used by NodeClient
@SuppressWarnings("rawtypes")
MapBinder<ActionType, TransportAction> transportActionsBinder
= MapBinder.newMapBinder(binder(), ActionType.class, TransportAction.class);
for (ActionHandler<?, ?> action : actions.values()) {
// bind the action as eager singleton, so the map binder one will reuse it
bind(action.getTransportAction()).asEagerSingleton();
transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
for (Class<?> supportAction : action.getSupportTransportActions()) {
bind(supportAction).asEagerSingleton();
}
}
}
}
actions.register(MainAction.INSTANCE, TransportMainAction.class);
とされているので TransportMainAction
へ
clusterService.state()
とか使いながら必要な情報を取得
@Override
protected void doExecute(Task task, MainRequest request, ActionListener<MainResponse> listener) {
ClusterState clusterState = clusterService.state();
listener.onResponse(
new MainResponse(nodeName, Version.CURRENT, clusterState.getClusterName(),
clusterState.metadata().clusterUUID(), Build.CURRENT));
}
親の顔より見たレスポンス〜
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("name", nodeName);
builder.field("cluster_name", clusterName.value());
builder.field("cluster_uuid", clusterUuid);
builder.startObject("version")
.field("number", build.getQualifiedVersion())
.field("build_flavor", build.flavor().displayName())
.field("build_type", build.type().displayName())
.field("build_hash", build.hash())
.field("build_date", build.date())
.field("build_snapshot", build.isSnapshot())
.field("lucene_version", version.luceneVersion.toString())
.field("minimum_wire_compatibility_version", version.minimumCompatibilityVersion().toString())
.field("minimum_index_compatibility_version", version.minimumIndexCompatibilityVersion().toString())
.endObject();
builder.field("tagline", "You Know, for Search");
builder.endObject();
return builder;
}
$ curl -XGET -H 'Content-Type: application/json' 'localhost:9200?pretty'
{
"name" : "b6265436b22d",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "zh_3Ueg-ToK64zBq9UXvww",
"version" : {
"number" : "7.9.2",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e",
"build_date" : "2020-09-23T00:45:33.626720Z",
"build_snapshot" : false,
"lucene_version" : "8.6.2",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
おわり
MainActionだから実際の処理の部分がとてもシンプル
これで読み方は完全に理解したと思って次にCreateIndexActionあたり行ってみたら見事に玉砕したので次回
Author And Source
この問題について(Elasticsearchコードリーディング その1), 我々は、より多くの情報をここで見つけました https://qiita.com/yokomotod/items/7e24b1db0ba45b3bce68著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .