From be7916574fcf59facf9f3c8bb7012aa031f8309c Mon Sep 17 00:00:00 2001 From: slothever Date: Tue, 6 Aug 2024 01:56:23 +0800 Subject: [PATCH 1/3] split iceberg --- .../authentication/Authenticator.java | 33 ++ .../authentication/HadoopAuthenticator.java | 2 +- .../doris/datasource/ExternalCatalog.java | 19 ++ .../datasource/hive/HMSExternalCatalog.java | 11 +- .../hive/HiveMetaStoreClientHelper.java | 7 - .../iceberg/IcebergExternalCatalog.java | 24 ++ .../iceberg/IcebergHadoopCatalog.java | 305 ++++++++++++++++++ .../iceberg/IcebergHadoopExternalCatalog.java | 32 +- .../iceberg/IcebergMetadataCache.java | 27 +- .../iceberg/IcebergMetadataOps.java | 2 +- .../datasource/iceberg/IcebergUtils.java | 31 +- .../iceberg/hadoop/IcebergHadoopFileIO.java | 71 ++++ .../hadoop/IcebergHadoopInputFile.java | 104 ++++++ .../hadoop/IcebergHadoopOutputFile.java | 73 +++++ .../hadoop/IcebergHadoopTableOperations.java | 48 +++ .../hadoop/IcebergPositionOutputStream.java | 62 ++++ .../hadoop/IcebergSeekableInputStream.java | 62 ++++ .../iceberg/source/IcebergScanNode.java | 7 +- .../doris/fs/remote/dfs/DFSFileSystem.java | 49 ++- .../kerberos/test_two_iceberg_kerberos.groovy | 124 +++++++ 20 files changed, 1045 insertions(+), 48 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java create mode 100644 regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java new file mode 100644 index 00000000000000..2e6c1602f0f4fe --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public interface Authenticator { + + T doAs(PrivilegedExceptionAction action) throws IOException; + + default void doAsNoReturn(Runnable action) throws IOException { + doAs(() -> { + action.run(); + return null; + }); + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index c3cab5f410be3a..dce3eb8e7b509f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -public interface HadoopAuthenticator { +public interface HadoopAuthenticator extends Authenticator { UserGroupInformation getUGI() throws IOException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index d99ac76c7b9f13..fbea197b078d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -37,6 +37,7 @@ import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.security.authentication.Authenticator; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; @@ -78,6 +79,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -157,6 +159,23 @@ public Configuration getConfiguration() { return conf; } + /** + * get authenticator for catalog + * return a dummy authenticator by default + */ + public synchronized Authenticator getAuthenticator() { + return new Authenticator() { + @Override + public T doAs(PrivilegedExceptionAction action) throws IOException { + try { + return action.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + }; + } + /** * set some default properties when creating catalog * @return list of database names in this catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index be9bf388adbd31..22f1ab049d2cad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -42,7 +42,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -70,7 +69,6 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; - @Getter private HadoopAuthenticator authenticator; @VisibleForTesting @@ -263,7 +261,12 @@ public String getHiveMetastoreUris() { return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); } - public String getHiveVersion() { - return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index db6019eda97eef..352e00a76208bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -31,7 +31,6 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.ArrayType; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -41,7 +40,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopUGI; -import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.TExprOpcode; @@ -813,11 +811,6 @@ public static Schema getHudiTableSchema(HMSExternalTable table) { return hudiSchema; } - public static T ugiDoAs(long catalogId, PrivilegedExceptionAction action) { - return ugiDoAs(((ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(), - action); - } - public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { // if hive config is not ready, then use hadoop kerberos to login AuthenticationConfig krbConfig = AuthenticationConfig.getKerberosConfig(conf, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6f79afd5de5d7f..a9029499ad8849 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -26,8 +26,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,4 +86,25 @@ protected void initS3Param(Configuration conf) { Map properties = catalogProperty.getHadoopProperties(); conf.set(Constants.AWS_CREDENTIALS_PROVIDER, PropertyConverter.getAWSCredentialsProviders(properties)); } + + public Table loadTable(TableIdentifier of) { + Table tbl = getCatalog().loadTable(of); + Map extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } + + public static void initIcebergTableFileIO(Table table, Map props) { + Map ioConf = new HashMap<>(); + table.properties().forEach((key, value) -> { + if (key.startsWith("io.")) { + ioConf.put(key, value); + } + }); + + // This `initialize` method will directly override the properties as a whole, + // so we need to merge the table's io-related properties with the doris's catalog-related properties + props.putAll(ioConf); + table.io().initialize(props); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java new file mode 100644 index 00000000000000..c344ba84b0ebd3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java @@ -0,0 +1,305 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopTableOperations; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.LockManager; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.LockManagers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IcebergHadoopCatalog extends HadoopCatalog { + private static final Logger LOG = LoggerFactory.getLogger(IcebergHadoopCatalog.class); + private String warehouseLocation; + private DFSFileSystem fs; + private CloseableGroup closeableGroup; + private LockManager lockManager; + protected FileIO fileIO; + protected String uid; + private static final Joiner SLASH = Joiner.on("/"); + + public void initialize(String name, Map properties) { + super.initialize(name, properties); + String inputWarehouseLocation = properties.get("warehouse"); + Preconditions.checkArgument(inputWarehouseLocation != null && inputWarehouseLocation.length() > 0, + "Cannot initialize HadoopCatalog because warehousePath must not be null or empty"); + this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation); + this.fs = new DFSFileSystem(properties); + this.fileIO = initializeFileIO(properties, getConf()); + this.lockManager = LockManagers.from(properties); + this.closeableGroup = new CloseableGroup(); + this.closeableGroup.addCloseable(this.lockManager); + this.closeableGroup.setSuppressCloseFailure(true); + } + + protected FileIO initializeFileIO(Map properties, Configuration hadoopConf) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + /* when use the S3FileIO, we need some custom configurations, + * so HadoopFileIO is used in the superclass by default + * we can add better implementations to derived class just like the implementation in DLFCatalog. + */ + FileIO io; + try { + io = new IcebergHadoopFileIO(hadoopConf, this.fs.rawFileSystem()); + } catch (IOException e) { + throw new RuntimeException(e); + } + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier identifier) { + return new IcebergHadoopTableOperations(new Path(this.defaultWarehouseLocation(identifier)), + this.fileIO, getConf(), this.lockManager, this.fs); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + String tableName = tableIdentifier.name(); + StringBuilder sb = new StringBuilder(); + sb.append(this.warehouseLocation).append('/'); + String[] levels = tableIdentifier.namespace().levels(); + for (String level : levels) { + sb.append(level).append('/'); + } + sb.append(tableName); + return sb.toString(); + } + + @Override + public void createNamespace(Namespace namespace, Map meta) { + Preconditions.checkArgument(!namespace.isEmpty(), + "Cannot create namespace with invalid name: %s", namespace); + if (!meta.isEmpty()) { + throw new UnsupportedOperationException("Cannot create namespace " + namespace + + ": metadata is not supported"); + } else { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath)) { + throw new AlreadyExistsException("Namespace already exists: %s", namespace); + } else { + try { + this.fs.mkdirs(nsPath); + } catch (IOException e) { + throw new RuntimeIOException(e, "Create namespace failed: %s", namespace); + } + } + } + } + + @Override + public List listNamespaces(Namespace namespace) { + Path nsPath = namespace.isEmpty() + ? new Path(warehouseLocation) + : new Path(warehouseLocation, SLASH.join(namespace.levels())); + if (!isNamespace(nsPath)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + + try { + // using the iterator listing allows for paged downloads + // from HDFS and prefetching from object storage. + List namespaces = new ArrayList<>(); + RemoteIterator it = fs.listStatusIterator(nsPath); + while (it.hasNext()) { + Path path = it.next().getPath(); + if (isNamespace(path)) { + namespaces.add(append(namespace, path.getName())); + } + } + return namespaces; + } catch (IOException ioe) { + throw new RuntimeIOException(ioe, "Failed to list namespace under: %s", namespace); + } + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath) && !namespace.isEmpty()) { + return ImmutableMap.of("location", nsPath.toString()); + } else { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + Path nsPath = new Path(this.warehouseLocation, SLASH.join(namespace.levels())); + if (this.isNamespace(nsPath) && !namespace.isEmpty()) { + try { + if (this.fs.listStatusIterator(nsPath).hasNext()) { + throw new NamespaceNotEmptyException("Namespace %s is not empty.", new Object[]{namespace}); + } else { + return this.fs.delete(nsPath, false); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Namespace delete failed: %s", new Object[]{namespace}); + } + } else { + return false; + } + } + + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!this.isValidIdentifier(identifier)) { + throw new NoSuchTableException("Invalid identifier: %s", identifier); + } else { + Path tablePath = new Path(this.defaultWarehouseLocation(identifier)); + TableOperations ops = this.newTableOps(identifier); + TableMetadata lastMetadata = ops.current(); + try { + if (lastMetadata == null) { + LOG.debug("Not an iceberg table: {}", identifier); + return false; + } else { + if (purge) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + return this.fs.delete(tablePath, true); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to delete file: %s", tablePath); + } + } + } + + private Namespace append(Namespace ns, String name) { + String[] levels = Arrays.copyOfRange(ns.levels(), 0, ns.levels().length + 1); + levels[ns.levels().length] = name; + return Namespace.of(levels); + } + + private boolean isNamespace(Path path) { + return isDirectory(path) && !isTableDir(path); + } + + @Override + public List listTables(Namespace namespace) { + Preconditions.checkArgument(namespace.levels().length >= 1, + "Missing database in table identifier: %s", namespace); + Path nsPath = new Path(this.warehouseLocation, Joiner.on("/").join(namespace.levels())); + Set tblIdents = Sets.newHashSet(); + + try { + if (!isDirectory(nsPath)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + RemoteIterator it = this.fs.listStatusIterator(nsPath); + while (it.hasNext()) { + FileStatus status = it.next(); + if (status.isDirectory()) { + Path path = status.getPath(); + if (isTableDir(path)) { + TableIdentifier tblIdent = TableIdentifier.of(namespace, path.getName()); + tblIdents.add(tblIdent); + } + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to list tables under: %s", namespace); + } + + return Lists.newArrayList(tblIdents); + } + + private boolean isTableDir(Path path) { + Path metadataPath = new Path(path, "metadata"); + PathFilter tableFilter = (filterPath) -> filterPath.getName().endsWith(".metadata.json"); + try { + return fs.listStatus(metadataPath, tableFilter).length >= 1; + } catch (FileNotFoundException f) { + return false; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private boolean isDirectory(Path path) { + try { + return fs.getFileStatus(path).isDirectory(); + } catch (FileNotFoundException e) { + return false; + } catch (IOException e) { + LOG.warn("Unable to list directory {}", path, e); + throw new UncheckedIOException(e); + } + } + + @Override + public void close() throws IOException { + this.closeableGroup.close(); + } + + @Override + public void setConf(Configuration configuration) { + super.setConf(configuration); + } + + @Override + public Configuration getConf() { + Configuration conf = super.getConf(); + if (conf == null) { + return new HdfsConfiguration(); + } + return conf; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index 01eee31d3b08d3..9309354beca441 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -18,6 +18,8 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; @@ -25,12 +27,16 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import java.io.IOException; import java.util.Map; public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { + private HadoopAuthenticator authenticator; + public IcebergHadoopExternalCatalog(long catalogId, String name, String resource, Map props, String comment) { super(catalogId, name, comment); @@ -49,10 +55,19 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource } } + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; + } + @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HADOOP; - HadoopCatalog hadoopCatalog = new HadoopCatalog(); + IcebergHadoopCatalog hadoopCatalog = new IcebergHadoopCatalog(); Configuration conf = getConfiguration(); initS3Param(conf); // initialize hadoop catalog @@ -63,4 +78,17 @@ protected void initCatalog() { hadoopCatalog.initialize(icebergCatalogType, catalogProperties); catalog = hadoopCatalog; } + + @Override + public Table loadTable(TableIdentifier of) { + Table tbl; + try { + tbl = getAuthenticator().doAs(() -> getCatalog().loadTable(of)); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 13bd9650978f81..214b9cdeeee3af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -23,7 +23,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.hive.HMSExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.TIcebergMetadataParams; @@ -41,6 +40,7 @@ import org.apache.iceberg.hive.HiveCatalog; import org.jetbrains.annotations.NotNull; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -111,21 +111,30 @@ private List loadSnapshots(IcebergMetadataCacheKey key) { @NotNull private Table loadTable(IcebergMetadataCacheKey key) { Catalog icebergCatalog; + Table icebergTable; if (key.catalog instanceof HMSExternalCatalog) { HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog; icebergCatalog = createIcebergHiveCatalog( - ctg.getHiveMetastoreUris(), - ctg.getCatalogProperty().getHadoopProperties(), - ctg.getProperties()); + ctg.getHiveMetastoreUris(), + ctg.getCatalogProperty().getHadoopProperties(), + ctg.getProperties()); + try { + icebergTable = ctg.getAuthenticator().doAs(() -> { + Table tbl = icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + IcebergExternalCatalog.initIcebergTableFileIO(tbl, key.catalog.getProperties()); + return tbl; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return icebergTable; } else if (key.catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); + IcebergExternalCatalog ctg = ((IcebergExternalCatalog) key.catalog); + icebergTable = ctg.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + return icebergTable; } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(), - () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); - initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); - return icebergTable; } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index edb83ac5bc8970..fba7f46ce0b469 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -156,7 +156,7 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); db.setUnInitialized(true); - return false; + return true; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f7280f5721f79d..3372d9033ea3d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -46,7 +46,6 @@ import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; @@ -69,6 +68,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -570,21 +570,24 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog * Get iceberg schema from catalog and convert them to doris schema */ public static List getSchema(ExternalCatalog catalog, String dbName, String name) { - return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { - org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); - Schema schema = icebergTable.schema(); - List columns = schema.columns(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), - IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, - schema.caseInsensitiveFindField(field.name()).fieldId())); - } - return tmpSchema; - }); + try { + return catalog.getAuthenticator().doAs(() -> { + Table icebergTable = getIcebergTable(catalog, dbName, name); + Schema schema = icebergTable.schema(); + List columns = schema.columns(); + List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } } - /** * Estimate iceberg table row count. * Get the row count by adding all task file recordCount. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java new file mode 100644 index 00000000000000..97912290d2f836 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +import java.io.IOException; + +public class IcebergHadoopFileIO extends HadoopFileIO { + + private FileSystem fs; + private Configuration hadoopConf; + + public IcebergHadoopFileIO(Configuration hadoopConf, FileSystem fs) { + this.hadoopConf = hadoopConf; + this.fs = fs; + } + + @Override + public InputFile newInputFile(String path) { + return new IcebergHadoopInputFile(this.fs, path, this.hadoopConf); + } + + @Override + public InputFile newInputFile(String path, long length) { + return new IcebergHadoopInputFile(this.fs, path, length, this.hadoopConf); + } + + @Override + public OutputFile newOutputFile(String path) { + return new IcebergHadoopOutputFile(this.fs, new Path(path), this.hadoopConf); + } + + @Override + public void deleteFile(String path) { + Path toDelete = new Path(path); + try { + fs.delete(toDelete, false); + } catch (IOException var5) { + IOException e = var5; + throw new RuntimeIOException(e, "Failed to delete file: %s", path); + } + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { + // TODO + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java new file mode 100644 index 00000000000000..cbfee48beb8f30 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.FileNotFoundException; +import java.io.IOException; + +public class IcebergHadoopInputFile implements InputFile { + + private String location; + private final FileSystem fs; + private final Path path; + private final Configuration conf; + private FileStatus stat = null; + private Long length; + + public IcebergHadoopInputFile(FileSystem fs, String location, Configuration conf) { + this(fs, location, null, conf); + } + + public IcebergHadoopInputFile(FileSystem fs, Path path, Configuration conf) { + this.fs = fs; + this.path = path; + this.conf = conf; + } + + public IcebergHadoopInputFile(FileSystem fs, String location, Long length, Configuration conf) { + this.fs = fs; + this.location = location; + this.path = new Path(location); + this.conf = conf; + this.length = length; + } + + @Override + public long getLength() { + return length; + } + + public SeekableInputStream newStream() { + try { + return new IcebergSeekableInputStream(openFile()); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "Failed to open input stream for file: %s", this.path); + } catch (IOException ex) { + throw new RuntimeIOException(ex, "Failed to open input stream for file: %s", this.path); + } + } + + private FSDataInputStream openFile() throws IOException { + return fs.open(path); + } + + @Override + public String location() { + return location; + } + + public boolean exists() { + try { + return this.lazyStat() != null; + } catch (NotFoundException e) { + return false; + } + } + + private FileStatus lazyStat() { + if (this.stat == null) { + try { + this.stat = this.fs.getFileStatus(this.path); + } catch (FileNotFoundException e) { + throw new NotFoundException(e, "File does not exist: %s", this.path); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get status for file: %s", this.path); + } + } + return this.stat; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java new file mode 100644 index 00000000000000..ca99f2b4590495 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopOutputFile.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; + +public class IcebergHadoopOutputFile implements OutputFile { + + private final FileSystem fs; + private final Path path; + private final Configuration conf; + + public IcebergHadoopOutputFile(FileSystem fs, Path path, Configuration hadoopConf) { + this.fs = fs; + this.path = path; + this.conf = hadoopConf; + } + + @Override + public PositionOutputStream create() { + try { + return new IcebergPositionOutputStream(this.fs.create(this.path, false)); + } catch (FileAlreadyExistsException e) { + throw new AlreadyExistsException(e, "Path already exists: %s", this.path); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create file: %s", this.path); + } + } + + @Override + public PositionOutputStream createOrOverwrite() { + try { + return new IcebergPositionOutputStream(this.fs.create(this.path, true)); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to create file: %s", this.path); + } + } + + @Override + public String location() { + return this.path.toString(); + } + + @Override + public InputFile toInputFile() { + return new IcebergHadoopInputFile(this.fs, this.path, this.conf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java new file mode 100644 index 00000000000000..187a1cdfcf42c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopTableOperations.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.LockManager; +import org.apache.iceberg.hadoop.HadoopTableOperations; +import org.apache.iceberg.io.FileIO; + +import java.io.IOException; + +public class IcebergHadoopTableOperations extends HadoopTableOperations { + private final DFSFileSystem fileSystem; + + public IcebergHadoopTableOperations(Path location, FileIO fileIO, Configuration conf, + LockManager lockManager, DFSFileSystem fileSystem) { + super(location, fileIO, conf, lockManager); + this.fileSystem = fileSystem; + } + + @Override + protected FileSystem getFileSystem(Path path, Configuration hadoopConf) { + try { + return fileSystem.rawFileSystem(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java new file mode 100644 index 00000000000000..7e18200fa80d11 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergPositionOutputStream.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.iceberg.io.DelegatingOutputStream; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class IcebergPositionOutputStream extends PositionOutputStream implements DelegatingOutputStream { + + private final FSDataOutputStream stream; + + IcebergPositionOutputStream(FSDataOutputStream stream) { + this.stream = stream; + } + + public OutputStream getDelegate() { + return this.stream; + } + + public long getPos() throws IOException { + return this.stream.getPos(); + } + + public void write(int b) throws IOException { + this.stream.write(b); + } + + public void write(byte[] b) throws IOException { + this.stream.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.stream.write(b, off, len); + } + + public void flush() throws IOException { + this.stream.flush(); + } + + public void close() throws IOException { + this.stream.close(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java new file mode 100644 index 00000000000000..501524fb497354 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergSeekableInputStream.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hadoop; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; + +public class IcebergSeekableInputStream extends SeekableInputStream { + private final FSDataInputStream stream; + + public IcebergSeekableInputStream(FSDataInputStream stream) { + this.stream = stream; + } + + @Override + public long getPos() throws IOException { + return stream.getPos(); + } + + @Override + public void seek(long position) throws IOException { + stream.seek(position); + } + + @Override + public int read() throws IOException { + return stream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return stream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return stream.read(b, off, len); + } + + @Override + public void close() throws IOException { + stream.close(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 2ca51298fe664b..b822304c153ae1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -31,7 +31,6 @@ import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; @@ -172,7 +171,11 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli @Override public List getSplits() throws UserException { - return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits); + try { + return source.getCatalog().getAuthenticator().doAs(this::doGetSplits); + } catch (IOException e) { + throw new UserException(e); + } } private List doGetSplits() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 59fbd73bda78cf..4018df68fa96e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; @@ -85,13 +86,7 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); try { - dfsFileSystem = authenticator.doAs(() -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + dfsFileSystem = authenticator.doAs(() -> FileSystem.get(new Path(remotePath).toUri(), conf)); } catch (Exception e) { throw new UserException(e); } @@ -394,7 +389,7 @@ public Status rename(String srcPath, String destPath) { if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system"); } - FileSystem fileSystem = nativeFileSystem(destPath); + FileSystem fileSystem = rawFileSystem(); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); @@ -481,4 +476,42 @@ public Status makeDir(String remotePath) { } return Status.OK; } + + public FileSystem rawFileSystem() throws IOException { + if (dfsFileSystem == null) { + synchronized (this) { + if (dfsFileSystem == null) { + Configuration conf = getHdfsConf(ifNotSetFallbackToSimpleAuth()); + for (Map.Entry propEntry : properties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + } + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + dfsFileSystem = authenticator.doAs(() -> FileSystem.get(conf)); + operations = new HDFSFileOperations(dfsFileSystem); + } + } + } + return authenticator.doAs(() -> dfsFileSystem); + } + + public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { + return authenticator.doAs(() -> rawFileSystem().listStatus(f, filter)); + } + + public RemoteIterator listStatusIterator(Path p) throws IOException { + return authenticator.doAs(() -> rawFileSystem().listStatusIterator(p)); + } + + public FileStatus getFileStatus(Path f) throws IOException { + return authenticator.doAs(() -> rawFileSystem().getFileStatus(f)); + } + + public boolean delete(Path p, boolean recursion) throws IOException { + return authenticator.doAs(() -> rawFileSystem().delete(p, recursion)); + } + + public boolean mkdirs(Path p) throws IOException { + return authenticator.doAs(() -> rawFileSystem().mkdirs(p)); + } } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy new file mode 100644 index 00000000000000..5ebd8c17c2d426 --- /dev/null +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { + String enabled = context.config.otherConfigs.get("enableKerberosTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // test iceberg hadoop catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hadoop', + "warehouse" = "hdfs://hadoop-master:9000/user/hive/warehouse", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "doris.krb5.debug" = "true" + ); + """ + + sql """ SWITCH test_krb_iceberg_ctl_hadoop; """ + sql """ CREATE DATABASE IF NOT EXISTS hadoop_test_krb_iceberg_db; """ + sql """ USE hadoop_test_krb_iceberg_db; """ + sql """ CREATE TABLE IF NOT EXISTS hadoop_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + + order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + + // cross catalog query test + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ + order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ + + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ + + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ + + sql """ DROP CATALOG test_krb_iceberg_ctl """ + sql """ DROP CATALOG other_test_krb_iceberg_ctl """ + sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ + + // // test iceberg hms catalog with kerberos + // sql """ + // CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.25:9083", + // "fs.defaultFS" = "hdfs://hadoop-master:9000", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + // "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", + // "doris.krb5.debug" = "true" + // ); + // """ + // + // sql """ + // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.26:9083", + // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", + // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", + // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + // DEFAULT", + // "doris.krb5.debug" = "true" + // ); + // """ + // + // sql """ SWITCH test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ + // sql """ USE `test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + // order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ + // + // sql """ SWITCH other_test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ + // sql """ USE `other_test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + } +} From f3c663c2ff703d462b6380bdfcff8109f3c5de56 Mon Sep 17 00:00:00 2001 From: slothever Date: Mon, 26 Aug 2024 23:59:25 +0800 Subject: [PATCH 2/3] fix --- .../hadoop/IcebergHadoopInputFile.java | 7 ++++-- .../kerberos/test_two_iceberg_kerberos.out | 9 ++++++++ .../kerberos/test_two_iceberg_kerberos.groovy | 22 +++++++++---------- 3 files changed, 25 insertions(+), 13 deletions(-) create mode 100644 regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java index cbfee48beb8f30..105d83d4084800 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopInputFile.java @@ -30,7 +30,7 @@ import java.io.FileNotFoundException; import java.io.IOException; -public class IcebergHadoopInputFile implements InputFile { +public class IcebergHadoopInputFile implements InputFile { private String location; private final FileSystem fs; @@ -59,7 +59,10 @@ public IcebergHadoopInputFile(FileSystem fs, String location, Long length, Confi @Override public long getLength() { - return length; + if (this.length == null) { + this.length = this.lazyStat().getLen(); + } + return this.length; } public SeekableInputStream newStream() { diff --git a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out new file mode 100644 index 00000000000000..fb4f79a0b73d4a --- /dev/null +++ b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !iceberg_q04 -- +1 2023-05-14 +2 2023-05-16 + +-- !iceberg_q06 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy index 5ebd8c17c2d426..579b3fcca25f88 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -43,25 +43,25 @@ suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,externa sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ - order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ - order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + // order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + // order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ // cross catalog query test - order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ - order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + // order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ - order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ + // order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ - sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ - sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ + // sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + // sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ - sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ - sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ + // sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + // sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ - sql """ DROP CATALOG test_krb_iceberg_ctl """ - sql """ DROP CATALOG other_test_krb_iceberg_ctl """ + // sql """ DROP CATALOG test_krb_iceberg_ctl """ + // sql """ DROP CATALOG other_test_krb_iceberg_ctl """ sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ // // test iceberg hms catalog with kerberos From 2117a8b6ce6dd56488949366cb6faa4fc6008f3f Mon Sep 17 00:00:00 2001 From: slothever Date: Mon, 2 Sep 2024 01:20:56 +0800 Subject: [PATCH 3/3] [Improvement](kerberos)refactor ugi login for iceberg hive catalog --- .../iceberg/HiveCompatibleCatalog.java | 181 ------- .../iceberg/IcebergHMSExternalCatalog.java | 4 +- .../iceberg/IcebergHadoopExternalCatalog.java | 1 + .../iceberg/IcebergMetadataOps.java | 15 +- .../datasource/iceberg/dlf/DLFCatalog.java | 118 ++++- .../{ => hadoop}/IcebergHadoopCatalog.java | 11 +- .../iceberg/hadoop/IcebergHadoopFileIO.java | 28 +- .../iceberg/hive/HCachedClientPool.java | 108 +++++ .../datasource/iceberg/hive/HClientPool.java | 84 ++++ .../iceberg/hive/HiveCompatibleCatalog.java | 89 ++++ .../iceberg/hive/IcebergHiveCatalog.java | 452 ++++++++++++++++++ .../IcebergHiveTableOperations.java} | 16 +- .../doris/fs/remote/dfs/DFSFileSystem.java | 10 +- .../kerberos/test_two_iceberg_kerberos.out | 14 + .../kerberos/test_two_iceberg_kerberos.groovy | 127 ++--- 15 files changed, 966 insertions(+), 292 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java rename fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/{ => hadoop}/IcebergHadoopCatalog.java (96%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java rename fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/{dlf/DLFTableOperations.java => hive/IcebergHiveTableOperations.java} (70%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java deleted file mode 100644 index 6431b02308b7a2..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java +++ /dev/null @@ -1,181 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.iceberg; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.iceberg.BaseMetastoreCatalog; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.ClientPool; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NamespaceNotEmptyException; -import org.apache.iceberg.exceptions.NoSuchNamespaceException; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.io.FileIO; -import shade.doris.hive.org.apache.thrift.TException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable { - - protected Configuration conf; - protected ClientPool clients; - protected FileIO fileIO; - protected String uid; - - public void initialize(String name, FileIO fileIO, - ClientPool clients) { - this.uid = name; - this.fileIO = fileIO; - this.clients = clients; - } - - protected FileIO initializeFileIO(Map properties, Configuration hadoopConf) { - String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); - if (fileIOImpl == null) { - /* when use the S3FileIO, we need some custom configurations, - * so HadoopFileIO is used in the superclass by default - * we can add better implementations to derived class just like the implementation in DLFCatalog. - */ - FileIO io = new HadoopFileIO(hadoopConf); - io.initialize(properties); - return io; - } else { - return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); - } - } - - @Override - protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { - return null; - } - - @Override - protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { - return tableIdentifier.namespace().levels().length == 1; - } - - protected boolean isValidNamespace(Namespace namespace) { - return namespace.levels().length != 1; - } - - @Override - public List listTables(Namespace namespace) { - if (isValidNamespace(namespace)) { - throw new NoSuchTableException("Invalid namespace: %s", namespace); - } - String dbName = namespace.level(0); - try { - return clients.run(client -> client.getAllTables(dbName)) - .stream() - .map(tbl -> TableIdentifier.of(dbName, tbl)) - .collect(Collectors.toList()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { - throw new UnsupportedOperationException( - "Cannot drop table " + tableIdentifier + " : dropTable is not supported"); - } - - @Override - public void renameTable(TableIdentifier sourceTbl, TableIdentifier targetTbl) { - throw new UnsupportedOperationException( - "Cannot rename table " + sourceTbl + " : renameTable is not supported"); - } - - @Override - public void createNamespace(Namespace namespace, Map props) { - throw new UnsupportedOperationException( - "Cannot create namespace " + namespace + " : createNamespace is not supported"); - } - - @Override - public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - if (isValidNamespace(namespace) && !namespace.isEmpty()) { - throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); - } - if (!namespace.isEmpty()) { - return new ArrayList<>(); - } - List namespaces = new ArrayList<>(); - List databases; - try { - databases = clients.run(client -> client.getAllDatabases()); - for (String database : databases) { - namespaces.add(Namespace.of(database)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return namespaces; - } - - @Override - public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { - if (isValidNamespace(namespace)) { - throw new NoSuchTableException("Invalid namespace: %s", namespace); - } - String dbName = namespace.level(0); - try { - return clients.run(client -> client.getDatabase(dbName)).getParameters(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { - throw new UnsupportedOperationException( - "Cannot drop namespace " + namespace + " : dropNamespace is not supported"); - } - - @Override - public boolean setProperties(Namespace namespace, Map props) throws NoSuchNamespaceException { - throw new UnsupportedOperationException( - "Cannot set namespace properties " + namespace + " : setProperties is not supported"); - } - - @Override - public boolean removeProperties(Namespace namespace, Set pNames) throws NoSuchNamespaceException { - throw new UnsupportedOperationException( - "Cannot remove properties " + namespace + " : removeProperties is not supported"); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 34e6f0c187e5ba..e04593b688bf12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -18,11 +18,11 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.iceberg.hive.IcebergHiveCatalog; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.hive.HiveCatalog; import java.util.Map; @@ -38,7 +38,7 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; - HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + IcebergHiveCatalog hiveCatalog = new IcebergHiveCatalog(); hiveCatalog.setConf(getConfiguration()); // initialize hive catalog Map catalogProperties = catalogProperty.getProperties(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index 9309354beca441..778438f9a80875 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopCatalog; import org.apache.doris.datasource.property.PropertyConverter; import com.google.common.base.Preconditions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index fba7f46ce0b469..36925563a366da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; @@ -42,6 +43,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -74,7 +76,16 @@ public void close() { @Override public boolean tableExist(String dbName, String tblName) { - return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + try { + return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + } catch (UndeclaredThrowableException e) { + // avoid to miss exception when get table reflect call + if (e.getCause() instanceof NoSuchObjectException) { + return false; + } else { + throw e; + } + } } public boolean databaseExist(String dbName) { @@ -83,7 +94,7 @@ public boolean databaseExist(String dbName) { public List listDatabaseNames() { return nsCatalog.listNamespaces().stream() - .map(e -> e.toString()) + .map(Namespace::toString) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java index e9c406715c1357..ddbf7ebeac5596 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java @@ -19,37 +19,36 @@ import org.apache.doris.common.credentials.CloudCredential; import org.apache.doris.common.util.S3Util; -import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog; import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool; +import org.apache.doris.datasource.iceberg.hive.HiveCompatibleCatalog; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.OssProperties; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.aliyun.oss.Constants; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class DLFCatalog extends HiveCompatibleCatalog { @Override public void initialize(String name, Map properties) { - super.initialize(name, initializeFileIO(properties, conf), new DLFCachedClientPool(this.conf, properties)); + super.initialize(name, initializeFileIO(properties), new DLFCachedClientPool(this.conf, properties)); } - @Override - protected TableOperations newTableOps(TableIdentifier tableIdentifier) { - String dbName = tableIdentifier.namespace().level(0); - String tableName = tableIdentifier.name(); - return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName); - } - - protected FileIO initializeFileIO(Map properties, Configuration hadoopConf) { + protected FileIO initializeFileIO(Map properties) { // read from converted properties or default by old s3 aws properties String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, properties.get(S3Properties.Env.ENDPOINT)); CloudCredential credential = new CloudCredential(); @@ -72,4 +71,99 @@ protected FileIO initializeFileIO(Map properties, Configuration io.initialize(properties); return io; } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + return null; + } + + protected boolean isValidNamespace(Namespace namespace) { + return namespace.levels().length != 1; + } + + @Override + public List listTables(Namespace namespace) { + if (isValidNamespace(namespace)) { + throw new NoSuchTableException("Invalid namespace: %s", namespace); + } + String dbName = namespace.level(0); + try { + return clients.run(client -> client.getAllTables(dbName)) + .stream() + .map(tbl -> TableIdentifier.of(dbName, tbl)) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) { + throw new UnsupportedOperationException( + "Cannot drop table " + tableIdentifier + " : dropTable is not supported"); + } + + @Override + public void renameTable(TableIdentifier sourceTbl, TableIdentifier targetTbl) { + throw new UnsupportedOperationException( + "Cannot rename table " + sourceTbl + " : renameTable is not supported"); + } + + @Override + public void createNamespace(Namespace namespace, Map props) { + throw new UnsupportedOperationException( + "Cannot create namespace " + namespace + " : createNamespace is not supported"); + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + if (isValidNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + if (!namespace.isEmpty()) { + return new ArrayList<>(); + } + List namespaces = new ArrayList<>(); + List databases; + try { + databases = clients.run(client -> client.getAllDatabases()); + for (String database : databases) { + namespaces.add(Namespace.of(database)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return namespaces; + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + if (isValidNamespace(namespace)) { + throw new NoSuchTableException("Invalid namespace: %s", namespace); + } + String dbName = namespace.level(0); + try { + return clients.run(client -> client.getDatabase(dbName)).getParameters(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + throw new UnsupportedOperationException( + "Cannot drop namespace " + namespace + " : dropNamespace is not supported"); + } + + @Override + public boolean setProperties(Namespace namespace, Map props) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot set namespace properties " + namespace + " : setProperties is not supported"); + } + + @Override + public boolean removeProperties(Namespace namespace, Set pNames) throws NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Cannot remove properties " + namespace + " : removeProperties is not supported"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java rename to fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java index c344ba84b0ebd3..856ed960888f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.iceberg; +package org.apache.doris.datasource.iceberg.hadoop; -import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; -import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopTableOperations; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import com.google.common.base.Joiner; @@ -92,12 +90,7 @@ protected FileIO initializeFileIO(Map properties, Configuration * so HadoopFileIO is used in the superclass by default * we can add better implementations to derived class just like the implementation in DLFCatalog. */ - FileIO io; - try { - io = new IcebergHadoopFileIO(hadoopConf, this.fs.rawFileSystem()); - } catch (IOException e) { - throw new RuntimeException(e); - } + FileIO io = new IcebergHadoopFileIO(hadoopConf, this.fs); io.initialize(properties); return io; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java index 97912290d2f836..fee0c4d6a1af92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java @@ -17,10 +17,11 @@ package org.apache.doris.datasource.iceberg.hadoop; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; @@ -30,27 +31,35 @@ public class IcebergHadoopFileIO extends HadoopFileIO { - private FileSystem fs; - private Configuration hadoopConf; + private final DFSFileSystem fs; + private final Configuration hadoopConf; - public IcebergHadoopFileIO(Configuration hadoopConf, FileSystem fs) { + public IcebergHadoopFileIO(Configuration hadoopConf, DFSFileSystem fs) { this.hadoopConf = hadoopConf; this.fs = fs; } @Override public InputFile newInputFile(String path) { - return new IcebergHadoopInputFile(this.fs, path, this.hadoopConf); + return new IcebergHadoopInputFile(getFs(), path, this.hadoopConf); + } + + private FileSystem getFs() { + try { + return this.fs.rawFileSystem(); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public InputFile newInputFile(String path, long length) { - return new IcebergHadoopInputFile(this.fs, path, length, this.hadoopConf); + return new IcebergHadoopInputFile(getFs(), path, length, this.hadoopConf); } @Override public OutputFile newOutputFile(String path) { - return new IcebergHadoopOutputFile(this.fs, new Path(path), this.hadoopConf); + return new IcebergHadoopOutputFile(getFs(), new Path(path), this.hadoopConf); } @Override @@ -58,9 +67,8 @@ public void deleteFile(String path) { Path toDelete = new Path(path); try { fs.delete(toDelete, false); - } catch (IOException var5) { - IOException e = var5; - throw new RuntimeIOException(e, "Failed to delete file: %s", path); + } catch (IOException e) { + throw new RuntimeException("Failed to delete file: " + path, e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java new file mode 100644 index 00000000000000..bfdb59cfc0402e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.util.PropertyUtil; +import shade.doris.hive.org.apache.thrift.TException; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class HCachedClientPool implements ClientPool { + + private static volatile Cache clientPoolCache; + private static final Object clientPoolCacheLock = new Object(); + private final String catalogName; + private final Configuration conf; + private final int clientPoolSize; + private final long evictionInterval; + private final HadoopAuthenticator authenticator; + + public HCachedClientPool(String catalogName, Configuration conf, Map properties) { + this.catalogName = catalogName; + this.conf = conf; + this.clientPoolSize = + PropertyUtil.propertyAsInt( + properties, + CatalogProperties.CLIENT_POOL_SIZE, + CatalogProperties.CLIENT_POOL_SIZE_DEFAULT); + this.evictionInterval = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT); + + if (clientPoolCache == null) { + synchronized (clientPoolCacheLock) { + if (clientPoolCache == null) { + clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener((key, value, cause) -> ((HClientPool) value).close()) + .build(); + } + } + } + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + } + + protected HClientPool clientPool() { + return clientPoolCache.get(this.catalogName, (k) -> new HClientPool(this.clientPoolSize, this.conf)); + } + + @Override + public R run(Action action) throws TException, InterruptedException { + try { + return authenticator.doAs(() -> clientPool().run(action)); + } catch (IOException e) { + throw new TException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TException) { + throw (TException) e.getCause(); + } + throw e; + } + } + + @Override + public R run(Action action, boolean retry) + throws TException, InterruptedException { + try { + return authenticator.doAs(() -> clientPool().run(action, retry)); + } catch (IOException e) { + throw new TException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TException) { + throw (TException) e.getCause(); + } + throw e; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java new file mode 100644 index 00000000000000..a9c7a8e7dd123a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.iceberg.ClientPoolImpl; +import org.apache.iceberg.hive.RuntimeMetaException; +import shade.doris.hive.org.apache.thrift.TException; +import shade.doris.hive.org.apache.thrift.transport.TTransportException; + +public class HClientPool extends ClientPoolImpl { + private final HiveConf hiveConf; + + public HClientPool(int poolSize, Configuration conf) { + super(poolSize, TTransportException.class, false); + this.hiveConf = new HiveConf(conf, org.apache.iceberg.hive.HiveClientPool.class); + this.hiveConf.addResource(conf); + } + + protected IMetaStoreClient newClient() { + try { + try { + return RetryingMetaStoreClient.getProxy(hiveConf, t -> null, HiveMetaStoreClient.class.getName()); + } catch (RuntimeException e) { + if (e.getCause() instanceof MetaException) { + throw (MetaException) e.getCause(); + } else { + throw e; + } + } + } catch (MetaException e) { + throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore"); + } catch (Throwable t) { + if (t.getMessage().contains("Another instance of Derby may have already booted")) { + throw new RuntimeMetaException(t, + "Embedded Derby supports only one client at a time." + + "To fix this, use a metastore that supports multiple clients."); + } else { + throw new RuntimeMetaException(t, "Failed to connect to Hive Metastore"); + } + } + } + + protected IMetaStoreClient reconnect(IMetaStoreClient client) { + try { + client.close(); + client.reconnect(); + return client; + } catch (MetaException var3) { + MetaException e = var3; + throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore", new Object[0]); + } + } + + protected boolean isConnectionException(Exception e) { + return super.isConnectionException(e) + || e instanceof MetaException + && e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"); + } + + protected void close(IMetaStoreClient client) { + client.close(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java new file mode 100644 index 00000000000000..6da5e34cbb4f29 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.io.FileIO; +import shade.doris.hive.org.apache.thrift.TException; + +import java.io.IOException; +import java.util.Map; + +public abstract class HiveCompatibleCatalog extends HiveCatalog { + + protected Configuration conf; + protected ClientPool clients; + protected FileIO fileIO; + protected String uid; + + public void initialize(String name, FileIO fileIO, + ClientPool clients) { + this.uid = name; + this.fileIO = fileIO; + this.clients = clients; + } + + protected FileIO initializeFileIO(Map properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + /* when use the S3FileIO, we need some custom configurations, + * so HadoopFileIO is used in the superclass by default + * we can add better implementations to derived class just like the implementation in DLFCatalog. + */ + FileIO io = new HadoopFileIO(getConf()); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, getConf()); + } + } + + public FileSystem getFileSystem() throws IOException { + return null; + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new IcebergHiveTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + if (conf == null) { + return new HdfsConfiguration(); + } + return conf; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java new file mode 100644 index 00000000000000..cda9c06bdc2c11 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java @@ -0,0 +1,452 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.hive.HiveHadoopUtil; +import org.apache.iceberg.hive.MetastoreUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.LocationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import shade.doris.hive.org.apache.thrift.TException; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class IcebergHiveCatalog extends HiveCompatibleCatalog { + private static final Logger LOG = LoggerFactory.getLogger(IcebergHiveCatalog.class); + private ClientPool clients; + private Configuration conf; + private String name; + private FileIO fileIO; + private boolean listAllTables = false; + + public void initialize(String name, Map properties) { + this.name = name; + if (this.conf == null) { + LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); + this.conf = getConf(); + } + if (properties.containsKey("uri")) { + this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get("uri")); + } + if (properties.containsKey("warehouse")) { + this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + LocationUtil.stripTrailingSlash(properties.get("warehouse"))); + } + + this.listAllTables = Boolean.parseBoolean(properties.getOrDefault("list-all-tables", "false")); + String fileIOImpl = properties.get("io-impl"); + DFSFileSystem fs = new DFSFileSystem(properties); + fileIO = fileIOImpl == null ? new IcebergHadoopFileIO(this.conf, fs) + : CatalogUtil.loadFileIO(fileIOImpl, properties, this.conf); + this.clients = new HCachedClientPool(name, this.conf, properties); + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new IcebergHiveTableOperations(this.conf, this.clients, this.fileIO, this.name(), dbName, tableName); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + try { + Database databaseData = this.clients.run((client) -> + client.getDatabase(tableIdentifier.namespace().levels()[0])); + if (databaseData.getLocationUri() != null) { + return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name()); + } + } catch (TException e) { + throw new RuntimeException(String.format("Metastore operation failed for %s", tableIdentifier), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during commit", e); + } + + String databaseLocation = this.databaseLocation(tableIdentifier.namespace().levels()[0]); + return String.format("%s/%s", databaseLocation, tableIdentifier.name()); + } + + private String databaseLocation(String databaseName) { + String warehouseLocation = this.conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + Preconditions.checkNotNull(warehouseLocation, + "Warehouse location is not set: hive.metastore.warehouse.dir=null"); + warehouseLocation = LocationUtil.stripTrailingSlash(warehouseLocation); + return String.format("%s/%s.db", warehouseLocation, databaseName); + } + + @Override + public void createNamespace(Namespace namespace, Map meta) { + Preconditions.checkArgument(!namespace.isEmpty(), + "Cannot create namespace with invalid name: %s", namespace); + Preconditions.checkArgument(this.isValidateNamespace(namespace), + "Cannot support multi part namespace in Hive Metastore: %s", namespace); + Preconditions.checkArgument(meta.get("hive.metastore.database.owner-type") == null + || meta.get("hive.metastore.database.owner") != null, + "Create namespace setting %s without setting %s is not allowed", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + try { + this.clients.run((client) -> { + client.createDatabase(convertToDatabase(namespace, meta)); + return null; + }); + LOG.info("Created namespace: {}", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to createDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + + Database convertToDatabase(Namespace namespace, Map meta) { + if (!this.isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else { + Database database = new Database(); + Map parameter = Maps.newHashMap(); + database.setName(namespace.level(0)); + database.setLocationUri(this.databaseLocation(namespace.level(0))); + meta.forEach((key, value) -> { + if (key.equals("comment")) { + database.setDescription(value); + } else if (key.equals("location")) { + database.setLocationUri(value); + } else if (key.equals("hive.metastore.database.owner")) { + database.setOwnerName(value); + } else if (key.equals("hive.metastore.database.owner-type") && value != null) { + database.setOwnerType(PrincipalType.valueOf(value)); + } else if (value != null) { + parameter.put(key, value); + } + }); + if (database.getOwnerName() == null) { + database.setOwnerName(HiveHadoopUtil.currentUser()); + database.setOwnerType(PrincipalType.USER); + } + database.setParameters(parameter); + return database; + } + } + + @Override + public List listNamespaces(Namespace namespace) { + if (!this.isValidateNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else if (!namespace.isEmpty()) { + return ImmutableList.of(); + } else { + try { + List namespaces = this.clients.run(IMetaStoreClient::getAllDatabases).stream() + .map(Namespace::of).collect(Collectors.toList()); + LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces); + return namespaces; + } catch (TException e) { + throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getAllDatabases() " + + namespace + " in Hive Metastore", e); + } + } + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + if (!this.isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else { + try { + Database database = this.clients.run((client) -> { + try { + return client.getDatabase(namespace.level(0)); + } catch (NoSuchObjectException e) { + return null; + } + }); + if (database == null) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + Map metadata = this.convertToMetadata(database); + LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet()); + return metadata; + } catch (UnknownDBException | NoSuchObjectException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list namespace under namespace: " + + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + } + + private Map convertToMetadata(Database database) { + Map meta = Maps.newHashMap(); + meta.putAll(database.getParameters()); + meta.put("location", database.getLocationUri()); + if (database.getDescription() != null) { + meta.put("comment", database.getDescription()); + } + if (database.getOwnerName() != null) { + meta.put("hive.metastore.database.owner", database.getOwnerName()); + if (database.getOwnerType() != null) { + meta.put("hive.metastore.database.owner-type", database.getOwnerType().name()); + } + } + return meta; + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + if (!this.isValidateNamespace(namespace)) { + return false; + } else { + try { + this.clients.run((client) -> { + client.dropDatabase(namespace.level(0), false, false, false); + return null; + }); + LOG.info("Dropped namespace: {}", namespace); + return true; + } catch (InvalidOperationException e) { + throw new NamespaceNotEmptyException(e, + "Namespace %s is not empty. One or more tables exist.", namespace); + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to drop dropDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + } + + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!this.isValidIdentifier(identifier)) { + return false; + } else { + String database = identifier.namespace().level(0); + TableOperations ops = this.newTableOps(identifier); + TableMetadata lastMetadata = null; + if (purge) { + try { + lastMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn("Failed to load table metadata for table: {}, continuing drop without purge", + identifier, e); + } + } + try { + this.clients.run((client) -> { + client.dropTable(database, identifier.name(), false, false); + return null; + }); + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + LOG.info("Dropped table: {}", identifier); + return true; + } catch (NoSuchObjectException | NoSuchTableException e) { + LOG.info("Skipping drop, table does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropTable", e); + } + } + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier originalTo) { + if (!this.isValidIdentifier(from)) { + throw new NoSuchTableException("Invalid identifier: %s", from); + } else { + TableIdentifier to = this.removeCatalogName(originalTo); + Preconditions.checkArgument(this.isValidIdentifier(to), "Invalid identifier: %s", to); + String toDatabase = to.namespace().level(0); + String fromDatabase = from.namespace().level(0); + String fromName = from.name(); + + try { + Table table = this.clients.run((client) -> client.getTable(fromDatabase, fromName)); + validateTableIsIceberg(table, fullTableName(this.name, from)); + table.setDbName(toDatabase); + table.setTableName(to.name()); + this.clients.run((client) -> { + MetastoreUtil.alterTable(client, fromDatabase, fromName, table); + return null; + }); + LOG.info("Renamed table from {}, to {}", from, to); + } catch (NoSuchObjectException e) { + throw new NoSuchTableException("Table does not exist: %s", from); + } catch (AlreadyExistsException e) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", to); + } catch (TException e) { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to rename", e); + } + } + } + + private TableIdentifier removeCatalogName(TableIdentifier to) { + if (this.isValidIdentifier(to)) { + return to; + } else { + return to.namespace().levels().length == 2 + && this.name().equalsIgnoreCase(to.namespace().level(0)) + ? TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name()) : to; + } + } + + static void validateTableIsIceberg(Table table, String fullName) { + String tableType = table.getParameters().get("table_type"); + NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase("iceberg"), + "Not an iceberg table: %s (type=%s)", fullName, tableType); + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + Preconditions.checkArgument(properties.get("hive.metastore.database.owner-type") == null + && properties.get("hive.metastore.database.owner") == null, + "Setting %s and %s has to be performed together or not at all", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + Map parameter = Maps.newHashMap(); + parameter.putAll(this.loadNamespaceMetadata(namespace)); + parameter.putAll(properties); + Database database = this.convertToDatabase(namespace, parameter); + this.alterHiveDataBase(namespace, database); + LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace); + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + Preconditions.checkArgument(properties.contains("hive.metastore.database.owner-type") + == properties.contains("hive.metastore.database.owner"), + "Removing %s and %s has to be performed together or not at all", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + Map parameter = Maps.newHashMap(); + parameter.putAll(this.loadNamespaceMetadata(namespace)); + properties.forEach((key) -> parameter.put(key, null)); + Database database = this.convertToDatabase(namespace, parameter); + this.alterHiveDataBase(namespace, database); + LOG.debug("Successfully removed properties {} from {}", properties, namespace); + return true; + } + + private void alterHiveDataBase(Namespace namespace, Database database) { + try { + this.clients.run((client) -> { + client.alterDatabase(namespace.level(0), database); + return null; + }); + } catch (UnknownDBException | NoSuchObjectException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list namespace under namespace: " + + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + + @Override + public List listTables(Namespace namespace) { + Preconditions.checkArgument(this.isValidateNamespace(namespace), + "Missing database in namespace: %s", namespace); + String database = namespace.level(0); + try { + List tableNames = this.clients.run((client) -> client.getAllTables(database)); + List tableIdentifiers; + if (this.listAllTables) { + tableIdentifiers = tableNames.stream() + .map((t) -> TableIdentifier.of(namespace, t)) + .collect(Collectors.toList()); + } else { + List tableObjects = this.clients.run((client) -> + client.getTableObjectsByName(database, tableNames)); + tableIdentifiers = tableObjects.stream() + .filter((table) -> + table.getParameters() != null + && "iceberg".equalsIgnoreCase(table.getParameters().get("table_type"))) + .map((table) -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers); + return tableIdentifiers; + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list all tables under namespace " + namespace, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables", e); + } + } + + private boolean isValidateNamespace(Namespace namespace) { + return namespace.levels().length == 1; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java similarity index 70% rename from fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java rename to fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java index 2aab8e754ca2ea..ea444e7870be87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFTableOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveTableOperations.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.iceberg.dlf; +package org.apache.doris.datasource.iceberg.hive; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -24,14 +24,14 @@ import org.apache.iceberg.io.FileIO; import shade.doris.hive.org.apache.thrift.TException; -public class DLFTableOperations extends HiveTableOperations { +public class IcebergHiveTableOperations extends HiveTableOperations { - public DLFTableOperations(Configuration conf, - ClientPool metaClients, - FileIO fileIO, - String catalogName, - String database, - String table) { + public IcebergHiveTableOperations(Configuration conf, + ClientPool metaClients, + FileIO fileIO, + String catalogName, + String database, + String table) { super(conf, metaClients, fileIO, catalogName, database, table); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 4018df68fa96e7..aca072eb66c47a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -496,22 +496,22 @@ public FileSystem rawFileSystem() throws IOException { } public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { - return authenticator.doAs(() -> rawFileSystem().listStatus(f, filter)); + return rawFileSystem().listStatus(f, filter); } public RemoteIterator listStatusIterator(Path p) throws IOException { - return authenticator.doAs(() -> rawFileSystem().listStatusIterator(p)); + return rawFileSystem().listStatusIterator(p); } public FileStatus getFileStatus(Path f) throws IOException { - return authenticator.doAs(() -> rawFileSystem().getFileStatus(f)); + return rawFileSystem().getFileStatus(f); } public boolean delete(Path p, boolean recursion) throws IOException { - return authenticator.doAs(() -> rawFileSystem().delete(p, recursion)); + return rawFileSystem().delete(p, recursion); } public boolean mkdirs(Path p) throws IOException { - return authenticator.doAs(() -> rawFileSystem().mkdirs(p)); + return rawFileSystem().mkdirs(p); } } diff --git a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out index fb4f79a0b73d4a..0634bf11cd62b2 100644 --- a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out +++ b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out @@ -1,8 +1,22 @@ -- This file is automatically generated. You should know what you did if you want to edit this +-- !iceberg_q01 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 + +-- !iceberg_q02 -- +2 2023-05-16 +3 2023-05-17 + -- !iceberg_q04 -- 1 2023-05-14 2 2023-05-16 +-- !iceberg_q05 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 + -- !iceberg_q06 -- 1 krb1 2023-05-14 2 krb2 2023-05-16 diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy index 579b3fcca25f88..4e58dd323590c8 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -20,6 +20,46 @@ import org.junit.Assert; suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { + // test iceberg hms catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + "hive.metastore.uris" = "thrift://172.31.71.25:9083", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.min.seconds.before.relogin" = "5", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "hive.metastore.sasl.enabled" = "true", + "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", + "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", + "doris.krb5.debug" = "true" + ); + """ + + // sql """ + // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.26:9083", + // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", + // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", + // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + // DEFAULT", + // "doris.krb5.debug" = "true" + // ); + // """ + // test iceberg hadoop catalog with kerberos sql """ CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop @@ -43,82 +83,43 @@ suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,externa sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ - // order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + sql """ SWITCH test_krb_iceberg_ctl; """ + sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ + sql """ USE `test_krb_iceberg_db`; """ + sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ + + // sql """ SWITCH other_test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ + // sql """ USE `other_test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ +// + order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ // order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ // cross catalog query test - order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ - // order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + sql """ SWITCH test_krb_iceberg_ctl_hadoop; """ + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ // order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ - // sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ // sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ - // sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ // sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ - // sql """ DROP CATALOG test_krb_iceberg_ctl """ + sql """ DROP CATALOG test_krb_iceberg_ctl """ // sql """ DROP CATALOG other_test_krb_iceberg_ctl """ sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ - - // // test iceberg hms catalog with kerberos - // sql """ - // CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl - // PROPERTIES ( - // 'type'='iceberg', - // 'iceberg.catalog.type'='hms', - // "hive.metastore.uris" = "thrift://172.31.71.25:9083", - // "fs.defaultFS" = "hdfs://hadoop-master:9000", - // "hadoop.security.authentication" = "kerberos", - // "hadoop.kerberos.min.seconds.before.relogin" = "5", - // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", - // "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", - // "hive.metastore.sasl.enabled" = "true", - // "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", - // "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", - // "doris.krb5.debug" = "true" - // ); - // """ - // - // sql """ - // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl - // PROPERTIES ( - // 'type'='iceberg', - // 'iceberg.catalog.type'='hms', - // "hive.metastore.uris" = "thrift://172.31.71.26:9083", - // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", - // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", - // "hadoop.security.authentication" = "kerberos", - // "hadoop.kerberos.min.seconds.before.relogin" = "5", - // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", - // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", - // "hive.metastore.sasl.enabled" = "true", - // "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", - // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// - // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// - // DEFAULT", - // "doris.krb5.debug" = "true" - // ); - // """ - // - // sql """ SWITCH test_krb_iceberg_ctl; """ - // sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ - // sql """ USE `test_krb_iceberg_db`; """ - // sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ - // order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ - // - // sql """ SWITCH other_test_krb_iceberg_ctl; """ - // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ - // sql """ USE `other_test_krb_iceberg_db`; """ - // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ } }