AntDB Reduceデッドロックと駆動
16580 ワード
AntDB Reduceデッドロックと駆動
AntDBのクラスタ計画(Cluster Plan)は、パラレル計画(Parallel Plan)と同様に、シーケンス化(Serialize)と逆シーケンス化(Restore)によって計画(Plan Statement)を実行し、各ノードの実行計画が一致することを保証するために、各関連ノード(Node)に送信される(基本的に一致し、Restoreではわずかに変更される可能性がある).
AntDBは、データを動的に再分布するためにReduce Planを導入し、Reduce Planの実行が完了するには、ローカルデータ(eof_underlyingによってローカルデータがスキャンされたか否かをマークする)+ネットワークデータ(eof_networkによってネットワークデータがスキャンされたか否かをマークする)の2つの部分が含まれる.
実際、各ノードの実行計画は一致しているが、各ノード自体の状況(機械性能、データサイズ、ネットワークなどの要素を含む)により、各ノードは実行過程において、必然的に前後の区別がある.さらに,Reduce Planは各ノードがEOFメッセージを送信してネットワーク伝送の完了をマークする必要があるため,各ノード間でReduce Planがデッドロックするという問題が発生することが分かった.では,Plan TreeにおけるReduce Planを合理的に駆動してEOFを送信することが必要となる.
以下に、現在知られているデッドロックの例を示す.
通常のデッドロック
問題:通常のデッドロックは、計画実行中に発生し、あるノードのPlan Treeが事前に終了した場合、そのPlan Treeの1つ以上のReduce NodeがEOFメッセージを送信せずに残りのノードに通知した場合に発生するデッドロックです.
方法:ExecProcNode(PlanState*node)が返すSlotがTupIsNull(slot)を満たすと、そのnodeを頂点とするPlan Treeを駆動し、そのPlan Treeの下のReduce PlanがEOFの送信動作を完了することを確保する.
Aggデッドロック
質問:Agg Plan実行中、左ツリーがNULLを返すと、Agg Planが返す結果がTupIsNull(slot)を満たさず、駆動しない可能性があります.
方法:駆動条件新規((AggState*)node)->agg_done)はタイミング駆動です.
CteScanデッドロック
問題:CteScanは比較的特殊なPlanであり、実際に実行されるplanはその上層のあるPlanのinitPlanである.ClusterReduceを駆動する目的は、本ノードに属さないリアルタイムデータを転送し、残りのデータを破棄することであるため、CteScanの実行には合理的ではない.他のPlanはこのCteScanを使用する可能性があるからである.例:
WITH t_onek AS (
SELECT unique1, two, ten, hundred, twothousand
, tenthous, even, stringu2
FROM onek
WHERE odd < 100
)
SELECT *
FROM t_onek
WHERE even = 1000
UNION ALL
SELECT *
FROM t_onek
WHERE even < 100;
方法:CteScanのドライバ
static bool
DriveCteScanState(CteScanState *node)
{
TupleTableSlot *slot = NULL;
ListCell *lc = NULL;
SubPlanState *sps = NULL;
Assert(node && IsA(node, CteScanState));
if (!IsThereClusterReduce((PlanState *) node))
return false;
/*
* Here we do ExecCteScan instead of just driving ClusterReduce,
* because other plan node may need the results of the CteScan.
*/
for (;;)
{
slot = ExecCteScan((CteScanState *) node);
if (TupIsNull(slot))
break;
}
/*
* Do not forget to drive subPlan-s.
*/
foreach (lc, node->ss.ps.subPlan)
{
sps = (SubPlanState *) lfirst(lc);
Assert(IsA(sps, SubPlanState));
if (DriveClusterReduceWalker(sps->planstate))
return true;
}
/*
* Do not forget to drive initPlan-s.
*/
foreach (lc, node->ss.ps.initPlan)
{
sps = (SubPlanState *) lfirst(lc);
Assert(IsA(sps, SubPlanState));
if (DriveClusterReduceWalker(sps->planstate))
return true;
}
return false;
}
Planの実行と駆動のインターロック
質問:PlanStateの駆動順序はplanstate_tree_walkerの順序は駆動されるが,この順序は実際のPlanStateの実行順序とは一致しないため,実行と駆動のインターロックが発生する場合がある.例えば、HashJoinデッドロック、HashJoinの左ツリー(Left Tree)と右ツリー(Right Tree)は、実行時に左ツリーを先にしたり、右ツリーを先にしたりするため、デッドロックの実行と駆動を招くことがあります.planstate_tree_walkerの順序は次のとおりです.
/*
* planstate_tree_walker --- walk plan state trees
*
* The walker has already visited the current node, and so we need only
* recurse into any sub-nodes it has.
*/
bool
planstate_tree_walker(PlanState *planstate,
bool (*walker) (),
void *context)
{
Plan *plan = planstate->plan;
ListCell *lc;
/* initPlan-s */
if (planstate_walk_subplans(planstate->initPlan, walker, context))
return true;
/* lefttree */
if (outerPlanState(planstate))
{
if (walker(outerPlanState(planstate), context))
return true;
}
/* righttree */
if (innerPlanState(planstate))
{
if (walker(innerPlanState(planstate), context))
return true;
}
/* special child plans */
switch (nodeTag(plan))
{
case T_ModifyTable:
if (planstate_walk_members(((ModifyTable *) plan)->plans,
((ModifyTableState *) planstate)->mt_plans,
walker, context))
return true;
break;
case T_Append:
if (planstate_walk_members(((Append *) plan)->appendplans,
((AppendState *) planstate)->appendplans,
walker, context))
return true;
break;
case T_MergeAppend:
if (planstate_walk_members(((MergeAppend *) plan)->mergeplans,
((MergeAppendState *) planstate)->mergeplans,
walker, context))
return true;
break;
case T_BitmapAnd:
if (planstate_walk_members(((BitmapAnd *) plan)->bitmapplans,
((BitmapAndState *) planstate)->bitmapplans,
walker, context))
return true;
break;
case T_BitmapOr:
if (planstate_walk_members(((BitmapOr *) plan)->bitmapplans,
((BitmapOrState *) planstate)->bitmapplans,
walker, context))
return true;
break;
case T_SubqueryScan:
if (walker(((SubqueryScanState *) planstate)->subplan, context))
return true;
break;
case T_CustomScan:
foreach(lc, ((CustomScanState *) planstate)->custom_ps)
{
if (walker((PlanState *) lfirst(lc), context))
return true;
}
break;
default:
break;
}
/* subPlan-s */
if (planstate_walk_subplans(planstate->subPlan, walker, context))
return true;
return false;
}
方法:駆動順序が実行順序と一致するようにplanstate_を追加tree_exec_walker関数、walk順序は次のとおりです.bool
planstate_tree_exec_walker(PlanState *planstate,
bool (*walker) (),
void *context)
{
Plan *plan = planstate->plan;
ListCell *lc;
switch (nodeTag(plan))
{
case T_HashJoin:
if (planstate_exec_walk_hashjoin((HashJoinState *)planstate,
walker,
context))
return true;
break;
case T_ModifyTable:
if (planstate_walk_members(((ModifyTable *) plan)->plans,
((ModifyTableState *) planstate)->mt_plans,
walker, context))
return true;
break;
case T_Append:
if (planstate_walk_members(((Append *) plan)->appendplans,
((AppendState *) planstate)->appendplans,
walker, context))
return true;
break;
case T_MergeAppend:
if (planstate_walk_members(((MergeAppend *) plan)->mergeplans,
((MergeAppendState *) planstate)->mergeplans,
walker, context))
return true;
break;
case T_BitmapAnd:
if (planstate_walk_members(((BitmapAnd *) plan)->bitmapplans,
((BitmapAndState *) planstate)->bitmapplans,
walker, context))
return true;
break;
case T_BitmapOr:
if (planstate_walk_members(((BitmapOr *) plan)->bitmapplans,
((BitmapOrState *) planstate)->bitmapplans,
walker, context))
return true;
break;
case T_SubqueryScan:
if (walker(((SubqueryScanState *) planstate)->subplan, context))
return true;
break;
case T_CustomScan:
foreach(lc, ((CustomScanState *) planstate)->custom_ps)
{
if (walker((PlanState *) lfirst(lc), context))
return true;
}
break;
case T_CteScan:
if (walker(((CteScanState *) planstate)->cteplanstate, context))
return true;
break;
default:
if (outerPlanState(planstate) && walker(outerPlanState(planstate), context))
return true;
if (innerPlanState(planstate) && walker(innerPlanState(planstate), context))
return true;
break;
}
/* subPlan-s */
if (planstate_walk_subplans(planstate->subPlan, walker, context))
return true;
/* initPlan-s */
if (planstate_walk_subplans(planstate->initPlan, walker, context))
return true;
return false;
}
AntDB Cluster Reduceデッドロック駆動
static bool
DriveClusterReduceWalker(PlanState *node)
{
EState *estate;
int planid;
bool res;
if (node == NULL)
return false;
estate = node->state;
planid = PlanNodeID(node->plan);
if (bms_is_member(planid, estate->es_reduce_drived_set))
return false;
if (IsA(node, ClusterReduceState))
{
ClusterReduceState *crs = (ClusterReduceState *) node;
Assert(crs->port);
if (!crs->eof_network || !crs->eof_underlying)
elog(LOG, "Drive ClusterReduce(%d) to send EOF message", planid);
/*
* Drive all ClusterReduce to send slot, discard slot
* used for local.
*/
res = DriveClusterReduceState(crs);
} else
if (IsA(node, CteScanState))
{
res = DriveCteScanState((CteScanState *) node);
} else
{
res = planstate_tree_exec_walker(node, DriveClusterReduceWalker, NULL);
}
estate->es_reduce_drived_set = bms_add_member(estate->es_reduce_drived_set, planid);
return res;
}
認知が限られている限り、ドライバはまだholdできない場合があり、その後、デッドロックcaseに遭遇したときに最適化を続けなければならない.