From 0042d6cc0f98f793621647361b4538ccc396f635 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 7 Jan 2025 14:56:57 +0800 Subject: [PATCH] v1 --- docs/content/spark/sql-query.md | 10 +- .../generated/core_configuration.html | 44 +++---- .../java/org/apache/paimon/CoreOptions.java | 9 +- paimon-docs/README.md | 2 +- .../logical/PaimonTableValuedFunctions.scala | 65 ++++++++-- .../paimon/spark/PaimonHiveTestBase.scala | 5 + .../paimon/spark/PaimonSparkTestBase.scala | 4 +- .../spark/sql/TableValuedFunctionsTest.scala | 112 ++++++++++++++++++ 8 files changed, 211 insertions(+), 40 deletions(-) diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md index e118b8418bc3..c97b6d3341b7 100644 --- a/docs/content/spark/sql-query.md +++ b/docs/content/spark/sql-query.md @@ -77,11 +77,17 @@ You can also force specifying `'incremental-between-scan-mode'`. Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function. -you can use `paimon_incremental_query` in query to extract the incremental data: - ```sql -- read the incremental data between snapshot id 12 and snapshot id 20. SELECT * FROM paimon_incremental_query('tableName', 12, 20); + +-- read the incremental data between ts 1692169900000 and ts 1692169900000. +SELECT * FROM paimon_incremental_between_timestamp('tableName', '1692169000000', '1692169900000'); + +-- read the incremental data to tag '2024-12-04'. +-- Paimon will find an earlier tag and return changes between them. +-- If the tag doesn't exist or the earlier tag doesn't exist, return empty. +SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04'); ``` In Batch SQL, the `DELETE` records are not allowed to be returned, so records of `-D` will be dropped. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 003d5ba4c2cb..148d742c3805 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -218,6 +218,24 @@ Duration The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication. + +
data-file.external-paths
+ (none) + String + The external paths where the data of this table will be written, multiple elements separated by commas. + + +
data-file.external-paths.specific-fs
+ (none) + String + The specific file system of the external path when data-file.external-paths.strategy is set to specific-fs, should be the prefix scheme of the external path, now supported are s3 and oss. + + +
data-file.external-paths.strategy
+ none +

Enum

+ The strategy of selecting an external path when writing data.

Possible values: +
data-file.path-directory
(none) @@ -378,25 +396,25 @@
incremental-between
(none) String - Read incremental changes between start snapshot (exclusive) and end snapshot, for example, '5,10' means changes between snapshot 5 and snapshot 10. + Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive), for example, '5,10' means changes between snapshot 5 and snapshot 10.
incremental-between-scan-mode
auto

Enum

- Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot.

Possible values: + Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive).

Possible values:
incremental-between-timestamp
(none) String - Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. + Read incremental changes between start timestamp (exclusive) and end timestamp (inclusive), for example, 't1,t2' means changes between timestamp t1 and timestamp t2.
incremental-to-auto-tag
(none) String - Used to specify the auto-created tag to reading incremental changes. + Used to specify the end tag (inclusive), and Paimon will find an earlier tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty.
local-merge-buffer-size
@@ -1026,23 +1044,5 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. - -
data-file.external-paths
- (none) - String - The external paths where the data of this table will be written, multiple elements separated by commas. - - -
data-file.external-paths.strategy
- none -

Enum

- The strategy of selecting an external path when writing data.

Possible values: - - -
data-file.external-paths.specific-fs
- (none) - String - The specific file system of the external path when data-file.external-paths.strategy is set to specific-fs, should be the prefix scheme of the external path, now supported are s3 and oss. - diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0beea0a8f2de..c033754ca1d1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1089,7 +1089,7 @@ public class CoreOptions implements Serializable { .stringType() .noDefaultValue() .withDescription( - "Read incremental changes between start snapshot (exclusive) and end snapshot, " + "Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive), " + "for example, '5,10' means changes between snapshot 5 and snapshot 10."); public static final ConfigOption INCREMENTAL_BETWEEN_SCAN_MODE = @@ -1097,14 +1097,14 @@ public class CoreOptions implements Serializable { .enumType(IncrementalBetweenScanMode.class) .defaultValue(IncrementalBetweenScanMode.AUTO) .withDescription( - "Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. "); + "Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive). "); public static final ConfigOption INCREMENTAL_BETWEEN_TIMESTAMP = key("incremental-between-timestamp") .stringType() .noDefaultValue() .withDescription( - "Read incremental changes between start timestamp (exclusive) and end timestamp, " + "Read incremental changes between start timestamp (exclusive) and end timestamp (inclusive), " + "for example, 't1,t2' means changes between timestamp t1 and timestamp t2."); public static final ConfigOption INCREMENTAL_TO_AUTO_TAG = @@ -1112,7 +1112,8 @@ public class CoreOptions implements Serializable { .stringType() .noDefaultValue() .withDescription( - "Used to specify the auto-created tag to reading incremental changes."); + "Used to specify the end tag (inclusive), and Paimon will find an earlier tag and return changes between them. " + + "If the tag doesn't exist or the earlier tag doesn't exist, return empty. "); public static final ConfigOption END_INPUT_CHECK_PARTITION_EXPIRE = key("end-input.check-partition-expire") diff --git a/paimon-docs/README.md b/paimon-docs/README.md index 76d922c882f2..cb1d4758692d 100644 --- a/paimon-docs/README.md +++ b/paimon-docs/README.md @@ -28,7 +28,7 @@ The `@ConfigGroups` annotation can be used to generate multiple files from a sin To integrate an `*Options` class from another package, add another module-package argument pair to `ConfigOptionsDocGenerator#LOCATIONS`. -The files can be generated by running `mvn clean install -DskipTests` and `mvn package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests`, and can be integrated into the documentation using `{{ include generated/ >}}`. +The files can be generated by running `mvn package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests -am`, and can be integrated into the documentation using `{{ include generated/ >}}`. **NOTE:** You need to make sure that the changed jar has been installed in the local maven repository. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 6edbf533cbbc..00759f663d24 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions +import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -35,8 +36,11 @@ import scala.collection.JavaConverters._ object PaimonTableValuedFunctions { val INCREMENTAL_QUERY = "paimon_incremental_query" + val INCREMENTAL_BETWEEN_TIMESTAMP = "paimon_incremental_between_timestamp" + val INCREMENTAL_TO_AUTO_TAG = "paimon_incremental_to_auto_tag" - val supportedFnNames: Seq[String] = Seq(INCREMENTAL_QUERY) + val supportedFnNames: Seq[String] = + Seq(INCREMENTAL_QUERY, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_TO_AUTO_TAG) private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) @@ -44,6 +48,10 @@ object PaimonTableValuedFunctions { val (info, builder) = fnName match { case INCREMENTAL_QUERY => FunctionRegistryBase.build[IncrementalQuery](fnName, since = None) + case INCREMENTAL_BETWEEN_TIMESTAMP => + FunctionRegistryBase.build[IncrementalBetweenTimestamp](fnName, since = None) + case INCREMENTAL_TO_AUTO_TAG => + FunctionRegistryBase.build[IncrementalToAutoTag](fnName, since = None) case _ => throw new Exception(s"Function $fnName isn't a supported table valued function.") } @@ -58,12 +66,23 @@ object PaimonTableValuedFunctions { val sessionState = spark.sessionState val catalogManager = sessionState.catalogManager - val sparkCatalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog] - val tableId = sessionState.sqlParser.parseTableIdentifier(args.head.eval().toString) - val namespace = tableId.database.map(Array(_)).getOrElse(catalogManager.currentNamespace) - val ident = Identifier.of(namespace, tableId.table) + + val identifier = args.head.eval().toString + val (catalogName, dbName, tableName) = { + sessionState.sqlParser.parseMultipartIdentifier(identifier) match { + case Seq(table) => + (catalogManager.currentCatalog.name(), catalogManager.currentNamespace.head, table) + case Seq(db, table) => (catalogManager.currentCatalog.name(), db, table) + case Seq(catalog, db, table) => (catalog, db, table) + case _ => throw new RuntimeException(s"Invalid table identifier: $identifier") + } + } + + val sparkCatalog = catalogManager.catalog(catalogName).asInstanceOf[TableCatalog] + val ident: Identifier = Identifier.of(Array(dbName), tableName) val sparkTable = sparkCatalog.loadTable(ident) val options = tvf.parseArgs(args.tail) + DataSourceV2Relation.create( sparkTable, Some(sparkCatalog), @@ -87,20 +106,46 @@ abstract class PaimonTableValueFunction(val fnName: String) extends LeafNode { val args: Seq[Expression] def parseArgs(args: Seq[Expression]): Map[String, String] - } -/** Plan for the "paimon_incremental_query" function */ +/** Plan for the [[INCREMENTAL_QUERY]] function */ case class IncrementalQuery(override val args: Seq[Expression]) - extends PaimonTableValueFunction(PaimonTableValuedFunctions.INCREMENTAL_QUERY) { + extends PaimonTableValueFunction(INCREMENTAL_QUERY) { override def parseArgs(args: Seq[Expression]): Map[String, String] = { assert( - args.size >= 1 && args.size <= 2, - "paimon_incremental_query needs two parameters: startSnapshotId, and endSnapshotId.") + args.size == 2, + s"$INCREMENTAL_QUERY needs two parameters: startSnapshotId, and endSnapshotId.") val start = args.head.eval().toString val end = args.last.eval().toString Map(CoreOptions.INCREMENTAL_BETWEEN.key -> s"$start,$end") } } + +/** Plan for the [[INCREMENTAL_BETWEEN_TIMESTAMP]] function */ +case class IncrementalBetweenTimestamp(override val args: Seq[Expression]) + extends PaimonTableValueFunction(INCREMENTAL_BETWEEN_TIMESTAMP) { + + override def parseArgs(args: Seq[Expression]): Map[String, String] = { + assert( + args.size == 2, + s"$INCREMENTAL_BETWEEN_TIMESTAMP needs two parameters: startTimestamp, and endTimestamp.") + + val start = args.head.eval().toString + val end = args.last.eval().toString + Map(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key -> s"$start,$end") + } +} + +/** Plan for the [[INCREMENTAL_TO_AUTO_TAG]] function */ +case class IncrementalToAutoTag(override val args: Seq[Expression]) + extends PaimonTableValueFunction(INCREMENTAL_TO_AUTO_TAG) { + + override def parseArgs(args: Seq[Expression]): Map[String, String] = { + assert(args.size == 1, s"$INCREMENTAL_TO_AUTO_TAG needs one parameter: endTagName.") + + val endTagName = args.head.eval().toString + Map(CoreOptions.INCREMENTAL_TO_AUTO_TAG.key -> endTagName) + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index 6d2ffea04df5..d4d888f2c1fd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark import org.apache.paimon.hive.TestHiveMetastore +import org.apache.paimon.table.FileStoreTable import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf @@ -78,6 +79,10 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { spark.sql(s"USE $sparkCatalogName") spark.sql(s"USE $hiveDbName") } + + override def loadTable(tableName: String): FileStoreTable = { + loadTable(hiveDbName, tableName) + } } object PaimonHiveTestBase { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 9a6719010e36..9a1647da8140 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -38,7 +38,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import java.io.File -import java.util.TimeZone +import java.util.{TimeZone, UUID} import scala.util.Random @@ -48,6 +48,8 @@ class PaimonSparkTestBase with WithTableOptions with SparkVersionSupport { + protected lazy val commitUser: String = UUID.randomUUID.toString + protected lazy val fileIO: FileIO = LocalFileIO.create protected lazy val tempDBDir: File = Utils.createTempDir diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index b9c187b83a25..0aa1829eee3f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -18,10 +18,15 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.data.{BinaryString, GenericRow, Timestamp} +import org.apache.paimon.manifest.ManifestCommittable import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.spark.sql.{DataFrame, Row} +import java.time.LocalDateTime +import java.util.Collections + class TableValuedFunctionsTest extends PaimonHiveTestBase { withPk.foreach { @@ -91,10 +96,117 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase { } } + test("Table Valued Functions: paimon_incremental_between_timestamp") { + Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"USE $catalogName") + val dbName = "test_tvf_db" + withDatabase(dbName) { + sql(s"CREATE DATABASE $dbName") + withTable("t") { + sql(s"USE $dbName") + sql("CREATE TABLE t (id INT) USING paimon") + + sql("INSERT INTO t VALUES 1") + Thread.sleep(100) + val t1 = System.currentTimeMillis() + sql("INSERT INTO t VALUES 2") + Thread.sleep(100) + val t2 = System.currentTimeMillis() + sql("INSERT INTO t VALUES 3") + sql("INSERT INTO t VALUES 4") + Thread.sleep(100) + val t3 = System.currentTimeMillis() + sql("INSERT INTO t VALUES 5") + + checkAnswer( + sql( + s"SELECT * FROM paimon_incremental_between_timestamp('t', '$t1', '$t2') ORDER BY id"), + Seq(Row(2))) + checkAnswer( + sql( + s"SELECT * FROM paimon_incremental_between_timestamp('$dbName.t', '$t2', '$t3') ORDER BY id"), + Seq(Row(3), Row(4))) + checkAnswer( + sql( + s"SELECT * FROM paimon_incremental_between_timestamp('$catalogName.$dbName.t', '$t1', '$t3') ORDER BY id"), + Seq(Row(2), Row(3), Row(4))) + } + } + } + } + + test("Table Valued Functions: paimon_incremental_to_auto_tag") { + withTable("t") { + sql(""" + |CREATE TABLE t (a INT, b STRING) USING paimon + |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1', 'tag.automatic-creation'='watermark', 'tag.creation-period'='daily') + |""".stripMargin) + + val table = loadTable("t") + val write = table.newWrite(commitUser) + val commit = table.newCommit(commitUser).ignoreEmptyCommit(false) + + write.write(GenericRow.of(1, BinaryString.fromString("a"))) + var commitMessages = write.prepareCommit(false, 0) + commit.commit( + new ManifestCommittable( + 0, + utcMills("2024-12-02T10:00:00"), + Collections.emptyMap[Integer, java.lang.Long], + commitMessages)) + + write.write(GenericRow.of(2, BinaryString.fromString("b"))) + commitMessages = write.prepareCommit(false, 1) + commit.commit( + new ManifestCommittable( + 1, + utcMills("2024-12-03T10:00:00"), + Collections.emptyMap[Integer, java.lang.Long], + commitMessages)) + + write.write(GenericRow.of(3, BinaryString.fromString("c"))) + commitMessages = write.prepareCommit(false, 2) + commit.commit( + new ManifestCommittable( + 2, + utcMills("2024-12-05T10:00:00"), + Collections.emptyMap[Integer, java.lang.Long], + commitMessages)) + + checkAnswer( + sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-01') ORDER BY a"), + Seq()) + checkAnswer( + sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-02') ORDER BY a"), + Seq(Row(2, "b"))) + checkAnswer( + sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-03') ORDER BY a"), + Seq()) + checkAnswer( + sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-04') ORDER BY a"), + Seq(Row(3, "c"))) + } + } + private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = { spark.read .format("paimon") .option("incremental-between", s"$start,$end") .table(tableIdent) } + + private def utcMills(timestamp: String) = + Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond + + object GenericRow { + def of(values: Any*): GenericRow = { + val row = new GenericRow(values.length) + values.zipWithIndex.foreach { + case (value, index) => + row.setField(index, value) + } + row + } + } }