diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index b5e90634b8bbb1..690ab88991bd16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -60,9 +60,14 @@ public boolean isDefaultPartition() { @Override public PartitionKeyDesc toPartitionKeyDesc() { - return PartitionKeyDesc.createFixed( + if (partitionKeyRange.hasLowerBound()) { + return PartitionKeyDesc.createFixed( PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()), PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); + } else { + // For null partition value. + return PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index feded88ea326f0..e259399f63740b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -17,9 +17,24 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.analysis.PartitionValue; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -28,13 +43,53 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionsTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; -public class IcebergExternalTable extends ExternalTable { +public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { + + public static final String YEAR = "year"; + public static final String MONTH = "month"; + public static final String DAY = "day"; + public static final String HOUR = "hour"; + public static final String IDENTITY = "identity"; + public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec + + private Table table; + private List partitionColumns; + private boolean isValidRelatedTableCached = false; + private boolean isValidRelatedTable = false; public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE); @@ -51,9 +106,50 @@ protected synchronized void makeSureInitialized() { } } + @VisibleForTesting + public void setTable(Table table) { + this.table = table; + } + + @VisibleForTesting + public void setPartitionColumns(List partitionColumns) { + this.partitionColumns = partitionColumns; + } + @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + List schema = IcebergUtils.getSchema(catalog, dbName, name); + Snapshot snapshot = table.currentSnapshot(); + if (snapshot == null) { + LOG.debug("Table {} is empty", name); + return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); + } + long snapshotId = snapshot.snapshotId(); + partitionColumns = null; + IcebergPartitionInfo partitionInfo = null; + if (isValidRelatedTable()) { + PartitionSpec spec = table.spec(); + partitionColumns = Lists.newArrayList(); + + // For iceberg table, we only support table with 1 partition column as RelatedTable. + // So we use spec.fields().get(0) to get the partition column. + Types.NestedField col = table.schema().findField(spec.fields().get(0).sourceId()); + for (Column c : schema) { + if (c.getName().equalsIgnoreCase(col.name())) { + partitionColumns.add(c); + break; + } + } + Preconditions.checkState(partitionColumns.size() == 1, + "Support 1 partition column for iceberg table, but found " + partitionColumns.size()); + try { + partitionInfo = loadPartitionInfo(); + } catch (AnalysisException e) { + LOG.warn("Failed to load iceberg table {} partition info.", name, e); + } + } + return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns, snapshotId, partitionInfo)); } @Override @@ -90,4 +186,343 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map getAndCopyPartitionItems(Optional snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional snapshot) { + makeSureInitialized(); + return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames(Optional snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns(Optional snapshot) { + return getPartitionColumnsFromCache(); + } + + private List getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { + long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName); + if (latestSnapshotId <= 0) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(latestSnapshotId); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("Can't find schema cache of table " + name); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + /** + * For now, we only support single partition column Iceberg table as related table. + * The supported transforms now are YEAR, MONTH, DAY and HOUR. + * And the column couldn't change to another column during partition evolution. + */ + @Override + public boolean isValidRelatedTable() { + if (isValidRelatedTableCached) { + return isValidRelatedTable; + } + isValidRelatedTable = false; + Set allFields = Sets.newHashSet(); + for (PartitionSpec spec : table.specs().values()) { + if (spec == null) { + isValidRelatedTableCached = true; + return false; + } + List fields = spec.fields(); + if (fields.size() != 1) { + isValidRelatedTableCached = true; + return false; + } + PartitionField partitionField = spec.fields().get(0); + String transformName = partitionField.transform().toString(); + if (!YEAR.equals(transformName) + && !MONTH.equals(transformName) + && !DAY.equals(transformName) + && !HOUR.equals(transformName)) { + isValidRelatedTableCached = true; + return false; + } + allFields.add(table.schema().findColumnName(partitionField.sourceId())); + } + isValidRelatedTableCached = true; + isValidRelatedTable = allFields.size() == 1; + return isValidRelatedTable; + } + + protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { + List icebergPartitions = loadIcebergPartition(); + Map nameToPartition = Maps.newHashMap(); + Map nameToPartitionItem = Maps.newHashMap(); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionName(), partition); + String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); + Range partitionRange = getPartitionRange(partition.getPartitionValues().get(0), transform); + PartitionItem item = new RangePartitionItem(partitionRange); + nameToPartitionItem.put(partition.getPartitionName(), item); + } + Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); + } + + public List loadIcebergPartition() { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List partitions = Lists.newArrayList(); + try (CloseableIterable tasks = partitionsTable.newScan().planFiles()) { + for (FileScanTask task : tasks) { + CloseableIterable rows = task.asDataTask().rows(); + for (StructLike row : rows) { + partitions.add(generateIcebergPartition(row)); + } + } + } catch (IOException e) { + LOG.warn("Failed to get Iceberg table {} partition info.", name, e); + } + return partitions; + } + + public IcebergPartition generateIcebergPartition(StructLike row) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); + int specId = row.get(1, Integer.class); + PartitionSpec partitionSpec = table.specs().get(specId); + StructProjection partitionData = row.get(0, StructProjection.class); + StringBuilder sb = new StringBuilder(); + List partitionValues = Lists.newArrayList(); + List transforms = Lists.newArrayList(); + for (int i = 0; i < partitionSpec.fields().size(); ++i) { + PartitionField partitionField = partitionSpec.fields().get(i); + Class fieldClass = partitionSpec.javaClasses()[i]; + int fieldId = partitionField.fieldId(); + // Iceberg partition field id starts at PARTITION_DATA_ID_START, + // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START + int index = fieldId - PARTITION_DATA_ID_START; + Object o = partitionData.get(index, fieldClass); + String fieldValue = o == null ? null : o.toString(); + String fieldName = partitionField.name(); + sb.append(fieldName); + sb.append("="); + sb.append(fieldValue); + sb.append("/"); + partitionValues.add(fieldValue); + transforms.add(partitionField.transform().toString()); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + String partitionName = sb.toString(); + long recordCount = row.get(2, Long.class); + long fileCount = row.get(3, Integer.class); + long fileSizeInBytes = row.get(4, Long.class); + long lastUpdateTime = row.get(9, Long.class); + long lastUpdateSnapShotId = row.get(10, Long.class); + return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, + lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); + } + + @VisibleForTesting + public Range getPartitionRange(String value, String transform) + throws AnalysisException { + // For NULL value, create a lessThan partition for it. + if (value == null) { + PartitionKey nullKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-02")), partitionColumns); + return Range.lessThan(nullKey); + } + LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); + LocalDateTime target; + LocalDateTime lower; + LocalDateTime upper; + long longValue = Long.parseLong(value); + switch (transform) { + case HOUR: + target = epoch.plusHours(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), + target.getHour(), 0, 0); + upper = lower.plusHours(1); + break; + case DAY: + target = epoch.plusDays(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); + upper = lower.plusDays(1); + break; + case MONTH: + target = epoch.plusMonths(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); + upper = lower.plusMonths(1); + break; + case YEAR: + target = epoch.plusYears(longValue); + lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); + upper = lower.plusYears(1); + break; + default: + throw new RuntimeException("Unsupported transform " + transform); + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Column c = partitionColumns.get(0); + Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); + if (c.getType().isDate() || c.getType().isDateV2()) { + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + } + PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); + PartitionValue upperValue = new PartitionValue(upper.format(formatter)); + PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); + PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); + return Range.closedOpen(lowKey, upperKey); + } + + /** + * Merge overlapped iceberg partitions into one Doris partition. + */ + public Map> mergeOverlapPartitions(Map originPartitions) { + List> entries = sortPartitionMap(originPartitions); + Map> map = Maps.newHashMap(); + for (int i = 0; i < entries.size() - 1; i++) { + Range firstValue = entries.get(i).getValue().getItems(); + String firstKey = entries.get(i).getKey(); + Range secondValue = entries.get(i + 1).getValue().getItems(); + String secondKey = entries.get(i + 1).getKey(); + // If the first entry enclose the second one, remove the second entry and keep a record in the return map. + // So we can track the iceberg partitions those contained by one Doris partition. + while (i < entries.size() && firstValue.encloses(secondValue)) { + originPartitions.remove(secondKey); + map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); + String finalSecondKey = secondKey; + map.computeIfPresent(firstKey, (key, value) -> { + value.add(finalSecondKey); + return value; + }); + i++; + if (i >= entries.size() - 1) { + break; + } + secondValue = entries.get(i + 1).getValue().getItems(); + secondKey = entries.get(i + 1).getKey(); + } + } + return map; + } + + /** + * Sort the given map entries by PartitionItem Range(LOW, HIGH) + * When comparing two ranges, the one with smaller LOW value is smaller than the other one. + * If two ranges have same values of LOW, the one with larger HIGH value is smaller. + * + * For now, we only support year, month, day and hour, + * so it is impossible to have two partially intersect partitions. + * One range is either enclosed by another or has no intersection at all with another. + * + * + * For example, we have these 4 ranges: + * [10, 20), [30, 40), [0, 30), [10, 15) + * + * After sort, they become: + * [0, 30), [10, 20), [10, 15), [30, 40) + */ + public List> sortPartitionMap(Map originPartitions) { + List> entries = new ArrayList<>(originPartitions.entrySet()); + entries.sort(new RangeComparator()); + return entries; + } + + public static class RangeComparator implements Comparator> { + @Override + public int compare(Map.Entry p1, Map.Entry p2) { + PartitionItem value1 = p1.getValue(); + PartitionItem value2 = p2.getValue(); + if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { + Range items1 = value1.getItems(); + Range items2 = value2.getItems(); + if (!items1.hasLowerBound()) { + return -1; + } + if (!items2.hasLowerBound()) { + return 1; + } + PartitionKey upper1 = items1.upperEndpoint(); + PartitionKey lower1 = items1.lowerEndpoint(); + PartitionKey upper2 = items2.upperEndpoint(); + PartitionKey lower2 = items2.lowerEndpoint(); + int compareLow = lower1.compareTo(lower2); + return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; + } + return 0; + } + } + + @VisibleForTesting + public boolean isValidRelatedTableCached() { + return isValidRelatedTableCached; + } + + @VisibleForTesting + public boolean validRelatedTableCache() { + return isValidRelatedTable; + } + + public void setIsValidRelatedTableCached(boolean isCached) { + this.isValidRelatedTableCached = isCached; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java new file mode 100644 index 00000000000000..cccc6244a0d0cc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import java.util.List; + +public class IcebergPartition { + private final String partitionName; + private final List partitionValues; + private final int specId; + private final long recordCount; + private final long fileSizeInBytes; + private final long fileCount; + private final long lastUpdateTime; + private final long lastSnapshotId; + private final List transforms; + + public IcebergPartition(String partitionName, int specId, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime, long lastSnapshotId, List partitionValues, + List transforms) { + this.partitionName = partitionName; + this.specId = specId; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + this.lastSnapshotId = lastSnapshotId; + this.partitionValues = partitionValues; + this.transforms = transforms; + } + + public String getPartitionName() { + return partitionName; + } + + public int getSpecId() { + return specId; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public long getLastSnapshotId() { + return lastSnapshotId; + } + + public List getPartitionValues() { + return partitionValues; + } + + public List getTransforms() { + return transforms; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java new file mode 100644 index 00000000000000..9edb2137f4f389 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfo { + private final Map nameToPartitionItem; + private final Map nameToIcebergPartition; + private final Map> nameToIcebergPartitionNames; + + public IcebergPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToIcebergPartition = Maps.newHashMap(); + this.nameToIcebergPartitionNames = Maps.newHashMap(); + } + + public IcebergPartitionInfo(Map nameToPartitionItem, + Map nameToIcebergPartition, + Map> nameToIcebergPartitionNames) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToIcebergPartition = nameToIcebergPartition; + this.nameToIcebergPartitionNames = nameToIcebergPartitionNames; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToIcebergPartition() { + return nameToIcebergPartition; + } + + public long getLatestSnapshotId(String partitionName) { + Set icebergPartitionNames = nameToIcebergPartitionNames.get(partitionName); + if (icebergPartitionNames == null) { + return nameToIcebergPartition.get(partitionName).getLastSnapshotId(); + } + long latestSnapshotId = 0; + long latestUpdateTime = -1; + for (String name : icebergPartitionNames) { + IcebergPartition partition = nameToIcebergPartition.get(name); + long lastUpdateTime = partition.getLastUpdateTime(); + if (latestUpdateTime < lastUpdateTime) { + latestUpdateTime = lastUpdateTime; + latestSnapshotId = partition.getLastSnapshotId(); + } + } + return latestSnapshotId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java new file mode 100644 index 00000000000000..e1fde8049fe1ad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.SchemaCacheValue; + +import java.util.List; + +public class IcebergSchemaCacheValue extends SchemaCacheValue { + + private final List partitionColumns; + private final IcebergPartitionInfo partitionInfo; + private final long snapshotId; + + public IcebergSchemaCacheValue(List schema, List partitionColumns, + long snapshotId, IcebergPartitionInfo partitionInfo) { + super(schema); + this.partitionColumns = partitionColumns; + this.snapshotId = snapshotId; + this.partitionInfo = partitionInfo; + } + + public List getPartitionColumns() { + return partitionColumns; + } + + public IcebergPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshotId() { + return snapshotId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 353e024ed97348..c1002faf4078b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -43,6 +43,7 @@ import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.StatementContext; @@ -182,6 +183,12 @@ public void run() throws JobException { this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); beforeMTMVRefresh(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + if (!relatedTable.isValidRelatedTable()) { + throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName() + + " is not a valid related table anymore, stop refreshing." + + " e.g. Table has multiple partition columns or including not supported transform functions."); + } MTMVPartitionUtil.alignMvPartition(mtmv); } MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 92436b063e0b49..9597378c488cfc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -317,7 +317,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt if (!relatedTable.needAutoRefresh()) { return true; } - // check if partitions of related table if changed + // check if partitions of related table is changed Set snapshotPartitions = mtmv.getRefreshSnapshot().getSnapshotPartitions(mtmvPartitionName); if (!Objects.equals(relatedPartitionNames, snapshotPartitions)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index c4261aa78f10be..821595d7dffd06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -115,4 +115,13 @@ default boolean needAutoRefresh() { * @return */ boolean isPartitionColumnAllowNull(); + + /** + * If the table is supported as related table. + * For example, an Iceberg table may become unsupported after partition revolution. + * @return + */ + default boolean isValidRelatedTable() { + return true; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java new file mode 100644 index 00000000000000..80d0a7c2429df3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import mockit.Expectations; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.transforms.Days; +import org.apache.iceberg.transforms.Hours; +import org.apache.iceberg.transforms.Months; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IcebergExternalTableTest { + + @Test + public void testIsSupportedPartitionTable(@Mocked org.apache.iceberg.Table icebergTable, + @Mocked PartitionSpec spec, + @Mocked PartitionField field, + @Mocked Schema schema) { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Map specs = Maps.newHashMap(); + // Test null + specs.put(0, null); + new Expectations() {{ + icebergTable.specs(); + result = specs; + }}; + table.setTable(icebergTable); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.isValidRelatedTable()); + new Verifications() {{ + icebergTable.specs(); + times = 1; + }}; + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.validRelatedTableCache()); + + // Test spec fields are empty. + specs.put(0, spec); + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + new Expectations() {{ + icebergTable.specs(); + result = specs; + }}; + List fields = Lists.newArrayList(); + new Expectations() {{ + spec.fields(); + result = fields; + }}; + Assertions.assertFalse(table.isValidRelatedTable()); + new Verifications() {{ + spec.fields(); + times = 1; + }}; + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.validRelatedTableCache()); + + // Test true + fields.add(field); + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + new Expectations() { + { + icebergTable.schema(); + result = schema; + + schema.findColumnName(anyInt); + result = "col1"; + } + }; + new Expectations() {{ + field.transform(); + result = new Hours(); + }}; + Assertions.assertTrue(table.isValidRelatedTable()); + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.validRelatedTableCache()); + new Verifications() {{ + schema.findColumnName(anyInt); + times = 1; + }}; + new Expectations() {{ + field.transform(); + result = new Days(); + }}; + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.isValidRelatedTable()); + new Expectations() {{ + field.transform(); + result = new Months(); + }}; + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.isValidRelatedTable()); + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.validRelatedTableCache()); + } + + @Test + public void testGetPartitionRange() throws AnalysisException { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Column c = new Column("ts", PrimitiveType.DATETIMEV2); + List partitionColumns = Lists.newArrayList(c); + table.setPartitionColumns(partitionColumns); + + // Test null partition value + Range nullRange = table.getPartitionRange(null, "hour"); + Assertions.assertFalse(nullRange.hasLowerBound()); + Assertions.assertEquals("0000-01-02 00:00:00", + nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); + + // Test hour transform. + Range hour = table.getPartitionRange("100", "hour"); + PartitionKey lowKey = hour.lowerEndpoint(); + PartitionKey upKey = hour.upperEndpoint(); + Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test day transform. + Range day = table.getPartitionRange("100", "day"); + lowKey = day.lowerEndpoint(); + upKey = day.upperEndpoint(); + Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test month transform. + Range month = table.getPartitionRange("100", "month"); + lowKey = month.lowerEndpoint(); + upKey = month.upperEndpoint(); + Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test year transform. + Range year = table.getPartitionRange("100", "year"); + lowKey = year.lowerEndpoint(); + upKey = year.upperEndpoint(); + Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("2071-01-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test unsupported transform + Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { + table.getPartitionRange("100", "bucket"); + }); + Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); + } + + @Test + public void testSortRange() throws AnalysisException { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Column c = new Column("c", PrimitiveType.DATETIMEV2); + table.setPartitionColumns(Lists.newArrayList(c)); + PartitionItem nullRange = new RangePartitionItem(table.getPartitionRange(null, "hour")); + PartitionItem year1970 = new RangePartitionItem(table.getPartitionRange("0", "year")); + PartitionItem year1971 = new RangePartitionItem(table.getPartitionRange("1", "year")); + PartitionItem month197002 = new RangePartitionItem(table.getPartitionRange("1", "month")); + PartitionItem month197103 = new RangePartitionItem(table.getPartitionRange("14", "month")); + PartitionItem month197204 = new RangePartitionItem(table.getPartitionRange("27", "month")); + PartitionItem day19700202 = new RangePartitionItem(table.getPartitionRange("32", "day")); + PartitionItem day19730101 = new RangePartitionItem(table.getPartitionRange("1096", "day")); + Map map = Maps.newHashMap(); + map.put("nullRange", nullRange); + map.put("year1970", year1970); + map.put("year1971", year1971); + map.put("month197002", month197002); + map.put("month197103", month197103); + map.put("month197204", month197204); + map.put("day19700202", day19700202); + map.put("day19730101", day19730101); + List> entries = table.sortPartitionMap(map); + Assertions.assertEquals(8, entries.size()); + Assertions.assertEquals("nullRange", entries.get(0).getKey()); + Assertions.assertEquals("year1970", entries.get(1).getKey()); + Assertions.assertEquals("month197002", entries.get(2).getKey()); + Assertions.assertEquals("day19700202", entries.get(3).getKey()); + Assertions.assertEquals("year1971", entries.get(4).getKey()); + Assertions.assertEquals("month197103", entries.get(5).getKey()); + Assertions.assertEquals("month197204", entries.get(6).getKey()); + Assertions.assertEquals("day19730101", entries.get(7).getKey()); + + Map> stringSetMap = table.mergeOverlapPartitions(map); + Assertions.assertEquals(2, stringSetMap.size()); + Assertions.assertTrue(stringSetMap.containsKey("year1970")); + Assertions.assertTrue(stringSetMap.containsKey("year1971")); + + Set names1970 = stringSetMap.get("year1970"); + Assertions.assertEquals(3, names1970.size()); + Assertions.assertTrue(names1970.contains("year1970")); + Assertions.assertTrue(names1970.contains("month197002")); + Assertions.assertTrue(names1970.contains("day19700202")); + + Set names1971 = stringSetMap.get("year1971"); + Assertions.assertEquals(2, names1971.size()); + Assertions.assertTrue(names1971.contains("year1971")); + Assertions.assertTrue(names1971.contains("month197103")); + + Assertions.assertEquals(5, map.size()); + Assertions.assertTrue(map.containsKey("nullRange")); + Assertions.assertTrue(map.containsKey("year1970")); + Assertions.assertTrue(map.containsKey("year1971")); + Assertions.assertTrue(map.containsKey("month197204")); + Assertions.assertTrue(map.containsKey("day19730101")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java new file mode 100644 index 00000000000000..74c8c3f6954a97 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfoTest { + + @Test + public void testGetLatestSnapshotId() { + IcebergPartition p1 = new IcebergPartition("p1", 0, 0, 0, 0, 1, 101, null, null); + IcebergPartition p2 = new IcebergPartition("p2", 0, 0, 0, 0, 2, 102, null, null); + IcebergPartition p3 = new IcebergPartition("p3", 0, 0, 0, 0, 3, 103, null, null); + Map nameToIcebergPartition = Maps.newHashMap(); + nameToIcebergPartition.put(p1.getPartitionName(), p1); + nameToIcebergPartition.put(p2.getPartitionName(), p2); + nameToIcebergPartition.put(p3.getPartitionName(), p3); + Map> nameToIcebergPartitionNames = Maps.newHashMap(); + Set names = Sets.newHashSet(); + names.add("p1"); + names.add("p2"); + nameToIcebergPartitionNames.put("p1", names); + + IcebergPartitionInfo info = new IcebergPartitionInfo(null, nameToIcebergPartition, nameToIcebergPartitionNames); + long snapshot1 = info.getLatestSnapshotId("p1"); + long snapshot2 = info.getLatestSnapshotId("p2"); + long snapshot3 = info.getLatestSnapshotId("p3"); + Assertions.assertEquals(102, snapshot1); + Assertions.assertEquals(102, snapshot2); + Assertions.assertEquals(103, snapshot3); + } +} diff --git a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out index d0b47e27fffae5..c9d9799da81300 100644 --- a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out +++ b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out @@ -5,3 +5,101 @@ -- !mtmv -- 1000 +-- !test_ts_refresh1 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 + +-- !test_ts_refresh2 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 + +-- !test_ts_refresh3 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 + +-- !test_ts_refresh4 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 + +-- !test_ts_refresh5 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 + +-- !test_ts_refresh6 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 + +-- !test_ts_refresh_null -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 +\N 8 + +-- !unpartition -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 +\N 8 + +-- !test_d_refresh1 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 + +-- !test_d_refresh2 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 + +-- !test_d_refresh3 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 + +-- !test_d_refresh4 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 +2024-09-30 6 + +-- !test_d_refresh5 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 +2024-09-30 6 +2024-10-28 7 + diff --git a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy index 3004f1677bded6..59cf1173acb46b 100644 --- a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy @@ -65,5 +65,152 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + + // Test partition refresh. + // Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String hivePrefix = "hive2"; + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" + + String catalog_name = "iceberg_mtmv_catalog_hms"; + String mvUnpartition = "test_iceberg_unpartition" + String mvName1 = "test_iceberg_mtmv_ts" + String mvName2 = "test_iceberg_mtmv_d" + String dbName = "regression_test_mtmv_partition_p0" + String icebergDb = "iceberg_mtmv_partition" + String icebergTable1 = "tstable" + String icebergTable2 = "dtable" + sql """drop catalog if exists ${catalog_name} """ + sql """create catalog if not exists ${catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = '${default_fs}', + 'warehouse' = '${warehouse}', + 'use_meta_cache' = 'true' + );""" + + sql """switch internal""" + sql """drop database if exists ${dbName}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """create database if not exists ${catalog_name}.${icebergDb}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} ( + ts DATETIME, + value INT) + ENGINE=iceberg + PARTITION BY LIST (DAY(ts)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)""" + sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh1 "select * from ${mvName1} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 21:02:03', 4)""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh2 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh3 """select * from ${mvName1} order by value""" + + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh4 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-28 01:22:03', 7);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh5 """select * from ${mvName1} order by value""" + + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh6 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values (null, 8);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh_null """select * from ${mvName1} order by value""" + + def showPartitionsResult = sql """show partitions from ${mvName1}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20241026000000_20241027000000")) + assertTrue(showPartitionsResult.toString().contains("p_20241027000000_20241028000000")) + assertTrue(showPartitionsResult.toString().contains("p_20241028000000_20241029000000")) + + sql """drop materialized view if exists ${mvName1};""" + + // Test unpartitioned mtmv + sql """ + CREATE MATERIALIZED VIEW ${mvUnpartition} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}; + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvUnpartition} complete + """ + def jobName = getJobName(dbName, mvUnpartition); + waitingMTMVTaskFinished(jobName) + qt_unpartition "SELECT * FROM ${mvUnpartition} order by value" + + sql """drop materialized view if exists ${mvUnpartition};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + + + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable2} ( + d DATE, + value INT) + ENGINE=iceberg + PARTITION BY LIST (MONTH(d)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-26', 1), ('2024-09-17', 2), ('2024-09-27', 3);""" + sql """CREATE MATERIALIZED VIEW ${mvName2} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`d`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} complete""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh1 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-09-01', 4);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} auto""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh2 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-22', 5), ('2024-09-30', 6);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240801_20240901);""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh3 "select * from ${mvName2} order by value" + sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240901_20241001);""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh4 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-10-28', 7);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} auto""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh5 "select * from ${mvName2} order by value" + + showPartitionsResult = sql """show partitions from ${mvName2}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20240801_20240901")) + assertTrue(showPartitionsResult.toString().contains("p_20240901_20241001")) + assertTrue(showPartitionsResult.toString().contains("p_20241001_20241101")) + + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + + sql """ drop catalog if exists ${catalog_name} """ + } }