From 419cf975eb4bb509d708e04fc21c9561678473d2 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 13 Jan 2025 11:32:14 +0800 Subject: [PATCH 01/15] introduce hybyidfileio --- .../generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 3 +- .../org/apache/paimon/fs/HybridFileIO.java | 135 ++++++++++++++++++ .../apache/paimon/fs/HybridFileIOTest.java | 112 +++++++++++++++ .../paimon/table/FileStoreTableFactory.java | 11 +- .../paimon/catalog/CatalogTestBase.java | 5 +- 6 files changed, 255 insertions(+), 13 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 1e7aa59ed869..270dbca340c0 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1024,7 +1024,7 @@
write-buffer-spillable
(none) Boolean - Whether the write buffer can be spillable. Enabled by default when using object storage. + Whether the write buffer can be spillable.
write-manifest-cache
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 93bc23a41f58..6f693903e592 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -475,8 +475,7 @@ public class CoreOptions implements Serializable { key("write-buffer-spillable") .booleanType() .noDefaultValue() - .withDescription( - "Whether the write buffer can be spillable. Enabled by default when using object storage."); + .withDescription("Whether the write buffer can be spillable."); public static final ConfigOption WRITE_BUFFER_FOR_APPEND = key("write-buffer-for-append") diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java new file mode 100644 index 000000000000..71a565a617c2 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.options.Options; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A hybrid implementation of {@link FileIO} that supports multiple file system schemas. It + * dynamically selects the appropriate {@link FileIO} based on the URI scheme of the given path. + */ +public class HybridFileIO implements FileIO { + private static final long serialVersionUID = 1L; + + protected Options options; + + private Map fileIOMap; + private volatile FileIO fallbackFileIO; + + // TODO, how to decide the real fileio is object store or not? + @Override + public boolean isObjectStore() { + return false; + } + + @Override + public void configure(CatalogContext context) { + this.options = context.options(); + this.fileIOMap = new ConcurrentHashMap<>(); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws IOException { + return wrap(() -> fileIO(path).newInputStream(path)); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { + return wrap(() -> fileIO(path).newOutputStream(path, overwrite)); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + return wrap(() -> fileIO(path).getFileStatus(path)); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + return wrap(() -> fileIO(path).listStatus(path)); + } + + @Override + public boolean exists(Path path) throws IOException { + return wrap(() -> fileIO(path).exists(path)); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + return wrap(() -> fileIO(path).delete(path, recursive)); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return wrap(() -> fileIO(path).mkdirs(path)); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return wrap(() -> fileIO(src).rename(src, dst)); + } + + @VisibleForTesting + public FileIO fileIO(Path path) throws IOException { + String scheme = path.toUri().getScheme(); + if (scheme == null) { + if (fallbackFileIO == null) { + synchronized (this) { + if (fallbackFileIO == null) { + CatalogContext catalogContext = CatalogContext.create(options); + fallbackFileIO = FileIO.get(path, catalogContext); + } + } + } + return fallbackFileIO; + } + + if (!fileIOMap.containsKey(scheme)) { + synchronized (this) { + if (!fileIOMap.containsKey(scheme)) { + CatalogContext catalogContext = CatalogContext.create(options); + FileIO fileIO = FileIO.get(path, catalogContext); + fileIOMap.put(path.toUri().getScheme(), fileIO); + } + } + } + return fileIOMap.get(path.toUri().getScheme()); + } + + private T wrap(Func func) throws IOException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(HybridFileIO.class.getClassLoader()); + return func.apply(); + } finally { + Thread.currentThread().setContextClassLoader(cl); + } + } + + /** Apply function with wrapping classloader. */ + @FunctionalInterface + protected interface Func { + T apply() throws IOException; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java new file mode 100644 index 000000000000..c5942774b81e --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.hadoop.HadoopFileIO; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.jupiter.api.Assertions.*; + +public class HybridFileIOTest { + + private HybridFileIO hybridFileIO; + + @BeforeEach + public void setUp() { + hybridFileIO = new HybridFileIO(); + Options options = new Options(); + CatalogContext catalogContext = CatalogContext.create(options); + hybridFileIO.configure(catalogContext); + } + + @Test + public void testFileIONullSchemeReturnsFallbackFileIO() throws IOException { + Path path = new Path("/path/to/file"); + FileIO result = hybridFileIO.fileIO(path); + assertNotNull(result); + assertInstanceOf(LocalFileIO.class, result); + } + + @Test + public void testFileIOReturnsLocalFileIO() throws IOException { + Path path = new Path("file:///path/to/file"); + FileIO result = hybridFileIO.fileIO(path); + assertNotNull(result); + assertInstanceOf(LocalFileIO.class, result); + } + + @Test + public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { + Path path = new Path("hdfs:///path/to/file"); + FileIO result = hybridFileIO.fileIO(path); + assertNotNull(result); + assertInstanceOf(HadoopFileIO.class, result); + } + + @Test + public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Exception { + Path path = new Path("file:///path/to/file"); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + Future future1 = executorService.submit(() -> hybridFileIO.fileIO(path)); + Future future2 = executorService.submit(() -> hybridFileIO.fileIO(path)); + + FileIO result1 = future1.get(); + FileIO result2 = future2.get(); + + assertNotNull(result1); + assertNotNull(result2); + assertEquals(result1, result2); + assertInstanceOf(LocalFileIO.class, result1); + } + + @Test + public void testFileIOMapStoresFileIOInstances() throws IOException { + Path localPath = new Path("file:///path/to/local/file1"); + Path hdfsPath = new Path("hdfs:///path/to/hdfs/file1"); + + // First call should create new instances + FileIO localFileIO = hybridFileIO.fileIO(localPath); + FileIO hdfsFileIO = hybridFileIO.fileIO(hdfsPath); + + assertNotNull(localFileIO); + assertNotNull(hdfsFileIO); + assertInstanceOf(LocalFileIO.class, localFileIO); + assertInstanceOf(HadoopFileIO.class, hdfsFileIO); + + // Second call should return the same instances from fileIOMap + FileIO localFileIOAgain = hybridFileIO.fileIO(new Path("file:///path/to/local/file2")); + FileIO hdfsFileIOAgain = hybridFileIO.fileIO(new Path("hdfs:///path/to/local/file2")); + + assertNotNull(localFileIOAgain); + assertNotNull(hdfsFileIOAgain); + assertEquals(localFileIO, localFileIOAgain); + assertEquals(hdfsFileIO, hdfsFileIOAgain); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index afd6ce7da96b..02d94b7a85f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -21,14 +21,13 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.HybridFileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.StringUtils; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Optional; import static org.apache.paimon.CoreOptions.PATH; @@ -38,12 +37,8 @@ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - FileIO fileIO; - try { - fileIO = FileIO.get(CoreOptions.path(context.options()), context); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + FileIO fileIO = new HybridFileIO(); + fileIO.configure(context); return create(fileIO, context.options()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 6448972cde04..d11a1d806951 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.HybridFileIO; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -85,7 +85,8 @@ public void setUp() throws Exception { Options catalogOptions = new Options(); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); CatalogContext catalogContext = CatalogContext.create(catalogOptions); - fileIO = FileIO.get(new Path(warehouse), catalogContext); + fileIO = new HybridFileIO(); + fileIO.configure(catalogContext); } @AfterEach From 079ce2d7d7792f00807d575253b85bed70b4f503 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 13 Jan 2025 13:08:08 +0800 Subject: [PATCH 02/15] spotless --- .../src/test/java/org/apache/paimon/fs/HybridFileIOTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java index c5942774b81e..7888136ab1c3 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java @@ -31,8 +31,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +/** Tests for {@link HybridFileIO}. */ public class HybridFileIOTest { private HybridFileIO hybridFileIO; From a39faf0b5c33653162ad6a475b58cf03d9b477c1 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 14 Jan 2025 23:59:23 +0800 Subject: [PATCH 03/15] fix review comments --- .../org/apache/paimon/fs/HybridFileIO.java | 6 ++-- .../apache/paimon/fs/HybridFileIOTest.java | 31 ++++++++++++++++--- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java index 71a565a617c2..02c004d4c25e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java @@ -33,7 +33,7 @@ public class HybridFileIO implements FileIO { private static final long serialVersionUID = 1L; - protected Options options; + private Options options; private Map fileIOMap; private volatile FileIO fallbackFileIO; @@ -110,11 +110,11 @@ public FileIO fileIO(Path path) throws IOException { if (!fileIOMap.containsKey(scheme)) { CatalogContext catalogContext = CatalogContext.create(options); FileIO fileIO = FileIO.get(path, catalogContext); - fileIOMap.put(path.toUri().getScheme(), fileIO); + fileIOMap.put(scheme, fileIO); } } } - return fileIOMap.get(path.toUri().getScheme()); + return fileIOMap.get(scheme); } private T wrap(Func func) throws IOException { diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java index 7888136ab1c3..8b763047d0b1 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java @@ -74,11 +74,10 @@ public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { @Test public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Exception { - Path path = new Path("file:///path/to/file"); - + Path fileSchemePath = new Path("file:///path/to/file"); ExecutorService executorService = Executors.newFixedThreadPool(2); - Future future1 = executorService.submit(() -> hybridFileIO.fileIO(path)); - Future future2 = executorService.submit(() -> hybridFileIO.fileIO(path)); + Future future1 = executorService.submit(() -> hybridFileIO.fileIO(fileSchemePath)); + Future future2 = executorService.submit(() -> hybridFileIO.fileIO(fileSchemePath)); FileIO result1 = future1.get(); FileIO result2 = future2.get(); @@ -87,6 +86,30 @@ public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Excepti assertNotNull(result2); assertEquals(result1, result2); assertInstanceOf(LocalFileIO.class, result1); + + Path noSchemePath = new Path("/path/to/file"); + future1 = executorService.submit(() -> hybridFileIO.fileIO(noSchemePath)); + future2 = executorService.submit(() -> hybridFileIO.fileIO(noSchemePath)); + + result1 = future1.get(); + result2 = future2.get(); + + assertNotNull(result1); + assertNotNull(result2); + assertEquals(result1, result2); + assertInstanceOf(LocalFileIO.class, result1); + + Path hdfsSchemePath = new Path("hdfs:///path/to/file"); + future1 = executorService.submit(() -> hybridFileIO.fileIO(hdfsSchemePath)); + future2 = executorService.submit(() -> hybridFileIO.fileIO(hdfsSchemePath)); + + result1 = future1.get(); + result2 = future2.get(); + + assertNotNull(result1); + assertNotNull(result2); + assertEquals(result1, result2); + assertInstanceOf(HadoopFileIO.class, result1); } @Test From c99cfec6d8e09431e46d6b3b3e2f20ffe2bd138f Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 11:44:53 +0800 Subject: [PATCH 04/15] fix review comments --- .../java/org/apache/paimon/CoreOptions.java | 3 +- .../{HybridFileIO.java => LaziedFileIO.java} | 25 +++++++------ ...dFileIOTest.java => LaziedFileIOTest.java} | 36 +++++++++---------- .../paimon/table/FileStoreTableFactory.java | 4 +-- .../paimon/catalog/CatalogTestBase.java | 4 +-- 5 files changed, 38 insertions(+), 34 deletions(-) rename paimon-common/src/main/java/org/apache/paimon/fs/{HybridFileIO.java => LaziedFileIO.java} (83%) rename paimon-common/src/test/java/org/apache/paimon/fs/{HybridFileIOTest.java => LaziedFileIOTest.java} (79%) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 6f693903e592..93bc23a41f58 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -475,7 +475,8 @@ public class CoreOptions implements Serializable { key("write-buffer-spillable") .booleanType() .noDefaultValue() - .withDescription("Whether the write buffer can be spillable."); + .withDescription( + "Whether the write buffer can be spillable. Enabled by default when using object storage."); public static final ConfigOption WRITE_BUFFER_FOR_APPEND = key("write-buffer-for-append") diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java similarity index 83% rename from paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java rename to paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java index 02c004d4c25e..4e12f8eac645 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/HybridFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java @@ -20,20 +20,20 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.options.Options; +import org.apache.paimon.options.CatalogOptions; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * A hybrid implementation of {@link FileIO} that supports multiple file system schemas. It + * A lazy implementation of {@link FileIO} that supports multiple file system schemas. It * dynamically selects the appropriate {@link FileIO} based on the URI scheme of the given path. */ -public class HybridFileIO implements FileIO { +public class LaziedFileIO implements FileIO { private static final long serialVersionUID = 1L; - private Options options; + private CatalogContext context; private Map fileIOMap; private volatile FileIO fallbackFileIO; @@ -41,12 +41,17 @@ public class HybridFileIO implements FileIO { // TODO, how to decide the real fileio is object store or not? @Override public boolean isObjectStore() { - return false; + String warehouse = context.options().get(CatalogOptions.WAREHOUSE); + Path path = new Path(warehouse); + String scheme = path.toUri().getScheme(); + return scheme != null + && !scheme.equalsIgnoreCase("file") + && !scheme.equalsIgnoreCase("hdfs"); } @Override public void configure(CatalogContext context) { - this.options = context.options(); + this.context = context; this.fileIOMap = new ConcurrentHashMap<>(); } @@ -97,8 +102,7 @@ public FileIO fileIO(Path path) throws IOException { if (fallbackFileIO == null) { synchronized (this) { if (fallbackFileIO == null) { - CatalogContext catalogContext = CatalogContext.create(options); - fallbackFileIO = FileIO.get(path, catalogContext); + fallbackFileIO = FileIO.get(path, context); } } } @@ -108,8 +112,7 @@ public FileIO fileIO(Path path) throws IOException { if (!fileIOMap.containsKey(scheme)) { synchronized (this) { if (!fileIOMap.containsKey(scheme)) { - CatalogContext catalogContext = CatalogContext.create(options); - FileIO fileIO = FileIO.get(path, catalogContext); + FileIO fileIO = FileIO.get(path, context); fileIOMap.put(scheme, fileIO); } } @@ -120,7 +123,7 @@ public FileIO fileIO(Path path) throws IOException { private T wrap(Func func) throws IOException { ClassLoader cl = Thread.currentThread().getContextClassLoader(); try { - Thread.currentThread().setContextClassLoader(HybridFileIO.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(LaziedFileIO.class.getClassLoader()); return func.apply(); } finally { Thread.currentThread().setContextClassLoader(cl); diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java similarity index 79% rename from paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java rename to paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java index 8b763047d0b1..e8c110507e27 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/HybridFileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java @@ -35,23 +35,23 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -/** Tests for {@link HybridFileIO}. */ -public class HybridFileIOTest { +/** Tests for {@link LaziedFileIO}. */ +public class LaziedFileIOTest { - private HybridFileIO hybridFileIO; + private LaziedFileIO laziedFileIO; @BeforeEach public void setUp() { - hybridFileIO = new HybridFileIO(); + laziedFileIO = new LaziedFileIO(); Options options = new Options(); CatalogContext catalogContext = CatalogContext.create(options); - hybridFileIO.configure(catalogContext); + laziedFileIO.configure(catalogContext); } @Test public void testFileIONullSchemeReturnsFallbackFileIO() throws IOException { Path path = new Path("/path/to/file"); - FileIO result = hybridFileIO.fileIO(path); + FileIO result = laziedFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(LocalFileIO.class, result); } @@ -59,7 +59,7 @@ public void testFileIONullSchemeReturnsFallbackFileIO() throws IOException { @Test public void testFileIOReturnsLocalFileIO() throws IOException { Path path = new Path("file:///path/to/file"); - FileIO result = hybridFileIO.fileIO(path); + FileIO result = laziedFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(LocalFileIO.class, result); } @@ -67,7 +67,7 @@ public void testFileIOReturnsLocalFileIO() throws IOException { @Test public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { Path path = new Path("hdfs:///path/to/file"); - FileIO result = hybridFileIO.fileIO(path); + FileIO result = laziedFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(HadoopFileIO.class, result); } @@ -76,8 +76,8 @@ public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Exception { Path fileSchemePath = new Path("file:///path/to/file"); ExecutorService executorService = Executors.newFixedThreadPool(2); - Future future1 = executorService.submit(() -> hybridFileIO.fileIO(fileSchemePath)); - Future future2 = executorService.submit(() -> hybridFileIO.fileIO(fileSchemePath)); + Future future1 = executorService.submit(() -> laziedFileIO.fileIO(fileSchemePath)); + Future future2 = executorService.submit(() -> laziedFileIO.fileIO(fileSchemePath)); FileIO result1 = future1.get(); FileIO result2 = future2.get(); @@ -88,8 +88,8 @@ public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Excepti assertInstanceOf(LocalFileIO.class, result1); Path noSchemePath = new Path("/path/to/file"); - future1 = executorService.submit(() -> hybridFileIO.fileIO(noSchemePath)); - future2 = executorService.submit(() -> hybridFileIO.fileIO(noSchemePath)); + future1 = executorService.submit(() -> laziedFileIO.fileIO(noSchemePath)); + future2 = executorService.submit(() -> laziedFileIO.fileIO(noSchemePath)); result1 = future1.get(); result2 = future2.get(); @@ -100,8 +100,8 @@ public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Excepti assertInstanceOf(LocalFileIO.class, result1); Path hdfsSchemePath = new Path("hdfs:///path/to/file"); - future1 = executorService.submit(() -> hybridFileIO.fileIO(hdfsSchemePath)); - future2 = executorService.submit(() -> hybridFileIO.fileIO(hdfsSchemePath)); + future1 = executorService.submit(() -> laziedFileIO.fileIO(hdfsSchemePath)); + future2 = executorService.submit(() -> laziedFileIO.fileIO(hdfsSchemePath)); result1 = future1.get(); result2 = future2.get(); @@ -118,8 +118,8 @@ public void testFileIOMapStoresFileIOInstances() throws IOException { Path hdfsPath = new Path("hdfs:///path/to/hdfs/file1"); // First call should create new instances - FileIO localFileIO = hybridFileIO.fileIO(localPath); - FileIO hdfsFileIO = hybridFileIO.fileIO(hdfsPath); + FileIO localFileIO = laziedFileIO.fileIO(localPath); + FileIO hdfsFileIO = laziedFileIO.fileIO(hdfsPath); assertNotNull(localFileIO); assertNotNull(hdfsFileIO); @@ -127,8 +127,8 @@ public void testFileIOMapStoresFileIOInstances() throws IOException { assertInstanceOf(HadoopFileIO.class, hdfsFileIO); // Second call should return the same instances from fileIOMap - FileIO localFileIOAgain = hybridFileIO.fileIO(new Path("file:///path/to/local/file2")); - FileIO hdfsFileIOAgain = hybridFileIO.fileIO(new Path("hdfs:///path/to/local/file2")); + FileIO localFileIOAgain = laziedFileIO.fileIO(new Path("file:///path/to/local/file2")); + FileIO hdfsFileIOAgain = laziedFileIO.fileIO(new Path("hdfs:///path/to/local/file2")); assertNotNull(localFileIOAgain); assertNotNull(hdfsFileIOAgain); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 02d94b7a85f5..c8be2de426e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -21,7 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.HybridFileIO; +import org.apache.paimon.fs.LaziedFileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; @@ -37,7 +37,7 @@ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - FileIO fileIO = new HybridFileIO(); + FileIO fileIO = new LaziedFileIO(); fileIO.configure(context); return create(fileIO, context.options()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index d11a1d806951..a4383c12c882 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.HybridFileIO; +import org.apache.paimon.fs.LaziedFileIO; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -85,7 +85,7 @@ public void setUp() throws Exception { Options catalogOptions = new Options(); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); CatalogContext catalogContext = CatalogContext.create(catalogOptions); - fileIO = new HybridFileIO(); + fileIO = new LaziedFileIO(); fileIO.configure(catalogContext); } From ef85c9e41ee31849dec7f2a5f822ae8808561824 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 11:45:25 +0800 Subject: [PATCH 05/15] fix review comments --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 270dbca340c0..1e7aa59ed869 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1024,7 +1024,7 @@
write-buffer-spillable
(none) Boolean - Whether the write buffer can be spillable. + Whether the write buffer can be spillable. Enabled by default when using object storage.
write-manifest-cache
From bbbfdf335510179db009fc8af00335baa551d89e Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 14:18:05 +0800 Subject: [PATCH 06/15] change to ResolvingFileIO --- ...LaziedFileIO.java => ResolvingFileIO.java} | 8 ++-- ...leIOTest.java => ResolvingFileIOTest.java} | 38 ++++++++++--------- .../paimon/table/FileStoreTableFactory.java | 4 +- .../paimon/catalog/CatalogTestBase.java | 4 +- 4 files changed, 28 insertions(+), 26 deletions(-) rename paimon-common/src/main/java/org/apache/paimon/fs/{LaziedFileIO.java => ResolvingFileIO.java} (92%) rename paimon-common/src/test/java/org/apache/paimon/fs/{LaziedFileIOTest.java => ResolvingFileIOTest.java} (75%) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java similarity index 92% rename from paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java rename to paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 4e12f8eac645..51962c53f36e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/LaziedFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -27,10 +27,10 @@ import java.util.concurrent.ConcurrentHashMap; /** - * A lazy implementation of {@link FileIO} that supports multiple file system schemas. It - * dynamically selects the appropriate {@link FileIO} based on the URI scheme of the given path. + * An implementation of {@link FileIO} that supports multiple file system schemas. It dynamically + * selects the appropriate {@link FileIO} based on the URI scheme of the given path. */ -public class LaziedFileIO implements FileIO { +public class ResolvingFileIO implements FileIO { private static final long serialVersionUID = 1L; private CatalogContext context; @@ -123,7 +123,7 @@ public FileIO fileIO(Path path) throws IOException { private T wrap(Func func) throws IOException { ClassLoader cl = Thread.currentThread().getContextClassLoader(); try { - Thread.currentThread().setContextClassLoader(LaziedFileIO.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(ResolvingFileIO.class.getClassLoader()); return func.apply(); } finally { Thread.currentThread().setContextClassLoader(cl); diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/ResolvingFileIOTest.java similarity index 75% rename from paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java rename to paimon-common/src/test/java/org/apache/paimon/fs/ResolvingFileIOTest.java index e8c110507e27..dae270eb873a 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/LaziedFileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/ResolvingFileIOTest.java @@ -35,23 +35,23 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -/** Tests for {@link LaziedFileIO}. */ -public class LaziedFileIOTest { +/** Tests for {@link ResolvingFileIO}. */ +public class ResolvingFileIOTest { - private LaziedFileIO laziedFileIO; + private ResolvingFileIO resolvingFileIO; @BeforeEach public void setUp() { - laziedFileIO = new LaziedFileIO(); + resolvingFileIO = new ResolvingFileIO(); Options options = new Options(); CatalogContext catalogContext = CatalogContext.create(options); - laziedFileIO.configure(catalogContext); + resolvingFileIO.configure(catalogContext); } @Test public void testFileIONullSchemeReturnsFallbackFileIO() throws IOException { Path path = new Path("/path/to/file"); - FileIO result = laziedFileIO.fileIO(path); + FileIO result = resolvingFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(LocalFileIO.class, result); } @@ -59,7 +59,7 @@ public void testFileIONullSchemeReturnsFallbackFileIO() throws IOException { @Test public void testFileIOReturnsLocalFileIO() throws IOException { Path path = new Path("file:///path/to/file"); - FileIO result = laziedFileIO.fileIO(path); + FileIO result = resolvingFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(LocalFileIO.class, result); } @@ -67,7 +67,7 @@ public void testFileIOReturnsLocalFileIO() throws IOException { @Test public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { Path path = new Path("hdfs:///path/to/file"); - FileIO result = laziedFileIO.fileIO(path); + FileIO result = resolvingFileIO.fileIO(path); assertNotNull(result); assertInstanceOf(HadoopFileIO.class, result); } @@ -76,8 +76,10 @@ public void testFileIOWithSchemeReturnsHdfsFileIO() throws IOException { public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Exception { Path fileSchemePath = new Path("file:///path/to/file"); ExecutorService executorService = Executors.newFixedThreadPool(2); - Future future1 = executorService.submit(() -> laziedFileIO.fileIO(fileSchemePath)); - Future future2 = executorService.submit(() -> laziedFileIO.fileIO(fileSchemePath)); + Future future1 = + executorService.submit(() -> resolvingFileIO.fileIO(fileSchemePath)); + Future future2 = + executorService.submit(() -> resolvingFileIO.fileIO(fileSchemePath)); FileIO result1 = future1.get(); FileIO result2 = future2.get(); @@ -88,8 +90,8 @@ public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Excepti assertInstanceOf(LocalFileIO.class, result1); Path noSchemePath = new Path("/path/to/file"); - future1 = executorService.submit(() -> laziedFileIO.fileIO(noSchemePath)); - future2 = executorService.submit(() -> laziedFileIO.fileIO(noSchemePath)); + future1 = executorService.submit(() -> resolvingFileIO.fileIO(noSchemePath)); + future2 = executorService.submit(() -> resolvingFileIO.fileIO(noSchemePath)); result1 = future1.get(); result2 = future2.get(); @@ -100,8 +102,8 @@ public void testFileIOConcurrentAccessInitializesFallbackFileIO() throws Excepti assertInstanceOf(LocalFileIO.class, result1); Path hdfsSchemePath = new Path("hdfs:///path/to/file"); - future1 = executorService.submit(() -> laziedFileIO.fileIO(hdfsSchemePath)); - future2 = executorService.submit(() -> laziedFileIO.fileIO(hdfsSchemePath)); + future1 = executorService.submit(() -> resolvingFileIO.fileIO(hdfsSchemePath)); + future2 = executorService.submit(() -> resolvingFileIO.fileIO(hdfsSchemePath)); result1 = future1.get(); result2 = future2.get(); @@ -118,8 +120,8 @@ public void testFileIOMapStoresFileIOInstances() throws IOException { Path hdfsPath = new Path("hdfs:///path/to/hdfs/file1"); // First call should create new instances - FileIO localFileIO = laziedFileIO.fileIO(localPath); - FileIO hdfsFileIO = laziedFileIO.fileIO(hdfsPath); + FileIO localFileIO = resolvingFileIO.fileIO(localPath); + FileIO hdfsFileIO = resolvingFileIO.fileIO(hdfsPath); assertNotNull(localFileIO); assertNotNull(hdfsFileIO); @@ -127,8 +129,8 @@ public void testFileIOMapStoresFileIOInstances() throws IOException { assertInstanceOf(HadoopFileIO.class, hdfsFileIO); // Second call should return the same instances from fileIOMap - FileIO localFileIOAgain = laziedFileIO.fileIO(new Path("file:///path/to/local/file2")); - FileIO hdfsFileIOAgain = laziedFileIO.fileIO(new Path("hdfs:///path/to/local/file2")); + FileIO localFileIOAgain = resolvingFileIO.fileIO(new Path("file:///path/to/local/file2")); + FileIO hdfsFileIOAgain = resolvingFileIO.fileIO(new Path("hdfs:///path/to/local/file2")); assertNotNull(localFileIOAgain); assertNotNull(hdfsFileIOAgain); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index c8be2de426e4..57741cab58ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -21,8 +21,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.LaziedFileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.ResolvingFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -37,7 +37,7 @@ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - FileIO fileIO = new LaziedFileIO(); + FileIO fileIO = new ResolvingFileIO(); fileIO.configure(context); return create(fileIO, context.options()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index a4383c12c882..0a0da0fd8f9a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -20,7 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.LaziedFileIO; +import org.apache.paimon.fs.ResolvingFileIO; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -85,7 +85,7 @@ public void setUp() throws Exception { Options catalogOptions = new Options(); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); CatalogContext catalogContext = CatalogContext.create(catalogOptions); - fileIO = new LaziedFileIO(); + fileIO = new ResolvingFileIO(); fileIO.configure(catalogContext); } From d207bcee65a2debfaf9bcd1a9f563627db103b27 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 14:47:47 +0800 Subject: [PATCH 07/15] fix null exception --- .../src/main/java/org/apache/paimon/fs/ResolvingFileIO.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 51962c53f36e..146162874460 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -42,6 +42,9 @@ public class ResolvingFileIO implements FileIO { @Override public boolean isObjectStore() { String warehouse = context.options().get(CatalogOptions.WAREHOUSE); + if (warehouse == null) { + return false; + } Path path = new Path(warehouse); String scheme = path.toUri().getScheme(); return scheme != null From 48bf1d51e35b46767a0928fa973ceaad468b3ad9 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 20:50:27 +0800 Subject: [PATCH 08/15] fix review comments --- .../org/apache/paimon/fs/ResolvingFileIO.java | 65 ++++++++++++------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 146162874460..748dd9a98717 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -31,12 +32,12 @@ * selects the appropriate {@link FileIO} based on the URI scheme of the given path. */ public class ResolvingFileIO implements FileIO { + private static final long serialVersionUID = 1L; - private CatalogContext context; + private final Map fileIOMap = new ConcurrentHashMap<>(); - private Map fileIOMap; - private volatile FileIO fallbackFileIO; + private CatalogContext context; // TODO, how to decide the real fileio is object store or not? @Override @@ -55,7 +56,6 @@ public boolean isObjectStore() { @Override public void configure(CatalogContext context) { this.context = context; - this.fileIOMap = new ConcurrentHashMap<>(); } @Override @@ -100,27 +100,16 @@ public boolean rename(Path src, Path dst) throws IOException { @VisibleForTesting public FileIO fileIO(Path path) throws IOException { - String scheme = path.toUri().getScheme(); - if (scheme == null) { - if (fallbackFileIO == null) { - synchronized (this) { - if (fallbackFileIO == null) { - fallbackFileIO = FileIO.get(path, context); + CacheKey cacheKey = new CacheKey(path.toUri().getScheme(), path.toUri().getAuthority()); + return fileIOMap.computeIfAbsent( + cacheKey, + k -> { + try { + return FileIO.get(path, context); + } catch (IOException e) { + throw new RuntimeException(e); } - } - } - return fallbackFileIO; - } - - if (!fileIOMap.containsKey(scheme)) { - synchronized (this) { - if (!fileIOMap.containsKey(scheme)) { - FileIO fileIO = FileIO.get(path, context); - fileIOMap.put(scheme, fileIO); - } - } - } - return fileIOMap.get(scheme); + }); } private T wrap(Func func) throws IOException { @@ -138,4 +127,32 @@ private T wrap(Func func) throws IOException { protected interface Func { T apply() throws IOException; } + + private static class CacheKey { + private final String scheme; + private final String authority; + + private CacheKey(String scheme, String authority) { + this.scheme = scheme; + this.authority = authority; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(scheme, cacheKey.scheme) + && Objects.equals(authority, cacheKey.authority); + } + + @Override + public int hashCode() { + return Objects.hash(scheme, authority); + } + } } From 907c577381c5af5898dc1c26f43c4054b36326ea Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 17 Jan 2025 21:03:13 +0800 Subject: [PATCH 09/15] fix review comments --- .../src/main/java/org/apache/paimon/fs/ResolvingFileIO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 748dd9a98717..55c50e383804 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -23,6 +23,7 @@ import org.apache.paimon.options.CatalogOptions; import java.io.IOException; +import java.io.Serializable; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -128,7 +129,7 @@ protected interface Func { T apply() throws IOException; } - private static class CacheKey { + private static class CacheKey implements Serializable { private final String scheme; private final String authority; From c29e715af42f76cc71a4b870ba2f5ac03998bed4 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 20 Jan 2025 10:31:38 +0800 Subject: [PATCH 10/15] add data-file.external-paths.enabled catalogoptions --- .../generated/catalog_configuration.html | 6 ++++++ .../apache/paimon/options/CatalogOptions.java | 8 ++++++++ .../paimon/table/FileStoreTableFactory.java | 18 +++++++++++++++--- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 03efd178b8fc..42fa65ef89ea 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -140,5 +140,11 @@ String The warehouse root path of catalog. + +
data-file.external-paths.enabled
+ true + Boolean + Whether to enable external paths for data files, external paths allow you to specify the path of the data file, if you specify the path of the data file, you should also specify the access key and secret key catalog options. + diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index d0cfbeaf39ed..2cb1d1b9e648 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -149,4 +149,12 @@ public class CatalogOptions { "Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. " + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + " the metastore and need to be manually added as separate partition operations."); + + public static final ConfigOption EXTERNAL_PATH_ENABLED = + ConfigOptions.key("data-file.external-paths.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable external paths for data files, external paths allow you to specify the path of the data file, if you specify the path of the data file, " + + "you should also specify the access key and secret key catalog options."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 57741cab58ce..c1816717628c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -23,11 +23,14 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.utils.StringUtils; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Optional; import static org.apache.paimon.CoreOptions.PATH; @@ -35,10 +38,19 @@ /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { - public static FileStoreTable create(CatalogContext context) { - FileIO fileIO = new ResolvingFileIO(); - fileIO.configure(context); + boolean externalPathEnabled = context.options().get(CatalogOptions.EXTERNAL_PATH_ENABLED); + FileIO fileIO; + if (externalPathEnabled) { + fileIO = new ResolvingFileIO(); + fileIO.configure(context); + } else { + try { + fileIO = FileIO.get(CoreOptions.path(context.options()), context); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } return create(fileIO, context.options()); } From 867dc032bd4d9055396e8eb7eb4c40ea24e31c66 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 20 Jan 2025 11:36:41 +0800 Subject: [PATCH 11/15] fix docs description --- docs/layouts/shortcodes/generated/catalog_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 42fa65ef89ea..3856634ff95b 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -142,7 +142,7 @@
data-file.external-paths.enabled
- true + false Boolean Whether to enable external paths for data files, external paths allow you to specify the path of the data file, if you specify the path of the data file, you should also specify the access key and secret key catalog options. From 7519f5d197bf9ab85d79bae13f19e7018f1258e3 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 20 Jan 2025 15:49:17 +0800 Subject: [PATCH 12/15] rename the options --- .../shortcodes/generated/catalog_configuration.html | 4 ++-- .../java/org/apache/paimon/options/CatalogOptions.java | 9 +++++---- .../org/apache/paimon/table/FileStoreTableFactory.java | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 3856634ff95b..8ccd4bf4a46e 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -141,10 +141,10 @@ The warehouse root path of catalog. -
data-file.external-paths.enabled
+
resolving-fileio.enabled
false Boolean - Whether to enable external paths for data files, external paths allow you to specify the path of the data file, if you specify the path of the data file, you should also specify the access key and secret key catalog options. + Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key. diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 2cb1d1b9e648..dd0cb6c5b218 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -150,11 +150,12 @@ public class CatalogOptions { + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + " the metastore and need to be manually added as separate partition operations."); - public static final ConfigOption EXTERNAL_PATH_ENABLED = - ConfigOptions.key("data-file.external-paths.enabled") + public static final ConfigOption RESOLVING_FILEIO_ENABLED = + ConfigOptions.key("resolving-fileio.enabled") .booleanType() .defaultValue(false) .withDescription( - "Whether to enable external paths for data files, external paths allow you to specify the path of the data file, if you specify the path of the data file, " - + "you should also specify the access key and secret key catalog options."); + "Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, " + + "Paimon can read and write to external storage paths, such as OSS or S3. " + + "In order to access these external paths correctly, you also need to configure the corresponding access key and secret key."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index c1816717628c..1d702b64fad7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -39,9 +39,9 @@ /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - boolean externalPathEnabled = context.options().get(CatalogOptions.EXTERNAL_PATH_ENABLED); + boolean resolvingFileIOEnabled = context.options().get(CatalogOptions.RESOLVING_FILEIO_ENABLED); FileIO fileIO; - if (externalPathEnabled) { + if (resolvingFileIOEnabled) { fileIO = new ResolvingFileIO(); fileIO.configure(context); } else { From abc1f9a384dc5b03d97b19bb639cd81fb09a9142 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 20 Jan 2025 17:06:01 +0800 Subject: [PATCH 13/15] spotless --- .../java/org/apache/paimon/table/FileStoreTableFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 1d702b64fad7..ab3af2774536 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -39,7 +39,8 @@ /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - boolean resolvingFileIOEnabled = context.options().get(CatalogOptions.RESOLVING_FILEIO_ENABLED); + boolean resolvingFileIOEnabled = + context.options().get(CatalogOptions.RESOLVING_FILEIO_ENABLED); FileIO fileIO; if (resolvingFileIOEnabled) { fileIO = new ResolvingFileIO(); From a898543ce3419798dfb73619f2a9fd85b681fd24 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 21 Jan 2025 13:13:59 +0800 Subject: [PATCH 14/15] fix re view comments --- .../main/java/org/apache/paimon/fs/FileIO.java | 11 +++++++++++ .../paimon/table/FileStoreTableFactory.java | 17 ++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 5ba16acfca0f..c557ece03529 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.hadoop.HadoopFileIOLoader; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.CatalogOptions; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; @@ -411,6 +412,16 @@ default Optional readOverwrittenFileUtf8(Path path) throws IOException { * by the given path. */ static FileIO get(Path path, CatalogContext config) throws IOException { + boolean resolvingFileIOEnabled = + config.options().get(CatalogOptions.RESOLVING_FILEIO_ENABLED); + if (resolvingFileIOEnabled) { + FileIO fileIO = new ResolvingFileIO(); + // set to false to avoid infinite loop + config.options().set(CatalogOptions.RESOLVING_FILEIO_ENABLED, false); + fileIO.configure(config); + return fileIO; + } + URI uri = path.toUri(); if (LOG.isDebugEnabled()) { LOG.debug("Getting FileIO by scheme {}.", uri.getScheme()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index ab3af2774536..b8aa211f60a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -22,8 +22,6 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.ResolvingFileIO; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -39,18 +37,11 @@ /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { public static FileStoreTable create(CatalogContext context) { - boolean resolvingFileIOEnabled = - context.options().get(CatalogOptions.RESOLVING_FILEIO_ENABLED); FileIO fileIO; - if (resolvingFileIOEnabled) { - fileIO = new ResolvingFileIO(); - fileIO.configure(context); - } else { - try { - fileIO = FileIO.get(CoreOptions.path(context.options()), context); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + try { + fileIO = FileIO.get(CoreOptions.path(context.options()), context); + } catch (IOException e) { + throw new UncheckedIOException(e); } return create(fileIO, context.options()); } From 2e5ecb5379378ded7c89d9b3016b80158c649255 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 21 Jan 2025 13:15:14 +0800 Subject: [PATCH 15/15] fix review comments --- .../main/java/org/apache/paimon/table/FileStoreTableFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index b8aa211f60a4..afd6ce7da96b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -36,6 +36,7 @@ /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { + public static FileStoreTable create(CatalogContext context) { FileIO fileIO; try {