From 2f4d88e4033b859b5a0d5da0793a14205004c9a2 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 22 Jan 2025 19:05:15 +0800 Subject: [PATCH 1/4] 1 --- .../IncrementalTagStartingScanner.java | 4 +- .../table/source/snapshot/TimeTravelUtil.java | 45 +++++++++--- .../logical/PaimonTableValuedFunctions.scala | 68 +++++++++++++++++-- .../spark/sql/TableValuedFunctionsTest.scala | 27 ++++++++ 4 files changed, 127 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 835b7595a316..388b36fb286b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -57,8 +57,8 @@ public IncrementalTagStartingScanner( snapshotManager.fileIO(), snapshotManager.tablePath(), snapshotManager.branch()), - start.schemaId(), - end.schemaId()); + start, + end); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 4a0f4290df91..d6231721ded7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -128,15 +128,18 @@ private static Snapshot resolveSnapshotByTagName( } public static void checkRescaleBucketForIncrementalTagQuery( - SchemaManager schemaManager, long schemaId1, long schemaId2) { - if (schemaId1 != schemaId2) { - int bucketNumber1 = bucketNumber(schemaManager, schemaId1); - int bucketNumber2 = bucketNumber(schemaManager, schemaId2); - checkArgument( - bucketNumber1 == bucketNumber2, - "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", - bucketNumber1, - bucketNumber2); + SchemaManager schemaManager, Snapshot start, Snapshot end) { + if (start.schemaId() != end.schemaId()) { + int startBucketNumber = bucketNumber(schemaManager, start.schemaId()); + int endBucketNumber = bucketNumber(schemaManager, end.schemaId()); + if (startBucketNumber != endBucketNumber) { + throw new InconsistentTagBucketException( + start.id(), + end.id(), + String.format( + "The bucket number of two tags are different (%s, %s), which is not supported in incremental tag query.", + startBucketNumber, endBucketNumber)); + } } } @@ -144,4 +147,28 @@ private static int bucketNumber(SchemaManager schemaManager, long schemaId) { TableSchema schema = schemaManager.schema(schemaId); return CoreOptions.fromMap(schema.options()).bucket(); } + + /** + * Exception thrown when the bucket number of two tags are different in incremental tag query. + */ + public static class InconsistentTagBucketException extends RuntimeException { + + private final long startSnapshotId; + private final long endSnapshotId; + + public InconsistentTagBucketException( + long startSnapshotId, long endSnapshotId, String message) { + super(message); + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + } + + public long startSnapshotId() { + return startSnapshotId; + } + + public long endSnapshotId() { + return endSnapshotId; + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 00759f663d24..368284e69691 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -19,15 +19,19 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions +import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._ +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException +import org.apache.spark.sql.PaimonUtils.createDataset import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase import org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -83,11 +87,63 @@ object PaimonTableValuedFunctions { val sparkTable = sparkCatalog.loadTable(ident) val options = tvf.parseArgs(args.tail) - DataSourceV2Relation.create( - sparkTable, - Some(sparkCatalog), - Some(ident), - new CaseInsensitiveStringMap(options.asJava)) + usingSparkIncrementQuery(tvf, sparkTable, options) match { + case Some(snapshotIdPair: (Long, Long)) => + sparkIncrementQuery(spark, sparkTable, sparkCatalog, ident, options, snapshotIdPair) + case _ => + DataSourceV2Relation.create( + sparkTable, + Some(sparkCatalog), + Some(ident), + new CaseInsensitiveStringMap(options.asJava)) + } + } + + private def usingSparkIncrementQuery( + tvf: PaimonTableValueFunction, + sparkTable: Table, + options: Map[String, String]): Option[(Long, Long)] = { + tvf.fnName match { + case INCREMENTAL_QUERY | INCREMENTAL_TO_AUTO_TAG => + sparkTable match { + case SparkTable(fileStoreTable: FileStoreTable) => + try { + fileStoreTable.copy(options.asJava).newScan().plan() + None + } catch { + case e: InconsistentTagBucketException => + Some((e.startSnapshotId, e.endSnapshotId)) + } + } + } + } + + private def sparkIncrementQuery( + spark: SparkSession, + sparkTable: Table, + sparkCatalog: TableCatalog, + ident: Identifier, + options: Map[String, String], + snapshotIdPair: (Long, Long)): LogicalPlan = { + val filteredOptions = + options - CoreOptions.INCREMENTAL_BETWEEN.key - CoreOptions.INCREMENTAL_TO_AUTO_TAG.key + + def datasetOfSnapshot(snapshotId: Long) = { + val updatedOptions = filteredOptions + (CoreOptions.SCAN_VERSION.key() -> snapshotId.toString) + createDataset( + spark, + DataSourceV2Relation.create( + sparkTable, + Some(sparkCatalog), + Some(ident), + new CaseInsensitiveStringMap(updatedOptions.asJava) + )) + } + + datasetOfSnapshot(snapshotIdPair._2) + .except(datasetOfSnapshot(snapshotIdPair._1)) + .queryExecution + .logical } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index 0aa1829eee3f..fe941fab9e26 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -189,6 +189,33 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase { } } + test("Table Valued Functions: incremental query with inconsistent tag bucket") { + withTable("t") { + sql(""" + |CREATE TABLE t (a INT, b INT) USING paimon + |TBLPROPERTIES ('primary-key'='a', 'bucket' = '1') + |""".stripMargin) + + val table = loadTable("t") + + sql("INSERT INTO t VALUES (1, 11), (2, 22)") + sql("ALTER TABLE t SET TBLPROPERTIES ('bucket' = '2')") + sql("INSERT OVERWRITE t SELECT * FROM t") + sql("INSERT INTO t VALUES (3, 33)") + + table.createTag("2024-01-01", 1) + table.createTag("2024-01-02", 3) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-02') ORDER BY a, b"), + Seq(Row(3, 33))) + checkAnswer( + sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-02') ORDER BY a, b"), + Seq(Row(3, 33))) + } + } + private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = { spark.read .format("paimon") From f392be8a6eb2f329396996c384a62a56be474acf Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 23 Jan 2025 13:16:23 +0800 Subject: [PATCH 2/4] 1 --- .../catalyst/plans/logical/PaimonTableValuedFunctions.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 368284e69691..16c9681d9272 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -114,7 +114,9 @@ object PaimonTableValuedFunctions { case e: InconsistentTagBucketException => Some((e.startSnapshotId, e.endSnapshotId)) } + case _ => None } + case _ => None } } From 832bc3484e07574eff70e94041846c49ca46ee0f Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 23 Jan 2025 14:01:52 +0800 Subject: [PATCH 3/4] fix test --- .../java/org/apache/paimon/flink/BatchFileStoreITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index d9a5cab1d455..486bfcb69bb0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.SnapshotNotExistException; @@ -676,7 +677,7 @@ public void testIncrementTagQueryWithRescaleBucket() throws Exception { assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s) */", option)) .satisfies( anyCauseMatches( - IllegalArgumentException.class, + TimeTravelUtil.InconsistentTagBucketException.class, "The bucket number of two tags are different (1, 2), which is not supported in incremental tag query.")); } } From bca9d9174f3eeead0f60e3d0a98d193244a18ef6 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 23 Jan 2025 14:54:05 +0800 Subject: [PATCH 4/4] Add audi log case --- .../logical/PaimonTableValuedFunctions.scala | 6 +-- .../spark/sql/TableValuedFunctionsTest.scala | 37 +++++++++++++++++-- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 16c9681d9272..7e72abc4c994 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._ -import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.{DataTable, FileStoreTable} import org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException import org.apache.spark.sql.PaimonUtils.createDataset @@ -106,9 +106,9 @@ object PaimonTableValuedFunctions { tvf.fnName match { case INCREMENTAL_QUERY | INCREMENTAL_TO_AUTO_TAG => sparkTable match { - case SparkTable(fileStoreTable: FileStoreTable) => + case SparkTable(fileStoreTable: DataTable) => try { - fileStoreTable.copy(options.asJava).newScan().plan() + fileStoreTable.copy(options.asJava).asInstanceOf[DataTable].newScan().plan() None } catch { case e: InconsistentTagBucketException => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index fe941fab9e26..addf8461000f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -199,20 +199,49 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase { val table = loadTable("t") sql("INSERT INTO t VALUES (1, 11), (2, 22)") + table.createTag("2024-01-01", 1) + sql("ALTER TABLE t SET TBLPROPERTIES ('bucket' = '2')") sql("INSERT OVERWRITE t SELECT * FROM t") + sql("INSERT INTO t VALUES (3, 33)") + table.createTag("2024-01-03", 3) - table.createTag("2024-01-01", 1) - table.createTag("2024-01-02", 3) + sql("DELETE FROM t WHERE a = 1") + table.createTag("2024-01-04", 4) + + sql("UPDATE t SET b = 222 WHERE a = 2") + table.createTag("2024-01-05", 5) checkAnswer( sql( - "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-02') ORDER BY a, b"), + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-03') ORDER BY a, b"), Seq(Row(3, 33))) + checkAnswer( - sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-02') ORDER BY a, b"), + sql("SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-01-03') ORDER BY a, b"), + Seq(Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-04') ORDER BY a, b"), Seq(Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('t', '2024-01-01', '2024-01-05') ORDER BY a, b"), + Seq(Row(2, 222), Row(3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('`t$audit_log`', '2024-01-01', '2024-01-04') ORDER BY a, b"), + Seq(Row("-D", 1, 11), Row("+I", 3, 33))) + + checkAnswer( + sql( + "SELECT * FROM paimon_incremental_query('`t$audit_log`', '2024-01-01', '2024-01-05') ORDER BY a, b"), + Seq(Row("-D", 1, 11), Row("+U", 2, 222), Row("+I", 3, 33)) + ) } }