From 7ec73c1530ae8df4b48e0ce0b96548a1ee7d3fb8 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 16 Dec 2024 15:34:39 +0800 Subject: [PATCH] [feature](mtmv)Support iceberg partition refresh. (#44726) ### What problem does this PR solve? Previously, when using Iceberg to create MTMV, it was not possible to perceive changes in partition lists and data, so only ```refresh materialized view mv1 complete ```could be used to force full refresh. This PR obtains the partition list of Iceberg, the last update time of the partition, and the latest snapshotId of the table. Therefore, MTMV can be partition based on Iceberg tables and perceive changes in data, automatically refreshing partitions For now, we only support single partition column table and the partition transform must one of hour, day, month or year. Will support Identity transform soon. Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None --- .../doris/catalog/RangePartitionItem.java | 7 +- .../iceberg/IcebergExternalTable.java | 439 +++++++++++++++++- .../datasource/iceberg/IcebergPartition.java | 82 ++++ .../iceberg/IcebergPartitionInfo.java | 71 +++ .../iceberg/IcebergSchemaCacheValue.java | 50 ++ .../doris/job/extensions/mtmv/MTMVTask.java | 7 + .../apache/doris/mtmv/MTMVPartitionUtil.java | 2 +- .../apache/doris/mtmv/MTMVRelatedTableIf.java | 9 + .../iceberg/IcebergExternalTableTest.java | 238 ++++++++++ .../iceberg/IcebergPartitionInfoTest.java | 53 +++ .../data/mtmv_p0/test_iceberg_mtmv.out | 98 ++++ .../suites/mtmv_p0/test_iceberg_mtmv.groovy | 147 ++++++ 12 files changed, 1199 insertions(+), 4 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index e7f2a9cab5dbf9..2015b2b9ea3eef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -61,9 +61,14 @@ public boolean isDefaultPartition() { @Override public PartitionKeyDesc toPartitionKeyDesc() { - return PartitionKeyDesc.createFixed( + if (partitionKeyRange.hasLowerBound()) { + return PartitionKeyDesc.createFixed( PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()), PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); + } else { + // For null partition value. + return PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index feded88ea326f0..e259399f63740b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -17,9 +17,24 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.analysis.PartitionValue; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -28,13 +43,53 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionsTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; -public class IcebergExternalTable extends ExternalTable { +public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { + + public static final String YEAR = "year"; + public static final String MONTH = "month"; + public static final String DAY = "day"; + public static final String HOUR = "hour"; + public static final String IDENTITY = "identity"; + public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec + + private Table table; + private List partitionColumns; + private boolean isValidRelatedTableCached = false; + private boolean isValidRelatedTable = false; public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE); @@ -51,9 +106,50 @@ protected synchronized void makeSureInitialized() { } } + @VisibleForTesting + public void setTable(Table table) { + this.table = table; + } + + @VisibleForTesting + public void setPartitionColumns(List partitionColumns) { + this.partitionColumns = partitionColumns; + } + @Override public Optional initSchema() { - return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + List schema = IcebergUtils.getSchema(catalog, dbName, name); + Snapshot snapshot = table.currentSnapshot(); + if (snapshot == null) { + LOG.debug("Table {} is empty", name); + return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); + } + long snapshotId = snapshot.snapshotId(); + partitionColumns = null; + IcebergPartitionInfo partitionInfo = null; + if (isValidRelatedTable()) { + PartitionSpec spec = table.spec(); + partitionColumns = Lists.newArrayList(); + + // For iceberg table, we only support table with 1 partition column as RelatedTable. + // So we use spec.fields().get(0) to get the partition column. + Types.NestedField col = table.schema().findField(spec.fields().get(0).sourceId()); + for (Column c : schema) { + if (c.getName().equalsIgnoreCase(col.name())) { + partitionColumns.add(c); + break; + } + } + Preconditions.checkState(partitionColumns.size() == 1, + "Support 1 partition column for iceberg table, but found " + partitionColumns.size()); + try { + partitionInfo = loadPartitionInfo(); + } catch (AnalysisException e) { + LOG.warn("Failed to load iceberg table {} partition info.", name, e); + } + } + return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns, snapshotId, partitionInfo)); } @Override @@ -90,4 +186,343 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map getAndCopyPartitionItems(Optional snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional snapshot) { + makeSureInitialized(); + return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames(Optional snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns(Optional snapshot) { + return getPartitionColumnsFromCache(); + } + + private List getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException { + long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName); + if (latestSnapshotId <= 0) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(latestSnapshotId); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("Can't find schema cache of table " + name); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + /** + * For now, we only support single partition column Iceberg table as related table. + * The supported transforms now are YEAR, MONTH, DAY and HOUR. + * And the column couldn't change to another column during partition evolution. + */ + @Override + public boolean isValidRelatedTable() { + if (isValidRelatedTableCached) { + return isValidRelatedTable; + } + isValidRelatedTable = false; + Set allFields = Sets.newHashSet(); + for (PartitionSpec spec : table.specs().values()) { + if (spec == null) { + isValidRelatedTableCached = true; + return false; + } + List fields = spec.fields(); + if (fields.size() != 1) { + isValidRelatedTableCached = true; + return false; + } + PartitionField partitionField = spec.fields().get(0); + String transformName = partitionField.transform().toString(); + if (!YEAR.equals(transformName) + && !MONTH.equals(transformName) + && !DAY.equals(transformName) + && !HOUR.equals(transformName)) { + isValidRelatedTableCached = true; + return false; + } + allFields.add(table.schema().findColumnName(partitionField.sourceId())); + } + isValidRelatedTableCached = true; + isValidRelatedTable = allFields.size() == 1; + return isValidRelatedTable; + } + + protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { + List icebergPartitions = loadIcebergPartition(); + Map nameToPartition = Maps.newHashMap(); + Map nameToPartitionItem = Maps.newHashMap(); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionName(), partition); + String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); + Range partitionRange = getPartitionRange(partition.getPartitionValues().get(0), transform); + PartitionItem item = new RangePartitionItem(partitionRange); + nameToPartitionItem.put(partition.getPartitionName(), item); + } + Map> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); + } + + public List loadIcebergPartition() { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List partitions = Lists.newArrayList(); + try (CloseableIterable tasks = partitionsTable.newScan().planFiles()) { + for (FileScanTask task : tasks) { + CloseableIterable rows = task.asDataTask().rows(); + for (StructLike row : rows) { + partitions.add(generateIcebergPartition(row)); + } + } + } catch (IOException e) { + LOG.warn("Failed to get Iceberg table {} partition info.", name, e); + } + return partitions; + } + + public IcebergPartition generateIcebergPartition(StructLike row) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); + int specId = row.get(1, Integer.class); + PartitionSpec partitionSpec = table.specs().get(specId); + StructProjection partitionData = row.get(0, StructProjection.class); + StringBuilder sb = new StringBuilder(); + List partitionValues = Lists.newArrayList(); + List transforms = Lists.newArrayList(); + for (int i = 0; i < partitionSpec.fields().size(); ++i) { + PartitionField partitionField = partitionSpec.fields().get(i); + Class fieldClass = partitionSpec.javaClasses()[i]; + int fieldId = partitionField.fieldId(); + // Iceberg partition field id starts at PARTITION_DATA_ID_START, + // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START + int index = fieldId - PARTITION_DATA_ID_START; + Object o = partitionData.get(index, fieldClass); + String fieldValue = o == null ? null : o.toString(); + String fieldName = partitionField.name(); + sb.append(fieldName); + sb.append("="); + sb.append(fieldValue); + sb.append("/"); + partitionValues.add(fieldValue); + transforms.add(partitionField.transform().toString()); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + String partitionName = sb.toString(); + long recordCount = row.get(2, Long.class); + long fileCount = row.get(3, Integer.class); + long fileSizeInBytes = row.get(4, Long.class); + long lastUpdateTime = row.get(9, Long.class); + long lastUpdateSnapShotId = row.get(10, Long.class); + return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, + lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); + } + + @VisibleForTesting + public Range getPartitionRange(String value, String transform) + throws AnalysisException { + // For NULL value, create a lessThan partition for it. + if (value == null) { + PartitionKey nullKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-02")), partitionColumns); + return Range.lessThan(nullKey); + } + LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); + LocalDateTime target; + LocalDateTime lower; + LocalDateTime upper; + long longValue = Long.parseLong(value); + switch (transform) { + case HOUR: + target = epoch.plusHours(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), + target.getHour(), 0, 0); + upper = lower.plusHours(1); + break; + case DAY: + target = epoch.plusDays(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); + upper = lower.plusDays(1); + break; + case MONTH: + target = epoch.plusMonths(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); + upper = lower.plusMonths(1); + break; + case YEAR: + target = epoch.plusYears(longValue); + lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); + upper = lower.plusYears(1); + break; + default: + throw new RuntimeException("Unsupported transform " + transform); + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Column c = partitionColumns.get(0); + Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); + if (c.getType().isDate() || c.getType().isDateV2()) { + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + } + PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); + PartitionValue upperValue = new PartitionValue(upper.format(formatter)); + PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); + PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); + return Range.closedOpen(lowKey, upperKey); + } + + /** + * Merge overlapped iceberg partitions into one Doris partition. + */ + public Map> mergeOverlapPartitions(Map originPartitions) { + List> entries = sortPartitionMap(originPartitions); + Map> map = Maps.newHashMap(); + for (int i = 0; i < entries.size() - 1; i++) { + Range firstValue = entries.get(i).getValue().getItems(); + String firstKey = entries.get(i).getKey(); + Range secondValue = entries.get(i + 1).getValue().getItems(); + String secondKey = entries.get(i + 1).getKey(); + // If the first entry enclose the second one, remove the second entry and keep a record in the return map. + // So we can track the iceberg partitions those contained by one Doris partition. + while (i < entries.size() && firstValue.encloses(secondValue)) { + originPartitions.remove(secondKey); + map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); + String finalSecondKey = secondKey; + map.computeIfPresent(firstKey, (key, value) -> { + value.add(finalSecondKey); + return value; + }); + i++; + if (i >= entries.size() - 1) { + break; + } + secondValue = entries.get(i + 1).getValue().getItems(); + secondKey = entries.get(i + 1).getKey(); + } + } + return map; + } + + /** + * Sort the given map entries by PartitionItem Range(LOW, HIGH) + * When comparing two ranges, the one with smaller LOW value is smaller than the other one. + * If two ranges have same values of LOW, the one with larger HIGH value is smaller. + * + * For now, we only support year, month, day and hour, + * so it is impossible to have two partially intersect partitions. + * One range is either enclosed by another or has no intersection at all with another. + * + * + * For example, we have these 4 ranges: + * [10, 20), [30, 40), [0, 30), [10, 15) + * + * After sort, they become: + * [0, 30), [10, 20), [10, 15), [30, 40) + */ + public List> sortPartitionMap(Map originPartitions) { + List> entries = new ArrayList<>(originPartitions.entrySet()); + entries.sort(new RangeComparator()); + return entries; + } + + public static class RangeComparator implements Comparator> { + @Override + public int compare(Map.Entry p1, Map.Entry p2) { + PartitionItem value1 = p1.getValue(); + PartitionItem value2 = p2.getValue(); + if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { + Range items1 = value1.getItems(); + Range items2 = value2.getItems(); + if (!items1.hasLowerBound()) { + return -1; + } + if (!items2.hasLowerBound()) { + return 1; + } + PartitionKey upper1 = items1.upperEndpoint(); + PartitionKey lower1 = items1.lowerEndpoint(); + PartitionKey upper2 = items2.upperEndpoint(); + PartitionKey lower2 = items2.lowerEndpoint(); + int compareLow = lower1.compareTo(lower2); + return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; + } + return 0; + } + } + + @VisibleForTesting + public boolean isValidRelatedTableCached() { + return isValidRelatedTableCached; + } + + @VisibleForTesting + public boolean validRelatedTableCache() { + return isValidRelatedTable; + } + + public void setIsValidRelatedTableCached(boolean isCached) { + this.isValidRelatedTableCached = isCached; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java new file mode 100644 index 00000000000000..cccc6244a0d0cc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartition.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import java.util.List; + +public class IcebergPartition { + private final String partitionName; + private final List partitionValues; + private final int specId; + private final long recordCount; + private final long fileSizeInBytes; + private final long fileCount; + private final long lastUpdateTime; + private final long lastSnapshotId; + private final List transforms; + + public IcebergPartition(String partitionName, int specId, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime, long lastSnapshotId, List partitionValues, + List transforms) { + this.partitionName = partitionName; + this.specId = specId; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + this.lastSnapshotId = lastSnapshotId; + this.partitionValues = partitionValues; + this.transforms = transforms; + } + + public String getPartitionName() { + return partitionName; + } + + public int getSpecId() { + return specId; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public long getLastSnapshotId() { + return lastSnapshotId; + } + + public List getPartitionValues() { + return partitionValues; + } + + public List getTransforms() { + return transforms; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java new file mode 100644 index 00000000000000..9edb2137f4f389 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfo.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfo { + private final Map nameToPartitionItem; + private final Map nameToIcebergPartition; + private final Map> nameToIcebergPartitionNames; + + public IcebergPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToIcebergPartition = Maps.newHashMap(); + this.nameToIcebergPartitionNames = Maps.newHashMap(); + } + + public IcebergPartitionInfo(Map nameToPartitionItem, + Map nameToIcebergPartition, + Map> nameToIcebergPartitionNames) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToIcebergPartition = nameToIcebergPartition; + this.nameToIcebergPartitionNames = nameToIcebergPartitionNames; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToIcebergPartition() { + return nameToIcebergPartition; + } + + public long getLatestSnapshotId(String partitionName) { + Set icebergPartitionNames = nameToIcebergPartitionNames.get(partitionName); + if (icebergPartitionNames == null) { + return nameToIcebergPartition.get(partitionName).getLastSnapshotId(); + } + long latestSnapshotId = 0; + long latestUpdateTime = -1; + for (String name : icebergPartitionNames) { + IcebergPartition partition = nameToIcebergPartition.get(name); + long lastUpdateTime = partition.getLastUpdateTime(); + if (latestUpdateTime < lastUpdateTime) { + latestUpdateTime = lastUpdateTime; + latestSnapshotId = partition.getLastSnapshotId(); + } + } + return latestSnapshotId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java new file mode 100644 index 00000000000000..e1fde8049fe1ad --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.SchemaCacheValue; + +import java.util.List; + +public class IcebergSchemaCacheValue extends SchemaCacheValue { + + private final List partitionColumns; + private final IcebergPartitionInfo partitionInfo; + private final long snapshotId; + + public IcebergSchemaCacheValue(List schema, List partitionColumns, + long snapshotId, IcebergPartitionInfo partitionInfo) { + super(schema); + this.partitionColumns = partitionColumns; + this.snapshotId = snapshotId; + this.partitionInfo = partitionInfo; + } + + public List getPartitionColumns() { + return partitionColumns; + } + + public IcebergPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshotId() { + return snapshotId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 3d1d001d61ab4c..a98ac06f67f187 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -39,6 +39,7 @@ import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.StatementContext; @@ -173,6 +174,12 @@ public void run() throws JobException { this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); beforeMTMVRefresh(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { + MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); + if (!relatedTable.isValidRelatedTable()) { + throw new JobException("MTMV " + mtmv.getName() + "'s related table " + relatedTable.getName() + + " is not a valid related table anymore, stop refreshing." + + " e.g. Table has multiple partition columns or including not supported transform functions."); + } MTMVPartitionUtil.alignMvPartition(mtmv); } MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index e2775970ab6ab8..002047d8fb10f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -317,7 +317,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt if (!relatedTable.needAutoRefresh()) { return true; } - // check if partitions of related table if changed + // check if partitions of related table is changed Set snapshotPartitions = mtmv.getRefreshSnapshot().getSnapshotPartitions(mtmvPartitionName); if (!Objects.equals(relatedPartitionNames, snapshotPartitions)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index c4261aa78f10be..821595d7dffd06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -115,4 +115,13 @@ default boolean needAutoRefresh() { * @return */ boolean isPartitionColumnAllowNull(); + + /** + * If the table is supported as related table. + * For example, an Iceberg table may become unsupported after partition revolution. + * @return + */ + default boolean isValidRelatedTable() { + return true; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java new file mode 100644 index 00000000000000..80d0a7c2429df3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.common.AnalysisException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import mockit.Expectations; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.transforms.Days; +import org.apache.iceberg.transforms.Hours; +import org.apache.iceberg.transforms.Months; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IcebergExternalTableTest { + + @Test + public void testIsSupportedPartitionTable(@Mocked org.apache.iceberg.Table icebergTable, + @Mocked PartitionSpec spec, + @Mocked PartitionField field, + @Mocked Schema schema) { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Map specs = Maps.newHashMap(); + // Test null + specs.put(0, null); + new Expectations() {{ + icebergTable.specs(); + result = specs; + }}; + table.setTable(icebergTable); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.isValidRelatedTable()); + new Verifications() {{ + icebergTable.specs(); + times = 1; + }}; + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.validRelatedTableCache()); + + // Test spec fields are empty. + specs.put(0, spec); + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + new Expectations() {{ + icebergTable.specs(); + result = specs; + }}; + List fields = Lists.newArrayList(); + new Expectations() {{ + spec.fields(); + result = fields; + }}; + Assertions.assertFalse(table.isValidRelatedTable()); + new Verifications() {{ + spec.fields(); + times = 1; + }}; + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertFalse(table.validRelatedTableCache()); + + // Test true + fields.add(field); + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + new Expectations() { + { + icebergTable.schema(); + result = schema; + + schema.findColumnName(anyInt); + result = "col1"; + } + }; + new Expectations() {{ + field.transform(); + result = new Hours(); + }}; + Assertions.assertTrue(table.isValidRelatedTable()); + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.validRelatedTableCache()); + new Verifications() {{ + schema.findColumnName(anyInt); + times = 1; + }}; + new Expectations() {{ + field.transform(); + result = new Days(); + }}; + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.isValidRelatedTable()); + new Expectations() {{ + field.transform(); + result = new Months(); + }}; + table.setIsValidRelatedTableCached(false); + Assertions.assertFalse(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.isValidRelatedTable()); + Assertions.assertTrue(table.isValidRelatedTableCached()); + Assertions.assertTrue(table.validRelatedTableCache()); + } + + @Test + public void testGetPartitionRange() throws AnalysisException { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Column c = new Column("ts", PrimitiveType.DATETIMEV2); + List partitionColumns = Lists.newArrayList(c); + table.setPartitionColumns(partitionColumns); + + // Test null partition value + Range nullRange = table.getPartitionRange(null, "hour"); + Assertions.assertFalse(nullRange.hasLowerBound()); + Assertions.assertEquals("0000-01-02 00:00:00", + nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); + + // Test hour transform. + Range hour = table.getPartitionRange("100", "hour"); + PartitionKey lowKey = hour.lowerEndpoint(); + PartitionKey upKey = hour.upperEndpoint(); + Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test day transform. + Range day = table.getPartitionRange("100", "day"); + lowKey = day.lowerEndpoint(); + upKey = day.upperEndpoint(); + Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test month transform. + Range month = table.getPartitionRange("100", "month"); + lowKey = month.lowerEndpoint(); + upKey = month.upperEndpoint(); + Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test year transform. + Range year = table.getPartitionRange("100", "year"); + lowKey = year.lowerEndpoint(); + upKey = year.upperEndpoint(); + Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("2071-01-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); + + // Test unsupported transform + Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { + table.getPartitionRange("100", "bucket"); + }); + Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); + } + + @Test + public void testSortRange() throws AnalysisException { + IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); + Column c = new Column("c", PrimitiveType.DATETIMEV2); + table.setPartitionColumns(Lists.newArrayList(c)); + PartitionItem nullRange = new RangePartitionItem(table.getPartitionRange(null, "hour")); + PartitionItem year1970 = new RangePartitionItem(table.getPartitionRange("0", "year")); + PartitionItem year1971 = new RangePartitionItem(table.getPartitionRange("1", "year")); + PartitionItem month197002 = new RangePartitionItem(table.getPartitionRange("1", "month")); + PartitionItem month197103 = new RangePartitionItem(table.getPartitionRange("14", "month")); + PartitionItem month197204 = new RangePartitionItem(table.getPartitionRange("27", "month")); + PartitionItem day19700202 = new RangePartitionItem(table.getPartitionRange("32", "day")); + PartitionItem day19730101 = new RangePartitionItem(table.getPartitionRange("1096", "day")); + Map map = Maps.newHashMap(); + map.put("nullRange", nullRange); + map.put("year1970", year1970); + map.put("year1971", year1971); + map.put("month197002", month197002); + map.put("month197103", month197103); + map.put("month197204", month197204); + map.put("day19700202", day19700202); + map.put("day19730101", day19730101); + List> entries = table.sortPartitionMap(map); + Assertions.assertEquals(8, entries.size()); + Assertions.assertEquals("nullRange", entries.get(0).getKey()); + Assertions.assertEquals("year1970", entries.get(1).getKey()); + Assertions.assertEquals("month197002", entries.get(2).getKey()); + Assertions.assertEquals("day19700202", entries.get(3).getKey()); + Assertions.assertEquals("year1971", entries.get(4).getKey()); + Assertions.assertEquals("month197103", entries.get(5).getKey()); + Assertions.assertEquals("month197204", entries.get(6).getKey()); + Assertions.assertEquals("day19730101", entries.get(7).getKey()); + + Map> stringSetMap = table.mergeOverlapPartitions(map); + Assertions.assertEquals(2, stringSetMap.size()); + Assertions.assertTrue(stringSetMap.containsKey("year1970")); + Assertions.assertTrue(stringSetMap.containsKey("year1971")); + + Set names1970 = stringSetMap.get("year1970"); + Assertions.assertEquals(3, names1970.size()); + Assertions.assertTrue(names1970.contains("year1970")); + Assertions.assertTrue(names1970.contains("month197002")); + Assertions.assertTrue(names1970.contains("day19700202")); + + Set names1971 = stringSetMap.get("year1971"); + Assertions.assertEquals(2, names1971.size()); + Assertions.assertTrue(names1971.contains("year1971")); + Assertions.assertTrue(names1971.contains("month197103")); + + Assertions.assertEquals(5, map.size()); + Assertions.assertTrue(map.containsKey("nullRange")); + Assertions.assertTrue(map.containsKey("year1970")); + Assertions.assertTrue(map.containsKey("year1971")); + Assertions.assertTrue(map.containsKey("month197204")); + Assertions.assertTrue(map.containsKey("day19730101")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java new file mode 100644 index 00000000000000..74c8c3f6954a97 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergPartitionInfoTest.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; + +public class IcebergPartitionInfoTest { + + @Test + public void testGetLatestSnapshotId() { + IcebergPartition p1 = new IcebergPartition("p1", 0, 0, 0, 0, 1, 101, null, null); + IcebergPartition p2 = new IcebergPartition("p2", 0, 0, 0, 0, 2, 102, null, null); + IcebergPartition p3 = new IcebergPartition("p3", 0, 0, 0, 0, 3, 103, null, null); + Map nameToIcebergPartition = Maps.newHashMap(); + nameToIcebergPartition.put(p1.getPartitionName(), p1); + nameToIcebergPartition.put(p2.getPartitionName(), p2); + nameToIcebergPartition.put(p3.getPartitionName(), p3); + Map> nameToIcebergPartitionNames = Maps.newHashMap(); + Set names = Sets.newHashSet(); + names.add("p1"); + names.add("p2"); + nameToIcebergPartitionNames.put("p1", names); + + IcebergPartitionInfo info = new IcebergPartitionInfo(null, nameToIcebergPartition, nameToIcebergPartitionNames); + long snapshot1 = info.getLatestSnapshotId("p1"); + long snapshot2 = info.getLatestSnapshotId("p2"); + long snapshot3 = info.getLatestSnapshotId("p3"); + Assertions.assertEquals(102, snapshot1); + Assertions.assertEquals(102, snapshot2); + Assertions.assertEquals(103, snapshot3); + } +} diff --git a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out index d0b47e27fffae5..c9d9799da81300 100644 --- a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out +++ b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out @@ -5,3 +5,101 @@ -- !mtmv -- 1000 +-- !test_ts_refresh1 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 + +-- !test_ts_refresh2 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 + +-- !test_ts_refresh3 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 + +-- !test_ts_refresh4 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 + +-- !test_ts_refresh5 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 + +-- !test_ts_refresh6 -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 + +-- !test_ts_refresh_null -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 +\N 8 + +-- !unpartition -- +2024-10-26T01:02:03 1 +2024-10-27T01:02:03 2 +2024-10-27T21:02:03 3 +2024-10-26T21:02:03 4 +2024-10-26T01:22:03 5 +2024-10-27T01:12:03 6 +2024-10-28T01:22:03 7 +\N 8 + +-- !test_d_refresh1 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 + +-- !test_d_refresh2 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 + +-- !test_d_refresh3 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 + +-- !test_d_refresh4 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 +2024-09-30 6 + +-- !test_d_refresh5 -- +2024-08-26 1 +2024-09-17 2 +2024-09-27 3 +2024-09-01 4 +2024-08-22 5 +2024-09-30 6 +2024-10-28 7 + diff --git a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy index 3004f1677bded6..59cf1173acb46b 100644 --- a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy @@ -65,5 +65,152 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + + // Test partition refresh. + // Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String hivePrefix = "hive2"; + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" + + String catalog_name = "iceberg_mtmv_catalog_hms"; + String mvUnpartition = "test_iceberg_unpartition" + String mvName1 = "test_iceberg_mtmv_ts" + String mvName2 = "test_iceberg_mtmv_d" + String dbName = "regression_test_mtmv_partition_p0" + String icebergDb = "iceberg_mtmv_partition" + String icebergTable1 = "tstable" + String icebergTable2 = "dtable" + sql """drop catalog if exists ${catalog_name} """ + sql """create catalog if not exists ${catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = '${default_fs}', + 'warehouse' = '${warehouse}', + 'use_meta_cache' = 'true' + );""" + + sql """switch internal""" + sql """drop database if exists ${dbName}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """create database if not exists ${catalog_name}.${icebergDb}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} ( + ts DATETIME, + value INT) + ENGINE=iceberg + PARTITION BY LIST (DAY(ts)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)""" + sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh1 "select * from ${mvName1} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 21:02:03', 4)""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh2 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh3 """select * from ${mvName1} order by value""" + + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh4 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-28 01:22:03', 7);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh5 """select * from ${mvName1} order by value""" + + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh6 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values (null, 8);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} auto""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh_null """select * from ${mvName1} order by value""" + + def showPartitionsResult = sql """show partitions from ${mvName1}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20241026000000_20241027000000")) + assertTrue(showPartitionsResult.toString().contains("p_20241027000000_20241028000000")) + assertTrue(showPartitionsResult.toString().contains("p_20241028000000_20241029000000")) + + sql """drop materialized view if exists ${mvName1};""" + + // Test unpartitioned mtmv + sql """ + CREATE MATERIALIZED VIEW ${mvUnpartition} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}; + """ + sql """ + REFRESH MATERIALIZED VIEW ${mvUnpartition} complete + """ + def jobName = getJobName(dbName, mvUnpartition); + waitingMTMVTaskFinished(jobName) + qt_unpartition "SELECT * FROM ${mvUnpartition} order by value" + + sql """drop materialized view if exists ${mvUnpartition};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + + + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable2} ( + d DATE, + value INT) + ENGINE=iceberg + PARTITION BY LIST (MONTH(d)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-26', 1), ('2024-09-17', 2), ('2024-09-27', 3);""" + sql """CREATE MATERIALIZED VIEW ${mvName2} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`d`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} complete""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh1 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-09-01', 4);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} auto""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh2 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-22', 5), ('2024-09-30', 6);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240801_20240901);""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh3 "select * from ${mvName2} order by value" + sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240901_20241001);""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh4 "select * from ${mvName2} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-10-28', 7);""" + sql """REFRESH MATERIALIZED VIEW ${mvName2} auto""" + waitingMTMVTaskFinishedByMvName(mvName2) + qt_test_d_refresh5 "select * from ${mvName2} order by value" + + showPartitionsResult = sql """show partitions from ${mvName2}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20240801_20240901")) + assertTrue(showPartitionsResult.toString().contains("p_20240901_20241001")) + assertTrue(showPartitionsResult.toString().contains("p_20241001_20241101")) + + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + + sql """ drop catalog if exists ${catalog_name} """ + } }