4646#include "optimizer/tlist.h"
4747#include "parser/parse_clause.h"
4848#include "parser/parsetree.h"
49+ #include "partitioning/partdesc.h"
4950#include "partitioning/partprune.h"
5051#include "utils/lsyscache.h"
5152#include "utils/uri.h"
@@ -102,6 +103,11 @@ typedef struct
102103 bool result ;
103104} contain_motion_walk_context ;
104105
106+ typedef struct
107+ {
108+ bool computeOnSlice ; /* does root slice contain computation node (Sort, Join, Agg) */
109+ } offload_entry_to_qe_plan_walk_context ;
110+
105111static Plan * create_scan_plan (PlannerInfo * root , Path * best_path ,
106112 int flags );
107113static List * build_path_tlist (PlannerInfo * root , Path * path );
@@ -9063,4 +9069,160 @@ push_locus_down_after_elide_motion(Plan* plan)
90639069 plan = plan -> lefttree ;
90649070 }
90659071 }
9066- }
9072+ }
9073+
9074+ /*
9075+ * Restore Entry locus to SingleQE in the root slice.
9076+ * This is simply a reverse of push_locus_down_after_elide_motion.
9077+ * The difference is that it's NOT used when creating a plan but rather
9078+ * after a plan gets created, it's used to modify the plan in offload_entry_to_qe.
9079+ */
9080+ static void
9081+ replace_entry_locus_with_singleqe (Plan * plan )
9082+ {
9083+ while (plan && (plan -> locustype == CdbLocusType_Entry ))
9084+ {
9085+ plan -> locustype = CdbLocusType_SingleQE ;
9086+ switch (nodeTag (plan ))
9087+ {
9088+ case T_Motion :
9089+ return ;
9090+ case T_Append :
9091+ {
9092+ List * subplans = NIL ;
9093+ ListCell * cell ;
9094+ subplans = ((Append * ) (plan ))-> appendplans ;
9095+ foreach (cell , subplans )
9096+ {
9097+ replace_entry_locus_with_singleqe (lfirst (cell ));
9098+ }
9099+ break ;
9100+ }
9101+ case T_SubqueryScan :
9102+ plan = ((SubqueryScan * )(plan ))-> subplan ;
9103+ break ;
9104+ case T_NestLoop :
9105+ case T_MergeJoin :
9106+ case T_HashJoin :
9107+ replace_entry_locus_with_singleqe (plan -> righttree );
9108+ /* FALLTHROUGH */
9109+ default :
9110+ plan = plan -> lefttree ;
9111+ break ;
9112+ }
9113+ }
9114+ }
9115+
9116+ /*
9117+ * Check whether we can safely offload root slice on QD to a QE.
9118+ */
9119+ static bool
9120+ safe_to_offload_entry_to_qe_rte_walker (List * rtes )
9121+ {
9122+ ListCell * lc ;
9123+ foreach (lc , rtes )
9124+ {
9125+ RangeTblEntry * rte = lfirst_node (RangeTblEntry , lc );
9126+ if (rte -> rtekind == RTE_RELATION )
9127+ {
9128+ // check if any partition of a partitioned table is a coordinator-only external/foreign table
9129+ if (rte -> relkind == RELKIND_PARTITIONED_TABLE )
9130+ {
9131+ Relation rel ;
9132+ PartitionDesc desc ;
9133+
9134+ rel = relation_open (rte -> relid , NoLock );
9135+ desc = RelationGetPartitionDesc (rel , true);
9136+ relation_close (rel , NoLock );
9137+ for (int i = 0 ; i < desc -> nparts ; i ++ )
9138+ {
9139+ if (GpPolicyIsEntry (GpPolicyFetch (desc -> oids [i ])))
9140+ return false;
9141+ }
9142+ return true;
9143+ }
9144+ else
9145+ return !GpPolicyIsEntry (GpPolicyFetch (rte -> relid ));
9146+ }
9147+ else if (rte -> rtekind == RTE_SUBQUERY )
9148+ {
9149+ if (!safe_to_offload_entry_to_qe_rte_walker (rte -> subquery -> rtable ))
9150+ return false;
9151+ }
9152+ }
9153+ return true;
9154+ }
9155+
9156+ /*
9157+ * Check if there are multiple Motion in which the root slice contains computation (Sort, Join or Aggregate).
9158+ */
9159+ static bool
9160+ should_offload_entry_to_qe_plan_walker (Plan * plan , offload_entry_to_qe_plan_walk_context * ctx )
9161+ {
9162+ while (plan && plan -> locustype == CdbLocusType_Entry )
9163+ {
9164+ switch (nodeTag (plan ))
9165+ {
9166+ case T_Motion :
9167+ return ctx -> computeOnSlice ;
9168+ case T_SubqueryScan :
9169+ plan = ((SubqueryScan * ) plan )-> subplan ;
9170+ break ;
9171+ /* join */
9172+ case T_Join :
9173+ case T_MergeJoin :
9174+ case T_HashJoin :
9175+ case T_NestLoop :
9176+ ctx -> computeOnSlice = true;
9177+ if (should_offload_entry_to_qe_plan_walker (plan -> righttree , ctx ))
9178+ return true;
9179+ plan = plan -> lefttree ;
9180+ break ;
9181+ /* sort */
9182+ case T_Sort :
9183+ /* aggregates*/
9184+ case T_Agg :
9185+ case T_WindowAgg :
9186+ ctx -> computeOnSlice = true;
9187+ /* FALLTHROUGH */
9188+ default :
9189+ plan = plan -> lefttree ;
9190+ break ;
9191+ }
9192+ }
9193+ return false;
9194+ }
9195+
9196+ Plan *
9197+ offload_entry_to_qe (PlannerInfo * root , Plan * plan , int sendslice_parallel )
9198+ {
9199+ offload_entry_to_qe_plan_walk_context plan_walk_ctx ;
9200+ plan_walk_ctx .computeOnSlice = false;
9201+
9202+ if (root -> parse -> commandType == CMD_SELECT &&
9203+ should_offload_entry_to_qe_plan_walker (plan , & plan_walk_ctx ) &&
9204+ safe_to_offload_entry_to_qe_rte_walker (root -> parse -> rtable ) &&
9205+ !contain_volatile_functions ((Node * ) root -> parse ))
9206+ {
9207+ CdbPathLocus entrylocus ;
9208+ PlanSlice * sendSlice ;
9209+ sendSlice = (PlanSlice * ) palloc0 (sizeof (PlanSlice ));
9210+ sendSlice -> gangType = GANGTYPE_SINGLETON_READER ;
9211+ sendSlice -> numsegments = 1 ;
9212+ sendSlice -> sliceIndex = -1 ;
9213+ sendSlice -> parallel_workers = sendslice_parallel ;
9214+ sendSlice -> segindex = gp_session_id % getgpsegmentCount ();
9215+
9216+ replace_entry_locus_with_singleqe (plan );
9217+
9218+ plan = (Plan * ) make_union_motion (plan );
9219+ ((Motion * ) plan )-> senderSliceInfo = sendSlice ;
9220+
9221+ plan -> locustype = CdbLocusType_Entry ;
9222+ CdbPathLocus_MakeEntry (& entrylocus );
9223+ if (plan -> flow )
9224+ pfree (plan -> flow );
9225+ plan -> flow = cdbpathtoplan_create_flow (root , entrylocus );
9226+ }
9227+ return plan ;
9228+ }
0 commit comments