Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/backend/commands/createas.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,6 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
save_nestlevel = NewGUCNestLevel();
}

/* into AO/AOCS ?*/
char* am = (into && into->accessMethod) ? into->accessMethod : default_table_access_method;
bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0));

{
/*
* Parse analysis was done already, but we still have to run the rule
Expand All @@ -385,12 +381,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
Assert(query->commandType == CMD_SELECT);

/* plan the query */
if (!intoAO)
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);
else
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_NOT_OK, params);
plan = pg_plan_query(query, pstate->p_sourcetext,
CURSOR_OPT_PARALLEL_OK, params);

/*GPDB: Save the target information in PlannedStmt */
/*
Expand Down
14 changes: 4 additions & 10 deletions src/backend/commands/matview.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct

static int matview_maintenance_depth = 0;

static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO);
static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation);
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
Expand Down Expand Up @@ -125,15 +125,14 @@ SetMatViewPopulatedState(Relation relation, bool newstate)
}

static RefreshClause*
MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO)
MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation)
{
RefreshClause *refreshClause;
refreshClause = makeNode(RefreshClause);

refreshClause->concurrent = concurrent;
refreshClause->skipData = skipData;
refreshClause->relation = relation;
refreshClause->intoAO = intoAO;

return refreshClause;
}
Expand Down Expand Up @@ -340,8 +339,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
dest = CreateTransientRelDestReceiver(OIDNewHeap, matviewOid, concurrent, relpersistence,
stmt->skipData);

bool intoAO = RelationIsAppendOptimized(matviewRel);
refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation, intoAO);
refreshClause = MakeRefreshClause(concurrent, stmt->skipData, stmt->relation);

/*
* Only in dispather role, we should set intoPolicy, else it should remain NULL.
Expand Down Expand Up @@ -473,11 +471,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,

/* Plan the query which will generate data for the refresh. */

/* CBDB_PARALLEL_FIXME: hack here, use cursor_option to disable parallel */
if (!refreshClause->intoAO)
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);
else
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_NOT_OK, NULL);
plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL);

plan->refreshClause = refreshClause;

Expand Down
10 changes: 7 additions & 3 deletions src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
*/
ExecSlice *currentSlice;
GpExecIdentity exec_identity;
bool amIParallel = false;

/* sanity checks */
Assert(queryDesc != NULL);
Expand All @@ -795,7 +796,10 @@ standard_ExecutorRun(QueryDesc *queryDesc,
{
if (Gp_role == GP_ROLE_EXECUTE ||
sliceRunsOnQD(currentSlice))
currentSliceId = currentSlice->sliceIndex;
{
currentSliceId = currentSlice->sliceIndex;
amIParallel = currentSlice->useMppParallelMode;
Comment thread
avamingli marked this conversation as resolved.
}
}

/*
Expand Down Expand Up @@ -875,7 +879,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,

ExecutePlan(estate,
(PlanState *) motionState,
queryDesc->plannedstmt->parallelModeNeeded,
amIParallel,
CMD_SELECT,
sendTuples,
0,
Expand Down Expand Up @@ -920,7 +924,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
*/
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
amIParallel,
operation,
isParallelRetrieveCursor ? true : sendTuples,
count,
Expand Down
17 changes: 2 additions & 15 deletions src/backend/utils/cache/plancache.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
#include <limits.h>

#include "access/transam.h"
#include "access/tableam.h"
#include "catalog/namespace.h"
#include "executor/executor.h"
#include "miscadmin.h"
Expand Down Expand Up @@ -964,23 +963,11 @@ BuildCachedPlan(CachedPlanSource *plansource, List *qlist,
snapshot_set = true;
}

/*
* CBDB_PARALLEL_FIXME:
* GPDB hack here for IntoClause, see GetCachedPlan().
* Disable parallel if into a AO/AOCS table.
*/
char* am = (intoClause && intoClause->accessMethod) ? intoClause->accessMethod : default_table_access_method;
bool intoAO = ((strcmp(am, "ao_row") == 0) || (strcmp(am, "ao_column") == 0));

/*
* Generate the plan.
*/
if (!intoAO)
plist = pg_plan_queries(qlist, plansource->query_string,
plansource->cursor_options, boundParams);
else
plist = pg_plan_queries(qlist, plansource->query_string,
plansource->cursor_options & ~CURSOR_OPT_PARALLEL_OK, boundParams);
plist = pg_plan_queries(qlist, plansource->query_string,
plansource->cursor_options, boundParams);

/* Release snapshot if we got one */
if (snapshot_set)
Expand Down
2 changes: 0 additions & 2 deletions src/include/nodes/parsenodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -3318,8 +3318,6 @@ typedef struct SecLabelStmt
* of the query are always postponed until execution.
* ----------------------
*/
#define CURSOR_OPT_PARALLEL_NOT_OK 0x0000 /* parallel mode is not OK */

#define CURSOR_OPT_BINARY 0x0001 /* BINARY */
#define CURSOR_OPT_SCROLL 0x0002 /* SCROLL explicitly given */
#define CURSOR_OPT_NO_SCROLL 0x0004 /* NO SCROLL explicitly given */
Expand Down
1 change: 0 additions & 1 deletion src/include/nodes/primnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ typedef struct RefreshClause
bool concurrent; /* allow concurrent access? */
bool skipData;
RangeVar *relation; /* relation to insert into */
bool intoAO; /* is relation to insert into AO/AOCS */
} RefreshClause;


Expand Down
144 changes: 144 additions & 0 deletions src/test/regress/expected/gp_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,150 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a;
Optimizer: Postgres query optimizer
(13 rows)

abort;
--
-- Parallel Refresh AO Materialized View
--
create or replace function refresh_compare(ao_row bool, verbose bool, OUT parallel_is_better bool) as $$
declare
t timestamptz;
dur0 interval;
dur1 interval;
results0 RECORD;
results1 RECORD;
begin
create table t_p(c1 int, c2 int) with(parallel_workers=8) distributed by(c1);
insert into t_p select i, i+1 from generate_series(1, 10000000)i;
analyze t_p;
if ao_row then
create materialized view matv using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
else
create materialized view matv using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p a join t_p b on a.c1 = b.c1 with no data distributed by(c2);
end if;
-- refresh
set enable_parallel=off;
t = clock_timestamp();
refresh materialized view matv;
dur0 = age(clock_timestamp(), t);
select * into results0 from matv;
if refresh_compare.verbose then
raise notice 'Non-parallel refresh duration=%', dur0;
raise notice 'Non-parallel results=%', results0;
end if;
-- parallel refresh
set enable_parallel=on;
t = clock_timestamp();
refresh materialized view matv;
dur1 = age(clock_timestamp(), t);
select * into results1 from matv;
if refresh_compare.verbose then
raise notice 'Parallel refresh duration=%', dur1;
raise notice 'Parallel results=%', results1;
end if;
-- compare
if results0 <> results1 then
raise notice 'results of non-parallel % are not equal to parallel %', results0, results1;
end if;
parallel_is_better = dur0 * 0.8 > dur1; -- Make sure we have significant improvements, given the fluctuations.
if NOT parallel_is_better then
raise notice 'Non-parallel refresh duration=%', dur0;
raise notice 'Parallel refresh duration=%', dur1;
end if;
drop materialized view matv;
drop table t_p;
reset enable_parallel;
end
$$ language plpgsql;
begin;
set local max_parallel_workers_per_gather = 8;
select * from refresh_compare(true, false);
parallel_is_better
--------------------
t
(1 row)

select * from refresh_compare(false, false);
parallel_is_better
--------------------
t
(1 row)

drop function refresh_compare;
reset max_parallel_workers_per_gather;
end;
--
-- Parallel Create AO/AOCO Table AS
--
begin;
create table t_p2(c1 int, c2 int) with(parallel_workers=2) distributed by(c1);
insert into t_p2 select i, i+1 from generate_series(1, 10000000)i;
analyze t_p2;
set local enable_parallel = off;
explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2);
QUERY PLAN
--------------------------------------------------------
Redistribute Motion 1:3 (slice1; segments: 1)
Hash Key: (sum(a.c2))
-> Finalize Aggregate
-> Gather Motion 3:1 (slice2; segments: 3)
-> Partial Aggregate
-> Hash Join
Hash Cond: (a.c1 = b.c1)
-> Seq Scan on t_p2 a
-> Hash
-> Seq Scan on t_p2 b
Optimizer: Postgres query optimizer
(11 rows)

explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2);
QUERY PLAN
--------------------------------------------------------
Redistribute Motion 1:3 (slice1; segments: 1)
Hash Key: (sum(a.c2))
-> Finalize Aggregate
-> Gather Motion 3:1 (slice2; segments: 3)
-> Partial Aggregate
-> Hash Join
Hash Cond: (a.c1 = b.c1)
-> Seq Scan on t_p2 a
-> Hash
-> Seq Scan on t_p2 b
Optimizer: Postgres query optimizer
(11 rows)

set local enable_parallel = on;
explain(costs off) create table ctas_ao using ao_row as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2);
QUERY PLAN
-----------------------------------------------------------------
Redistribute Motion 1:3 (slice1; segments: 1)
Hash Key: (sum(a.c2))
-> Finalize Aggregate
-> Gather Motion 6:1 (slice2; segments: 6)
-> Partial Aggregate
-> Parallel Hash Join
Hash Cond: (a.c1 = b.c1)
-> Parallel Seq Scan on t_p2 a
-> Parallel Hash
-> Parallel Seq Scan on t_p2 b
Optimizer: Postgres query optimizer
(11 rows)

explain(costs off) create table ctas_aoco using ao_column as select sum(a.c2) as c2, avg(b.c1) as c1 from t_p2 a join t_p2 b on a.c1 = b.c1 distributed by(c2);
QUERY PLAN
-----------------------------------------------------------------
Redistribute Motion 1:3 (slice1; segments: 1)
Hash Key: (sum(a.c2))
-> Finalize Aggregate
-> Gather Motion 6:1 (slice2; segments: 6)
-> Partial Aggregate
-> Parallel Hash Join
Hash Cond: (a.c1 = b.c1)
-> Parallel Seq Scan on t_p2 a
Comment thread
my-ship-it marked this conversation as resolved.
-> Parallel Hash
-> Parallel Seq Scan on t_p2 b
Optimizer: Postgres query optimizer
(11 rows)

abort;
-- start_ignore
drop schema test_parallel cascade;
Expand Down
2 changes: 2 additions & 0 deletions src/test/regress/expected/zlib.out
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ create or replace function FuncA()
returns void as
$body$
begin
set enable_parallel=off;
insert into test_zlib values(2387283, 'a');
insert into test_zlib_t1 values(1, 2);
CREATE TEMP table TMP_Q_QR_INSTM_ANL_01 WITH(APPENDONLY=true,COMPRESSLEVEL=5,ORIENTATION=row,COMPRESSTYPE=zlib) on commit drop as
SELECT t1.i from test_zlib as t1 join test_zlib as t2 on t1.i = t2.i;
reset enable_parallel;
EXCEPTION WHEN others THEN
-- do nothing
end
Expand Down
Loading