From c749ef7064d35e28514a651e719915bcee894e22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 16:00:19 +0800 Subject: [PATCH 01/10] [core] CoreOptions.fileFormat is cpu expensive, because FileFormat initalizing is costly. As much as we can, reduce the rate --- .../org/apache/paimon/format/FileFormat.java | 30 +++++++++++++------ .../org/apache/paimon/AbstractFileStore.java | 7 ++++- .../apache/paimon/AppendOnlyFileStore.java | 2 ++ .../org/apache/paimon/KeyValueFileStore.java | 3 +- .../operation/AppendOnlyFileStoreWrite.java | 3 +- .../AppendOnlyFixedBucketFileStoreWrite.java | 3 ++ ...AppendOnlyUnawareBucketFileStoreWrite.java | 3 ++ .../operation/KeyValueFileStoreWrite.java | 4 ++- .../source/TestChangelogDataReadWrite.java | 1 + 9 files changed, 43 insertions(+), 13 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 9d138d800680..7c1b64546aaf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -25,6 +25,8 @@ import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -41,6 +43,19 @@ */ public abstract class FileFormat { + private static final Map FORMAT_MAP; + + static { + Map formatMap = new HashMap<>(); + ServiceLoader serviceLoader = + ServiceLoader.load(FileFormatFactory.class, FileFormat.class.getClassLoader()); + for (FileFormatFactory factory : serviceLoader) { + formatMap.put(factory.identifier().toLowerCase(), factory); + } + + FORMAT_MAP = ImmutableMap.copyOf(formatMap); + } + protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -88,7 +103,7 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return fromIdentifier(identifier, context, FileFormat.class.getClassLoader()) + return getFileFormatFromLoadedCache(identifier, context) .orElseThrow( () -> new RuntimeException( @@ -97,14 +112,11 @@ public static FileFormat fromIdentifier(String identifier, FormatContext context identifier))); } - private static Optional fromIdentifier( - String formatIdentifier, FormatContext context, ClassLoader classLoader) { - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, classLoader); - for (FileFormatFactory factory : serviceLoader) { - if (factory.identifier().equals(formatIdentifier.toLowerCase())) { - return Optional.of(factory.create(context)); - } + private static Optional getFileFormatFromLoadedCache( + String formatIdentifier, FormatContext context) { + FORMAT_MAP.get(formatIdentifier.toLowerCase()); + if (FORMAT_MAP.containsKey(formatIdentifier.toLowerCase())) { + return Optional.of(FORMAT_MAP.get(formatIdentifier.toLowerCase()).create(context)); } return Optional.empty(); diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 1caff252a654..198883948956 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.HashIndexFile; @@ -77,6 +78,8 @@ abstract class AbstractFileStore implements FileStore { protected final String tableName; protected final CoreOptions options; protected final RowType partitionType; + protected final FileFormat fileFormat; + protected final FileStorePathFactory pathFactory; private final CatalogEnvironment catalogEnvironment; @Nullable private final SegmentsCache writeManifestCache; @@ -101,11 +104,13 @@ protected AbstractFileStore( this.writeManifestCache = SegmentsCache.create( options.pageSize(), options.writeManifestCache(), Long.MAX_VALUE); + this.fileFormat = options.fileFormat(); + this.pathFactory = pathFactory(fileFormat.getFormatIdentifier()); } @Override public FileStorePathFactory pathFactory() { - return pathFactory(options.fileFormat().getFormatIdentifier()); + return pathFactory; } protected FileStorePathFactory pathFactory(String format) { diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index a06b98d7b30c..dae262686438 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -106,6 +106,7 @@ public AppendOnlyFileStoreWrite newWrite( schema.id(), rowType, partitionType, + fileFormat, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -120,6 +121,7 @@ public AppendOnlyFileStoreWrite newWrite( commitUser, rowType, partitionType, + fileFormat, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 8cf45105c01b..1d832b10265b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -179,6 +179,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma () -> UserDefinedSeqComparator.create(valueType, options), logDedupEqualSupplier, mfFactory, + fileFormat, pathFactory(), format2PathFactory(), snapshotManager(), @@ -193,7 +194,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma private Map format2PathFactory() { Map pathFactoryMap = new HashMap<>(); Set formats = new HashSet<>(options.fileFormatPerLevel().values()); - formats.add(options.fileFormat().getFormatIdentifier()); + formats.add(fileFormat.getFormatIdentifier()); formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format))); return pathFactoryMap; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 4a6196453df6..5176f1d1ef9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -81,6 +81,7 @@ public AppendOnlyFileStoreWrite( long schemaId, RowType rowType, RowType partitionType, + FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -92,7 +93,7 @@ public AppendOnlyFileStoreWrite( this.read = read; this.schemaId = schemaId; this.rowType = rowType; - this.fileFormat = options.fileFormat(); + this.fileFormat = fileFormat; this.pathFactory = pathFactory; this.statsCollectors = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java index c58bad9a9796..9bf4fbc7d45a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -50,6 +51,7 @@ public AppendOnlyFixedBucketFileStoreWrite( String commitUser, RowType rowType, RowType partitionType, + FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -62,6 +64,7 @@ public AppendOnlyFixedBucketFileStoreWrite( schemaId, rowType, partitionType, + fileFormat, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java index e509b589944d..63b9d1ec349f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -47,6 +48,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( long schemaId, RowType rowType, RowType partitionType, + FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -59,6 +61,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( schemaId, rowType, partitionType, + fileFormat, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d061e181618b..5d4a541c0281 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -32,6 +32,7 @@ import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.IndexMaintainer; @@ -123,6 +124,7 @@ public KeyValueFileStoreWrite( Supplier udsComparatorSupplier, Supplier logDedupEqualSupplier, MergeFunctionFactory mfFactory, + FileFormat fileFormat, FileStorePathFactory pathFactory, Map format2PathFactory, SnapshotManager snapshotManager, @@ -165,7 +167,7 @@ public KeyValueFileStoreWrite( schema.id(), keyType, valueType, - options.fileFormat(), + fileFormat, format2PathFactory, options.targetFileSize(true)); this.keyComparatorSupplier = keyComparatorSupplier; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index d2bb9eb98274..066dcf519190 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -186,6 +186,7 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc () -> null, () -> EQUALISER, DeduplicateMergeFunction.factory(), + options.fileFormat(), pathFactory, pathFactoryMap, snapshotManager, From 91240526373e397b7287d71cdacc78964961b5fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 16:59:48 +0800 Subject: [PATCH 02/10] fix comment --- .../org/apache/paimon/format/FileFormat.java | 39 +++---------------- .../paimon/format/FileFormatFactory.java | 5 +-- 2 files changed, 8 insertions(+), 36 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 7c1b64546aaf..a114ab459e72 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -19,14 +19,13 @@ package org.apache.paimon.format; import org.apache.paimon.CoreOptions; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; - import javax.annotation.Nullable; import java.util.ArrayList; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.ServiceLoader; /** * Factory class which creates reader and writer factories for specific file format. @@ -43,19 +41,6 @@ */ public abstract class FileFormat { - private static final Map FORMAT_MAP; - - static { - Map formatMap = new HashMap<>(); - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, FileFormat.class.getClassLoader()); - for (FileFormatFactory factory : serviceLoader) { - formatMap.put(factory.identifier().toLowerCase(), factory); - } - - FORMAT_MAP = ImmutableMap.copyOf(formatMap); - } - protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -103,23 +88,11 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return getFileFormatFromLoadedCache(identifier, context) - .orElseThrow( - () -> - new RuntimeException( - String.format( - "Could not find a FileFormatFactory implementation class for %s format", - identifier))); - } - - private static Optional getFileFormatFromLoadedCache( - String formatIdentifier, FormatContext context) { - FORMAT_MAP.get(formatIdentifier.toLowerCase()); - if (FORMAT_MAP.containsKey(formatIdentifier.toLowerCase())) { - return Optional.of(FORMAT_MAP.get(formatIdentifier.toLowerCase()).create(context)); - } - - return Optional.empty(); + return FactoryUtil.discoverFactory( + FileFormat.class.getClassLoader(), + FileFormatFactory.class, + identifier.toLowerCase()) + .create(context); } protected Options getIdentifierPrefixOptions(Options options) { diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index b726a84f24a2..2b7f9cd7b852 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -19,15 +19,14 @@ package org.apache.paimon.format; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.factories.Factory; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import javax.annotation.Nullable; /** Factory to create {@link FileFormat}. */ -public interface FileFormatFactory { - - String identifier(); +public interface FileFormatFactory extends Factory { FileFormat create(FormatContext formatContext); From 83084d3011a3ca37672de162005c2804f7bb5aea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 17:50:58 +0800 Subject: [PATCH 03/10] fix comment --- .../java/org/apache/paimon/CoreOptions.java | 4 ++ .../paimon/factories/BaseFactoryUtil.java | 64 ++++++++++++++++++ .../apache/paimon/factories/FactoryUtil.java | 42 +----------- .../paimon/factories/FormatFactoryUtil.java | 65 +++++++++++++++++++ .../org/apache/paimon/format/FileFormat.java | 8 +-- .../paimon/format/FileFormatFactory.java | 5 +- .../org/apache/paimon/AbstractFileStore.java | 7 +- .../apache/paimon/AppendOnlyFileStore.java | 4 +- .../org/apache/paimon/KeyValueFileStore.java | 4 +- 9 files changed, 146 insertions(+), 57 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 6e1e9bba076b..a6248da404fe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1583,6 +1583,10 @@ public FileFormat fileFormat() { return createFileFormat(options, FILE_FORMAT); } + public String fileFormatString() { + return normalizeFileFormat(options.get(FILE_FORMAT)); + } + public FileFormat manifestFormat() { return createFileFormat(options, MANIFEST_FORMAT); } diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java new file mode 100644 index 000000000000..46a85a6904d2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.factories; + +import org.apache.paimon.format.FileFormatFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; + +/** Base Factory Util. */ +public class BaseFactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(BaseFactoryUtil.class); + + public static List discoverFactories(ClassLoader classLoader, Class klass) { + final Iterator serviceLoaderIterator = ServiceLoader.load(klass, classLoader).iterator(); + + final List loadResults = new ArrayList<>(); + while (true) { + try { + // error handling should also be applied to the hasNext() call because service + // loading might cause problems here as well + if (!serviceLoaderIterator.hasNext()) { + break; + } + + loadResults.add(serviceLoaderIterator.next()); + } catch (Throwable t) { + if (t instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.", + FileFormatFactory.class.getCanonicalName(), + t); + } else { + throw new RuntimeException( + "Unexpected error when trying to load service provider.", t); + } + } + } + + return loadResults; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java index 1213168e16ae..90ed2d589697 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java @@ -21,19 +21,11 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.ServiceLoader; import java.util.stream.Collectors; /** Utility for working with {@link Factory}s. */ -public class FactoryUtil { - - private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); +public class FactoryUtil extends BaseFactoryUtil { private static final Cache> FACTORIES = Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); @@ -101,36 +93,6 @@ public static List discoverIdentifiers( } private static List getFactories(ClassLoader classLoader) { - return FACTORIES.get(classLoader, FactoryUtil::discoverFactories); - } - - private static List discoverFactories(ClassLoader classLoader) { - final Iterator serviceLoaderIterator = - ServiceLoader.load(Factory.class, classLoader).iterator(); - - final List loadResults = new ArrayList<>(); - while (true) { - try { - // error handling should also be applied to the hasNext() call because service - // loading might cause problems here as well - if (!serviceLoaderIterator.hasNext()) { - break; - } - - loadResults.add(serviceLoaderIterator.next()); - } catch (Throwable t) { - if (t instanceof NoClassDefFoundError) { - LOG.debug( - "NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.", - Factory.class.getCanonicalName(), - t); - } else { - throw new RuntimeException( - "Unexpected error when trying to load service provider.", t); - } - } - } - - return loadResults; + return FACTORIES.get(classLoader, s -> discoverFactories(classLoader, Factory.class)); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java new file mode 100644 index 000000000000..f3f97efc6931 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.factories; + +import org.apache.paimon.format.FileFormatFactory; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.util.List; +import java.util.stream.Collectors; + +/** Utility for working with {@link FileFormatFactory}s. */ +public class FormatFactoryUtil extends BaseFactoryUtil { + + private static final Cache> FACTORIES = + Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); + + /** Discovers a factory using the given factory base class and identifier. */ + @SuppressWarnings("unchecked") + public static T discoverFactory( + ClassLoader classLoader, String identifier) { + final List foundFactories = getFactories(classLoader); + + final List matchingFactories = + foundFactories.stream() + .filter(f -> f.identifier().equals(identifier)) + .collect(Collectors.toList()); + + if (matchingFactories.isEmpty()) { + throw new FactoryException( + String.format( + "Could not find any factory for identifier '%s' that implements FileFormatFactory in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "%s", + identifier, + foundFactories.stream() + .map(FileFormatFactory::identifier) + .collect(Collectors.joining("\n")))); + } + + return (T) matchingFactories.get(0); + } + + private static List getFactories(ClassLoader classLoader) { + return FACTORIES.get( + classLoader, s -> discoverFactories(classLoader, FileFormatFactory.class)); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index a114ab459e72..e1391e7f5396 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -19,7 +19,7 @@ package org.apache.paimon.format; import org.apache.paimon.CoreOptions; -import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.factories.FormatFactoryUtil; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -88,10 +88,8 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return FactoryUtil.discoverFactory( - FileFormat.class.getClassLoader(), - FileFormatFactory.class, - identifier.toLowerCase()) + return FormatFactoryUtil.discoverFactory( + FileFormat.class.getClassLoader(), identifier.toLowerCase()) .create(context); } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index 2b7f9cd7b852..b726a84f24a2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -19,14 +19,15 @@ package org.apache.paimon.format; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.factories.Factory; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import javax.annotation.Nullable; /** Factory to create {@link FileFormat}. */ -public interface FileFormatFactory extends Factory { +public interface FileFormatFactory { + + String identifier(); FileFormat create(FormatContext formatContext); diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 198883948956..58cc6f47a9af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; -import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.HashIndexFile; @@ -78,8 +77,6 @@ abstract class AbstractFileStore implements FileStore { protected final String tableName; protected final CoreOptions options; protected final RowType partitionType; - protected final FileFormat fileFormat; - protected final FileStorePathFactory pathFactory; private final CatalogEnvironment catalogEnvironment; @Nullable private final SegmentsCache writeManifestCache; @@ -104,13 +101,11 @@ protected AbstractFileStore( this.writeManifestCache = SegmentsCache.create( options.pageSize(), options.writeManifestCache(), Long.MAX_VALUE); - this.fileFormat = options.fileFormat(); - this.pathFactory = pathFactory(fileFormat.getFormatIdentifier()); } @Override public FileStorePathFactory pathFactory() { - return pathFactory; + return pathFactory(options.fileFormatString()); } protected FileStorePathFactory pathFactory(String format) { diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index dae262686438..3d3a99d82c21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -106,7 +106,7 @@ public AppendOnlyFileStoreWrite newWrite( schema.id(), rowType, partitionType, - fileFormat, + options.fileFormat(), pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -121,7 +121,7 @@ public AppendOnlyFileStoreWrite newWrite( commitUser, rowType, partitionType, - fileFormat, + options.fileFormat(), pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 1d832b10265b..f77d78ca9e1c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -179,7 +179,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma () -> UserDefinedSeqComparator.create(valueType, options), logDedupEqualSupplier, mfFactory, - fileFormat, + options.fileFormat(), pathFactory(), format2PathFactory(), snapshotManager(), @@ -194,7 +194,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma private Map format2PathFactory() { Map pathFactoryMap = new HashMap<>(); Set formats = new HashSet<>(options.fileFormatPerLevel().values()); - formats.add(fileFormat.getFormatIdentifier()); + formats.add(options.fileFormatString()); formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format))); return pathFactoryMap; } From 7520236f0341300ba93bb0c3dfad4faa4dc56b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 17:53:26 +0800 Subject: [PATCH 04/10] fix comment --- .../java/org/apache/paimon/factories/FormatFactoryUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java index f3f97efc6931..64bcf7ee43c5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java @@ -32,7 +32,7 @@ public class FormatFactoryUtil extends BaseFactoryUtil { private static final Cache> FACTORIES = Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); - /** Discovers a factory using the given factory base class and identifier. */ + /** Discovers a file format factory. */ @SuppressWarnings("unchecked") public static T discoverFactory( ClassLoader classLoader, String identifier) { From da85214d8c0679e167863be78eda0dd9924da78e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 18:06:20 +0800 Subject: [PATCH 05/10] fix comment --- .../java/org/apache/paimon/factories/BaseFactoryUtil.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java index 46a85a6904d2..35eddaa21522 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java @@ -33,6 +33,14 @@ public class BaseFactoryUtil { private static final Logger LOG = LoggerFactory.getLogger(BaseFactoryUtil.class); + /** + * Discover factories. + * + * @param classLoader the class loader + * @param klass the klass + * @param the type of the factory + * @return the list of factories + */ public static List discoverFactories(ClassLoader classLoader, Class klass) { final Iterator serviceLoaderIterator = ServiceLoader.load(klass, classLoader).iterator(); From fc9dd44888a23567164e6a1fad070269aa35eeb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 19:27:20 +0800 Subject: [PATCH 06/10] fix comment --- .../paimon/factories/BaseFactoryUtil.java | 72 ------------------- .../apache/paimon/factories/FactoryUtil.java | 49 ++++++++++++- .../paimon/factories/FormatFactoryUtil.java | 4 +- .../apache/paimon/AppendOnlyFileStore.java | 2 - .../operation/AppendOnlyFileStoreWrite.java | 3 +- .../AppendOnlyFixedBucketFileStoreWrite.java | 3 - ...AppendOnlyUnawareBucketFileStoreWrite.java | 3 - 7 files changed, 52 insertions(+), 84 deletions(-) delete mode 100644 paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java deleted file mode 100644 index 35eddaa21522..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/factories/BaseFactoryUtil.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.factories; - -import org.apache.paimon.format.FileFormatFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.ServiceLoader; - -/** Base Factory Util. */ -public class BaseFactoryUtil { - - private static final Logger LOG = LoggerFactory.getLogger(BaseFactoryUtil.class); - - /** - * Discover factories. - * - * @param classLoader the class loader - * @param klass the klass - * @param the type of the factory - * @return the list of factories - */ - public static List discoverFactories(ClassLoader classLoader, Class klass) { - final Iterator serviceLoaderIterator = ServiceLoader.load(klass, classLoader).iterator(); - - final List loadResults = new ArrayList<>(); - while (true) { - try { - // error handling should also be applied to the hasNext() call because service - // loading might cause problems here as well - if (!serviceLoaderIterator.hasNext()) { - break; - } - - loadResults.add(serviceLoaderIterator.next()); - } catch (Throwable t) { - if (t instanceof NoClassDefFoundError) { - LOG.debug( - "NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.", - FileFormatFactory.class.getCanonicalName(), - t); - } else { - throw new RuntimeException( - "Unexpected error when trying to load service provider.", t); - } - } - } - - return loadResults; - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java index 90ed2d589697..f4a55d477dbf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java @@ -18,14 +18,24 @@ package org.apache.paimon.factories; +import org.apache.paimon.format.FileFormatFactory; + import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.ServiceLoader; import java.util.stream.Collectors; /** Utility for working with {@link Factory}s. */ -public class FactoryUtil extends BaseFactoryUtil { +public class FactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); private static final Cache> FACTORIES = Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); @@ -95,4 +105,41 @@ public static List discoverIdentifiers( private static List getFactories(ClassLoader classLoader) { return FACTORIES.get(classLoader, s -> discoverFactories(classLoader, Factory.class)); } + + /** + * Discover factories. + * + * @param classLoader the class loader + * @param klass the klass + * @param the type of the factory + * @return the list of factories + */ + public static List discoverFactories(ClassLoader classLoader, Class klass) { + final Iterator serviceLoaderIterator = ServiceLoader.load(klass, classLoader).iterator(); + + final List loadResults = new ArrayList<>(); + while (true) { + try { + // error handling should also be applied to the hasNext() call because service + // loading might cause problems here as well + if (!serviceLoaderIterator.hasNext()) { + break; + } + + loadResults.add(serviceLoaderIterator.next()); + } catch (Throwable t) { + if (t instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.", + FileFormatFactory.class.getCanonicalName(), + t); + } else { + throw new RuntimeException( + "Unexpected error when trying to load service provider.", t); + } + } + } + + return loadResults; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java index 64bcf7ee43c5..083b775461aa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java @@ -26,8 +26,10 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.paimon.factories.FactoryUtil.discoverFactories; + /** Utility for working with {@link FileFormatFactory}s. */ -public class FormatFactoryUtil extends BaseFactoryUtil { +public class FormatFactoryUtil { private static final Cache> FACTORIES = Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 3d3a99d82c21..a06b98d7b30c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -106,7 +106,6 @@ public AppendOnlyFileStoreWrite newWrite( schema.id(), rowType, partitionType, - options.fileFormat(), pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -121,7 +120,6 @@ public AppendOnlyFileStoreWrite newWrite( commitUser, rowType, partitionType, - options.fileFormat(), pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 5176f1d1ef9a..4a6196453df6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -81,7 +81,6 @@ public AppendOnlyFileStoreWrite( long schemaId, RowType rowType, RowType partitionType, - FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -93,7 +92,7 @@ public AppendOnlyFileStoreWrite( this.read = read; this.schemaId = schemaId; this.rowType = rowType; - this.fileFormat = fileFormat; + this.fileFormat = options.fileFormat(); this.pathFactory = pathFactory; this.statsCollectors = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java index 9bf4fbc7d45a..c58bad9a9796 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java @@ -26,7 +26,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; -import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -51,7 +50,6 @@ public AppendOnlyFixedBucketFileStoreWrite( String commitUser, RowType rowType, RowType partitionType, - FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -64,7 +62,6 @@ public AppendOnlyFixedBucketFileStoreWrite( schemaId, rowType, partitionType, - fileFormat, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java index 63b9d1ec349f..e509b589944d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java @@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; -import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -48,7 +47,6 @@ public AppendOnlyUnawareBucketFileStoreWrite( long schemaId, RowType rowType, RowType partitionType, - FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -61,7 +59,6 @@ public AppendOnlyUnawareBucketFileStoreWrite( schemaId, rowType, partitionType, - fileFormat, pathFactory, snapshotManager, scan, From fcaf649b0d96eca87cbf9e5f3082426a687a8874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 19:31:53 +0800 Subject: [PATCH 07/10] fix comment --- .../main/java/org/apache/paimon/factories/FactoryUtil.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java index f4a55d477dbf..a492aeece7a8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java @@ -18,8 +18,6 @@ package org.apache.paimon.factories; -import org.apache.paimon.format.FileFormatFactory; - import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -131,7 +129,7 @@ public static List discoverFactories(ClassLoader classLoader, Class kl if (t instanceof NoClassDefFoundError) { LOG.debug( "NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.", - FileFormatFactory.class.getCanonicalName(), + Factory.class.getCanonicalName(), t); } else { throw new RuntimeException( From 2166dfa0f92c683b951d6138855ba43c90471a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 19:36:53 +0800 Subject: [PATCH 08/10] fix comment --- .../src/main/java/org/apache/paimon/KeyValueFileStore.java | 3 +-- .../org/apache/paimon/operation/KeyValueFileStoreWrite.java | 2 -- .../apache/paimon/flink/source/TestChangelogDataReadWrite.java | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index f77d78ca9e1c..8cf45105c01b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -179,7 +179,6 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma () -> UserDefinedSeqComparator.create(valueType, options), logDedupEqualSupplier, mfFactory, - options.fileFormat(), pathFactory(), format2PathFactory(), snapshotManager(), @@ -194,7 +193,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma private Map format2PathFactory() { Map pathFactoryMap = new HashMap<>(); Set formats = new HashSet<>(options.fileFormatPerLevel().values()); - formats.add(options.fileFormatString()); + formats.add(options.fileFormat().getFormatIdentifier()); formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format))); return pathFactoryMap; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 5d4a541c0281..387c000b211a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -32,7 +32,6 @@ import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; -import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.IndexMaintainer; @@ -124,7 +123,6 @@ public KeyValueFileStoreWrite( Supplier udsComparatorSupplier, Supplier logDedupEqualSupplier, MergeFunctionFactory mfFactory, - FileFormat fileFormat, FileStorePathFactory pathFactory, Map format2PathFactory, SnapshotManager snapshotManager, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 066dcf519190..d2bb9eb98274 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -186,7 +186,6 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc () -> null, () -> EQUALISER, DeduplicateMergeFunction.factory(), - options.fileFormat(), pathFactory, pathFactoryMap, snapshotManager, From eb74330f43d811e6cccb146dc00b0014855a3de4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 19:38:08 +0800 Subject: [PATCH 09/10] fix comment --- .../org/apache/paimon/operation/KeyValueFileStoreWrite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 387c000b211a..d061e181618b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -165,7 +165,7 @@ public KeyValueFileStoreWrite( schema.id(), keyType, valueType, - fileFormat, + options.fileFormat(), format2PathFactory, options.targetFileSize(true)); this.keyComparatorSupplier = keyComparatorSupplier; From d1fc8056a48d2f8245f355026708e7b3a65def30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 26 Dec 2024 19:39:52 +0800 Subject: [PATCH 10/10] fix comment --- .../src/main/java/org/apache/paimon/KeyValueFileStore.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 8cf45105c01b..a969fca03777 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -193,7 +193,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma private Map format2PathFactory() { Map pathFactoryMap = new HashMap<>(); Set formats = new HashSet<>(options.fileFormatPerLevel().values()); - formats.add(options.fileFormat().getFormatIdentifier()); + formats.add(options.fileFormatString()); formats.forEach(format -> pathFactoryMap.put(format, pathFactory(format))); return pathFactoryMap; }