Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metada
AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg
manifest list file, you can enable: `'metadata.iceberg.manifest-legacy-version'`.

## DuckDB

Duckdb may rely on files placed in the `root/data` directory, while Paimon is usually placed directly in the `root`
directory, so you can configure this parameter for the table to achieve compatibility:
`'data-file.path-directory' = 'data'`.

## Trino Iceberg

In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog.
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>data-file.path-directory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the path directory of data files.</td>
</tr>
<tr>
<td><h5>data-file.prefix</h5></td>
<td style="word-wrap: break-word;">"data-"</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public class CoreOptions implements Serializable {
.defaultValue("data-")
.withDescription("Specify the file name prefix of data files.");

public static final ConfigOption<String> DATA_FILE_PATH_DIRECTORY =
key("data-file.path-directory")
.stringType()
.noDefaultValue()
.withDescription("Specify the path directory of data files.");

public static final ConfigOption<String> CHANGELOG_FILE_PREFIX =
key("changelog-file.prefix")
.stringType()
Expand Down Expand Up @@ -1632,6 +1638,11 @@ public String dataFilePrefix() {
return options.get(DATA_FILE_PREFIX);
}

@Nullable
public String dataFilePathDirectory() {
return options.get(DATA_FILE_PATH_DIRECTORY);
}

public String changelogFilePrefix() {
return options.get(CHANGELOG_FILE_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,21 @@ protected AbstractFileStore(

@Override
public FileStorePathFactory pathFactory() {
return pathFactory(options.fileFormat().getFormatIdentifier());
}

protected FileStorePathFactory pathFactory(String format) {
return new FileStorePathFactory(
options.path(),
partitionType,
options.partitionDefaultName(),
options.fileFormat().getFormatIdentifier(),
format,
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression());
options.fileCompression(),
options.dataFilePathDirectory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values());
formats.add(options.fileFormat().getFormatIdentifier());
formats.forEach(
format ->
pathFactoryMap.put(
format,
new FileStorePathFactory(
options.path(),
partitionType,
options.partitionDefaultName(),
format,
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression())));
formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format)));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import java.util.List;
Expand All @@ -46,6 +47,8 @@ public class FileStorePathFactory {
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;

@Nullable private final String dataFilePathDirectory;

private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
private final AtomicInteger indexManifestCount;
Expand All @@ -61,8 +64,10 @@ public FileStorePathFactory(
String changelogFilePrefix,
boolean legacyPartitionName,
boolean fileSuffixIncludeCompression,
String fileCompression) {
String fileCompression,
@Nullable String dataFilePathDirectory) {
this.root = root;
this.dataFilePathDirectory = dataFilePathDirectory;
this.uuid = UUID.randomUUID().toString();

this.partitionComputer =
Expand Down Expand Up @@ -125,7 +130,11 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
}

public Path bucketPath(BinaryRow partition, int bucket) {
return new Path(root + "/" + relativePartitionAndBucketPath(partition, bucket));
Path dataFileRoot = this.root;
if (dataFilePathDirectory != null) {
dataFileRoot = new Path(dataFileRoot, dataFilePathDirectory);
}
return new Path(dataFileRoot + "/" + relativePartitionAndBucketPath(partition, bucket));
}

public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -250,7 +251,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()));
CoreOptions.FILE_COMPRESSION.defaultValue(),
null));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ protected ManifestFile createManifestFile(String pathStr) {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
CoreOptions.FILE_COMPRESSION.defaultValue(),
null),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private ManifestFile createManifestFile(String pathStr) {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ private ManifestList createManifestList(String pathStr) {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
import static org.apache.paimon.io.DataFileTestUtils.row;
Expand Down Expand Up @@ -143,6 +144,26 @@ public void testBatchReadWrite() throws Exception {
"2|21|201|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testReadWriteWithDataDirectory() throws Exception {
Consumer<Options> optionsSetter = options -> options.set(DATA_FILE_PATH_DIRECTORY, "data");
writeData(optionsSetter);
FileStoreTable table = createFileStoreTable(optionsSetter);

assertThat(table.fileIO().exists(new Path(tablePath, "data/pt=1"))).isTrue();

List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
"1|10|100|binary|varbinary|mapKey:mapVal|multiset",
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
"1|12|102|binary|varbinary|mapKey:mapVal|multiset",
"1|11|101|binary|varbinary|mapKey:mapVal|multiset",
"1|12|102|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testBatchRecordsWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public void testCreateDataFilePathFactoryWithPartition() {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -130,6 +131,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public TestChangelogDataReadWrite(String root) {
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
CoreOptions.FILE_COMPRESSION.defaultValue(),
null);

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
2 changes: 1 addition & 1 deletion tools/maven/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ This file is based on the checkstyle file of Apache Beam.
-->

<module name="FileLength">
<property name="max" value="3000"/>
<property name="max" value="4000"/>
</module>

<!-- All Java AST specific tests live under TreeWalker module. -->
Expand Down
Loading