diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 52c3fa074a4cb4..c121e2d1c98972 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -33,7 +33,7 @@ under the License. ${basedir}/../../thirdparty 1 4.9.3 - 2.17.257 + 2.20.131 3.1.1-hw-46 8.2.7 @@ -548,7 +548,11 @@ under the License. org.apache.iceberg iceberg-aws - + + org.apache.iceberg + iceberg-aws-bundle + runtime + org.apache.paimon paimon-core diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java index 4977c84e495935..a1be776292da41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java @@ -23,8 +23,8 @@ import org.apache.hadoop.fs.s3a.Constants; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.aws.glue.GlueCatalog; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.catalog.Namespace; import java.util.List; @@ -55,7 +55,7 @@ protected void initLocalObjectsImpl() { // read from converted s3 endpoint or default by BE s3 endpoint String endpoint = catalogProperties.getOrDefault(Constants.ENDPOINT, catalogProperties.get(S3Properties.Env.ENDPOINT)); - catalogProperties.putIfAbsent(AwsProperties.S3FILEIO_ENDPOINT, endpoint); + catalogProperties.putIfAbsent(S3FileIOProperties.ENDPOINT, endpoint); glueCatalog.initialize(icebergCatalogType, catalogProperties); catalog = glueCatalog; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index 7eed82471f4111..aefdfb65ceaadb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -18,14 +18,16 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.datasource.CatalogProperty; -import org.apache.doris.datasource.iceberg.rest.DorisIcebergRestResolvedIO; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import java.util.HashMap; import java.util.Map; @@ -33,7 +35,7 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog { public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map props, - String comment) { + String comment) { super(catalogId, name, comment); props = PropertyConverter.convertToMetaProperties(props); catalogProperty = new CatalogProperty(resource, props); @@ -42,18 +44,12 @@ public IcebergRestExternalCatalog(long catalogId, String name, String resource, @Override protected void initLocalObjectsImpl() { icebergCatalogType = ICEBERG_REST; - Map restProperties = new HashMap<>(); - String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, ""); - restProperties.put(CatalogProperties.URI, restUri); - restProperties.put(CatalogProperties.FILE_IO_IMPL, DorisIcebergRestResolvedIO.class.getName()); - restProperties.putAll(catalogProperty.getProperties()); Configuration conf = replaceS3Properties(getConfiguration()); - RESTCatalog restCatalog = new RESTCatalog(); - restCatalog.setConf(conf); - restCatalog.initialize(icebergCatalogType, restProperties); - catalog = restCatalog; + catalog = CatalogUtil.buildIcebergCatalog(icebergCatalogType, + convertToRestCatalogProperties(), + conf); } private Configuration replaceS3Properties(Configuration conf) { @@ -71,4 +67,30 @@ private Configuration replaceS3Properties(Configuration conf) { catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1")); return conf; } + + private Map convertToRestCatalogProperties() { + + Map props = catalogProperty.getProperties(); + Map restProperties = new HashMap<>(props); + restProperties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); + restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + String restUri = props.getOrDefault(CatalogProperties.URI, ""); + restProperties.put(CatalogProperties.URI, restUri); + if (props.containsKey(S3Properties.ENDPOINT)) { + restProperties.put(S3FileIOProperties.ENDPOINT, props.get(S3Properties.ENDPOINT)); + } + if (props.containsKey(S3Properties.ACCESS_KEY)) { + restProperties.put(S3FileIOProperties.ACCESS_KEY_ID, props.get(S3Properties.ACCESS_KEY)); + } + if (props.containsKey(S3Properties.SECRET_KEY)) { + restProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, props.get(S3Properties.SECRET_KEY)); + } + if (props.containsKey(S3Properties.REGION)) { + restProperties.put(AwsClientProperties.CLIENT_REGION, props.get(S3Properties.REGION)); + } + if (props.containsKey(PropertyConverter.USE_PATH_STYLE)) { + restProperties.put(S3FileIOProperties.PATH_STYLE_ACCESS, props.get(PropertyConverter.USE_PATH_STYLE)); + } + return restProperties; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java index 53910e89392fff..2aab8e754ca2ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java @@ -19,21 +19,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.ClientPool; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.io.FileIO; import shade.doris.hive.org.apache.thrift.TException; public class DLFTableOperations extends HiveTableOperations { - private final ClientPool metaClients; - private final String database; - private final String tableName; - private final int metadataRefreshMaxRetries; - public DLFTableOperations(Configuration conf, ClientPool metaClients, FileIO fileIO, @@ -41,31 +33,5 @@ public DLFTableOperations(Configuration conf, String database, String table) { super(conf, metaClients, fileIO, catalogName, database, table); - this.metaClients = metaClients; - this.database = database; - this.tableName = table; - metadataRefreshMaxRetries = conf.getInt( - "iceberg.hive.metadata-refresh-max-retries", 2); - } - - @Override - protected void doRefresh() { - String metadataLocation = null; - try { - Table table = metaClients.run(client -> client.getTable(database, tableName)); - metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); - } catch (NoSuchObjectException e) { - if (currentMetadataLocation() != null) { - throw new NoSuchTableException("No such table: %s.%s", database, tableName); - } - } catch (TException e) { - String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); - throw new RuntimeException(errMsg, e); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted during refresh", e); - } - refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java deleted file mode 100644 index f55f304d73fc66..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java +++ /dev/null @@ -1,225 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.iceberg.rest; - -import org.apache.doris.common.util.S3Util; -import org.apache.doris.datasource.credentials.CloudCredential; -import org.apache.doris.datasource.property.constants.OssProperties; -import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.datasource.property.constants.S3Properties.Env; - -import com.amazonaws.glue.catalog.util.AWSGlueConfig; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.aws.s3.S3FileIO; -import org.apache.iceberg.hadoop.HadoopConfigurable; -import org.apache.iceberg.hadoop.SerializableConfiguration; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.util.SerializableMap; -import org.apache.iceberg.util.SerializableSupplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -/** - * FileIO implementation that uses location scheme to choose the correct FileIO implementation. - * Copy from org.apache.iceberg.io.ResolvingFileIO and only modify the io for s3 (to set region) - * */ -public class DorisIcebergRestResolvedIO implements FileIO, HadoopConfigurable { - private static final Logger LOG = LoggerFactory.getLogger(DorisIcebergRestResolvedIO.class); - private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; - private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; - private static final Map SCHEME_TO_FILE_IO = - ImmutableMap.of( - "s3", S3_FILE_IO_IMPL, - "s3a", S3_FILE_IO_IMPL, - "s3n", S3_FILE_IO_IMPL); - - private final Map ioInstances = Maps.newHashMap(); - private SerializableMap properties; - private SerializableSupplier hadoopConf; - - /** - * No-arg constructor to load the FileIO dynamically. - * - *

All fields are initialized by calling {@link DorisIcebergRestResolvedIO#initialize(Map)} later. - */ - public DorisIcebergRestResolvedIO() {} - - @Override - public InputFile newInputFile(String location) { - return io(location).newInputFile(location); - } - - @Override - public InputFile newInputFile(String location, long length) { - return io(location).newInputFile(location, length); - } - - @Override - public OutputFile newOutputFile(String location) { - return io(location).newOutputFile(location); - } - - @Override - public void deleteFile(String location) { - io(location).deleteFile(location); - } - - @Override - public Map properties() { - return properties.immutableMap(); - } - - @Override - public void initialize(Map newProperties) { - close(); // close and discard any existing FileIO instances - this.properties = SerializableMap.copyOf(newProperties); - } - - @Override - public void close() { - List instances = Lists.newArrayList(); - - synchronized (ioInstances) { - instances.addAll(ioInstances.values()); - ioInstances.clear(); - } - - for (FileIO io : instances) { - io.close(); - } - } - - @Override - public void serializeConfWith( - Function> confSerializer) { - this.hadoopConf = confSerializer.apply(hadoopConf.get()); - } - - @Override - public void setConf(Configuration conf) { - this.hadoopConf = new SerializableConfiguration(conf)::get; - } - - @Override - public Configuration getConf() { - return hadoopConf.get(); - } - - private FileIO io(String location) { - String impl = implFromLocation(location); - FileIO io = ioInstances.get(impl); - if (io != null) { - return io; - } - - synchronized (ioInstances) { - // double check while holding the lock - io = ioInstances.get(impl); - if (io != null) { - return io; - } - - Configuration conf = hadoopConf.get(); - - try { - if (impl.equals(S3_FILE_IO_IMPL)) { - io = createS3FileIO(properties); - } else { - io = CatalogUtil.loadFileIO(impl, properties, conf); - } - } catch (IllegalArgumentException e) { - if (impl.equals(FALLBACK_IMPL)) { - // no implementation to fall back to, throw the exception - throw e; - } else { - // couldn't load the normal class, fall back to HadoopFileIO - LOG.warn( - "Failed to load FileIO implementation: {}, falling back to {}", - impl, - FALLBACK_IMPL, - e); - try { - io = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); - } catch (IllegalArgumentException suppressed) { - LOG.warn( - "Failed to load FileIO implementation: {} (fallback)", FALLBACK_IMPL, suppressed); - // both attempts failed, throw the original exception with the later exception - // suppressed - e.addSuppressed(suppressed); - throw e; - } - } - } - - ioInstances.put(impl, io); - } - - return io; - } - - private static String implFromLocation(String location) { - return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL); - } - - private static String scheme(String location) { - int colonPos = location.indexOf(":"); - if (colonPos > 0) { - return location.substring(0, colonPos); - } - - return null; - } - - protected FileIO createS3FileIO(Map properties) { - - // get region - String region = properties.getOrDefault(S3Properties.REGION, - properties.getOrDefault(AWSGlueConfig.AWS_REGION, properties.get(Env.REGION))); - - // get endpoint - String s3Endpoint = properties.getOrDefault(S3Properties.ENDPOINT, properties.get(Env.ENDPOINT)); - URI endpointUri = URI.create(s3Endpoint); - - // set credential - CloudCredential credential = new CloudCredential(); - credential.setAccessKey(properties.getOrDefault(S3Properties.ACCESS_KEY, - properties.get(S3Properties.Env.ACCESS_KEY))); - credential.setSecretKey(properties.getOrDefault(S3Properties.SECRET_KEY, - properties.get(S3Properties.Env.SECRET_KEY))); - if (properties.containsKey(OssProperties.SESSION_TOKEN) - || properties.containsKey(S3Properties.Env.TOKEN)) { - credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN, - properties.get(S3Properties.Env.TOKEN))); - } - - FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential)); - io.initialize(properties); - return io; - } -} diff --git a/fe/pom.xml b/fe/pom.xml index 3cb5af10e6e48f..a95a62b2c8d6bf 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -302,7 +302,7 @@ under the License. - 1.1.0 + 1.4.3 3.0.0rc1 0.45.2-public 1.11.2 @@ -1288,6 +1288,11 @@ under the License. iceberg-aws ${iceberg.version} + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + com.aliyun.odps odps-sdk-core