From 6c63fda43562a5347396e7ef1e02c2817f3547a4 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Mon, 16 Dec 2024 15:17:43 +0800 Subject: [PATCH] 1 --- .../paimon/utils/FileStorePathFactory.java | 21 +++++++------- .../paimon/spark/commands/PaimonCommand.scala | 6 ++-- .../spark/commands/SparkDataFileMeta.scala | 2 +- .../spark/commands/SparkDeletionVectors.scala | 2 +- .../paimon/spark/sql/DeletionVectorTest.scala | 16 ++++++++++ .../spark/sql/MergeIntoTableTestBase.scala | 29 +++++++++++++++++++ 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index 8896ec328680..f255762cfd3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -130,20 +130,19 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu } public Path bucketPath(BinaryRow partition, int bucket) { - Path dataFileRoot = this.root; - if (dataFilePathDirectory != null) { - dataFileRoot = new Path(dataFileRoot, dataFilePathDirectory); - } - return new Path(dataFileRoot + "/" + relativePartitionAndBucketPath(partition, bucket)); + return new Path(root, relativeBucketPath(partition, bucket)); } - public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) { + public Path relativeBucketPath(BinaryRow partition, int bucket) { + Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucket); String partitionPath = getPartitionString(partition); - String fullPath = - partitionPath.isEmpty() - ? BUCKET_PATH_PREFIX + bucket - : partitionPath + "/" + BUCKET_PATH_PREFIX + bucket; - return new Path(fullPath); + if (!partitionPath.isEmpty()) { + relativeBucketPath = new Path(partitionPath, relativeBucketPath); + } + if (dataFilePathDirectory != null) { + relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath); + } + return relativeBucketPath; } /** IMPORTANT: This method is NOT THREAD SAFE. */ diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 87583593e3fe..04118a438307 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -245,12 +245,12 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon val relativeFilePath = location.toUri.relativize(new URI(filePath)).toString val (partition, bucket) = dataFileToPartitionAndBucket.toMap.apply(relativeFilePath) val pathFactory = my_table.store().pathFactory() - val partitionAndBucket = pathFactory - .relativePartitionAndBucketPath(partition, bucket) + val relativeBucketPath = pathFactory + .relativeBucketPath(partition, bucket) .toString SparkDeletionVectors( - partitionAndBucket, + relativeBucketPath, SerializationUtils.serializeBinaryRow(partition), bucket, Seq((new Path(filePath).getName, dv.serializeToBytes())) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala index 9c377b47c4fd..569a84a74cf5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -35,7 +35,7 @@ case class SparkDataFileMeta( def relativePath(fileStorePathFactory: FileStorePathFactory): String = { fileStorePathFactory - .relativePartitionAndBucketPath(partition, bucket) + .relativeBucketPath(partition, bucket) .toUri .toString + "/" + dataFileMeta.fileName() } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala index 9f687e6e3c92..1f908aeb908b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala @@ -36,7 +36,7 @@ case class SparkDeletionVectors( ) { def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] = { val prefix = fileStorePathFactory - .relativePartitionAndBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket) + .relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket) .toUri .toString + "/" dataFileAndDeletionVector.map(prefix + _._1) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 46a423b9d699..ec5526f20e1d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -652,6 +652,22 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe assert(dvMeta.cardinality() == 334) } + test("Paimon deletionVector: delete from non-pk table with data file path") { + sql(s""" + |CREATE TABLE T (id INT) + |TBLPROPERTIES ( + | 'deletion-vectors.enabled' = 'true', + | 'bucket-key' = 'id', + | 'bucket' = '1', + | 'data-file.path-directory' = 'data' + |) + |""".stripMargin) + + sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)") + sql("DELETE FROM T WHERE id >= 111 and id <= 444") + checkAnswer(sql("SELECT count(*) FROM T"), Row(49665)) + } + private def getPathName(path: String): String = { new Path(path).getName } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index 8973ea93d8a0..bcd84fdc11da 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -113,6 +113,35 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab } } + test(s"Paimon MergeInto: update + insert with data file path") { + withTable("source", "target") { + + Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") + + createTable( + "target", + "a INT, b INT, c STRING", + Seq("a"), + Seq(), + Map("data-file.path-directory" -> "data")) + spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") + + spark.sql(s""" + |MERGE INTO target + |USING source + |ON target.a = source.a + |WHEN MATCHED THEN + |UPDATE SET a = source.a, b = source.b, c = source.c + |WHEN NOT MATCHED + |THEN INSERT (a, b, c) values (a, b, c) + |""".stripMargin) + + checkAnswer( + spark.sql("SELECT * FROM target ORDER BY a, b"), + Seq(Row(1, 100, "c11"), Row(2, 20, "c2"), Row(3, 300, "c33"))) + } + } + test(s"Paimon MergeInto: delete + insert") { withTable("source", "target") {