Prestoソースレコードを読む(Sql実行プロセス)

31377 ワード

  • 前言Prestoソースコードは主に2つの部分から読み始め、presto-cliとpresto-mainはそれぞれclient端の入口とserver端の入口工事に対応している.バージョンは以下のcom.facebook.presto
    presto-root
    0.190-SNAPSHOT
  • presto-cliのmainメソッドエントリは
    com.facebook.presto.cli.Presto.java
    presto-mainのmainメソッドエントリは
    com.facebook.presto.server.PrestoServer.java
    Client側とserver側の起動はそれぞれこの2つのmainから開始され、今日は主にclient側がserver側にクエリーを要求した後、どのようにクエリーを実行するかについて説明します.
  • StatementResource.createQuery()メソッド
  • クライアントはhttpリクエストを送信します(類似http://localhost/v1/statement)StatementResourceクラスへのcreateQueryメソッド
        if (isNullOrEmpty(statement)) {
                throw new WebApplicationException(Response
                        .status(Status.BAD_REQUEST)
                        .type(MediaType.TEXT_PLAIN)
                        .entity("SQL statement is empty")
                        .build());
            }
    
            SessionContext sessionContext = new HttpRequestSessionContext(servletRequest);
    
            ExchangeClient exchangeClient = exchangeClientSupplier.get(deltaMemoryInBytes -> {});    
            //   create  
            Query query = Query.create(
                    sessionContext,
                    statement,
                    queryManager,
                    sessionPropertyManager,
                    exchangeClient,
                    responseExecutor,
                    timeoutExecutor,
                    blockEncodingSerde);
  • Query.create()メソッド
  • Queryオブジェクトの構築の準備
      Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);

    Queryコンストラクタの実行
    QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);
  • SqlQueryManager.createQuery()メソッド
  • クエリーマネージャへのsqlのコミット
     // start the query in the background
     queueManager.submit(statement, queryExecution, queryExecutor);
  • SqlQueryQueueManager.submit()メソッド
  • public void submit(Statement statement, QueryExecution queryExecution, Executor executor)
    {
        List queues;
        try {
         //             
            queues = selectQueues(queryExecution.getSession(), executor);
        }
        catch (PrestoException e) {
            queryExecution.fail(e);
            return;
        }
        for (QueryQueue queue : queues) {
            if (!queue.reserve(queryExecution)) {
                //                ,    
                //         ,       
                queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
                return;
            }
        }
        //     QueuedExecution  
        queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));
    }

    QueryQueue.enqueue.Enqueue()メソッド
    public void enqueue(QueuedExecution queuedExecution)
    {
        queryQueueSize.incrementAndGet();
         //      ,            。
         //               ,
         //        ,     。
        QueueEntry entry = new QueueEntry(queuedExecution, queryQueueSize::decrementAndGet);
        queuedExecution.getCompletionFuture().addListener(entry::dequeue, MoreExecutors.directExecutor());
    //    QueueEntry
        asyncSemaphore.submit(entry);
    }
    

    AsyncSemaphore.submit()メソッド
    public ListenableFuture> submit(T task)
    {
        QueuedTask queuedTask = new QueuedTask<>(task);
        //        QueueEntry
        queuedTasks.add(queuedTask);
        //    
        acquirePermit();
        return queuedTask.getCompletionFuture();
    }

    ライセンス取得開始タスク
    private final Runnable runNextTask = this::runNext;
    private void acquirePermit()
    {
     //              
        if (counter.incrementAndGet() <= maxPermits) {
            //               ,        ,  runNexTask    runNext  
            submitExecutor.execute(runNextTask);
        }
    }
    private void runNext()
     {
          final QueuedTask queuedTask = queuedTasks.poll();
          //    
          ListenableFuture> future = submitTask(queuedTask.getTask());
          Futures.addCallback(future, new FutureCallback()
          {
              @Override
              public void onSuccess(Object result)
              {
                  queuedTask.markCompleted();
                  releasePermit();
              }
    
              @Override
              public void onFailure(Throwable t)
              {
                  queuedTask.markFailure(t);
                  releasePermit();
              }
          });
      }
    private final Function<T, ListenableFuture>> submitter;
    private ListenableFuture> submitTask(T task)
    {
         try {
          //       QueryQueue
             ListenableFuture> future = submitter.apply(task);
             if (future == null) {
                 return Futures.immediateFailedFuture(new NullPointerException("Submitter returned a null future for task: " + task));
             }
             return future;
         }
         catch (Exception e) {
             return Futures.immediateFailedFuture(e);
         }
     }

    QueryQueueは、オブジェクト呼び出しがSqlQueryQueueManagerから作成する.selectQueues().getOrCreateQueues
    QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
    {
        requireNonNull(queryExecutor, "queryExecutor is null");
        checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
        checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");
    
        int permits = maxQueuedQueries + maxConcurrentQueries;
        // Check for overflow
        checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);
    
        this.queuePermits = new AtomicInteger(permits);
        this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
                queryExecutor,
                queueEntry -> {
                    QueuedExecution queuedExecution = queueEntry.dequeue();
                    if (queuedExecution != null) {
                     //  queuedExecution   
                        queuedExecution.start();
                        return queuedExecution.getCompletionFuture();
                    }
                    return Futures.immediateFuture(null);
                });
    }

    QueuedExecution.start()メソッド
    public void start()
    {
          // Only execute if the query is not already completed (e.g. cancelled)
          if (listenableFuture.isDone()) {
              return;
          }
          if (nextQueues.isEmpty()) {
              executor.execute(() -> {
                  try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
                   //    ,       SqlQueryExecution start  
                      queryExecution.start();
                  }
              });
          }
          else {
              nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
          }
      }

    SqlQueryExecution.start()メソッド
    public void start()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
            try {
                // transition to planning
                if (!stateMachine.transitionToPlanning()) {
                    // query already started or finished
                    return;
                }
    
                //     
                PlanRoot plan = analyzeQuery();
    
                metadata.beginQuery(getSession(), plan.getConnectors());
    
                // plan distribution of query
                planDistribution(plan);
    
                // transition to starting
                if (!stateMachine.transitionToStarting()) {
                    // query already started or finished
                    return;
                }
    
                // if query is not finished, start the scheduler, otherwise cancel it
                SqlQueryScheduler scheduler = queryScheduler.get();
    
                if (!stateMachine.isDone()) {
                    scheduler.start();
                }
            }
            catch (Throwable e) {
                fail(e);
                throwIfInstanceOf(e, Error.class);
            }
        }
    }
     private PlanRoot analyzeQuery()
     {
          try {
              //    
              return doAnalyzeQuery();
          }
          catch (StackOverflowError e) {
              throw new PrestoException(NOT_SUPPORTED, "statement is too large (stack overflow during analysis)", e);
          }
      }
    private PlanRoot doAnalyzeQuery()
    {
        // time analysis phase
        long analysisStart = System.nanoTime();
    
        // analyze query
        Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
        //     ,statment sql      
        Analysis analysis = analyzer.analyze(statement);
    
        stateMachine.setUpdateType(analysis.getUpdateType());
    
        // plan query
        PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
        LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
        Plan plan = logicalPlanner.plan(analysis);
        queryPlan.set(plan);
    
        // extract inputs
        List inputs = new InputExtractor(metadata, stateMachine.getSession()).extractInputs(plan.getRoot());
        stateMachine.setInputs(inputs);
    
        // extract output
        Optional output = new OutputExtractor().extractOutput(plan.getRoot());
        stateMachine.setOutput(output);
    
        // fragment the plan
        SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, plan);
        stateMachine.setPlan(planFlattener.flatten(fragmentedPlan, stateMachine.getSession()));
    
        // record analysis time
        stateMachine.recordAnalysisTime(analysisStart);
    
        boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze();
        return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis));
    }

    Analyzer.analyze()メソッド実行解析メソッドスタック多すぎ
    public Analysis analyze(Statement statement)
    {
        return analyze(statement, false);
    }
    
    public Analysis analyze(Statement statement, boolean isDescribe)
    {
         Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
         Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
         //      
         StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
         //     
         analyzer.analyze(rewrittenStatement, Optional.empty());
         return analysis;
     }

    StatementAnalyzer.analyze()メソッドStatementはNodeのサブクラスなのでNodeはマシンノードではなくQueryStatementです
    public Scope analyze(Node node, Optional outerQueryScope)
    {
         return new Visitor(outerQueryScope).process(node, Optional.empty());
     }
    
    public Scope process(Node node, Optional scope)
    {
        Scope returnScope = super.process(node, scope);
        checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
        if (scope.isPresent()) {
            checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
        }
        return returnScope;
    }

    AsVisitor.process
    public R process(Node node, @Nullable C context)
    {
        return node.accept(this, context);
    }

    Query.accept
    public  R accept(AstVisitor visitor, C context)
    {
        return visitor.visitQuery(this, context);
    }

    StatementAnalyzer.VisitQuery()メソッド
    protected Scope visitQuery(Query node, Optional scope)
    {
        //  Sql     ,                
        Scope withScope = analyzeWith(node, scope);
        //    querybody
        Scope queryBodyScope = process(node.getQueryBody(), withScope);
        if (node.getOrderBy().isPresent()) {
            analyzeOrderBy(node, queryBodyScope);
        }
        else {
            analysis.setOrderByExpressions(node, emptyList());
        }
    
        // Input fields == Output fields
        analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope));
    
        Scope queryScope = Scope.builder()
                .withParent(withScope)
                .withRelationType(RelationId.of(node), queryBodyScope.getRelationType())
                .build();
        analysis.setScope(node, queryScope);
        return queryScope;
    }
    private Scope process(Node node, Scope scope)
    {
         return process(node, Optional.of(scope));
     }
    public Scope process(Node node, Optional scope)
    {
        //               ,             
         Scope returnScope = super.process(node, scope);
         checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
         if (scope.isPresent()) {
             checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
         }
         return returnScope;
     }

    アクセス仕様sqlが準拠しているかどうか
    protected Scope visitQuerySpecification(QuerySpecification node, Optional scope)
    {
        if (SystemSessionProperties.isLegacyOrderByEnabled(session)) {
            return legacyVisitQuerySpecification(node, scope);
        }
    
        // TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions
        // to pass down to analyzeFrom
        //    NodeBody
        Scope sourceScope = analyzeFrom(node, scope);
    
        node.getWhere().ifPresent(where -> analyzeWhere(node, sourceScope, where));
    
        List outputExpressions = analyzeSelect(node, sourceScope);
        List> groupByExpressions = analyzeGroupBy(node, sourceScope, outputExpressions);
        analyzeHaving(node, sourceScope);
    
        Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope);
    
        List orderByExpressions = emptyList();
        Optional orderByScope = Optional.empty();
        if (node.getOrderBy().isPresent()) {
            orderByScope = Optional.of(computeAndAssignOrderByScope(node.getOrderBy().get(), sourceScope, outputScope));
            orderByExpressions = analyzeOrderBy(node, orderByScope.get(), outputExpressions);
        }
        else {
            analysis.setOrderByExpressions(node, emptyList());
        }
    
        List sourceExpressions = new ArrayList<>();
        sourceExpressions.addAll(outputExpressions);
        node.getHaving().ifPresent(sourceExpressions::add);
    
        analyzeGroupingOperations(node, sourceExpressions, orderByExpressions);
        List aggregations = analyzeAggregations(node, sourceScope, orderByScope, groupByExpressions, sourceExpressions, orderByExpressions);
        analyzeWindowFunctions(node, outputExpressions, orderByExpressions);
    
        if (!groupByExpressions.isEmpty() && node.getOrderBy().isPresent()) {
            // Create a different scope for ORDER BY expressions when aggregation is present.
            // This is because planner requires scope in order to resolve names against fields.
            // Original ORDER BY scope "sees" FROM query fields. However, during planning
            // and when aggregation is present, ORDER BY expressions should only be resolvable against
            // output scope, group by expressions and aggregation expressions.
            computeAndAssignOrderByScopeWithAggregation(node.getOrderBy().get(), sourceScope, outputScope, aggregations, groupByExpressions, analysis.getGroupingOperations(node));
        }
    
        return outputScope;
    }
    private Scope analyzeFrom(QuerySpecification node, Optional scope)
    {
        if (node.getFrom().isPresent()) {
            //               Table 
            return process(node.getFrom().get(), scope);
        }
    
        return createScope(scope);
    }
    

    com.facebook.presto.sql.tree.Table.accept()メソッドTableはQueryBodyから継承される
    public  R accept(AstVisitor visitor, C context)
    {
         //   
         return visitor.visitTable(this, context);
     }

    StatementAnalyzer.visitTable()メソッド
    //        ,       ,     metadata   table   
    Optional tableHandle = metadata.getTableHandle(session, name);
    if (!tableHandle.isPresent()) {
         if (!metadata.getCatalogHandle(session, name.getCatalogName()).isPresent()) {
             throw new SemanticException(MISSING_CATALOG, table, "Catalog %s does not exist", name.getCatalogName());
         }
         if (!metadata.schemaExists(session, new CatalogSchemaName(name.getCatalogName(), name.getSchemaName()))) {
             throw new SemanticException(MISSING_SCHEMA, table, "Schema %s does not exist", name.getSchemaName());
         }
         throw new SemanticException(MISSING_TABLE, table, "Table %s does not exist", name);
     }

    MetadataManager.getTable Handle()メソッドテーブルハンドルの取得
    public Optional getTableHandle(Session session, QualifiedObjectName table)
    {
        requireNonNull(table, "table is null");
    
        Optional catalog = getOptionalCatalogMetadata(session, table.getCatalogName());
        if (catalog.isPresent()) {
            CatalogMetadata catalogMetadata = catalog.get();
            ConnectorId connectorId = catalogMetadata.getConnectorId(table);
            ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
            //     Connector
            ConnectorTableHandle tableHandle = metadata.getTableHandle(session.toConnectorSession(connectorId), table.asSchemaTableName());
            if (tableHandle != null) {
                return Optional.of(new TableHandle(connectorId, tableHandle));
            }
        }
        return Optional.empty();
    }