Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void run() throws JobException {
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionNames,
.generatePartitionSnapshots(mtmv, relation.getBaseTablesOneLevel(), execPartitionNames,
partitionMappings);
exec(ctx, execPartitionNames, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
Expand Down Expand Up @@ -288,7 +288,7 @@ public void before() throws JobException {
* @throws DdlException
*/
private void refreshHmsTable() throws AnalysisException, DdlException {
for (BaseTableInfo tableInfo : relation.getBaseTables()) {
for (BaseTableInfo tableInfo : relation.getBaseTablesOneLevel()) {
TableIf tableIf = MTMVUtil.getTable(tableInfo);
if (tableIf instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) tableIf;
Expand Down Expand Up @@ -450,7 +450,8 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(),
boolean fresh = MTMVPartitionUtil.isMTMVSync(mtmv, relation.getBaseTablesOneLevel(),
mtmv.getExcludedTriggerTables(),
partitionMappings);
if (fresh) {
return Lists.newArrayList();
Expand All @@ -461,7 +462,8 @@ public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> part
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables(), partitionMappings);
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTablesOneLevel(),
partitionMappings);
}

public MTMVTaskContext getTaskContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static boolean isMTMVSync(MTMV mtmv) {
return false;
}
try {
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), mtmv.calculatePartitionMappings());
return isMTMVSync(mtmv, mtmvRelation.getBaseTablesOneLevel(), Sets.newHashSet(),
mtmv.calculatePartitionMappings());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
Expand Down Expand Up @@ -254,7 +255,7 @@ private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partition
Set<String> relatedPartitionNames)
throws AnalysisException {
List<String> res = Lists.newArrayList();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTablesOneLevel()) {
TableIf table = MTMVUtil.getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
partitionMappings = mtmv.calculatePartitionMappings();
}
if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTables(),
partitionMappings.get(partition.getName()), mtmvRelation.getBaseTablesOneLevel(),
Sets.newHashSet())) {
res.add(partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF
* @return
*/
public static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTablesOneLevel();
for (BaseTableInfo baseTableInfo : baseTables) {
if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

relation.getBaseTables();
relation.getBaseTablesOneLevel();
minTimes = 0;
result = baseTables;

Expand Down
6 changes: 6 additions & 0 deletions regression-test/data/mtmv_p0/test_multi_level_mtmv.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
-- !mv2 --
1 1

-- !mv1_should_one_partition --
["p_2"]

-- !mv2_should_one_partition --
["p_2"]

-- !status1 --
multi_level_mtmv1 SCHEMA_CHANGE SUCCESS

Expand Down
31 changes: 25 additions & 6 deletions regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ suite("test_multi_level_mtmv") {
k1 int,
k2 int
)
PARTITION BY LIST(`k1`)
(
PARTITION `p1` VALUES IN ('1'),
PARTITION `p2` VALUES IN ('2')
)
DISTRIBUTED BY HASH(k1) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
Expand All @@ -40,34 +45,48 @@ suite("test_multi_level_mtmv") {

sql """
CREATE MATERIALIZED VIEW ${mv1}
BUILD DEFERRED REFRESH COMPLETE ON MANUAL
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(k1)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT * FROM ${tableName};
"""
def jobName1 = getJobName("regression_test_mtmv_p0", mv1);
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinished(jobName1)
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1 "select * from ${mv1}"

sql """
CREATE MATERIALIZED VIEW ${mv2}
BUILD DEFERRED REFRESH COMPLETE ON MANUAL
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(k1)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT * FROM ${mv1};
"""
def jobName2 = getJobName("regression_test_mtmv_p0", mv2);
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinished(jobName2)
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2 "select * from ${mv2}"

sql """
INSERT INTO ${tableName} VALUES(2,2);
"""
sql """
REFRESH MATERIALIZED VIEW ${mv1} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv1)
order_qt_mv1_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1"
sql """
REFRESH MATERIALIZED VIEW ${mv2} AUTO
"""
waitingMTMVTaskFinishedByMvName(mv2)
order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1"

// drop table
sql """
drop table ${tableName}
Expand Down