diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 835b7595a316..388b36fb286b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -57,8 +57,8 @@ public IncrementalTagStartingScanner( snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch()), - start.schemaId(), - end.schemaId()); + start, + end); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 4a0f4290df91..d6231721ded7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -128,15 +128,18 @@ private static Snapshot resolveSnapshotByTagName( } public static void checkRescaleBucketForIncrementalTagQuery( - SchemaManager schemaManager, long schemaId1, long schemaId2) { - if (schemaId1 != schemaId2) { - int bucketNumber1 = bucketNumber(schemaManager, schemaId1); - int bucketNumber2 = bucketNumber(schemaManager, schemaId2); - checkArgument( - bucketNumber1 == bucketNumber2, - "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", - bucketNumber1, - bucketNumber2); + SchemaManager schemaManager, Snapshot start, Snapshot end) { + if (start.schemaId() != end.schemaId()) { + int startBucketNumber = bucketNumber(schemaManager, start.schemaId()); + int endBucketNumber = bucketNumber(schemaManager, end.schemaId()); + if (startBucketNumber != endBucketNumber) { + throw new InconsistentTagBucketException( + start.id(), + end.id(), + String.format( + "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", + startBucketNumber, endBucketNumber)); + } } } @@ -144,4 +147,28 @@ private static int bucketNumber(SchemaManager schemaManager, long schemaId) { TableSchema schema = schemaManager.schema(schemaId); return CoreOptions.fromMap(schema.options()).bucket(); } + + /** + * Exception thrown when the bucket number of two tags are different in incremental tag query. + */ + public static class InconsistentTagBucketException extends RuntimeException { + + private final long startSnapshotId; + private final long endSnapshotId; + + public InconsistentTagBucketException( + long startSnapshotId, long endSnapshotId, String message) { + super(message); + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + } + + public long startSnapshotId() { + return startSnapshotId; + } + + public long endSnapshotId() { + return endSnapshotId; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index d9a5cab1d455..486bfcb69bb0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotNotExistException; @@ -676,7 +677,7 @@ public void testIncrementTagQueryWithRescaleBucket() throws Exception { assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option)) .satisfies( anyCauseMatches( - IllegalArgumentException.class, + TimeTravelUtil.InconsistentTagBucketException.class, "The bucket number of two tags are different (1, 2), which is not supported in incremental tag query.")); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 00759f663d24..7e72abc4c994 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -19,15 +19,19 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions +import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._ +import org.apache.paimon.table.{DataTable, FileStoreTable} +import org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException +import org.apache.spark.sql.PaimonUtils.createDataset import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase import org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -83,11 +87,65 @@ object PaimonTableValuedFunctions { val sparkTable = sparkCatalog.loadTable(ident) val options = tvf.parseArgs(args.tail) - DataSourceV2Relation.create( - sparkTable, - Some(sparkCatalog), - Some(ident), - new CaseInsensitiveStringMap(options.asJava)) + usingSparkIncrementQuery(tvf, sparkTable, options) match { + case Some(snapshotIdPair: (Long, Long)) => + sparkIncrementQuery(spark, sparkTable, sparkCatalog, ident, options, snapshotIdPair) + case _ => + DataSourceV2Relation.create( + sparkTable, + Some(sparkCatalog), + Some(ident), + new CaseInsensitiveStringMap(options.asJava)) + } + } + + private def usingSparkIncrementQuery( + tvf: PaimonTableValueFunction, + sparkTable: Table, + options: Map[String, String]): Option[(Long, Long)] = { + tvf.fnName match { + case INCREMENTAL_QUERY | INCREMENTAL_TO_AUTO_TAG => + sparkTable match { + case SparkTable(fileStoreTable: DataTable) => + try { + fileStoreTable.copy(options.asJava).asInstanceOf[DataTable].newScan().plan() + None + } catch { + case e: InconsistentTagBucketException => + Some((e.startSnapshotId, e.endSnapshotId)) + } + case _ => None + } + case _ => None + } + } + + private def sparkIncrementQuery( + spark: SparkSession, + sparkTable: Table, + sparkCatalog: TableCatalog, + ident: Identifier, + options: Map[String, String], + snapshotIdPair: (Long, Long)): LogicalPlan = { + val filteredOptions = + options - CoreOptions.INCREMENTAL_BETWEEN.key - CoreOptions.INCREMENTAL_TO_AUTO_TAG.key + + def datasetOfSnapshot(snapshotId: Long) = { + val updatedOptions = filteredOptions + (CoreOptions.SCAN_VERSION.key() -> snapshotId.toString) + createDataset( + spark, + DataSourceV2Relation.create( + sparkTable, + Some(sparkCatalog), + Some(ident), + new CaseInsensitiveStringMap(updatedOptions.asJava) + )) + } + + datasetOfSnapshot(snapshotIdPair._2) + .except(datasetOfSnapshot(snapshotIdPair._1)) + .queryExecution + .logical } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index 0aa1829eee3f..addf8461000f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -189,6 +189,62 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase { } } + test("Table Valued Functions: incremental query with inconsistent tag bucket") { + withTable("t") { + sql(""" + |CREATE TABLE t (a INT, b INT) USING paimon + |TBLPROPERTIES ('primary-key'='a', 'bucket' = '1') + |""".stripMargin) + + val table = loadTable("t") + + sql("INSERT INTO t VALUES (1, 11), (2, 22)") + table.createTag("2024-01-01", 1) + + sql("ALTER TABLE t SET TBLPROPERTIES ('bucket' = '2')") + sql("INSERT OVERWRITE t SELECT * FROM t") + + sql("INSERT INTO t VALUES (3, 33)") + table.createTag("2024-01-03", 3) + + sql("DELETE FROM t WHERE a = 1") + table.createTag("2024-01-04", 4) + + sql("UPDATE t SET b = 222 WHERE a = 2") + table.createTag("2024-01-05", 5) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-03') ORDER BY a, b"), + Seq(Row(3, 33))) + + checkAnswer( + sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-03') ORDER BY a, b"), + Seq(Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-04') ORDER BY a, b"), + Seq(Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-05') ORDER BY a, b"), + Seq(Row(2, 222), Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('`t$audit_log`', '2024-01-01', '2024-01-04') ORDER BY a, b"), + Seq(Row("-D", 1, 11), Row("+I", 3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('`t$audit_log`', '2024-01-01', '2024-01-05') ORDER BY a, b"), + Seq(Row("-D", 1, 11), Row("+U", 2, 222), Row("+I", 3, 33)) + ) + } + } + private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = { spark.read .format("paimon")