Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,19 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
}

public Path bucketPath(BinaryRow partition, int bucket) {
Path dataFileRoot = this.root;
if (dataFilePathDirectory != null) {
dataFileRoot = new Path(dataFileRoot, dataFilePathDirectory);
}
return new Path(dataFileRoot + "/" + relativePartitionAndBucketPath(partition, bucket));
return new Path(root, relativeBucketPath(partition, bucket));
}

public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
public Path relativeBucketPath(BinaryRow partition, int bucket) {
Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucket);
String partitionPath = getPartitionString(partition);
String fullPath =
partitionPath.isEmpty()
? BUCKET_PATH_PREFIX + bucket
: partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
return new Path(fullPath);
if (!partitionPath.isEmpty()) {
relativeBucketPath = new Path(partitionPath, relativeBucketPath);
}
if (dataFilePathDirectory != null) {
relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath);
}
return relativeBucketPath;
}

/** IMPORTANT: This method is NOT THREAD SAFE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
val relativeFilePath = location.toUri.relativize(new URI(filePath)).toString
val (partition, bucket) = dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
val pathFactory = my_table.store().pathFactory()
val partitionAndBucket = pathFactory
.relativePartitionAndBucketPath(partition, bucket)
val relativeBucketPath = pathFactory
.relativeBucketPath(partition, bucket)
.toString

SparkDeletionVectors(
partitionAndBucket,
relativeBucketPath,
SerializationUtils.serializeBinaryRow(partition),
bucket,
Seq((new Path(filePath).getName, dv.serializeToBytes()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class SparkDataFileMeta(

def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
fileStorePathFactory
.relativePartitionAndBucketPath(partition, bucket)
.relativeBucketPath(partition, bucket)
.toUri
.toString + "/" + dataFileMeta.fileName()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class SparkDeletionVectors(
) {
def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] = {
val prefix = fileStorePathFactory
.relativePartitionAndBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket)
.relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket)
.toUri
.toString + "/"
dataFileAndDeletionVector.map(prefix + _._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,22 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe
assert(dvMeta.cardinality() == 334)
}

test("Paimon deletionVector: delete from non-pk table with data file path") {
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
| 'bucket-key' = 'id',
| 'bucket' = '1',
| 'data-file.path-directory' = 'data'
|)
|""".stripMargin)

sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)")
sql("DELETE FROM T WHERE id >= 111 and id <= 444")
checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
}

private def getPathName(path: String): String = {
new Path(path).getName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,35 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
}
}

test(s"Paimon MergeInto: update + insert with data file path") {
withTable("source", "target") {

Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")

createTable(
"target",
"a INT, b INT, c STRING",
Seq("a"),
Seq(),
Map("data-file.path-directory" -> "data"))
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

spark.sql(s"""
|MERGE INTO target
|USING source
|ON target.a = source.a
|WHEN MATCHED THEN
|UPDATE SET a = source.a, b = source.b, c = source.c
|WHEN NOT MATCHED
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM target ORDER BY a, b"),
Seq(Row(1, 100, "c11"), Row(2, 20, "c2"), Row(3, 300, "c33")))
}
}

test(s"Paimon MergeInto: delete + insert") {
withTable("source", "target") {

Expand Down
Loading