diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java new file mode 100644 index 00000000000000..2e6c1602f0f4fe --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/Authenticator.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public interface Authenticator { + + T doAs(PrivilegedExceptionAction action) throws IOException; + + default void doAsNoReturn(Runnable action) throws IOException { + doAs(() -> { + action.run(); + return null; + }); + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index c3cab5f410be3a..dce3eb8e7b509f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -public interface HadoopAuthenticator { +public interface HadoopAuthenticator extends Authenticator { UserGroupInformation getUGI() throws IOException; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java index d04d772728bc55..27facf03c999d3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -40,7 +40,7 @@ private static UserGroupInformation loginWithUGI(AuthenticationConfig config) { } if (config instanceof KerberosAuthenticationConfig) { try { - // TODO: remove after iceberg and hudi kerberos test case pass + // TODO: remove after hudi kerberos test case pass try { // login hadoop with keytab and try checking TGT UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java index d202417afc8e33..0c3d8bd79836be 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/SimpleAuthenticationConfig.java @@ -17,10 +17,12 @@ package org.apache.doris.common.security.authentication; +import com.google.gson.annotations.SerializedName; import lombok.Data; @Data public class SimpleAuthenticationConfig extends AuthenticationConfig { + @SerializedName(value = "username") private String username; @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 4ba5a2ebd96281..ba6777f4a4ceaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -37,6 +37,7 @@ import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.security.authentication.Authenticator; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; @@ -78,6 +79,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -158,6 +160,23 @@ public Configuration getConfiguration() { return conf; } + /** + * get authenticator for catalog + * return a dummy authenticator by default + */ + public synchronized Authenticator getAuthenticator() { + return new Authenticator() { + @Override + public T doAs(PrivilegedExceptionAction action) throws IOException { + try { + return action.run(); + } catch (Exception e) { + throw new IOException(e); + } + } + }; + } + /** * set some default properties when creating catalog * @return list of database names in this catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index a22eacaf1e4fc1..f450b3d398d127 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -42,7 +42,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -70,7 +69,7 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; - @Getter + private HadoopAuthenticator authenticator; @VisibleForTesting @@ -86,8 +85,15 @@ public HMSExternalCatalog(long catalogId, String name, String resource, Map T ugiDoAs(long catalogId, PrivilegedExceptionAction action) { - return ugiDoAs(((ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId)).getConfiguration(), - action); - } - public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { // if hive config is not ready, then use hadoop kerberos to login AuthenticationConfig krbConfig = AuthenticationConfig.getKerberosConfig(conf, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6f79afd5de5d7f..ee521359bb44a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -26,8 +26,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,7 +63,7 @@ protected void initLocalObjectsImpl() { public Catalog getCatalog() { makeSureInitialized(); - return ((IcebergMetadataOps) metadataOps).getCatalog(); + return catalog; } public String getIcebergCatalogType() { @@ -83,4 +87,25 @@ protected void initS3Param(Configuration conf) { Map properties = catalogProperty.getHadoopProperties(); conf.set(Constants.AWS_CREDENTIALS_PROVIDER, PropertyConverter.getAWSCredentialsProviders(properties)); } + + public Table loadTable(TableIdentifier of) { + Table tbl = getCatalog().loadTable(of); + Map extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } + + public static void initIcebergTableFileIO(Table table, Map props) { + Map ioConf = new HashMap<>(); + table.properties().forEach((key, value) -> { + if (key.startsWith("io.")) { + ioConf.put(key, value); + } + }); + + // This `initialize` method will directly override the properties as a whole, + // so we need to merge the table's io-related properties with the doris's catalog-related properties + props.putAll(ioConf); + table.io().initialize(props); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 34e6f0c187e5ba..2fc3e935dccfda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,17 +17,25 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; +import java.io.IOException; import java.util.Map; public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { + private HadoopAuthenticator authenticator; + public IcebergHMSExternalCatalog(long catalogId, String name, String resource, Map props, String comment) { super(catalogId, name, comment); @@ -35,6 +43,15 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M catalogProperty = new CatalogProperty(resource, props); } + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; + } + @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; @@ -44,8 +61,32 @@ protected void initCatalog() { Map catalogProperties = catalogProperty.getProperties(); String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); catalogProperties.put(CatalogProperties.URI, metastoreUris); - hiveCatalog.initialize(icebergCatalogType, catalogProperties); + try { + getAuthenticator().doAsNoReturn(() -> hiveCatalog.initialize(icebergCatalogType, catalogProperties)); + } catch (IOException e) { + throw new AnalysisException(e.getMessage(), e); + } catalog = hiveCatalog; } + + public Table loadTable(TableIdentifier of) { + // // todo + // HiveOperations operations = new HiveOperations( + // FileSystemFactory.get()), + // catalog.getMetastore(), + // database, + // table, + // location) + // return new BaseTable(operations, of.toString()); + Table tbl; + try { + tbl = getAuthenticator().doAs(() -> getCatalog().loadTable(of)); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index 01eee31d3b08d3..41e5080e3918bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -18,19 +18,27 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.nereids.exceptions.AnalysisException; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import java.io.IOException; import java.util.Map; public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { + private HadoopAuthenticator authenticator; + public IcebergHadoopExternalCatalog(long catalogId, String name, String resource, Map props, String comment) { super(catalogId, name, comment); @@ -49,6 +57,15 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource } } + @Override + public synchronized HadoopAuthenticator getAuthenticator() { + if (authenticator == null) { + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); + } + return authenticator; + } + @Override protected void initCatalog() { icebergCatalogType = ICEBERG_HADOOP; @@ -60,7 +77,32 @@ protected void initCatalog() { String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION); hadoopCatalog.setConf(conf); catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); - hadoopCatalog.initialize(icebergCatalogType, catalogProperties); + try { + getAuthenticator().doAsNoReturn(() -> hadoopCatalog.initialize(icebergCatalogType, catalogProperties)); + } catch (IOException e) { + throw new AnalysisException(e.getMessage(), e); + } catalog = hadoopCatalog; } + + @Override + public Table loadTable(TableIdentifier of) { + // todo + // FileOperations operations = new FileOperations( + // FileSystemFactory.get()), + // catalog.getMetastore(), + // database, + // table, + // location); + // return new BaseTable(operations, of.toString()); + Table tbl; + try { + tbl = getAuthenticator().doAs(() -> getCatalog().loadTable(of)); + } catch (IOException e) { + throw new RuntimeException(e); + } + Map extProps = getProperties(); + initIcebergTableFileIO(tbl, extProps); + return tbl; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index dc11a6cacc24cf..0e561025944949 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -21,9 +21,8 @@ import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; -import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.thrift.TIcebergMetadataParams; @@ -41,6 +40,7 @@ import org.apache.iceberg.hive.HiveCatalog; import org.jetbrains.annotations.NotNull; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,7 +60,7 @@ public IcebergMetadataCache(ExecutorService executor) { Config.max_hive_table_cache_num, false, null); - this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), null, executor); + this.snapshotListCache = snapshotListCacheFactory.buildCache(this::loadSnapshots, null, executor); CacheFactory tableCacheFactory = new CacheFactory( OptionalLong.of(86400L), @@ -68,11 +68,11 @@ public IcebergMetadataCache(ExecutorService executor) { Config.max_hive_table_cache_num, false, null); - this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), null, executor); + this.tableCache = tableCacheFactory.buildCache(this::loadTable, null, executor); } public List getSnapshotList(TIcebergMetadataParams params) throws UserException { - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog()); + ExternalCatalog catalog = (ExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog()); if (catalog == null) { throw new UserException("The specified catalog does not exist:" + params.getCatalog()); } @@ -81,12 +81,12 @@ public List getSnapshotList(TIcebergMetadataParams params) throws User return snapshotListCache.get(key); } - public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { + public Table getIcebergTable(ExternalCatalog catalog, String dbName, String tbName) { IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); return tableCache.get(key); } - public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { + public Table getAndCloneTable(ExternalCatalog catalog, String dbName, String tbName) { Table restTable; synchronized (this) { Table table = getIcebergTable(catalog, dbName, tbName); @@ -95,7 +95,7 @@ public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { return restTable; } - public Table getRemoteTable(CatalogIf catalog, String dbName, String tbName) { + public Table getRemoteTable(ExternalCatalog catalog, String dbName, String tbName) { IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); return loadTable(key); } @@ -111,21 +111,30 @@ private List loadSnapshots(IcebergMetadataCacheKey key) { @NotNull private Table loadTable(IcebergMetadataCacheKey key) { Catalog icebergCatalog; + Table icebergTable; if (key.catalog instanceof HMSExternalCatalog) { HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog; icebergCatalog = createIcebergHiveCatalog( ctg.getHiveMetastoreUris(), ctg.getCatalogProperty().getHadoopProperties(), ctg.getProperties()); + try { + icebergTable = ctg.getAuthenticator().doAs(() -> { + Table tbl = icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + IcebergExternalCatalog.initIcebergTableFileIO(tbl, key.catalog.getProperties()); + return tbl; + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return icebergTable; } else if (key.catalog instanceof IcebergExternalCatalog) { - icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog(); + IcebergExternalCatalog ctg = ((IcebergExternalCatalog) key.catalog); + icebergTable = ctg.loadTable(TableIdentifier.of(key.dbName, key.tableName)); + return icebergTable; } else { throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(), - () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName))); - initIcebergTableFileIO(icebergTable, key.catalog.getProperties()); - return icebergTable; } public void invalidateCatalogCache(long catalogId) { @@ -198,32 +207,18 @@ private Catalog createIcebergHiveCatalog(String uri, Map hdfsCon return hiveCatalog; } - private static void initIcebergTableFileIO(Table table, Map props) { - Map ioConf = new HashMap<>(); - table.properties().forEach((key, value) -> { - if (key.startsWith("io.")) { - ioConf.put(key, value); - } - }); - - // This `initialize` method will directly override the properties as a whole, - // so we need to merge the table's io-related properties with the doris's catalog-related properties - props.putAll(ioConf); - table.io().initialize(props); - } - static class IcebergMetadataCacheKey { - CatalogIf catalog; + ExternalCatalog catalog; String dbName; String tableName; - public IcebergMetadataCacheKey(CatalogIf catalog, String dbName, String tableName) { + public IcebergMetadataCacheKey(ExternalCatalog catalog, String dbName, String tableName) { this.catalog = catalog; this.dbName = dbName; this.tableName = tableName; } - static IcebergMetadataCacheKey of(CatalogIf catalog, String dbName, String tableName) { + static IcebergMetadataCacheKey of(ExternalCatalog catalog, String dbName, String tableName) { return new IcebergMetadataCacheKey(catalog, dbName, tableName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 3236367afc003e..d9fc6ed7da37b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -42,6 +42,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -74,23 +76,38 @@ public void close() { @Override public boolean tableExist(String dbName, String tblName) { - return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + return doAs(() -> catalog.tableExists(TableIdentifier.of(dbName, tblName))); } public boolean databaseExist(String dbName) { - return nsCatalog.namespaceExists(Namespace.of(dbName)); + return doAs(() -> nsCatalog.namespaceExists(Namespace.of(dbName))); } public List listDatabaseNames() { - return nsCatalog.listNamespaces().stream() - .map(e -> e.toString()) - .collect(Collectors.toList()); + return doAs(() -> nsCatalog.listNamespaces().stream() + .map(Namespace::toString) + .collect(Collectors.toList())); + } + + public void doAsNoReturn(Runnable action) { + try { + dorisCatalog.getAuthenticator().doAsNoReturn(action); + } catch (IOException e) { + throw new RuntimeException(e); + } } + public T doAs(PrivilegedExceptionAction action) { + try { + return dorisCatalog.getAuthenticator().doAs(action); + } catch (IOException e) { + throw new RuntimeException(e); + } + } @Override public List listTableNames(String dbName) { - List tableIdentifiers = catalog.listTables(Namespace.of(dbName)); + List tableIdentifiers = doAs(() -> catalog.listTables(Namespace.of(dbName))); return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); } @@ -107,7 +124,7 @@ public void createDb(CreateDbStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName); } } - nsCatalog.createNamespace(Namespace.of(dbName), properties); + doAsNoReturn(() -> nsCatalog.createNamespace(Namespace.of(dbName), properties)); dorisCatalog.onRefresh(true); } @@ -123,7 +140,7 @@ public void dropDb(DropDbStmt stmt) throws DdlException { } } SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; - nsCatalog.dropNamespace(Namespace.of(dbName)); + doAsNoReturn(() -> nsCatalog.dropNamespace(Namespace.of(dbName))); dorisCatalog.onRefresh(true); } @@ -154,7 +171,8 @@ public boolean createTable(CreateTableStmt stmt) throws UserException { Map properties = stmt.getProperties(); properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE); PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); - catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); + doAsNoReturn(() -> catalog.createTable(TableIdentifier.of(dbName, tableName), + schema, partitionSpec, properties)); db.setUnInitialized(true); return false; } @@ -175,7 +193,7 @@ public void dropTable(DropTableStmt stmt) throws DdlException { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dbName); } } - catalog.dropTable(TableIdentifier.of(dbName, tableName)); + doAsNoReturn(() -> catalog.dropTable(TableIdentifier.of(dbName, tableName))); db.setUnInitialized(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index a3a978ccd7a16e..33561c06842304 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -70,7 +70,7 @@ public void updateIcebergCommitData(List commitDataList) { public void beginInsert(SimpleTableInfo tableInfo) { this.tableInfo = tableInfo; this.table = getNativeTable(tableInfo); - this.transaction = table.newTransaction(); + this.transaction = ops.doAs(() -> table.newTransaction()); } public void finishInsert(SimpleTableInfo tableInfo, Optional insertCtx) { @@ -148,7 +148,7 @@ private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List pendingResults) { // To be compatible with iceberg format V1. AppendFiles appendFiles = table.newAppend(); @@ -183,7 +182,6 @@ private void commitAppendTxn(Table table, List pendingResults) { "Should have no referenced data files for append."); Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } - appendFiles.commit(); + ops.doAsNoReturn(appendFiles::commit); } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 512e6a3ee93087..78f8ca67f8efee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -46,7 +46,7 @@ import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; @@ -69,6 +69,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -570,18 +571,22 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog * Get iceberg schema from catalog and convert them to doris schema */ public static List getSchema(ExternalCatalog catalog, String dbName, String name) { - return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { - org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); - Schema schema = icebergTable.schema(); - List columns = schema.columns(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), - IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, - schema.caseInsensitiveFindField(field.name()).fieldId())); - } - return tmpSchema; - }); + try { + return catalog.getAuthenticator().doAs(() -> { + Table icebergTable = getIcebergTable(catalog, dbName, name); + Schema schema = icebergTable.schema(); + List columns = schema.columns(); + List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + }); + } catch (IOException e) { + throw new AnalysisException(e.getMessage(), e); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java index 56ff188f964fe9..b66ac6e2c32d19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java @@ -21,17 +21,12 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; -import org.apache.doris.planner.ColumnRange; -import org.apache.doris.thrift.TFileAttributes; import org.apache.iceberg.Table; -import java.util.Map; - /** * Get metadata from iceberg api (all iceberg table like hive, rest, glue...) */ @@ -42,8 +37,7 @@ public class IcebergApiSource implements IcebergSource { private final TupleDescriptor desc; - public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc, - Map columnNameToRange) { + public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc) { this.icebergExtTable = table; this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable( @@ -74,11 +68,6 @@ public TableIf getTargetTable() { return icebergExtTable; } - @Override - public TFileAttributes getFileAttributes() throws UserException { - return new TFileAttributes(); - } - @Override public ExternalCatalog getCatalog() { return icebergExtTable.getCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 5e9860171d0fbe..71440403e641a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -22,29 +22,19 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.iceberg.IcebergUtils; -import org.apache.doris.planner.ColumnRange; -import org.apache.doris.thrift.TFileAttributes; -import org.apache.doris.thrift.TFileTextScanRangeParams; - -import java.util.Map; public class IcebergHMSSource implements IcebergSource { private final HMSExternalTable hmsTable; private final TupleDescriptor desc; - private final Map columnNameToRange; private final org.apache.iceberg.Table icebergTable; - public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, - Map columnNameToRange) { + public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc) { this.hmsTable = hmsTable; this.desc = desc; - this.columnNameToRange = columnNameToRange; this.icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() .getIcebergTable(hmsTable.getCatalog(), @@ -70,18 +60,6 @@ public TableIf getTargetTable() { return hmsTable; } - @Override - public TFileAttributes getFileAttributes() throws UserException { - TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); - textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters() - .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, HiveScanNode.DEFAULT_FIELD_DELIMITER)); - textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER); - TFileAttributes fileAttributes = new TFileAttributes(); - fileAttributes.setTextParams(textParams); - fileAttributes.setHeaderType(""); - return fileAttributes; - } - @Override public ExternalCatalog getCatalog() { return hmsTable.getCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index b85e93680bff03..16307d5f59d8c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -32,7 +32,6 @@ import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HMSExternalTable; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; @@ -104,7 +103,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol ExternalTable table = (ExternalTable) desc.getTable(); if (table instanceof HMSExternalTable) { - source = new IcebergHMSSource((HMSExternalTable) table, desc, columnNameToRange); + source = new IcebergHMSSource((HMSExternalTable) table, desc); } else if (table instanceof IcebergExternalTable) { String catalogType = ((IcebergExternalTable) table).getIcebergCatalogType(); switch (catalogType) { @@ -113,7 +112,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol case IcebergExternalCatalog.ICEBERG_DLF: case IcebergExternalCatalog.ICEBERG_GLUE: case IcebergExternalCatalog.ICEBERG_HADOOP: - source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); + source = new IcebergApiSource((IcebergExternalTable) table, desc); break; default: Preconditions.checkState(false, "Unknown iceberg catalog type: " + catalogType); @@ -179,7 +178,11 @@ public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit @Override public List getSplits() throws UserException { - return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits); + try { + return source.getCatalog().getAuthenticator().doAs(this::doGetSplits); + } catch (IOException e) { + throw new UserException(e); + } } private List doGetSplits() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java index b4b1bf2a805d12..be1ce7521061d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java @@ -21,9 +21,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.thrift.TFileAttributes; public interface IcebergSource { @@ -33,8 +31,6 @@ public interface IcebergSource { TableIf getTargetTable(); - TFileAttributes getFileAttributes() throws UserException; - ExternalCatalog getCatalog(); String getFileFormat() throws DdlException, MetaNotFoundException; diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy index 7e7f276236adaa..fa3d250fb3212c 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy @@ -1,5 +1,3 @@ -import groovyjarjarantlr4.v4.codegen.model.ExceptionClause - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -98,8 +96,8 @@ suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_d thread1.join() thread2.join() + sql """drop catalog ${hms_catalog_name};""" sql """drop catalog other_${hms_catalog_name};""" - // TODO: add tvf case } } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy new file mode 100644 index 00000000000000..49ae08804b8fa3 --- /dev/null +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { + String enabled = context.config.otherConfigs.get("enableKerberosTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // test iceberg hms catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + "hive.metastore.uris" = "thrift://172.31.71.25:9083", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "hive.metastore.sasl.enabled" = "true", + "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", + "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", + "doris.krb5.debug" = "true" + ); + """ + + sql """ + CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + "hive.metastore.uris" = "thrift://172.31.71.26:9083", + "fs.defaultFS" = "hdfs://hadoop-master-2:9000", + "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", + "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", + "hive.metastore.sasl.enabled" = "true", + "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", + "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + DEFAULT", + "doris.krb5.debug" = "true" + ); + """ + + sql """ SWITCH test_krb_iceberg_ctl; """ + sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ + sql """ USE `test_krb_iceberg_db`; """ + sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ + + sql """ SWITCH other_test_krb_iceberg_ctl; """ + sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ + sql """ USE `other_test_krb_iceberg_db`; """ + sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + + // test iceberg hadoop catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hadoop', + "warehouse" = "hdfs://hadoop-master:9000/user/hive/", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "doris.krb5.debug" = "true" + ); + """ + + sql """ SWITCH test_krb_iceberg_ctl_hadoop; """ + sql """ CREATE DATABASE IF NOT EXISTS hadoop_test_krb_iceberg_db; """ + sql """ USE hadoop_test_krb_iceberg_db; """ + sql """ CREATE TABLE IF NOT EXISTS hadoop_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + + order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + + // cross catalog query test + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ + order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ + + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ + + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ + + sql """ DROP CATALOG test_krb_iceberg_ctl """ + sql """ DROP CATALOG other_test_krb_iceberg_ctl """ + sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ + } +}