Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private Pair<BitSet, Long> buildForMv(Plan plan) {
LogicalFilter<?> filter = (LogicalFilter<?>) plan;
Pair<BitSet, Long> child = this.buildForMv(filter.child());
this.addFilter(filter, child);
return Pair.of(new BitSet(), child.second);
return Pair.of(child.first, child.second);
}

// process Other Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,14 @@ protected SplitPredicate predicatesCompensate(
SlotMapping queryToViewMapping = viewToQuerySlotMapping.inverse();
// try to use
boolean valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, queryStructInfo,
viewStructInfo, cascadesContext);
if (!valid) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, cascadesContext);
}
if (!valid) {
return SplitPredicate.INVALID_INSTANCE;
Expand Down Expand Up @@ -801,6 +803,8 @@ protected SplitPredicate predicatesCompensate(
private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
Set<Expression> queryPredicates,
SlotMapping queryToViewMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
CascadesContext cascadesContext) {
Set<Expression> queryPulledUpPredicates = queryPredicates.stream()
.flatMap(expr -> ExpressionUtils.extractConjunction(expr).stream())
Expand All @@ -813,16 +817,37 @@ private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
return expr;
})
.collect(Collectors.toSet());
Set<Expression> nullRejectPredicates = ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = nullRejectPredicates.stream()
.map(expression -> TypeUtils.isNotNull(expression).orElse(null))
.filter(Objects::nonNull)
.map(expr -> ExpressionUtils.replace((Expression) expr, queryToViewMapping.toSlotReferenceMap()))
Set<Expression> queryNullRejectPredicates =
ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
if (queryPulledUpPredicates.containsAll(queryNullRejectPredicates)) {
// Query has no null reject predicates, return
return false;
}
// Get query null reject predicate slots
Set<Expression> queryNullRejectSlotSet = new HashSet<>();
for (Expression queryNullRejectPredicate : queryNullRejectPredicates) {
Optional<Slot> notNullSlot = TypeUtils.isNotNull(queryNullRejectPredicate);
if (!notNullSlot.isPresent()) {
continue;
}
queryNullRejectSlotSet.add(notNullSlot.get());
}
// query slot need shuttle to use table slot, avoid alias influence
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = ExpressionUtils.shuttleExpressionWithLineage(
new ArrayList<>(queryNullRejectSlotSet), queryStructInfo.getTopPlan(), new BitSet()).stream()
.map(expr -> ExpressionUtils.replace(expr, queryToViewMapping.toSlotReferenceMap()))
.collect(Collectors.toSet());
// view slot need shuttle to use table slot, avoid alias influence
Set<Set<Slot>> shuttledRequireNoNullableViewSlot = new HashSet<>();
for (Set<Slot> requireNullableSlots : requireNoNullableViewSlot) {
shuttledRequireNoNullableViewSlot.add(
ExpressionUtils.shuttleExpressionWithLineage(new ArrayList<>(requireNullableSlots),
viewStructInfo.getTopPlan(), new BitSet()).stream().map(Slot.class::cast)
.collect(Collectors.toSet()));
}
// query pulledUp predicates should have null reject predicates and contains any require noNullable slot
return !queryPulledUpPredicates.containsAll(nullRejectPredicates)
&& requireNoNullableViewSlot.stream().noneMatch(set ->
Sets.intersection(set, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
return shuttledRequireNoNullableViewSlot.stream().noneMatch(viewRequiredNullSlotSet ->
Sets.intersection(viewRequiredNullSlotSet, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,51 @@
2023-12-11 4 2
2023-12-12 5 4

-- !query10_0_before --
2023-12-09 1 yy 2 4
2023-12-11 2 mm 4 3

-- !query10_0_after --
2023-12-09 1 yy 2 4
2023-12-11 2 mm 4 3

-- !query11_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query11_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,91 @@ suite("outer_join") {
(2, 3, 10, 11.01, 'supply2');
"""

sql """drop table if exists orders_same_col;"""
sql """
CREATE TABLE IF NOT EXISTS orders_same_col (
o_orderkey INTEGER NOT NULL,
o_custkey INTEGER NOT NULL,
o_orderstatus CHAR(1) NOT NULL,
o_totalprice DECIMALV3(15,2) NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority CHAR(15) NOT NULL,
o_clerk CHAR(15) NOT NULL,
o_shippriority INTEGER NOT NULL,
O_COMMENT VARCHAR(79) NOT NULL,
o_code VARCHAR(6) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);"""

sql """
insert into orders_same_col values
(1, 1, 'o', 9.5, '2023-12-08', 'a', 'b', 1, 'yy', '91'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '92'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '93'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '94'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '95'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '96'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '97'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '98'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '99'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '100'),
(3, 1, 'o', 33.5, '2023-12-10', 'a', 'b', 1, 'yy', '101'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '102'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '103'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '104'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '105'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '106'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '107'),
(5, 2, 'o', 1.2, '2023-12-12', 'c','d',2, 'mi', '108');
"""

sql """drop table if exists lineitem_same_col; """
sql """
CREATE TABLE IF NOT EXISTS lineitem_same_col (
l_orderkey INTEGER NOT NULL,
l_partkey INTEGER NOT NULL,
l_suppkey INTEGER NOT NULL,
l_linenumber INTEGER NOT NULL,
l_quantity DECIMALV3(15,2) NOT NULL,
l_extendedprice DECIMALV3(15,2) NOT NULL,
l_discount DECIMALV3(15,2) NOT NULL,
l_tax DECIMALV3(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
o_code VARCHAR(6) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""

sql """
insert into lineitem_same_col values
(1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '91'),
(2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '92'),
(3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '93'),
(4, 3, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-11', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '94'),
(5, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-12-12', '2023-12-12', '2023-12-13', 'c', 'd', 'xxxxxxxxx','95');

"""

sql """analyze table lineitem with sync;"""
sql """analyze table orders with sync;"""
sql """analyze table partsupp with sync;"""
sql """analyze table orders_same_col with sync;"""
sql """analyze table lineitem_same_col with sync;"""

// without filter
def mv1_0 = "select lineitem.L_LINENUMBER, orders.O_CUSTKEY " +
Expand Down Expand Up @@ -606,4 +688,75 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv9_0, query9_0, "mv9_0")
order_qt_query9_0_after "${query9_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv9_0"""


// Test filter which can not push down through join and there is more than two join
def mv10_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
l_orderkey,
l_partkey
from
orders left
join lineitem on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query10_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
l_orderkey,
l_partkey
from
orders left
join lineitem on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l_partkey is null or l_partkey <> 2;
"""

order_qt_query10_0_before "${query10_0}"
async_mv_rewrite_success(db, mv10_0, query10_0, "mv10_0")
order_qt_query10_0_after "${query10_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv10_0"""


// Test where filter contains the same col in both lineitem_same_col and orders_same_col
def mv11_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query11_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code <> '91';
"""

order_qt_query11_0_before "${query11_0}"
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""
}