From b58c5948167af49870fa53013a0362f9baaf888a Mon Sep 17 00:00:00 2001 From: "xinwei.chen" Date: Tue, 26 Mar 2024 14:56:25 +0800 Subject: [PATCH 1/2] [filesystems] Supported Tencent Cloud COS filesystem --- docs/content/filesystems/cosn.md | 102 ++++++ docs/content/filesystems/overview.md | 1 + docs/themes/book | 2 +- paimon-filesystems/paimon-cosn-impl/pom.xml | 120 ++++++++ .../org/apache/paimon/cosn/COSNFileIO.java | 158 ++++++++++ .../paimon/cosn/HadoopCompliantFileIO.java | 290 ++++++++++++++++++ paimon-filesystems/paimon-cosn/pom.xml | 125 ++++++++ .../org/apache/paimon/cosn/COSNLoader.java | 90 ++++++ .../org.apache.paimon.fs.FileIOLoader | 16 + paimon-filesystems/pom.xml | 3 + 10 files changed, 906 insertions(+), 1 deletion(-) create mode 100644 docs/content/filesystems/cosn.md create mode 100644 paimon-filesystems/paimon-cosn-impl/pom.xml create mode 100644 paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/COSNFileIO.java create mode 100644 paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java create mode 100644 paimon-filesystems/paimon-cosn/pom.xml create mode 100644 paimon-filesystems/paimon-cosn/src/main/java/org/apache/paimon/cosn/COSNLoader.java create mode 100644 paimon-filesystems/paimon-cosn/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader diff --git a/docs/content/filesystems/cosn.md b/docs/content/filesystems/cosn.md new file mode 100644 index 000000000000..0da5e334d254 --- /dev/null +++ b/docs/content/filesystems/cosn.md @@ -0,0 +1,102 @@ +--- +title: "cosn" +weight: 3 +type: docs +aliases: +- /filesystems/cosn.html +--- + + +# cosn + +{{< stable >}} + +Download [paimon-cosn-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-cosn/{{< version >}}/paimon-oss-{{< version >}}.jar). + +{{< /stable >}} + +{{< unstable >}} + +Download [paimon-cosn-{{< version >}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-cosn/{{< version >}}/). + +{{< /unstable >}} + +{{< tabs "cosn" >}} + +{{< tab "Flink" >}} + +Put `paimon-cosn-{{< version >}}.jar` into `lib` directory of your Flink home, and create catalog: + +```sql +CREATE CATALOG my_catalog WITH ( + 'type' = 'paimon', + 'warehouse' = 'cosn:///', + 'fs.cosn.bucket.endpoint_suffix' = 'cos.ap-beijing.myqcloud.com', + 'fs.cosn.userinfo.secretId' = 'AKIDxxxx', + 'fs.cosn.userinfo.secretKey' = 'yyy' +); +``` + +{{< /tab >}} + +{{< tab "Spark" >}} + +{{< hint info >}} +If you have already configured cosn access through Spark (Via Hadoop FileSystem), here you can skip the following configuration. +{{< /hint >}} + +Place `paimon-cosn-{{< version >}}.jar` together with `paimon-spark-{{< version >}}.jar` under Spark's jars directory, and start like + +```shell +spark-sql \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.warehouse=cosn:/// \ + --conf spark.sql.catalog.paimon.fs.cosn.bucket.endpoint_suffix=cos.ap-beijing.myqcloud.com \ + --conf spark.sql.catalog.paimon.fs.cosn.userinfo.secretId=AKIDxxxx \ + --conf spark.sql.catalog.paimon.fs.cosn.userinfo.secretKey=yyy +``` + +{{< /tab >}} + +{{< tab "Hive" >}} + +{{< hint info >}} +If you have already configured cosn access through Hive (Via Hadoop FileSystem), here you can skip the following configuration. +{{< /hint >}} + +NOTE: You need to ensure that Hive metastore can access `cosn` + +Place `paimon-cosn-{{< version >}}.jar` together with `paimon-hive-connector-{{< version >}}.jar` under Hive's auxlib directory, and start like + +```sql +SET paimon.fs.cosn.bucket.endpoint_suffix=cos.ap-beijing.myqcloud.com; +SET paimon.fs.cosn.userinfo.secretId=AKIDxxxx; +SET paimon.fs.cosn.userinfo.secretKey=yyy; +``` + +And read table from hive metastore, table can be created by Flink or Spark, see [Catalog with Hive Metastore]({{< ref "how-to/creating-catalogs" >}}) +```sql +SELECT * FROM test_table; +SELECT COUNT(1) FROM test_table; +``` + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/filesystems/overview.md b/docs/content/filesystems/overview.md index 3de3c2ec500a..347c76ecb034 100644 --- a/docs/content/filesystems/overview.md +++ b/docs/content/filesystems/overview.md @@ -40,6 +40,7 @@ FileSystem pluggable jars for user to query tables from Spark/Hive side. | Local File System | file:// | N | Built-in Support | | HDFS | hdfs:// | N | Built-in Support, ensure that the cluster is in the hadoop environment | | Aliyun OSS | oss:// | Y | | +| Tencent Cloud COS | cosn:// | Y | | | S3 | s3:// | Y | | ## Dependency diff --git a/docs/themes/book b/docs/themes/book index a486adf8462c..2dffe0bc7a5c 160000 --- a/docs/themes/book +++ b/docs/themes/book @@ -1 +1 @@ -Subproject commit a486adf8462c0abfc9034436ddd72927d6656809 +Subproject commit 2dffe0bc7a5caac3e49bf2abe943ca412d5f4333 diff --git a/paimon-filesystems/paimon-cosn-impl/pom.xml b/paimon-filesystems/paimon-cosn-impl/pom.xml new file mode 100644 index 000000000000..2f8aa827e309 --- /dev/null +++ b/paimon-filesystems/paimon-cosn-impl/pom.xml @@ -0,0 +1,120 @@ + + + + 4.0.0 + + + paimon-filesystems + org.apache.paimon + 0.8-SNAPSHOT + + + paimon-cosn-impl + Paimon : FileSystems : COSN : Impl + jar + + + 3.13.2 + + + + + org.apache.paimon + paimon-hadoop-shaded + ${project.version} + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + com.qcloud.cos + hadoop-cos + ${fs.cos.qcloud.version} + + + + org.apache.hadoop + hadoop-common + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + + + + + javax.xml.bind + jaxb-api + ${jaxb.api.version} + + provided + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + *:* + + + + + * + + .gitkeep + mime.types + mozilla/** + + + + + + + + + + + diff --git a/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/COSNFileIO.java b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/COSNFileIO.java new file mode 100644 index 000000000000..8c59c67773ad --- /dev/null +++ b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/COSNFileIO.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.cosn; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CosFileSystem; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** OSS {@link FileIO}. */ +public class COSNFileIO extends HadoopCompliantFileIO { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(COSNFileIO.class); + + /** + * In order to simplify, we make paimon oss configuration keys same with hadoop cos module. So, + * we add all configuration key with prefix `fs.cosn` in paimon conf to hadoop conf. + */ + private static final String[] CONFIG_PREFIXES = {"fs.cosn."}; + + private static final String COSN_SECRET_ID = "fs.cosn.userinfo.secretId"; + private static final String COSN_SECRET_KEY = "fs.cosn.userinfo.secretKey"; + + private static final Map CASE_SENSITIVE_KEYS = + new HashMap() { + { + put(COSN_SECRET_ID.toLowerCase(), COSN_SECRET_ID); + put(COSN_SECRET_KEY.toLowerCase(), COSN_SECRET_KEY); + } + }; + + /** + * Cache CosFileSystem, at present, there is no good mechanism to ensure that the file system + * will be shut down, so here the fs cache is used to avoid resource leakage. + */ + private static final Map CACHE = new ConcurrentHashMap<>(); + + private Options hadoopOptions; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + public void configure(CatalogContext context) { + hadoopOptions = new Options(); + // read all configuration with prefix 'CONFIG_PREFIXES' + for (String key : context.options().keySet()) { + for (String prefix : CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + String value = context.options().get(key); + if (CASE_SENSITIVE_KEYS.containsKey(key.toLowerCase())) { + key = CASE_SENSITIVE_KEYS.get(key.toLowerCase()); + } + hadoopOptions.set(key, value); + LOG.debug( + "Adding config entry for {} as {} to Hadoop config", + key, + hadoopOptions.get(key)); + } + } + } + } + + @Override + protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { + final String scheme = path.toUri().getScheme(); + final String authority = path.toUri().getAuthority(); + return CACHE.computeIfAbsent( + new CacheKey(hadoopOptions, scheme, authority), + key -> { + Configuration hadoopConf = new Configuration(); + key.options.toMap().forEach(hadoopConf::set); + URI fsUri = path.toUri(); + if (scheme == null && authority == null) { + fsUri = FileSystem.getDefaultUri(hadoopConf); + } else if (scheme != null && authority == null) { + URI defaultUri = FileSystem.getDefaultUri(hadoopConf); + if (scheme.equals(defaultUri.getScheme()) + && defaultUri.getAuthority() != null) { + fsUri = defaultUri; + } + } + + CosFileSystem fs = new CosFileSystem(); + try { + fs.initialize(fsUri, hadoopConf); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return fs; + }); + } + + private static class CacheKey { + + private final Options options; + private final String scheme; + private final String authority; + + private CacheKey(Options options, String scheme, String authority) { + this.options = options; + this.scheme = scheme; + this.authority = authority; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(options, cacheKey.options) + && Objects.equals(scheme, cacheKey.scheme) + && Objects.equals(authority, cacheKey.authority); + } + + @Override + public int hashCode() { + return Objects.hash(options, scheme, authority); + } + } +} diff --git a/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java new file mode 100644 index 000000000000..c628ee47b222 --- /dev/null +++ b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.cosn; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Hadoop {@link FileIO}. + * + *

Important: copy this class from HadoopFileIO here to avoid class loader conflicts. + */ +public abstract class HadoopCompliantFileIO implements FileIO { + + private static final long serialVersionUID = 1L; + + protected transient volatile Map fsMap; + + @Override + public SeekableInputStream newInputStream(Path path) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return new HadoopSeekableInputStream(getFileSystem(hadoopPath).open(hadoopPath)); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return new HadoopPositionOutputStream( + getFileSystem(hadoopPath).create(hadoopPath, overwrite)); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return new HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath)); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + FileStatus[] statuses = new FileStatus[0]; + org.apache.hadoop.fs.FileStatus[] hadoopStatuses = + getFileSystem(hadoopPath).listStatus(hadoopPath); + if (hadoopStatuses != null) { + statuses = new FileStatus[hadoopStatuses.length]; + for (int i = 0; i < hadoopStatuses.length; i++) { + statuses[i] = new HadoopFileStatus(hadoopStatuses[i]); + } + } + return statuses; + } + + @Override + public boolean exists(Path path) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return getFileSystem(hadoopPath).exists(hadoopPath); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return getFileSystem(hadoopPath).delete(hadoopPath, recursive); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + return getFileSystem(hadoopPath).mkdirs(hadoopPath); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + org.apache.hadoop.fs.Path hadoopSrc = path(src); + org.apache.hadoop.fs.Path hadoopDst = path(dst); + return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); + } + + private org.apache.hadoop.fs.Path path(Path path) { + return new org.apache.hadoop.fs.Path(path.toUri()); + } + + private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { + if (fsMap == null) { + synchronized (this) { + if (fsMap == null) { + fsMap = new ConcurrentHashMap<>(); + } + } + } + + Map map = fsMap; + + String authority = path.toUri().getAuthority(); + if (authority == null) { + authority = "DEFAULT"; + } + FileSystem fs = map.get(authority); + if (fs == null) { + fs = createFileSystem(path); + map.put(authority, fs); + } + return fs; + } + + protected abstract FileSystem createFileSystem(org.apache.hadoop.fs.Path path) + throws IOException; + + private static class HadoopSeekableInputStream extends SeekableInputStream { + + /** + * Minimum amount of bytes to skip forward before we issue a seek instead of discarding + * read. + * + *

The current value is just a magic number. In the long run, this value could become + * configurable, but for now it is a conservative, relatively small value that should bring + * safe improvements for small skips (e.g. in reading meta data), that would hurt the most + * with frequent seeks. + * + *

The optimal value depends on the DFS implementation and configuration plus the + * underlying filesystem. For now, this number is chosen "big enough" to provide + * improvements for smaller seeks, and "small enough" to avoid disadvantages over real + * seeks. While the minimum should be the page size, a true optimum per system would be the + * amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, + * seektime is not constant and devices, OS, and DFS potentially also use read buffers and + * read-ahead. + */ + private static final int MIN_SKIP_BYTES = 1024 * 1024; + + private final FSDataInputStream in; + + private HadoopSeekableInputStream(FSDataInputStream in) { + this.in = in; + } + + @Override + public void seek(long seekPos) throws IOException { + // We do some optimizations to avoid that some implementations of distributed FS perform + // expensive seeks when they are actually not needed. + long delta = seekPos - getPos(); + + if (delta > 0L && delta <= MIN_SKIP_BYTES) { + // Instead of a small forward seek, we skip over the gap + skipFully(delta); + } else if (delta != 0L) { + // For larger gaps and backward seeks, we do a real seek + forceSeek(seekPos); + } // Do nothing if delta is zero. + } + + @Override + public long getPos() throws IOException { + return in.getPos(); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public void close() throws IOException { + in.close(); + } + + /** + * Positions the stream to the given location. In contrast to {@link #seek(long)}, this + * method will always issue a "seek" command to the dfs and may not replace it by {@link + * #skip(long)} for small seeks. + * + *

Notice that the underlying DFS implementation can still decide to do skip instead of + * seek. + * + * @param seekPos the position to seek to. + */ + public void forceSeek(long seekPos) throws IOException { + in.seek(seekPos); + } + + /** + * Skips over a given amount of bytes in the stream. + * + * @param bytes the number of bytes to skip. + */ + public void skipFully(long bytes) throws IOException { + while (bytes > 0) { + bytes -= in.skip(bytes); + } + } + } + + private static class HadoopPositionOutputStream extends PositionOutputStream { + + private final FSDataOutputStream out; + + private HadoopPositionOutputStream(FSDataOutputStream out) { + this.out = out; + } + + @Override + public long getPos() { + return out.getPos(); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + out.hflush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + private static class HadoopFileStatus implements FileStatus { + + private final org.apache.hadoop.fs.FileStatus status; + + private HadoopFileStatus(org.apache.hadoop.fs.FileStatus status) { + this.status = status; + } + + @Override + public long getLen() { + return status.getLen(); + } + + @Override + public boolean isDir() { + return status.isDirectory(); + } + + @Override + public Path getPath() { + return new Path(status.getPath().toUri()); + } + + @Override + public long getModificationTime() { + return status.getModificationTime(); + } + } +} diff --git a/paimon-filesystems/paimon-cosn/pom.xml b/paimon-filesystems/paimon-cosn/pom.xml new file mode 100644 index 000000000000..75bb045a6036 --- /dev/null +++ b/paimon-filesystems/paimon-cosn/pom.xml @@ -0,0 +1,125 @@ + + + + 4.0.0 + + + paimon-filesystems + org.apache.paimon + 0.8-SNAPSHOT + + + paimon-cosn + Paimon : FileSystems : COSN + jar + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + org.apache.paimon + paimon-cosn-impl + ${project.version} + runtime + true + + + * + * + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-cos-classes + prepare-package + + unpack + + + + + org.apache.paimon + paimon-cosn-impl + ${project.version} + jar + true + ${project.build.directory}/classes/paimon-plugin-cosn + META-INF/** + + + org.apache.paimon + paimon-cosn-impl + ${project.version} + jar + true + ${project.build.directory}/classes/paimon-plugin-cosn + META-INF/services/**,META-INF/versions/** + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + + + + org.apache.paimon:paimon-cosn-impl + + + + + org.apache.paimon:paimon-cosn-impl + + META-INF/** + + + META-INF/services/** + + + + + + + + + + + diff --git a/paimon-filesystems/paimon-cosn/src/main/java/org/apache/paimon/cosn/COSNLoader.java b/paimon-filesystems/paimon-cosn/src/main/java/org/apache/paimon/cosn/COSNLoader.java new file mode 100644 index 000000000000..b4a90d1cd659 --- /dev/null +++ b/paimon-filesystems/paimon-cosn/src/main/java/org/apache/paimon/cosn/COSNLoader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.cosn; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOLoader; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PluginFileIO; +import org.apache.paimon.plugin.PluginLoader; + +import java.util.ArrayList; +import java.util.List; + +/** A {@link PluginLoader} to load cosn. */ +public class COSNLoader implements FileIOLoader { + + private static final String COSN_CLASSES_DIR = "paimon-plugin-cosn"; + + private static final String COSN_CLASS = "org.apache.paimon.cosn.COSNFileIO"; + + // Singleton lazy initialization + + private static PluginLoader loader; + + private static synchronized PluginLoader getLoader() { + if (loader == null) { + // Avoid NoClassDefFoundError without cause by exception + loader = new PluginLoader(COSN_CLASSES_DIR); + } + return loader; + } + + @Override + public String getScheme() { + return "cosn"; + } + + @Override + public List requiredOptions() { + List options = new ArrayList<>(); + options.add(new String[] {"fs.cosn.bucket.endpoint_suffix"}); + options.add(new String[] {"fs.cosn.userinfo.secretId"}); + options.add(new String[] {"fs.cosn.userinfo.secretKey"}); + return options; + } + + @Override + public FileIO load(Path path) { + return new CosnPluginFileIO(); + } + + private static class CosnPluginFileIO extends PluginFileIO { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + protected FileIO createFileIO(Path path) { + FileIO fileIO = getLoader().newInstance(COSN_CLASS); + fileIO.configure(CatalogContext.create(options)); + return fileIO; + } + + @Override + protected ClassLoader pluginClassLoader() { + return getLoader().submoduleClassLoader(); + } + } +} diff --git a/paimon-filesystems/paimon-cosn/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader b/paimon-filesystems/paimon-cosn/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader new file mode 100644 index 000000000000..80d859fbdd3c --- /dev/null +++ b/paimon-filesystems/paimon-cosn/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.cosn.COSNLoader diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml index 53aee6d44b24..d19bdc448a6d 100644 --- a/paimon-filesystems/pom.xml +++ b/paimon-filesystems/pom.xml @@ -33,6 +33,8 @@ paimon-hadoop-shaded + paimon-cosn + paimon-cosn-impl paimon-oss paimon-oss-impl paimon-s3 @@ -41,6 +43,7 @@ 3.3.4 + 3.3.0-8.3.5 1.12.319 1.9.4 From f97c4e87b61dcf5bc7c549e77744962c49569890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=96=B0=E4=BC=9F?= Date: Wed, 27 Mar 2024 15:16:34 +0800 Subject: [PATCH 2/2] [filesystems] Fix NoClassDefFoundError by cos --- paimon-filesystems/paimon-cosn-impl/pom.xml | 7 +++++++ paimon-filesystems/paimon-cosn/pom.xml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/paimon-filesystems/paimon-cosn-impl/pom.xml b/paimon-filesystems/paimon-cosn-impl/pom.xml index 2f8aa827e309..654b878f6b62 100644 --- a/paimon-filesystems/paimon-cosn-impl/pom.xml +++ b/paimon-filesystems/paimon-cosn-impl/pom.xml @@ -78,6 +78,13 @@ provided + + + + commons-codec + commons-codec + 1.16.0 + diff --git a/paimon-filesystems/paimon-cosn/pom.xml b/paimon-filesystems/paimon-cosn/pom.xml index 75bb045a6036..ca3905317242 100644 --- a/paimon-filesystems/paimon-cosn/pom.xml +++ b/paimon-filesystems/paimon-cosn/pom.xml @@ -61,7 +61,7 @@ maven-dependency-plugin - copy-cos-classes + copy-cosn-classes prepare-package unpack