diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 1ee8a0fcfc6..7757bd4951f 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -363,10 +363,6 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, save_nestlevel = NewGUCNestLevel(); } - /* into AO/AOCS ?*/ - char* am = (into && into->accessMethod) ? into->accessMethod : default_table_access_method; - bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0)); - { /* * Parse analysis was done already, but we still have to run the rule @@ -385,12 +381,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, Assert(query->commandType == CMD_SELECT); /* plan the query */ - if (!intoAO) - plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_OK, params); - else - plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_NOT_OK, params); + plan = pg_plan_query(query, pstate->p_sourcetext, + CURSOR_OPT_PARALLEL_OK, params); /*GPDB: Save the target information in PlannedStmt */ /* diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index cb924e586aa..f7c96efc103 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -69,7 +69,7 @@ typedef struct static int matview_maintenance_depth = 0; -static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO); +static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation); static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); @@ -125,7 +125,7 @@ SetMatViewPopulatedState(Relation relation, bool newstate) } static RefreshClause* -MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO) +MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation) { RefreshClause *refreshClause; refreshClause = makeNode(RefreshClause); @@ -133,7 +133,6 @@ MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoA refreshClause->concurrent = concurrent; refreshClause->skipData = skipData; refreshClause->relation = relation; - refreshClause->intoAO = intoAO; return refreshClause; } @@ -340,8 +339,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, concurrent, relpersistence, stmt->skipData); - bool intoAO = RelationIsAppendOptimized(matviewRel); - refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation, intoAO); + refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation); /* * Only in dispather role, we should set intoPolicy, else it should remain NULL. @@ -473,11 +471,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, /* Plan the query which will generate data for the refresh. */ - /* CBDB_PARALLEL_FIXME: hack here, use cursor_option to disable parallel */ - if (!refreshClause->intoAO) - plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); - else - plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_NOT_OK, NULL); + plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); plan->refreshClause = refreshClause; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b21ef9c9f0f..233411c966d 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -769,6 +769,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, */ ExecSlice *currentSlice; GpExecIdentity exec_identity; + bool amIParallel = false; /* sanity checks */ Assert(queryDesc != NULL); @@ -795,7 +796,10 @@ standard_ExecutorRun(QueryDesc *queryDesc, { if (Gp_role == GP_ROLE_EXECUTE || sliceRunsOnQD(currentSlice)) - currentSliceId = currentSlice->sliceIndex; + { + currentSliceId = currentSlice->sliceIndex; + amIParallel = currentSlice->useMppParallelMode; + } } /* @@ -875,7 +879,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, ExecutePlan(estate, (PlanState *) motionState, - queryDesc->plannedstmt->parallelModeNeeded, + amIParallel, CMD_SELECT, sendTuples, 0, @@ -920,7 +924,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, */ ExecutePlan(estate, queryDesc->planstate, - queryDesc->plannedstmt->parallelModeNeeded, + amIParallel, operation, isParallelRetrieveCursor ? true : sendTuples, count, diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 69201ce6db1..e3e3d4f7360 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,7 +58,6 @@ #include #include "access/transam.h" -#include "access/tableam.h" #include "catalog/namespace.h" #include "executor/executor.h" #include "miscadmin.h" @@ -964,23 +963,11 @@ BuildCachedPlan(CachedPlanSource *plansource, List *qlist, snapshot_set = true; } - /* - * CBDB_PARALLEL_FIXME: - * GPDB hack here for IntoClause, see GetCachedPlan(). - * Disable parallel if into a AO/AOCS table. - */ - char* am = (intoClause && intoClause->accessMethod) ? intoClause->accessMethod : default_table_access_method; - bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0)); - /* * Generate the plan. */ - if (!intoAO) - plist = pg_plan_queries(qlist, plansource->query_string, - plansource->cursor_options, boundParams); - else - plist = pg_plan_queries(qlist, plansource->query_string, - plansource->cursor_options & ~CURSOR_OPT_PARALLEL_OK, boundParams); + plist = pg_plan_queries(qlist, plansource->query_string, + plansource->cursor_options, boundParams); /* Release snapshot if we got one */ if (snapshot_set) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4af6cbc1f9f..294e213ee1c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3318,8 +3318,6 @@ typedef struct SecLabelStmt * of the query are always postponed until execution. * ---------------------- */ -#define CURSOR_OPT_PARALLEL_NOT_OK 0x0000 /* parallel mode is not OK */ - #define CURSOR_OPT_BINARY 0x0001 /* BINARY */ #define CURSOR_OPT_SCROLL 0x0002 /* SCROLL explicitly given */ #define CURSOR_OPT_NO_SCROLL 0x0004 /* NO SCROLL explicitly given */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 39927266c2d..e555a746960 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -142,7 +142,6 @@ typedef struct RefreshClause bool concurrent; /* allow concurrent access? */ bool skipData; RangeVar *relation; /* relation to insert into */ - bool intoAO; /* is relation to insert into AO/AOCS */ } RefreshClause; diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 0ead2658f4c..597dbf31e08 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -2012,6 +2012,150 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; Optimizer: Postgres query optimizer (13 rows) +abort; +-- +-- Parallel Refresh AO Materialized View +-- +create or replace function refresh_compare(ao_row bool, verbose bool, OUT parallel_is_better bool) as $$ +declare + t timestamptz; + dur0 interval; + dur1 interval; + results0 RECORD; + results1 RECORD; +begin + create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); + insert into t_p select i, i+1 from generate_series(1, 10000000)i; + analyze t_p; + if ao_row then + create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + else + create materialized view matv using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + end if; + -- refresh + set enable_parallel=off; + t = clock_timestamp(); + refresh materialized view matv; + dur0 = age(clock_timestamp(), t); + select * into results0 from matv; + if refresh_compare.verbose then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Non-parallel results=%', results0; + end if; + -- parallel refresh + set enable_parallel=on; + t = clock_timestamp(); + refresh materialized view matv; + dur1 = age(clock_timestamp(), t); + select * into results1 from matv; + if refresh_compare.verbose then + raise notice 'Parallel refresh duration=%', dur1; + raise notice 'Parallel results=%', results1; + end if; + -- compare + if results0 <> results1 then + raise notice 'results of non-parallel % are not equal to parallel %', results0, results1; + end if; + parallel_is_better = dur0 * 0.8 > dur1; -- Make sure we have significant improvements, given the fluctuations. + if NOT parallel_is_better then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Parallel refresh duration=%', dur1; + end if; + drop materialized view matv; + drop table t_p; + reset enable_parallel; +end +$$ language plpgsql; +begin; +set local max_parallel_workers_per_gather = 8; +select * from refresh_compare(true, false); + parallel_is_better +-------------------- + t +(1 row) + +select * from refresh_compare(false, false); + parallel_is_better +-------------------- + t +(1 row) + +drop function refresh_compare; +reset max_parallel_workers_per_gather; +end; +-- +-- Parallel Create AO/AOCO Table AS +-- +begin; +create table t_p2(c1 int, c2 int) with(parallel_workers=2) distributed by(c1); +insert into t_p2 select i, i+1 from generate_series(1, 10000000)i; +analyze t_p2; +set local enable_parallel = off; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +-------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 3:1 (slice2; segments: 3) + -> Partial Aggregate + -> Hash Join + Hash Cond: (a.c1 = b.c1) + -> Seq Scan on t_p2 a + -> Hash + -> Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +-------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 3:1 (slice2; segments: 3) + -> Partial Aggregate + -> Hash Join + Hash Cond: (a.c1 = b.c1) + -> Seq Scan on t_p2 a + -> Hash + -> Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +set local enable_parallel = on; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +----------------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 6:1 (slice2; segments: 6) + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (a.c1 = b.c1) + -> Parallel Seq Scan on t_p2 a + -> Parallel Hash + -> Parallel Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +----------------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 6:1 (slice2; segments: 6) + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (a.c1 = b.c1) + -> Parallel Seq Scan on t_p2 a + -> Parallel Hash + -> Parallel Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + abort; -- start_ignore drop schema test_parallel cascade; diff --git a/src/test/regress/expected/zlib.out b/src/test/regress/expected/zlib.out index 38bc285d6be..accccf8ad64 100644 --- a/src/test/regress/expected/zlib.out +++ b/src/test/regress/expected/zlib.out @@ -91,10 +91,12 @@ create or replace function FuncA() returns void as $body$ begin + set enable_parallel=off; insert into test_zlib values(2387283, 'a'); insert into test_zlib_t1 values(1, 2); CREATE TEMP table TMP_Q_QR_INSTM_ANL_01 WITH(APPENDONLY=true,COMPRESSLEVEL=5,ORIENTATION=row,COMPRESSTYPE=zlib) on commit drop as SELECT t1.i from test_zlib as t1 join test_zlib as t2 on t1.i = t2.i; + reset enable_parallel; EXCEPTION WHEN others THEN -- do nothing end diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index 16d2ef34afa..5808cefd0e2 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -596,6 +596,81 @@ set local enable_parallel_hash=off; set local max_parallel_workers_per_gather= 4; explain(costs off) select * from t1 right join t2 on t1.b = t2.a; abort; +-- +-- Parallel Refresh AO Materialized View +-- +create or replace function refresh_compare(ao_row bool, verbose bool, OUT parallel_is_better bool) as $$ +declare + t timestamptz; + dur0 interval; + dur1 interval; + results0 RECORD; + results1 RECORD; +begin + create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); + insert into t_p select i, i+1 from generate_series(1, 10000000)i; + analyze t_p; + if ao_row then + create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + else + create materialized view matv using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + end if; + -- refresh + set enable_parallel=off; + t = clock_timestamp(); + refresh materialized view matv; + dur0 = age(clock_timestamp(), t); + select * into results0 from matv; + if refresh_compare.verbose then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Non-parallel results=%', results0; + end if; + -- parallel refresh + set enable_parallel=on; + t = clock_timestamp(); + refresh materialized view matv; + dur1 = age(clock_timestamp(), t); + select * into results1 from matv; + if refresh_compare.verbose then + raise notice 'Parallel refresh duration=%', dur1; + raise notice 'Parallel results=%', results1; + end if; + -- compare + if results0 <> results1 then + raise notice 'results of non-parallel % are not equal to parallel %', results0, results1; + end if; + parallel_is_better = dur0 * 0.8 > dur1; -- Make sure we have significant improvements, given the fluctuations. + if NOT parallel_is_better then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Parallel refresh duration=%', dur1; + end if; + drop materialized view matv; + drop table t_p; + reset enable_parallel; +end +$$ language plpgsql; +begin; +set local max_parallel_workers_per_gather = 8; +select * from refresh_compare(true, false); +select * from refresh_compare(false, false); +drop function refresh_compare; +reset max_parallel_workers_per_gather; +end; + +-- +-- Parallel Create AO/AOCO Table AS +-- +begin; +create table t_p2(c1 int, c2 int) with(parallel_workers=2) distributed by(c1); +insert into t_p2 select i, i+1 from generate_series(1, 10000000)i; +analyze t_p2; +set local enable_parallel = off; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +set local enable_parallel = on; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +abort; -- start_ignore drop schema test_parallel cascade; diff --git a/src/test/regress/sql/zlib.sql b/src/test/regress/sql/zlib.sql index 3631a2e2aaa..22dbf1d7609 100644 --- a/src/test/regress/sql/zlib.sql +++ b/src/test/regress/sql/zlib.sql @@ -59,10 +59,12 @@ create or replace function FuncA() returns void as $body$ begin + set enable_parallel=off; insert into test_zlib values(2387283, 'a'); insert into test_zlib_t1 values(1, 2); CREATE TEMP table TMP_Q_QR_INSTM_ANL_01 WITH(APPENDONLY=true,COMPRESSLEVEL=5,ORIENTATION=row,COMPRESSTYPE=zlib) on commit drop as SELECT t1.i from test_zlib as t1 join test_zlib as t2 on t1.i = t2.i; + reset enable_parallel; EXCEPTION WHEN others THEN -- do nothing end