From 865fb47b7be19942868c589ed6b7a6a62edf77da Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Thu, 10 Aug 2023 10:31:49 +0800 Subject: [PATCH] Parallel REFRESH MATERIALIZED VIEW and CTAS for AO/AOCS Make the SELECT part of REFRESH parallel for AO/AOCS storage MATERIALIZED VIEW. Make the SELECT part of CREATE TABLE AS parallel for AO/AOCS storage table. Parallel processes couldn't have writeable operations, assertions like below are added by PG: 'cannot update tuples during a parallel operation'. It's not a problem for PG as workers are launched by Gather node, and the SELECT part of Refresh MV/CTAS could be parallel. However, AO/AOCS will require batches of Row Numbers generated from gp_fastquence which will in-place update catalog. And CBDB will EnterParallelMode() anyway when ExecutePlan in QE if there is parallel across the whole plan. Use EnterParallelMode() only for the slices who have multiple parallel workers, in theory, slices execute the SELECT part of a parallel plan. Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/commands/createas.c | 12 +- src/backend/commands/matview.c | 14 +-- src/backend/executor/execMain.c | 10 +- src/backend/utils/cache/plancache.c | 17 +-- src/include/nodes/parsenodes.h | 2 - src/include/nodes/primnodes.h | 1 - src/test/regress/expected/gp_parallel.out | 144 ++++++++++++++++++++++ src/test/regress/expected/zlib.out | 2 + src/test/regress/sql/gp_parallel.sql | 75 +++++++++++ src/test/regress/sql/zlib.sql | 2 + 10 files changed, 238 insertions(+), 41 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 1ee8a0fcfc6..7757bd4951f 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -363,10 +363,6 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, save_nestlevel = NewGUCNestLevel(); } - /* into AO/AOCS ?*/ - char* am = (into && into->accessMethod) ? into->accessMethod : default_table_access_method; - bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0)); - { /* * Parse analysis was done already, but we still have to run the rule @@ -385,12 +381,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, Assert(query->commandType == CMD_SELECT); /* plan the query */ - if (!intoAO) - plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_OK, params); - else - plan = pg_plan_query(query, pstate->p_sourcetext, - CURSOR_OPT_PARALLEL_NOT_OK, params); + plan = pg_plan_query(query, pstate->p_sourcetext, + CURSOR_OPT_PARALLEL_OK, params); /*GPDB: Save the target information in PlannedStmt */ /* diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index cb924e586aa..f7c96efc103 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -69,7 +69,7 @@ typedef struct static int matview_maintenance_depth = 0; -static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO); +static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation); static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); @@ -125,7 +125,7 @@ SetMatViewPopulatedState(Relation relation, bool newstate) } static RefreshClause* -MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO) +MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation) { RefreshClause *refreshClause; refreshClause = makeNode(RefreshClause); @@ -133,7 +133,6 @@ MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoA refreshClause->concurrent = concurrent; refreshClause->skipData = skipData; refreshClause->relation = relation; - refreshClause->intoAO = intoAO; return refreshClause; } @@ -340,8 +339,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, concurrent, relpersistence, stmt->skipData); - bool intoAO = RelationIsAppendOptimized(matviewRel); - refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation, intoAO); + refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation); /* * Only in dispather role, we should set intoPolicy, else it should remain NULL. @@ -473,11 +471,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, /* Plan the query which will generate data for the refresh. */ - /* CBDB_PARALLEL_FIXME: hack here, use cursor_option to disable parallel */ - if (!refreshClause->intoAO) - plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); - else - plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_NOT_OK, NULL); + plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); plan->refreshClause = refreshClause; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b21ef9c9f0f..233411c966d 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -769,6 +769,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, */ ExecSlice *currentSlice; GpExecIdentity exec_identity; + bool amIParallel = false; /* sanity checks */ Assert(queryDesc != NULL); @@ -795,7 +796,10 @@ standard_ExecutorRun(QueryDesc *queryDesc, { if (Gp_role == GP_ROLE_EXECUTE || sliceRunsOnQD(currentSlice)) - currentSliceId = currentSlice->sliceIndex; + { + currentSliceId = currentSlice->sliceIndex; + amIParallel = currentSlice->useMppParallelMode; + } } /* @@ -875,7 +879,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, ExecutePlan(estate, (PlanState *) motionState, - queryDesc->plannedstmt->parallelModeNeeded, + amIParallel, CMD_SELECT, sendTuples, 0, @@ -920,7 +924,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, */ ExecutePlan(estate, queryDesc->planstate, - queryDesc->plannedstmt->parallelModeNeeded, + amIParallel, operation, isParallelRetrieveCursor ? true : sendTuples, count, diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 69201ce6db1..e3e3d4f7360 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,7 +58,6 @@ #include #include "access/transam.h" -#include "access/tableam.h" #include "catalog/namespace.h" #include "executor/executor.h" #include "miscadmin.h" @@ -964,23 +963,11 @@ BuildCachedPlan(CachedPlanSource *plansource, List *qlist, snapshot_set = true; } - /* - * CBDB_PARALLEL_FIXME: - * GPDB hack here for IntoClause, see GetCachedPlan(). - * Disable parallel if into a AO/AOCS table. - */ - char* am = (intoClause && intoClause->accessMethod) ? intoClause->accessMethod : default_table_access_method; - bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0)); - /* * Generate the plan. */ - if (!intoAO) - plist = pg_plan_queries(qlist, plansource->query_string, - plansource->cursor_options, boundParams); - else - plist = pg_plan_queries(qlist, plansource->query_string, - plansource->cursor_options & ~CURSOR_OPT_PARALLEL_OK, boundParams); + plist = pg_plan_queries(qlist, plansource->query_string, + plansource->cursor_options, boundParams); /* Release snapshot if we got one */ if (snapshot_set) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4af6cbc1f9f..294e213ee1c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3318,8 +3318,6 @@ typedef struct SecLabelStmt * of the query are always postponed until execution. * ---------------------- */ -#define CURSOR_OPT_PARALLEL_NOT_OK 0x0000 /* parallel mode is not OK */ - #define CURSOR_OPT_BINARY 0x0001 /* BINARY */ #define CURSOR_OPT_SCROLL 0x0002 /* SCROLL explicitly given */ #define CURSOR_OPT_NO_SCROLL 0x0004 /* NO SCROLL explicitly given */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 39927266c2d..e555a746960 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -142,7 +142,6 @@ typedef struct RefreshClause bool concurrent; /* allow concurrent access? */ bool skipData; RangeVar *relation; /* relation to insert into */ - bool intoAO; /* is relation to insert into AO/AOCS */ } RefreshClause; diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 0ead2658f4c..597dbf31e08 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -2012,6 +2012,150 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; Optimizer: Postgres query optimizer (13 rows) +abort; +-- +-- Parallel Refresh AO Materialized View +-- +create or replace function refresh_compare(ao_row bool, verbose bool, OUT parallel_is_better bool) as $$ +declare + t timestamptz; + dur0 interval; + dur1 interval; + results0 RECORD; + results1 RECORD; +begin + create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); + insert into t_p select i, i+1 from generate_series(1, 10000000)i; + analyze t_p; + if ao_row then + create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + else + create materialized view matv using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + end if; + -- refresh + set enable_parallel=off; + t = clock_timestamp(); + refresh materialized view matv; + dur0 = age(clock_timestamp(), t); + select * into results0 from matv; + if refresh_compare.verbose then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Non-parallel results=%', results0; + end if; + -- parallel refresh + set enable_parallel=on; + t = clock_timestamp(); + refresh materialized view matv; + dur1 = age(clock_timestamp(), t); + select * into results1 from matv; + if refresh_compare.verbose then + raise notice 'Parallel refresh duration=%', dur1; + raise notice 'Parallel results=%', results1; + end if; + -- compare + if results0 <> results1 then + raise notice 'results of non-parallel % are not equal to parallel %', results0, results1; + end if; + parallel_is_better = dur0 * 0.8 > dur1; -- Make sure we have significant improvements, given the fluctuations. + if NOT parallel_is_better then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Parallel refresh duration=%', dur1; + end if; + drop materialized view matv; + drop table t_p; + reset enable_parallel; +end +$$ language plpgsql; +begin; +set local max_parallel_workers_per_gather = 8; +select * from refresh_compare(true, false); + parallel_is_better +-------------------- + t +(1 row) + +select * from refresh_compare(false, false); + parallel_is_better +-------------------- + t +(1 row) + +drop function refresh_compare; +reset max_parallel_workers_per_gather; +end; +-- +-- Parallel Create AO/AOCO Table AS +-- +begin; +create table t_p2(c1 int, c2 int) with(parallel_workers=2) distributed by(c1); +insert into t_p2 select i, i+1 from generate_series(1, 10000000)i; +analyze t_p2; +set local enable_parallel = off; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +-------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 3:1 (slice2; segments: 3) + -> Partial Aggregate + -> Hash Join + Hash Cond: (a.c1 = b.c1) + -> Seq Scan on t_p2 a + -> Hash + -> Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +-------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 3:1 (slice2; segments: 3) + -> Partial Aggregate + -> Hash Join + Hash Cond: (a.c1 = b.c1) + -> Seq Scan on t_p2 a + -> Hash + -> Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +set local enable_parallel = on; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +----------------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 6:1 (slice2; segments: 6) + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (a.c1 = b.c1) + -> Parallel Seq Scan on t_p2 a + -> Parallel Hash + -> Parallel Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); + QUERY PLAN +----------------------------------------------------------------- + Redistribute Motion 1:3 (slice1; segments: 1) + Hash Key: (sum(a.c2)) + -> Finalize Aggregate + -> Gather Motion 6:1 (slice2; segments: 6) + -> Partial Aggregate + -> Parallel Hash Join + Hash Cond: (a.c1 = b.c1) + -> Parallel Seq Scan on t_p2 a + -> Parallel Hash + -> Parallel Seq Scan on t_p2 b + Optimizer: Postgres query optimizer +(11 rows) + abort; -- start_ignore drop schema test_parallel cascade; diff --git a/src/test/regress/expected/zlib.out b/src/test/regress/expected/zlib.out index 38bc285d6be..accccf8ad64 100644 --- a/src/test/regress/expected/zlib.out +++ b/src/test/regress/expected/zlib.out @@ -91,10 +91,12 @@ create or replace function FuncA() returns void as $body$ begin + set enable_parallel=off; insert into test_zlib values(2387283, 'a'); insert into test_zlib_t1 values(1, 2); CREATE TEMP table TMP_Q_QR_INSTM_ANL_01 WITH(APPENDONLY=true,COMPRESSLEVEL=5,ORIENTATION=row,COMPRESSTYPE=zlib) on commit drop as SELECT t1.i from test_zlib as t1 join test_zlib as t2 on t1.i = t2.i; + reset enable_parallel; EXCEPTION WHEN others THEN -- do nothing end diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index 16d2ef34afa..5808cefd0e2 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -596,6 +596,81 @@ set local enable_parallel_hash=off; set local max_parallel_workers_per_gather= 4; explain(costs off) select * from t1 right join t2 on t1.b = t2.a; abort; +-- +-- Parallel Refresh AO Materialized View +-- +create or replace function refresh_compare(ao_row bool, verbose bool, OUT parallel_is_better bool) as $$ +declare + t timestamptz; + dur0 interval; + dur1 interval; + results0 RECORD; + results1 RECORD; +begin + create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1); + insert into t_p select i, i+1 from generate_series(1, 10000000)i; + analyze t_p; + if ao_row then + create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + else + create materialized view matv using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2); + end if; + -- refresh + set enable_parallel=off; + t = clock_timestamp(); + refresh materialized view matv; + dur0 = age(clock_timestamp(), t); + select * into results0 from matv; + if refresh_compare.verbose then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Non-parallel results=%', results0; + end if; + -- parallel refresh + set enable_parallel=on; + t = clock_timestamp(); + refresh materialized view matv; + dur1 = age(clock_timestamp(), t); + select * into results1 from matv; + if refresh_compare.verbose then + raise notice 'Parallel refresh duration=%', dur1; + raise notice 'Parallel results=%', results1; + end if; + -- compare + if results0 <> results1 then + raise notice 'results of non-parallel % are not equal to parallel %', results0, results1; + end if; + parallel_is_better = dur0 * 0.8 > dur1; -- Make sure we have significant improvements, given the fluctuations. + if NOT parallel_is_better then + raise notice 'Non-parallel refresh duration=%', dur0; + raise notice 'Parallel refresh duration=%', dur1; + end if; + drop materialized view matv; + drop table t_p; + reset enable_parallel; +end +$$ language plpgsql; +begin; +set local max_parallel_workers_per_gather = 8; +select * from refresh_compare(true, false); +select * from refresh_compare(false, false); +drop function refresh_compare; +reset max_parallel_workers_per_gather; +end; + +-- +-- Parallel Create AO/AOCO Table AS +-- +begin; +create table t_p2(c1 int, c2 int) with(parallel_workers=2) distributed by(c1); +insert into t_p2 select i, i+1 from generate_series(1, 10000000)i; +analyze t_p2; +set local enable_parallel = off; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +set local enable_parallel = on; +explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2); +abort; -- start_ignore drop schema test_parallel cascade; diff --git a/src/test/regress/sql/zlib.sql b/src/test/regress/sql/zlib.sql index 3631a2e2aaa..22dbf1d7609 100644 --- a/src/test/regress/sql/zlib.sql +++ b/src/test/regress/sql/zlib.sql @@ -59,10 +59,12 @@ create or replace function FuncA() returns void as $body$ begin + set enable_parallel=off; insert into test_zlib values(2387283, 'a'); insert into test_zlib_t1 values(1, 2); CREATE TEMP table TMP_Q_QR_INSTM_ANL_01 WITH(APPENDONLY=true,COMPRESSLEVEL=5,ORIENTATION=row,COMPRESSTYPE=zlib) on commit drop as SELECT t1.i from test_zlib as t1 join test_zlib as t2 on t1.i = t2.i; + reset enable_parallel; EXCEPTION WHEN others THEN -- do nothing end