diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index 8a1637d7e96e..54018b073f91 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -402,6 +402,12 @@ You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metada AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg manifest list file, you can enable: `'metadata.iceberg.manifest-legacy-version'`. +## DuckDB + +Duckdb may rely on files placed in the `root/data` directory, while Paimon is usually placed directly in the `root` +directory, so you can configure this parameter for the table to achieve compatibility: +`'data-file.path-directory' = 'data'`. + ## Trino Iceberg In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 7d6bacccb026..42633e2546b4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -212,6 +212,12 @@ Duration The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication. + +
data-file.path-directory
+ (none) + String + Specify the path directory of data files. +
data-file.prefix
"data-" diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 8aebf2f289a0..df4cdf58a6f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -184,6 +184,12 @@ public class CoreOptions implements Serializable { .defaultValue("data-") .withDescription("Specify the file name prefix of data files."); + public static final ConfigOption DATA_FILE_PATH_DIRECTORY = + key("data-file.path-directory") + .stringType() + .noDefaultValue() + .withDescription("Specify the path directory of data files."); + public static final ConfigOption CHANGELOG_FILE_PREFIX = key("changelog-file.prefix") .stringType() @@ -1632,6 +1638,11 @@ public String dataFilePrefix() { return options.get(DATA_FILE_PREFIX); } + @Nullable + public String dataFilePathDirectory() { + return options.get(DATA_FILE_PATH_DIRECTORY); + } + public String changelogFilePrefix() { return options.get(CHANGELOG_FILE_PREFIX); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 54f554aa46d3..ae149557365d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -105,16 +105,21 @@ protected AbstractFileStore( @Override public FileStorePathFactory pathFactory() { + return pathFactory(options.fileFormat().getFormatIdentifier()); + } + + protected FileStorePathFactory pathFactory(String format) { return new FileStorePathFactory( options.path(), partitionType, options.partitionDefaultName(), - options.fileFormat().getFormatIdentifier(), + format, options.dataFilePrefix(), options.changelogFilePrefix(), options.legacyPartitionName(), options.fileSuffixIncludeCompression(), - options.fileCompression()); + options.fileCompression(), + options.dataFilePathDirectory()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 1c87a087e44e..8cf45105c01b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -194,20 +194,7 @@ private Map format2PathFactory() { Map pathFactoryMap = new HashMap<>(); Set formats = new HashSet<>(options.fileFormatPerLevel().values()); formats.add(options.fileFormat().getFormatIdentifier()); - formats.forEach( - format -> - pathFactoryMap.put( - format, - new FileStorePathFactory( - options.path(), - partitionType, - options.partitionDefaultName(), - format, - options.dataFilePrefix(), - options.changelogFilePrefix(), - options.legacyPartitionName(), - options.fileSuffixIncludeCompression(), - options.fileCompression()))); + formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format))); return pathFactoryMap; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index fcdc4634d74a..8896ec328680 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.types.RowType; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.List; @@ -46,6 +47,8 @@ public class FileStorePathFactory { private final boolean fileSuffixIncludeCompression; private final String fileCompression; + @Nullable private final String dataFilePathDirectory; + private final AtomicInteger manifestFileCount; private final AtomicInteger manifestListCount; private final AtomicInteger indexManifestCount; @@ -61,8 +64,10 @@ public FileStorePathFactory( String changelogFilePrefix, boolean legacyPartitionName, boolean fileSuffixIncludeCompression, - String fileCompression) { + String fileCompression, + @Nullable String dataFilePathDirectory) { this.root = root; + this.dataFilePathDirectory = dataFilePathDirectory; this.uuid = UUID.randomUUID().toString(); this.partitionComputer = @@ -125,7 +130,11 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu } public Path bucketPath(BinaryRow partition, int bucket) { - return new Path(root + "/" + relativePartitionAndBucketPath(partition, bucket)); + Path dataFileRoot = this.root; + if (dataFilePathDirectory != null) { + dataFileRoot = new Path(dataFileRoot, dataFilePathDirectory); + } + return new Path(dataFileRoot + "/" + relativePartitionAndBucketPath(partition, bucket)); } public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 52d56afad813..b648e2af8972 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -231,7 +231,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); Options options = new Options(); @@ -250,7 +251,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue())); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null)); return KeyValueFileWriterFactory.builder( fileIO, diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 5e69035ca834..52d82e76be2a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -149,7 +149,8 @@ protected ManifestFile createManifestFile(String pathStr) { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()), + CoreOptions.FILE_COMPRESSION.defaultValue(), + null), Long.MAX_VALUE, null) .create(); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java index 34cca41e61a1..089e11656a99 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java @@ -105,7 +105,8 @@ private ManifestFile createManifestFile(String pathStr) { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); return new ManifestFile.Factory( diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java index ce4f7b8072d2..5bf01f32cb07 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java @@ -109,7 +109,8 @@ private ManifestList createManifestList(String pathStr) { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null) .create(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 922221bb8dc9..01d4e89af95d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -78,6 +78,7 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; +import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; import static org.apache.paimon.io.DataFileTestUtils.row; @@ -143,6 +144,26 @@ public void testBatchReadWrite() throws Exception { "2|21|201|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testReadWriteWithDataDirectory() throws Exception { + Consumer optionsSetter = options -> options.set(DATA_FILE_PATH_DIRECTORY, "data"); + writeData(optionsSetter); + FileStoreTable table = createFileStoreTable(optionsSetter); + + assertThat(table.fileIO().exists(new Path(tablePath, "data/pt=1"))).isTrue(); + + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + TableRead read = table.newRead(); + assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "1|11|101|binary|varbinary|mapKey:mapVal|multiset", + "1|12|102|binary|varbinary|mapKey:mapVal|multiset", + "1|11|101|binary|varbinary|mapKey:mapVal|multiset", + "1|12|102|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testBatchRecordsWrite() throws Exception { FileStoreTable table = createFileStoreTable(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java index d4d45b312961..6ca15cf1503d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java @@ -90,7 +90,8 @@ public void testCreateDataFilePathFactoryWithPartition() { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16"); assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default"); @@ -130,6 +131,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 85679e5fd30a..17e63700faac 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -110,7 +110,8 @@ public TestChangelogDataReadWrite(String root) { CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root)); this.commitUser = UUID.randomUUID().toString(); } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index 7da8f2e6b7fe..0360def685b6 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -160,7 +160,8 @@ protected void foreachIndexReader(Consumer consumer) CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(), CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), - CoreOptions.FILE_COMPRESSION.defaultValue()); + CoreOptions.FILE_COMPRESSION.defaultValue(), + null); Table table = fileSystemCatalog.getTable(Identifier.create("db", "T")); ReadBuilder readBuilder = table.newReadBuilder(); diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index d5db52cb03df..80e785353526 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam. --> - +