Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f683a18
[feat](mtmv)mtmv support paimon partition refresh (#43959)
zddr Nov 21, 2024
93f74b9
[enhance](mtmv)Enable the MTMVRelatedTableIf interface to support mvc…
zddr Nov 22, 2024
297c0ff
1
zddr Jun 19, 2025
f43e740
[enhance](catalog)External partition prune return partitionName inste…
zddr Nov 25, 2024
ceb038a
[enhance](catalog)Unified external partition prune interface (#44567)
zddr Nov 26, 2024
3123d16
[feat](mtmv)Unified external table interface supporting partition ref…
zddr Nov 28, 2024
df36768
[feat](mtmv)external table support partition rewrite (#44998)
zddr Dec 5, 2024
2f5dfdc
[feat](mtmv)use real snapshot instead of optional.empty() (#45273)
zddr Dec 11, 2024
756d67b
[feat](mtmv)Paimon queries the data in the cache instead of querying …
zddr Dec 11, 2024
e82fc3b
[feature](mtmv)Support iceberg partition refresh. (#44726)
Jibing-Li Dec 16, 2024
9c27a90
[enhance](mtmv)MTMV allow paimon table has multi partition keys (#45652)
zddr Dec 23, 2024
db1dd02
[feature](mtmv)Support iceberg mtmv query. (#45659)
Jibing-Li Dec 24, 2024
4807227
[regression](mtmv)Support show iceberg external table partition. Add …
Jibing-Li Jan 6, 2025
b671c01
[mtmv](test)Add more test case for Iceberg mtmv. (#47026)
Jibing-Li Jan 17, 2025
204dccc
[enhance](mtmv)Add a new class MTMVSnapshotIdSnapshot to make the cod…
zddr Feb 14, 2025
8783bf2
[opt](mtmv) metadata cache of external table no longer be refreshed b…
zddr Mar 5, 2025
d98e5f3
[fix](Hudi-mtmv) Support asynchronous materialized view partition re…
BePPPower Apr 23, 2025
e21a1ec
[Fix](MTMV) mtmv not refresh cache of external table before run task …
BePPPower May 22, 2025
92df750
fix error
zddr Jun 20, 2025
9af68d7
success
zddr Jun 20, 2025
8f9a025
up IcebergUtils to 47782
zddr Jun 20, 2025
913d4a7
format
zddr Jun 20, 2025
039436e
[enhance](mtmv)When obtaining the partition list fails, treat the pai…
zddr Jan 9, 2025
2f0af60
1
zddr Jun 20, 2025
37481a0
resolve conflict
zddr Jun 23, 2025
ef1d589
resolve conflict
zddr Jun 23, 2025
f0ad324
resolve conflict
zddr Jun 23, 2025
e87e611
fix case
zddr Jun 23, 2025
327e499
resolve conflict
zddr Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@

use demo.test_db;
CREATE TABLE no_partition (
id INT,
name STRING,
create_date DATE
) USING iceberg;
INSERT INTO no_partition VALUES(1, 'Alice', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02');

CREATE TABLE not_support_trans (
id INT,
name STRING,
create_date DATE
) USING iceberg
PARTITIONED BY (bucket(10, create_date));
INSERT INTO not_support_trans VALUES(1, 'Alice', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02');

CREATE TABLE add_partition1 (
id INT,
name STRING,
create_date DATE
) USING iceberg;
INSERT INTO add_partition1 VALUES(1, 'Alice', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02');
ALTER TABLE add_partition1 ADD PARTITION FIELD month(create_date);
INSERT INTO add_partition1 VALUES(3, 'Lara', DATE '2023-12-03');

CREATE TABLE add_partition2 (
id INT,
name STRING,
create_date1 DATE,
create_date2 DATE
) USING iceberg
PARTITIONED BY (month(create_date1));
INSERT INTO add_partition2 VALUES(1, 'Alice', DATE '2023-12-01', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02', DATE '2023-12-02');
ALTER TABLE add_partition2 ADD PARTITION FIELD year(create_date2);
INSERT INTO add_partition2 VALUES(3, 'Lara', DATE '2023-12-03', DATE '2023-12-03');

CREATE TABLE drop_partition1 (
id INT,
name STRING,
create_date DATE
) USING iceberg
PARTITIONED BY (month(create_date));
INSERT INTO drop_partition1 VALUES(1, 'Alice', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02');
ALTER TABLE drop_partition1 DROP PARTITION FIELD month(create_date);

CREATE TABLE drop_partition2 (
id INT,
name STRING,
create_date1 DATE,
create_date2 DATE
) USING iceberg
PARTITIONED BY (month(create_date1), year(create_date2));
INSERT INTO drop_partition2 VALUES(1, 'Alice', DATE '2023-12-01', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02', DATE '2023-12-02');
ALTER TABLE drop_partition2 DROP PARTITION FIELD year(create_date2);
INSERT INTO drop_partition2 VALUES(3, 'Lara', DATE '2023-12-03', DATE '2023-12-03');

CREATE TABLE replace_partition1 (
id INT,
name STRING,
create_date1 DATE,
create_date2 DATE
) USING iceberg
PARTITIONED BY (month(create_date1));
INSERT INTO replace_partition1 VALUES(1, 'Alice', DATE '2023-12-01', DATE '2023-12-01'),(2, 'Bob', DATE '2023-12-02', DATE '2023-12-02');
ALTER TABLE replace_partition1 REPLACE PARTITION FIELD month(create_date1) WITH year(create_date2);
INSERT INTO replace_partition1 VALUES(3, 'Lara', DATE '2023-12-03', DATE '2023-12-03');

CREATE TABLE replace_partition2(
ts TIMESTAMP COMMENT 'ts',
value INT COMMENT 'col1')
USING iceberg
PARTITIONED BY (month(ts));
insert into replace_partition2 values (to_timestamp('2024-10-26 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 1), (to_timestamp('2024-11-27 21:02:03', 'yyyy-MM-dd HH:mm:ss'), 2);
ALTER TABLE replace_partition2 REPLACE PARTITION FIELD ts_month WITH day(ts);
insert into replace_partition2 values (to_timestamp('2024-12-03 14:02:03', 'yyyy-MM-dd HH:mm:ss'), 3);

CREATE TABLE replace_partition3(
ts TIMESTAMP COMMENT 'ts',
value INT COMMENT 'col1')
USING iceberg
PARTITIONED BY (month(ts));
insert into replace_partition3 values (to_timestamp('2024-11-26 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 1);
ALTER TABLE replace_partition3 REPLACE PARTITION FIELD month(ts) WITH day(ts);
insert into replace_partition3 values (to_timestamp('2024-11-02 21:02:03', 'yyyy-MM-dd HH:mm:ss'), 2), (to_timestamp('2024-11-03 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 3), (to_timestamp('2024-12-02 19:02:03', 'yyyy-MM-dd HH:mm:ss'), 4);

CREATE TABLE replace_partition4(
ts TIMESTAMP COMMENT 'ts',
value INT COMMENT 'col1')
USING iceberg
PARTITIONED BY (month(ts));
insert into replace_partition4 values (to_timestamp('2024-10-26 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 1), (to_timestamp('2024-11-26 21:02:03', 'yyyy-MM-dd HH:mm:ss'), 2);
ALTER TABLE replace_partition4 REPLACE PARTITION FIELD month(ts) WITH day(ts);
insert into replace_partition4 values (to_timestamp('2024-11-02 13:02:03', 'yyyy-MM-dd HH:mm:ss'), 3), (to_timestamp('2024-11-03 10:02:03', 'yyyy-MM-dd HH:mm:ss'), 4);

CREATE TABLE replace_partition5(
ts TIMESTAMP COMMENT 'ts',
value INT COMMENT 'col1')
USING iceberg
PARTITIONED BY (month(ts));
insert into replace_partition5 values (to_timestamp('2024-10-26 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 1), (to_timestamp('2024-11-26 13:02:03', 'yyyy-MM-dd HH:mm:ss'), 2);
ALTER TABLE replace_partition5 REPLACE PARTITION FIELD month(ts) WITH day(ts);
insert into replace_partition5 values (to_timestamp('2024-10-12 09:02:03', 'yyyy-MM-dd HH:mm:ss'), 3), (to_timestamp('2024-12-21 21:02:03', 'yyyy-MM-dd HH:mm:ss'), 4);
ALTER TABLE replace_partition5 REPLACE PARTITION FIELD day(ts) WITH hour(ts);
insert into replace_partition5 values (to_timestamp('2024-12-21 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 5);
insert into replace_partition5 values (to_timestamp('2025-01-01 11:02:03', 'yyyy-MM-dd HH:mm:ss'), 6);
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use test_paimon_spark;

drop table if exists test_tb_mix_format;
create table test_tb_mix_format (
id int,
value int,
par string
id int,
value int,
par string
) PARTITIONED BY (par) TBLPROPERTIES (
'primary-key' = 'id, par',
'bucket'=1000,
Expand All @@ -26,9 +26,9 @@ alter table test_tb_mix_format unset TBLPROPERTIES ('file.format');

drop table if exists two_partition;
CREATE TABLE two_partition (
id BIGINT,
create_date STRING,
region STRING
id BIGINT,
create_date STRING,
region STRING
) PARTITIONED BY (create_date,region) TBLPROPERTIES (
'primary-key' = 'create_date,region,id',
'bucket'=10,
Expand All @@ -43,8 +43,8 @@ insert into two_partition values(5,'2038-01-02','bj');

drop table if exists null_partition;
CREATE TABLE null_partition (
id BIGINT,
region STRING
id BIGINT,
region STRING
) PARTITIONED BY (region) TBLPROPERTIES (
'primary-key' = 'region,id',
'bucket'=10,
Expand All @@ -59,8 +59,8 @@ insert into null_partition values(5,'NULL');

drop table if exists date_partition;
CREATE TABLE date_partition (
id BIGINT,
create_date DATE
id BIGINT,
create_date DATE
) PARTITIONED BY (create_date) TBLPROPERTIES (
'primary-key' = 'create_date,id',
'bucket'=10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -128,7 +130,8 @@ public void analyze(Analyzer analyzer) throws UserException {

DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP,
TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE,
TableType.ICEBERG_EXTERNAL_TABLE);

if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).isView()) {
Expand All @@ -147,6 +150,13 @@ public void analyze(Analyzer analyzer) throws UserException {
return;
}

if (table instanceof IcebergExternalTable) {
if (!((IcebergExternalTable) table).isValidRelatedTable()) {
throw new AnalysisException("Table " + tblName + " is not a supported partition table");
}
return;
}

table.readLock();
try {
// build proc path
Expand Down Expand Up @@ -180,7 +190,7 @@ public void analyzeImpl(Analyzer analyzer) throws UserException {

// disallow unsupported catalog
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog
|| catalog instanceof MaxComputeExternalCatalog)) {
|| catalog instanceof MaxComputeExternalCatalog || catalog instanceof IcebergExternalCatalog)) {
throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt",
catalog.getType()));
}
Expand Down Expand Up @@ -287,6 +297,10 @@ public ShowResultSetMetaData getMetaData() {
for (String col : result.getColumnNames()) {
builder.addColumn(new Column(col, ScalarType.createVarchar(30)));
}
} else if (catalog instanceof IcebergExternalCatalog) {
builder.addColumn(new Column("Partition", ScalarType.createVarchar(60)));
builder.addColumn(new Column("Lower Bound", ScalarType.createVarchar(100)));
builder.addColumn(new Column("Upper Bound", ScalarType.createVarchar(100)));
} else {
builder.addColumn(new Column("Partition", ScalarType.createVarchar(60)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
Expand Down Expand Up @@ -293,7 +294,7 @@ public void addPartitionsCache(long catalogId, HMSExternalTable table, List<Stri
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
partitionColumnTypes = table.getPartitionColumnTypes();
partitionColumnTypes = table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
} catch (NotSupportedException e) {
LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}

protected Optional<SchemaCacheValue> getSchemaCacheValue() {
public Optional<SchemaCacheValue> getSchemaCacheValue() {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchemaValue(dbName, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class TablePartitionValues {
private long nextPartitionId;
private final Map<Long, PartitionItem> idToPartitionItem;
private final Map<String, Long> partitionNameToIdMap;
private Map<String, Long> partitionNameToLastModifiedMap;
private final Map<Long, String> partitionIdToNameMap;

private Map<Long, List<UniqueId>> idToUniqueIdsMap;
Expand All @@ -68,15 +69,12 @@ public TablePartitionValues() {
nextPartitionId = 0;
idToPartitionItem = new HashMap<>();
partitionNameToIdMap = new HashMap<>();
partitionNameToLastModifiedMap = new HashMap<>();
partitionIdToNameMap = new HashMap<>();
}

public TablePartitionValues(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) {
this();
addPartitions(partitionNames, partitionValues, types);
}

public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) {
public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types,
List<Long> partitionLastUpdateTimestamp) {
Preconditions.checkState(partitionNames.size() == partitionValues.size());
List<String> addPartitionNames = new ArrayList<>();
List<PartitionItem> addPartitionItems = new ArrayList<>();
Expand All @@ -90,6 +88,7 @@ public void addPartitions(List<String> partitionNames, List<List<String>> partit
addPartitionNames.add(partitionNames.get(i));
addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types));
}
partitionNameToLastModifiedMap.put(partitionNames.get(i), partitionLastUpdateTimestamp.get(i));
}
cleanPartitions();

Expand Down Expand Up @@ -123,23 +122,6 @@ private void addPartitionItems(List<String> partitionNames, List<PartitionItem>
partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
}

public void dropPartitions(List<String> partitionNames, List<Type> types) {
partitionNames.forEach(p -> {
Long removedPartition = partitionNameToIdMap.get(p);
if (removedPartition != null) {
idToPartitionItem.remove(removedPartition);
}
});
List<String> remainingPartitionNames = new ArrayList<>();
List<PartitionItem> remainingPartitionItems = new ArrayList<>();
partitionNameToIdMap.forEach((partitionName, partitionId) -> {
remainingPartitionNames.add(partitionName);
remainingPartitionItems.add(idToPartitionItem.get(partitionId));
});
cleanPartitions();
addPartitionItems(remainingPartitionNames, remainingPartitionItems, types);
}

public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
Expand All @@ -148,6 +130,10 @@ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}

public Map<String, Long> getPartitionNameToLastModifiedMap() {
return partitionNameToLastModifiedMap;
}

public Lock readLock() {
return readWriteLock.readLock();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVSnapshotIf;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* This abstract class represents a Hive Metastore (HMS) Dla Table and provides a blueprint for
* various operations related to metastore tables in Doris.
*
* Purpose:
* - To encapsulate common functionalities that HMS Dla tables should have for implementing other interfaces
*
* Why needed:
* - To provide a unified way to manage and interact with different kinds of Dla Table
* - To facilitate the implementation of multi-table materialized views (MTMV) by providing necessary
* methods for snapshot and partition management.
* - To abstract out the specific details of HMS table operations, making the code more modular and maintainable.
*/
public abstract class HMSDlaTable implements MTMVBaseTableIf {
protected HMSExternalTable hmsTable;

public HMSDlaTable(HMSExternalTable table) {
this.hmsTable = table;
}

abstract Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException;

abstract PartitionType getPartitionType(Optional<MvccSnapshot> snapshot);

abstract Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot);

abstract List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot);

abstract MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException;

abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException;

abstract boolean isPartitionColumnAllowNull();

@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}
}
Loading