AntDB Reduceデッドロックと駆動

16580 ワード

AntDB Reduceデッドロックと駆動


AntDBのクラスタ計画(Cluster Plan)は、パラレル計画(Parallel Plan)と同様に、シーケンス化(Serialize)と逆シーケンス化(Restore)によって計画(Plan Statement)を実行し、各ノードの実行計画が一致することを保証するために、各関連ノード(Node)に送信される(基本的に一致し、Restoreではわずかに変更される可能性がある).
AntDBは、データを動的に再分布するためにReduce Planを導入し、Reduce Planの実行が完了するには、ローカルデータ(eof_underlyingによってローカルデータがスキャンされたか否かをマークする)+ネットワークデータ(eof_networkによってネットワークデータがスキャンされたか否かをマークする)の2つの部分が含まれる.
実際、各ノードの実行計画は一致しているが、各ノード自体の状況(機械性能、データサイズ、ネットワークなどの要素を含む)により、各ノードは実行過程において、必然的に前後の区別がある.さらに,Reduce Planは各ノードがEOFメッセージを送信してネットワーク伝送の完了をマークする必要があるため,各ノード間でReduce Planがデッドロックするという問題が発生することが分かった.では,Plan TreeにおけるReduce Planを合理的に駆動してEOFを送信することが必要となる.
以下に、現在知られているデッドロックの例を示す.

通常のデッドロック


問題:通常のデッドロックは、計画実行中に発生し、あるノードのPlan Treeが事前に終了した場合、そのPlan Treeの1つ以上のReduce NodeがEOFメッセージを送信せずに残りのノードに通知した場合に発生するデッドロックです.
方法:ExecProcNode(PlanState*node)が返すSlotがTupIsNull(slot)を満たすと、そのnodeを頂点とするPlan Treeを駆動し、そのPlan Treeの下のReduce PlanがEOFの送信動作を完了することを確保する.

Aggデッドロック


質問:Agg Plan実行中、左ツリーがNULLを返すと、Agg Planが返す結果がTupIsNull(slot)を満たさず、駆動しない可能性があります.
方法:駆動条件新規((AggState*)node)->agg_done)はタイミング駆動です.

CteScanデッドロック


問題:CteScanは比較的特殊なPlanであり、実際に実行されるplanはその上層のあるPlanのinitPlanである.ClusterReduceを駆動する目的は、本ノードに属さないリアルタイムデータを転送し、残りのデータを破棄することであるため、CteScanの実行には合理的ではない.他のPlanはこのCteScanを使用する可能性があるからである.例:
WITH t_onek AS (
        SELECT unique1, two, ten, hundred, twothousand
            , tenthous, even, stringu2
        FROM onek
        WHERE odd < 100
    )
SELECT *
FROM t_onek
WHERE even = 1000
UNION ALL
SELECT *
FROM t_onek
WHERE even < 100;

方法:CteScanのドライバ
static bool
DriveCteScanState(CteScanState *node)
{
    TupleTableSlot *slot = NULL;
    ListCell       *lc = NULL;
    SubPlanState   *sps = NULL;

    Assert(node && IsA(node, CteScanState));

    if (!IsThereClusterReduce((PlanState *) node))
        return false;

    /*
     * Here we do ExecCteScan instead of just driving ClusterReduce,
     * because other plan node may need the results of the CteScan.
     */
    for (;;)
    {
        slot = ExecCteScan((CteScanState *) node);
        if (TupIsNull(slot))
            break;
    }

    /*
     * Do not forget to drive subPlan-s.
     */
    foreach (lc, node->ss.ps.subPlan)
    {
        sps = (SubPlanState *) lfirst(lc);

        Assert(IsA(sps, SubPlanState));
        if (DriveClusterReduceWalker(sps->planstate))
            return true;
    }

    /*
     * Do not forget to drive initPlan-s.
     */
    foreach (lc, node->ss.ps.initPlan)
    {
        sps = (SubPlanState *) lfirst(lc);

        Assert(IsA(sps, SubPlanState));
        if (DriveClusterReduceWalker(sps->planstate))
            return true;
    }

    return false;
}

Planの実行と駆動のインターロック


質問:PlanStateの駆動順序はplanstate_tree_walkerの順序は駆動されるが,この順序は実際のPlanStateの実行順序とは一致しないため,実行と駆動のインターロックが発生する場合がある.例えば、HashJoinデッドロック、HashJoinの左ツリー(Left Tree)と右ツリー(Right Tree)は、実行時に左ツリーを先にしたり、右ツリーを先にしたりするため、デッドロックの実行と駆動を招くことがあります.planstate_tree_walkerの順序は次のとおりです.
  • initPlan-s
  • left tree
  • right tree
  • special child plans
  • subPlan-sが実行します.
    /*
    * planstate_tree_walker --- walk plan state trees
    *
    * The walker has already visited the current node, and so we need only
    * recurse into any sub-nodes it has.
    */
    bool
    planstate_tree_walker(PlanState *planstate,
                      bool (*walker) (),
                      void *context)
    {
    Plan       *plan = planstate->plan;
    ListCell   *lc;
    
    /* initPlan-s */
    if (planstate_walk_subplans(planstate->initPlan, walker, context))
        return true;
    
    /* lefttree */
    if (outerPlanState(planstate))
    {
        if (walker(outerPlanState(planstate), context))
            return true;
    }
    
    /* righttree */
    if (innerPlanState(planstate))
    {
        if (walker(innerPlanState(planstate), context))
            return true;
    }
    
    /* special child plans */
    switch (nodeTag(plan))
    {
        case T_ModifyTable:
            if (planstate_walk_members(((ModifyTable *) plan)->plans,
                                  ((ModifyTableState *) planstate)->mt_plans,
                                       walker, context))
                return true;
            break;
        case T_Append:
            if (planstate_walk_members(((Append *) plan)->appendplans,
                                    ((AppendState *) planstate)->appendplans,
                                       walker, context))
                return true;
            break;
        case T_MergeAppend:
            if (planstate_walk_members(((MergeAppend *) plan)->mergeplans,
                                ((MergeAppendState *) planstate)->mergeplans,
                                       walker, context))
                return true;
            break;
        case T_BitmapAnd:
            if (planstate_walk_members(((BitmapAnd *) plan)->bitmapplans,
                                 ((BitmapAndState *) planstate)->bitmapplans,
                                       walker, context))
                return true;
            break;
        case T_BitmapOr:
            if (planstate_walk_members(((BitmapOr *) plan)->bitmapplans,
                                  ((BitmapOrState *) planstate)->bitmapplans,
                                       walker, context))
                return true;
            break;
        case T_SubqueryScan:
            if (walker(((SubqueryScanState *) planstate)->subplan, context))
                return true;
            break;
        case T_CustomScan:
            foreach(lc, ((CustomScanState *) planstate)->custom_ps)
            {
                if (walker((PlanState *) lfirst(lc), context))
                    return true;
            }
            break;
        default:
            break;
    }
    
    /* subPlan-s */
    if (planstate_walk_subplans(planstate->subPlan, walker, context))
        return true;
    
    return false;
    }
    方法:駆動順序が実行順序と一致するようにplanstate_を追加tree_exec_walker関数、walk順序は次のとおりです.
  • left tree/right tree/special child plans
  • subPlan-s
  • initPlan-s

  • bool
    planstate_tree_exec_walker(PlanState *planstate,
                               bool (*walker) (),
                               void *context)
    {
        Plan       *plan = planstate->plan;
        ListCell   *lc;
    
        switch (nodeTag(plan))
        {
            case T_HashJoin:
                if (planstate_exec_walk_hashjoin((HashJoinState *)planstate,
                                                 walker,
                                                 context))
                    return true;
                break;
            case T_ModifyTable:
                if (planstate_walk_members(((ModifyTable *) plan)->plans,
                                      ((ModifyTableState *) planstate)->mt_plans,
                                           walker, context))
                    return true;
                break;
            case T_Append:
                if (planstate_walk_members(((Append *) plan)->appendplans,
                                        ((AppendState *) planstate)->appendplans,
                                           walker, context))
                    return true;
                break;
            case T_MergeAppend:
                if (planstate_walk_members(((MergeAppend *) plan)->mergeplans,
                                    ((MergeAppendState *) planstate)->mergeplans,
                                           walker, context))
                    return true;
                break;
            case T_BitmapAnd:
                if (planstate_walk_members(((BitmapAnd *) plan)->bitmapplans,
                                     ((BitmapAndState *) planstate)->bitmapplans,
                                           walker, context))
                    return true;
                break;
            case T_BitmapOr:
                if (planstate_walk_members(((BitmapOr *) plan)->bitmapplans,
                                      ((BitmapOrState *) planstate)->bitmapplans,
                                           walker, context))
                    return true;
                break;
            case T_SubqueryScan:
                if (walker(((SubqueryScanState *) planstate)->subplan, context))
                    return true;
                break;
            case T_CustomScan:
                foreach(lc, ((CustomScanState *) planstate)->custom_ps)
                {
                    if (walker((PlanState *) lfirst(lc), context))
                        return true;
                }
                break;
            case T_CteScan:
                if (walker(((CteScanState *) planstate)->cteplanstate, context))
                    return true;
                break;
            default:
                if (outerPlanState(planstate) && walker(outerPlanState(planstate), context))
                    return true;
                if (innerPlanState(planstate) && walker(innerPlanState(planstate), context))
                    return true;
                break;
        }
    
        /* subPlan-s */
        if (planstate_walk_subplans(planstate->subPlan, walker, context))
            return true;
    
        /* initPlan-s */
        if (planstate_walk_subplans(planstate->initPlan, walker, context))
            return true;
    
        return false;
    }

    AntDB Cluster Reduceデッドロック駆動

    static bool
    DriveClusterReduceWalker(PlanState *node)
    {
        EState     *estate;
        int         planid;
        bool        res;
    
        if (node == NULL)
            return false;
    
        estate = node->state;
        planid = PlanNodeID(node->plan);
    
        if (bms_is_member(planid, estate->es_reduce_drived_set))
            return false;
    
        if (IsA(node, ClusterReduceState))
        {
            ClusterReduceState *crs = (ClusterReduceState *) node;
            Assert(crs->port);
    
            if (!crs->eof_network || !crs->eof_underlying)
                elog(LOG, "Drive ClusterReduce(%d) to send EOF message", planid);
    
            /*
             * Drive all ClusterReduce to send slot, discard slot
             * used for local.
             */
            res = DriveClusterReduceState(crs);
        } else
        if (IsA(node, CteScanState))
        {
            res = DriveCteScanState((CteScanState *) node);
        } else
        {
            res = planstate_tree_exec_walker(node, DriveClusterReduceWalker, NULL);
        }
    
        estate->es_reduce_drived_set = bms_add_member(estate->es_reduce_drived_set, planid);
    
        return res;
    }

    認知が限られている限り、ドライバはまだholdできない場合があり、その後、デッドロックcaseに遭遇したときに最適化を続けなければならない.