Prestoソースレコードを読む(Sql実行プロセス)
31377 ワード
com.facebook.presto
presto-root
0.190-SNAPSHOT
com.facebook.presto.cli.Presto.java
presto-mainのmainメソッドエントリは
com.facebook.presto.server.PrestoServer.java
Client側とserver側の起動はそれぞれこの2つのmainから開始され、今日は主にclient側がserver側にクエリーを要求した後、どのようにクエリーを実行するかについて説明します.
if (isNullOrEmpty(statement)) {
throw new WebApplicationException(Response
.status(Status.BAD_REQUEST)
.type(MediaType.TEXT_PLAIN)
.entity("SQL statement is empty")
.build());
}
SessionContext sessionContext = new HttpRequestSessionContext(servletRequest);
ExchangeClient exchangeClient = exchangeClientSupplier.get(deltaMemoryInBytes -> {});
// create
Query query = Query.create(
sessionContext,
statement,
queryManager,
sessionPropertyManager,
exchangeClient,
responseExecutor,
timeoutExecutor,
blockEncodingSerde);
Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);
Queryコンストラクタの実行
QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);
// start the query in the background
queueManager.submit(statement, queryExecution, queryExecutor);
public void submit(Statement statement, QueryExecution queryExecution, Executor executor)
{
List queues;
try {
//
queues = selectQueues(queryExecution.getSession(), executor);
}
catch (PrestoException e) {
queryExecution.fail(e);
return;
}
for (QueryQueue queue : queues) {
if (!queue.reserve(queryExecution)) {
// ,
// ,
queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
return;
}
}
// QueuedExecution
queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));
}
QueryQueue.enqueue.Enqueue()メソッド
public void enqueue(QueuedExecution queuedExecution)
{
queryQueueSize.incrementAndGet();
// , 。
// ,
// , 。
QueueEntry entry = new QueueEntry(queuedExecution, queryQueueSize::decrementAndGet);
queuedExecution.getCompletionFuture().addListener(entry::dequeue, MoreExecutors.directExecutor());
// QueueEntry
asyncSemaphore.submit(entry);
}
AsyncSemaphore.submit()メソッド
public ListenableFuture> submit(T task)
{
QueuedTask queuedTask = new QueuedTask<>(task);
// QueueEntry
queuedTasks.add(queuedTask);
//
acquirePermit();
return queuedTask.getCompletionFuture();
}
ライセンス取得開始タスク
private final Runnable runNextTask = this::runNext;
private void acquirePermit()
{
//
if (counter.incrementAndGet() <= maxPermits) {
// , , runNexTask runNext
submitExecutor.execute(runNextTask);
}
}
private void runNext()
{
final QueuedTask queuedTask = queuedTasks.poll();
//
ListenableFuture> future = submitTask(queuedTask.getTask());
Futures.addCallback(future, new FutureCallback
private final Function<T, ListenableFuture>> submitter;
private ListenableFuture> submitTask(T task)
{
try {
// QueryQueue
ListenableFuture> future = submitter.apply(task);
if (future == null) {
return Futures.immediateFailedFuture(new NullPointerException("Submitter returned a null future for task: " + task));
}
return future;
}
catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}
QueryQueueは、オブジェクト呼び出しがSqlQueryQueueManagerから作成する.selectQueues().getOrCreateQueues
QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
{
requireNonNull(queryExecutor, "queryExecutor is null");
checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");
int permits = maxQueuedQueries + maxConcurrentQueries;
// Check for overflow
checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);
this.queuePermits = new AtomicInteger(permits);
this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
queryExecutor,
queueEntry -> {
QueuedExecution queuedExecution = queueEntry.dequeue();
if (queuedExecution != null) {
// queuedExecution
queuedExecution.start();
return queuedExecution.getCompletionFuture();
}
return Futures.immediateFuture(null);
});
}
QueuedExecution.start()メソッド
public void start()
{
// Only execute if the query is not already completed (e.g. cancelled)
if (listenableFuture.isDone()) {
return;
}
if (nextQueues.isEmpty()) {
executor.execute(() -> {
try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
// , SqlQueryExecution start
queryExecution.start();
}
});
}
else {
nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
}
}
SqlQueryExecution.start()メソッド
public void start()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try {
// transition to planning
if (!stateMachine.transitionToPlanning()) {
// query already started or finished
return;
}
//
PlanRoot plan = analyzeQuery();
metadata.beginQuery(getSession(), plan.getConnectors());
// plan distribution of query
planDistribution(plan);
// transition to starting
if (!stateMachine.transitionToStarting()) {
// query already started or finished
return;
}
// if query is not finished, start the scheduler, otherwise cancel it
SqlQueryScheduler scheduler = queryScheduler.get();
if (!stateMachine.isDone()) {
scheduler.start();
}
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
}
}
}
private PlanRoot analyzeQuery()
{
try {
//
return doAnalyzeQuery();
}
catch (StackOverflowError e) {
throw new PrestoException(NOT_SUPPORTED, "statement is too large (stack overflow during analysis)", e);
}
}
private PlanRoot doAnalyzeQuery()
{
// time analysis phase
long analysisStart = System.nanoTime();
// analyze query
Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
// ,statment sql
Analysis analysis = analyzer.analyze(statement);
stateMachine.setUpdateType(analysis.getUpdateType());
// plan query
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);
// extract inputs
List inputs = new InputExtractor(metadata, stateMachine.getSession()).extractInputs(plan.getRoot());
stateMachine.setInputs(inputs);
// extract output
Optional
Analyzer.analyze()メソッド実行解析メソッドスタック多すぎ
public Analysis analyze(Statement statement)
{
return analyze(statement, false);
}
public Analysis analyze(Statement statement, boolean isDescribe)
{
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
//
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
//
analyzer.analyze(rewrittenStatement, Optional.empty());
return analysis;
}
StatementAnalyzer.analyze()メソッドStatementはNodeのサブクラスなのでNodeはマシンノードではなくQueryStatementです
public Scope analyze(Node node, Optional outerQueryScope)
{
return new Visitor(outerQueryScope).process(node, Optional.empty());
}
public Scope process(Node node, Optional scope)
{
Scope returnScope = super.process(node, scope);
checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
if (scope.isPresent()) {
checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
}
return returnScope;
}
AsVisitor.process
public R process(Node node, @Nullable C context)
{
return node.accept(this, context);
}
Query.accept
public R accept(AstVisitor visitor, C context)
{
return visitor.visitQuery(this, context);
}
StatementAnalyzer.VisitQuery()メソッド
protected Scope visitQuery(Query node, Optional scope)
{
// Sql ,
Scope withScope = analyzeWith(node, scope);
// querybody
Scope queryBodyScope = process(node.getQueryBody(), withScope);
if (node.getOrderBy().isPresent()) {
analyzeOrderBy(node, queryBodyScope);
}
else {
analysis.setOrderByExpressions(node, emptyList());
}
// Input fields == Output fields
analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope));
Scope queryScope = Scope.builder()
.withParent(withScope)
.withRelationType(RelationId.of(node), queryBodyScope.getRelationType())
.build();
analysis.setScope(node, queryScope);
return queryScope;
}
private Scope process(Node node, Scope scope)
{
return process(node, Optional.of(scope));
}
public Scope process(Node node, Optional scope)
{
// ,
Scope returnScope = super.process(node, scope);
checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
if (scope.isPresent()) {
checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
}
return returnScope;
}
アクセス仕様sqlが準拠しているかどうか
protected Scope visitQuerySpecification(QuerySpecification node, Optional scope)
{
if (SystemSessionProperties.isLegacyOrderByEnabled(session)) {
return legacyVisitQuerySpecification(node, scope);
}
// TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions
// to pass down to analyzeFrom
// NodeBody
Scope sourceScope = analyzeFrom(node, scope);
node.getWhere().ifPresent(where -> analyzeWhere(node, sourceScope, where));
List outputExpressions = analyzeSelect(node, sourceScope);
List> groupByExpressions = analyzeGroupBy(node, sourceScope, outputExpressions);
analyzeHaving(node, sourceScope);
Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope);
List orderByExpressions = emptyList();
Optional orderByScope = Optional.empty();
if (node.getOrderBy().isPresent()) {
orderByScope = Optional.of(computeAndAssignOrderByScope(node.getOrderBy().get(), sourceScope, outputScope));
orderByExpressions = analyzeOrderBy(node, orderByScope.get(), outputExpressions);
}
else {
analysis.setOrderByExpressions(node, emptyList());
}
List sourceExpressions = new ArrayList<>();
sourceExpressions.addAll(outputExpressions);
node.getHaving().ifPresent(sourceExpressions::add);
analyzeGroupingOperations(node, sourceExpressions, orderByExpressions);
List aggregations = analyzeAggregations(node, sourceScope, orderByScope, groupByExpressions, sourceExpressions, orderByExpressions);
analyzeWindowFunctions(node, outputExpressions, orderByExpressions);
if (!groupByExpressions.isEmpty() && node.getOrderBy().isPresent()) {
// Create a different scope for ORDER BY expressions when aggregation is present.
// This is because planner requires scope in order to resolve names against fields.
// Original ORDER BY scope "sees" FROM query fields. However, during planning
// and when aggregation is present, ORDER BY expressions should only be resolvable against
// output scope, group by expressions and aggregation expressions.
computeAndAssignOrderByScopeWithAggregation(node.getOrderBy().get(), sourceScope, outputScope, aggregations, groupByExpressions, analysis.getGroupingOperations(node));
}
return outputScope;
}
private Scope analyzeFrom(QuerySpecification node, Optional scope)
{
if (node.getFrom().isPresent()) {
// Table
return process(node.getFrom().get(), scope);
}
return createScope(scope);
}
com.facebook.presto.sql.tree.Table.accept()メソッドTableはQueryBodyから継承される
public R accept(AstVisitor visitor, C context)
{
//
return visitor.visitTable(this, context);
}
StatementAnalyzer.visitTable()メソッド
// , , metadata table
Optional tableHandle = metadata.getTableHandle(session, name);
if (!tableHandle.isPresent()) {
if (!metadata.getCatalogHandle(session, name.getCatalogName()).isPresent()) {
throw new SemanticException(MISSING_CATALOG, table, "Catalog %s does not exist", name.getCatalogName());
}
if (!metadata.schemaExists(session, new CatalogSchemaName(name.getCatalogName(), name.getSchemaName()))) {
throw new SemanticException(MISSING_SCHEMA, table, "Schema %s does not exist", name.getSchemaName());
}
throw new SemanticException(MISSING_TABLE, table, "Table %s does not exist", name);
}
MetadataManager.getTable Handle()メソッドテーブルハンドルの取得
public Optional getTableHandle(Session session, QualifiedObjectName table)
{
requireNonNull(table, "table is null");
Optional catalog = getOptionalCatalogMetadata(session, table.getCatalogName());
if (catalog.isPresent()) {
CatalogMetadata catalogMetadata = catalog.get();
ConnectorId connectorId = catalogMetadata.getConnectorId(table);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
// Connector
ConnectorTableHandle tableHandle = metadata.getTableHandle(session.toConnectorSession(connectorId), table.asSchemaTableName());
if (tableHandle != null) {
return Optional.of(new TableHandle(connectorId, tableHandle));
}
}
return Optional.empty();
}