From 338d4337cac38e4f85ebcfb16b142a71f4f7a217 Mon Sep 17 00:00:00 2001 From: slothever Date: Mon, 2 Sep 2024 01:20:56 +0800 Subject: [PATCH] [Improvement](kerberos)refactor ugi login for iceberg hive catalog --- .../iceberg/IcebergMetadataOps.java | 15 +- .../iceberg/hadoop/IcebergHadoopCatalog.java | 7 +- .../iceberg/hadoop/IcebergHadoopFileIO.java | 28 +- .../iceberg/hive/HCachedClientPool.java | 39 +- .../datasource/iceberg/hive/HClientPool.java | 84 ++++ .../iceberg/hive/HiveCompatibleCatalog.java | 14 +- .../iceberg/hive/IcebergHiveCatalog.java | 359 +++++++++++++++++- .../doris/fs/remote/dfs/DFSFileSystem.java | 10 +- .../kerberos/test_two_iceberg_kerberos.out | 14 + .../kerberos/test_two_iceberg_kerberos.groovy | 127 ++++--- 10 files changed, 575 insertions(+), 122 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 9187178c7dca10..63497861042aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; @@ -42,6 +43,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -74,7 +76,16 @@ public void close() { @Override public boolean tableExist(String dbName, String tblName) { - return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + try { + return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + } catch (UndeclaredThrowableException e) { + // avoid to miss exception when get table reflect call + if (e.getCause() instanceof NoSuchObjectException) { + return false; + } else { + throw e; + } + } } public boolean databaseExist(String dbName) { @@ -83,7 +94,7 @@ public boolean databaseExist(String dbName) { public List listDatabaseNames() { return nsCatalog.listNamespaces().stream() - .map(e -> e.toString()) + .map(Namespace::toString) .collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java index 13ec717d7d8153..856ed960888f1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopCatalog.java @@ -90,12 +90,7 @@ protected FileIO initializeFileIO(Map properties, Configuration * so HadoopFileIO is used in the superclass by default * we can add better implementations to derived class just like the implementation in DLFCatalog. */ - FileIO io; - try { - io = new IcebergHadoopFileIO(hadoopConf, this.fs.rawFileSystem()); - } catch (IOException e) { - throw new RuntimeException(e); - } + FileIO io = new IcebergHadoopFileIO(hadoopConf, this.fs); io.initialize(properties); return io; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java index 97912290d2f836..fee0c4d6a1af92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hadoop/IcebergHadoopFileIO.java @@ -17,10 +17,11 @@ package org.apache.doris.datasource.iceberg.hadoop; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; @@ -30,27 +31,35 @@ public class IcebergHadoopFileIO extends HadoopFileIO { - private FileSystem fs; - private Configuration hadoopConf; + private final DFSFileSystem fs; + private final Configuration hadoopConf; - public IcebergHadoopFileIO(Configuration hadoopConf, FileSystem fs) { + public IcebergHadoopFileIO(Configuration hadoopConf, DFSFileSystem fs) { this.hadoopConf = hadoopConf; this.fs = fs; } @Override public InputFile newInputFile(String path) { - return new IcebergHadoopInputFile(this.fs, path, this.hadoopConf); + return new IcebergHadoopInputFile(getFs(), path, this.hadoopConf); + } + + private FileSystem getFs() { + try { + return this.fs.rawFileSystem(); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public InputFile newInputFile(String path, long length) { - return new IcebergHadoopInputFile(this.fs, path, length, this.hadoopConf); + return new IcebergHadoopInputFile(getFs(), path, length, this.hadoopConf); } @Override public OutputFile newOutputFile(String path) { - return new IcebergHadoopOutputFile(this.fs, new Path(path), this.hadoopConf); + return new IcebergHadoopOutputFile(getFs(), new Path(path), this.hadoopConf); } @Override @@ -58,9 +67,8 @@ public void deleteFile(String path) { Path toDelete = new Path(path); try { fs.delete(toDelete, false); - } catch (IOException var5) { - IOException e = var5; - throw new RuntimeIOException(e, "Failed to delete file: %s", path); + } catch (IOException e) { + throw new RuntimeException("Failed to delete file: " + path, e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java index ff8313cab0f403..bfdb59cfc0402e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HCachedClientPool.java @@ -17,27 +17,32 @@ package org.apache.doris.datasource.iceberg.hive; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; + import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ClientPool; -import org.apache.iceberg.hive.HiveClientPool; import org.apache.iceberg.util.PropertyUtil; import shade.doris.hive.org.apache.thrift.TException; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; import java.util.concurrent.TimeUnit; public class HCachedClientPool implements ClientPool { - private static volatile Cache clientPoolCache; + private static volatile Cache clientPoolCache; private static final Object clientPoolCacheLock = new Object(); private final String catalogName; private final Configuration conf; private final int clientPoolSize; private final long evictionInterval; + private final HadoopAuthenticator authenticator; public HCachedClientPool(String catalogName, Configuration conf, Map properties) { this.catalogName = catalogName; @@ -59,25 +64,45 @@ public HCachedClientPool(String catalogName, Configuration conf, Map ((HiveClientPool) value).close()) + .removalListener((key, value, cause) -> ((HClientPool) value).close()) .build(); } } } + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); } - protected HiveClientPool clientPool() { - return clientPoolCache.get(this.catalogName, (k) -> new HiveClientPool(this.clientPoolSize, this.conf)); + protected HClientPool clientPool() { + return clientPoolCache.get(this.catalogName, (k) -> new HClientPool(this.clientPoolSize, this.conf)); } @Override public R run(Action action) throws TException, InterruptedException { - return clientPool().run(action); + try { + return authenticator.doAs(() -> clientPool().run(action)); + } catch (IOException e) { + throw new TException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TException) { + throw (TException) e.getCause(); + } + throw e; + } } @Override public R run(Action action, boolean retry) throws TException, InterruptedException { - return clientPool().run(action, retry); + try { + return authenticator.doAs(() -> clientPool().run(action, retry)); + } catch (IOException e) { + throw new TException(e); + } catch (UndeclaredThrowableException e) { + if (e.getCause() instanceof TException) { + throw (TException) e.getCause(); + } + throw e; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java new file mode 100644 index 00000000000000..a9c7a8e7dd123a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HClientPool.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.iceberg.ClientPoolImpl; +import org.apache.iceberg.hive.RuntimeMetaException; +import shade.doris.hive.org.apache.thrift.TException; +import shade.doris.hive.org.apache.thrift.transport.TTransportException; + +public class HClientPool extends ClientPoolImpl { + private final HiveConf hiveConf; + + public HClientPool(int poolSize, Configuration conf) { + super(poolSize, TTransportException.class, false); + this.hiveConf = new HiveConf(conf, org.apache.iceberg.hive.HiveClientPool.class); + this.hiveConf.addResource(conf); + } + + protected IMetaStoreClient newClient() { + try { + try { + return RetryingMetaStoreClient.getProxy(hiveConf, t -> null, HiveMetaStoreClient.class.getName()); + } catch (RuntimeException e) { + if (e.getCause() instanceof MetaException) { + throw (MetaException) e.getCause(); + } else { + throw e; + } + } + } catch (MetaException e) { + throw new RuntimeMetaException(e, "Failed to connect to Hive Metastore"); + } catch (Throwable t) { + if (t.getMessage().contains("Another instance of Derby may have already booted")) { + throw new RuntimeMetaException(t, + "Embedded Derby supports only one client at a time." + + "To fix this, use a metastore that supports multiple clients."); + } else { + throw new RuntimeMetaException(t, "Failed to connect to Hive Metastore"); + } + } + } + + protected IMetaStoreClient reconnect(IMetaStoreClient client) { + try { + client.close(); + client.reconnect(); + return client; + } catch (MetaException var3) { + MetaException e = var3; + throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore", new Object[0]); + } + } + + protected boolean isConnectionException(Exception e) { + return super.isConnectionException(e) + || e instanceof MetaException + && e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"); + } + + protected void close(IMetaStoreClient client) { + client.close(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java index be3ad97859f40d..6da5e34cbb4f29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/HiveCompatibleCatalog.java @@ -17,8 +17,6 @@ package org.apache.doris.datasource.iceberg.hive; -import org.apache.doris.datasource.iceberg.hadoop.IcebergHadoopFileIO; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -57,17 +55,7 @@ protected FileIO initializeFileIO(Map properties) { * so HadoopFileIO is used in the superclass by default * we can add better implementations to derived class just like the implementation in DLFCatalog. */ - FileIO io; - try { - FileSystem fs = getFileSystem(); - if (fs == null) { - io = new HadoopFileIO(getConf()); - } else { - io = new IcebergHadoopFileIO(getConf(), getFileSystem()); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + FileIO io = new HadoopFileIO(getConf()); io.initialize(properties); return io; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java index 6aaa3965aafd84..55fc6bce85ed5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/hive/IcebergHiveCatalog.java @@ -21,31 +21,53 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import com.google.common.base.Preconditions; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.hive.HiveHadoopUtil; +import org.apache.iceberg.hive.MetastoreUtil; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.util.LocationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shade.doris.hive.org.apache.thrift.TException; -import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class IcebergHiveCatalog extends HiveCompatibleCatalog { private static final Logger LOG = LoggerFactory.getLogger(IcebergHiveCatalog.class); private ClientPool clients; + private Configuration conf; + private String name; + private FileIO fileIO; private boolean listAllTables = false; public void initialize(String name, Map properties) { + this.name = name; if (this.conf == null) { LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); this.conf = getConf(); @@ -58,18 +80,23 @@ public void initialize(String name, Map properties) { LocationUtil.stripTrailingSlash(properties.get("warehouse"))); } - this.listAllTables = Boolean.parseBoolean(properties.getOrDefault("list-all-tables", "true")); + this.listAllTables = Boolean.parseBoolean(properties.getOrDefault("list-all-tables", "false")); String fileIOImpl = properties.get("io-impl"); - org.apache.hadoop.fs.FileSystem fs; - try { - fs = new DFSFileSystem(properties).rawFileSystem(); - } catch (IOException e) { - throw new RuntimeException(e); - } - FileIO fileIO = fileIOImpl == null ? new IcebergHadoopFileIO(this.conf, fs) + DFSFileSystem fs = new DFSFileSystem(properties); + fileIO = fileIOImpl == null ? new IcebergHadoopFileIO(this.conf, fs) : CatalogUtil.loadFileIO(fileIOImpl, properties, this.conf); this.clients = new HCachedClientPool(name, this.conf, properties); - super.initialize(name, fileIO, clients); + } + + public String name() { + return this.name; + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new IcebergHiveTableOperations(this.conf, this.clients, this.fileIO, this.name(), dbName, tableName); } @Override @@ -101,30 +128,330 @@ private String databaseLocation(String databaseName) { @Override public void createNamespace(Namespace namespace, Map meta) { - super.createNamespace(namespace, meta); + Preconditions.checkArgument(!namespace.isEmpty(), + "Cannot create namespace with invalid name: %s", namespace); + Preconditions.checkArgument(this.isValidateNamespace(namespace), + "Cannot support multi part namespace in Hive Metastore: %s", namespace); + Preconditions.checkArgument(meta.get("hive.metastore.database.owner-type") == null + || meta.get("hive.metastore.database.owner") != null, + "Create namespace setting %s without setting %s is not allowed", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + try { + this.clients.run((client) -> { + client.createDatabase(convertToDatabase(namespace, meta)); + return null; + }); + LOG.info("Created namespace: {}", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to createDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + + Database convertToDatabase(Namespace namespace, Map meta) { + if (!this.isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else { + Database database = new Database(); + Map parameter = Maps.newHashMap(); + database.setName(namespace.level(0)); + database.setLocationUri(this.databaseLocation(namespace.level(0))); + meta.forEach((key, value) -> { + if (key.equals("comment")) { + database.setDescription(value); + } else if (key.equals("location")) { + database.setLocationUri(value); + } else if (key.equals("hive.metastore.database.owner")) { + database.setOwnerName(value); + } else if (key.equals("hive.metastore.database.owner-type") && value != null) { + database.setOwnerType(PrincipalType.valueOf(value)); + } else if (value != null) { + parameter.put(key, value); + } + }); + if (database.getOwnerName() == null) { + database.setOwnerName(HiveHadoopUtil.currentUser()); + database.setOwnerType(PrincipalType.USER); + } + database.setParameters(parameter); + return database; + } } @Override public List listNamespaces(Namespace namespace) { - return super.listNamespaces(namespace); + if (!this.isValidateNamespace(namespace) && !namespace.isEmpty()) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else if (!namespace.isEmpty()) { + return ImmutableList.of(); + } else { + try { + List namespaces = this.clients.run(IMetaStoreClient::getAllDatabases).stream() + .map(Namespace::of).collect(Collectors.toList()); + LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces); + return namespaces; + } catch (TException e) { + throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getAllDatabases() " + + namespace + " in Hive Metastore", e); + } + } } @Override public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { - return super.loadNamespaceMetadata(namespace); + if (!this.isValidateNamespace(namespace)) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } else { + try { + Database database = this.clients.run((client) -> { + try { + return client.getDatabase(namespace.level(0)); + } catch (NoSuchObjectException e) { + return null; + } + }); + if (database == null) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } + Map metadata = this.convertToMetadata(database); + LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet()); + return metadata; + } catch (UnknownDBException | NoSuchObjectException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list namespace under namespace: " + + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } + } + + private Map convertToMetadata(Database database) { + Map meta = Maps.newHashMap(); + meta.putAll(database.getParameters()); + meta.put("location", database.getLocationUri()); + if (database.getDescription() != null) { + meta.put("comment", database.getDescription()); + } + if (database.getOwnerName() != null) { + meta.put("hive.metastore.database.owner", database.getOwnerName()); + if (database.getOwnerType() != null) { + meta.put("hive.metastore.database.owner-type", database.getOwnerType().name()); + } + } + return meta; } @Override public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { - return super.dropNamespace(namespace); + if (!this.isValidateNamespace(namespace)) { + return false; + } else { + try { + this.clients.run((client) -> { + client.dropDatabase(namespace.level(0), false, false, false); + return null; + }); + LOG.info("Dropped namespace: {}", namespace); + return true; + } catch (InvalidOperationException e) { + throw new NamespaceNotEmptyException(e, + "Namespace %s is not empty. One or more tables exist.", namespace); + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to drop dropDatabase(name) " + + namespace + " in Hive Metastore", e); + } + } } public boolean dropTable(TableIdentifier identifier, boolean purge) { - return super.dropTable(identifier, purge); + if (!this.isValidIdentifier(identifier)) { + return false; + } else { + String database = identifier.namespace().level(0); + TableOperations ops = this.newTableOps(identifier); + TableMetadata lastMetadata = null; + if (purge) { + try { + lastMetadata = ops.current(); + } catch (NotFoundException e) { + LOG.warn("Failed to load table metadata for table: {}, continuing drop without purge", + identifier, e); + } + } + try { + this.clients.run((client) -> { + client.dropTable(database, identifier.name(), false, false); + return null; + }); + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + } + LOG.info("Dropped table: {}", identifier); + return true; + } catch (NoSuchObjectException | NoSuchTableException e) { + LOG.info("Skipping drop, table does not exist: {}", identifier, e); + return false; + } catch (TException e) { + throw new RuntimeException("Failed to drop " + identifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to dropTable", e); + } + } + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier originalTo) { + if (!this.isValidIdentifier(from)) { + throw new NoSuchTableException("Invalid identifier: %s", from); + } else { + TableIdentifier to = this.removeCatalogName(originalTo); + Preconditions.checkArgument(this.isValidIdentifier(to), "Invalid identifier: %s", to); + String toDatabase = to.namespace().level(0); + String fromDatabase = from.namespace().level(0); + String fromName = from.name(); + + try { + Table table = this.clients.run((client) -> client.getTable(fromDatabase, fromName)); + validateTableIsIceberg(table, fullTableName(this.name, from)); + table.setDbName(toDatabase); + table.setTableName(to.name()); + this.clients.run((client) -> { + MetastoreUtil.alterTable(client, fromDatabase, fromName, table); + return null; + }); + LOG.info("Renamed table from {}, to {}", from, to); + } catch (NoSuchObjectException e) { + throw new NoSuchTableException("Table does not exist: %s", from); + } catch (AlreadyExistsException e) { + throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", to); + } catch (TException e) { + throw new RuntimeException("Failed to rename " + from + " to " + to, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to rename", e); + } + } + } + + private TableIdentifier removeCatalogName(TableIdentifier to) { + if (this.isValidIdentifier(to)) { + return to; + } else { + return to.namespace().levels().length == 2 + && this.name().equalsIgnoreCase(to.namespace().level(0)) + ? TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name()) : to; + } + } + + static void validateTableIsIceberg(Table table, String fullName) { + String tableType = table.getParameters().get("table_type"); + NoSuchIcebergTableException.check(tableType != null && tableType.equalsIgnoreCase("iceberg"), + "Not an iceberg table: %s (type=%s)", fullName, tableType); + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) { + Preconditions.checkArgument(properties.get("hive.metastore.database.owner-type") == null + && properties.get("hive.metastore.database.owner") == null, + "Setting %s and %s has to be performed together or not at all", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + Map parameter = Maps.newHashMap(); + parameter.putAll(this.loadNamespaceMetadata(namespace)); + parameter.putAll(properties); + Database database = this.convertToDatabase(namespace, parameter); + this.alterHiveDataBase(namespace, database); + LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace); + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) { + Preconditions.checkArgument(properties.contains("hive.metastore.database.owner-type") + == properties.contains("hive.metastore.database.owner"), + "Removing %s and %s has to be performed together or not at all", + "hive.metastore.database.owner-type", + "hive.metastore.database.owner"); + Map parameter = Maps.newHashMap(); + parameter.putAll(this.loadNamespaceMetadata(namespace)); + properties.forEach((key) -> parameter.put(key, null)); + Database database = this.convertToDatabase(namespace, parameter); + this.alterHiveDataBase(namespace, database); + LOG.debug("Successfully removed properties {} from {}", properties, namespace); + return true; + } + + private void alterHiveDataBase(Namespace namespace, Database database) { + try { + this.clients.run((client) -> { + client.alterDatabase(namespace.level(0), database); + return null; + }); + } catch (UnknownDBException | NoSuchObjectException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list namespace under namespace: " + + namespace + " in Hive Metastore", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getDatabase(name) " + + namespace + " in Hive Metastore", e); + } } @Override public List listTables(Namespace namespace) { - return super.listTables(namespace); + Preconditions.checkArgument(this.isValidateNamespace(namespace), + "Missing database in namespace: %s", namespace); + String database = namespace.level(0); + try { + List tableNames = this.clients.run((client) -> client.getAllTables(database)); + List tableIdentifiers; + if (this.listAllTables) { + tableIdentifiers = tableNames.stream() + .map((t) -> TableIdentifier.of(namespace, t)) + .collect(Collectors.toList()); + } else { + List tableObjects = this.clients.run((client) -> + client.getTableObjectsByName(database, tableNames)); + tableIdentifiers = tableObjects.stream() + .filter((table) -> + table.getParameters() != null + && "iceberg".equalsIgnoreCase(table.getParameters().get("table_type"))) + .map((table) -> TableIdentifier.of(namespace, table.getTableName())) + .collect(Collectors.toList()); + } + + LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers); + return tableIdentifiers; + } catch (UnknownDBException e) { + throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); + } catch (TException e) { + throw new RuntimeException("Failed to list all tables under namespace " + namespace, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables", e); + } + } + + private boolean isValidateNamespace(Namespace namespace) { + return namespace.levels().length == 1; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 4018df68fa96e7..aca072eb66c47a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -496,22 +496,22 @@ public FileSystem rawFileSystem() throws IOException { } public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { - return authenticator.doAs(() -> rawFileSystem().listStatus(f, filter)); + return rawFileSystem().listStatus(f, filter); } public RemoteIterator listStatusIterator(Path p) throws IOException { - return authenticator.doAs(() -> rawFileSystem().listStatusIterator(p)); + return rawFileSystem().listStatusIterator(p); } public FileStatus getFileStatus(Path f) throws IOException { - return authenticator.doAs(() -> rawFileSystem().getFileStatus(f)); + return rawFileSystem().getFileStatus(f); } public boolean delete(Path p, boolean recursion) throws IOException { - return authenticator.doAs(() -> rawFileSystem().delete(p, recursion)); + return rawFileSystem().delete(p, recursion); } public boolean mkdirs(Path p) throws IOException { - return authenticator.doAs(() -> rawFileSystem().mkdirs(p)); + return rawFileSystem().mkdirs(p); } } diff --git a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out index fb4f79a0b73d4a..0634bf11cd62b2 100644 --- a/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out +++ b/regression-test/data/external_table_p0/kerberos/test_two_iceberg_kerberos.out @@ -1,8 +1,22 @@ -- This file is automatically generated. You should know what you did if you want to edit this +-- !iceberg_q01 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 + +-- !iceberg_q02 -- +2 2023-05-16 +3 2023-05-17 + -- !iceberg_q04 -- 1 2023-05-14 2 2023-05-16 +-- !iceberg_q05 -- +1 krb1 2023-05-14 +2 krb2 2023-05-16 +3 krb3 2023-05-17 + -- !iceberg_q06 -- 1 krb1 2023-05-14 2 krb2 2023-05-16 diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy index 579b3fcca25f88..4c3e6f9b7766c1 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_iceberg_kerberos.groovy @@ -20,6 +20,46 @@ import org.junit.Assert; suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { + // test iceberg hms catalog with kerberos + sql """ + CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + "hive.metastore.uris" = "thrift://172.31.71.25:9083", + "fs.defaultFS" = "hdfs://hadoop-master:9000", + "hadoop.security.authentication" = "kerberos", + "hadoop.kerberos.min.seconds.before.relogin" = "5", + "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", + "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", + "hive.metastore.sasl.enabled" = "true", + "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", + "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", + "doris.krb5.debug" = "true" + ); + """ + + // sql """ + // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl + // PROPERTIES ( + // 'type'='iceberg', + // 'iceberg.catalog.type'='hms', + // "hive.metastore.uris" = "thrift://172.31.71.26:9083", + // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", + // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", + // "hadoop.security.authentication" = "kerberos", + // "hadoop.kerberos.min.seconds.before.relogin" = "5", + // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", + // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", + // "hive.metastore.sasl.enabled" = "true", + // "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", + // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + // DEFAULT", + // "doris.krb5.debug" = "true" + // ); + // """ + // test iceberg hadoop catalog with kerberos sql """ CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl_hadoop @@ -43,82 +83,43 @@ suite("test_two_iceberg_kerberos", "p0,external,kerberos,external_docker,externa sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ sql """ INSERT INTO hadoop_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ - // order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ + sql """ SWITCH test_krb_iceberg_ctl; """ + sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ + sql """ USE `test_krb_iceberg_db`; """ + sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ + + // sql """ SWITCH other_test_krb_iceberg_ctl; """ + // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ + // sql """ USE `other_test_krb_iceberg_db`; """ + // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ + // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ + + order_qt_iceberg_q02 """ SELECT id,dd FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl where dd >= '2023-05-16' """ // order_qt_iceberg_q03 """ SELECT id,dd FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl where dd <= '2023-05-16' """ // cross catalog query test - order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ - // order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ + sql """ SWITCH test_krb_iceberg_ctl_hadoop; """ + order_qt_iceberg_q04 """ SELECT id,dd FROM hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl where dd <= '2023-05-16' """ + order_qt_iceberg_q05 """ SELECT * FROM test_krb_iceberg_ctl.test_krb_iceberg_db.test_krb_iceberg_tbl """ order_qt_iceberg_q06 """ SELECT * FROM test_krb_iceberg_ctl_hadoop.hadoop_test_krb_iceberg_db.hadoop_test_krb_iceberg_tbl """ // order_qt_iceberg_q07 """ SELECT * FROM other_test_krb_iceberg_ctl.other_test_krb_iceberg_db.other_test_krb_iceberg_tbl """ - // sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ + sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`.`test_krb_iceberg_tbl`; """ // sql """ DROP TABLE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`.`other_test_krb_iceberg_tbl`; """ sql """ DROP TABLE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`.`hadoop_test_krb_iceberg_tbl`; """ - // sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ + sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl.`test_krb_iceberg_db`; """ // sql """ DROP DATABASE IF EXISTS other_test_krb_iceberg_ctl.`other_test_krb_iceberg_db`; """ sql """ DROP DATABASE IF EXISTS test_krb_iceberg_ctl_hadoop.`hadoop_test_krb_iceberg_db`; """ - // sql """ DROP CATALOG test_krb_iceberg_ctl """ + sql """ DROP CATALOG test_krb_iceberg_ctl """ // sql """ DROP CATALOG other_test_krb_iceberg_ctl """ sql """ DROP CATALOG test_krb_iceberg_ctl_hadoop """ - - // // test iceberg hms catalog with kerberos - // sql """ - // CREATE CATALOG IF NOT EXISTS test_krb_iceberg_ctl - // PROPERTIES ( - // 'type'='iceberg', - // 'iceberg.catalog.type'='hms', - // "hive.metastore.uris" = "thrift://172.31.71.25:9083", - // "fs.defaultFS" = "hdfs://hadoop-master:9000", - // "hadoop.security.authentication" = "kerberos", - // "hadoop.kerberos.min.seconds.before.relogin" = "5", - // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@LABS.TERADATA.COM", - // "hadoop.kerberos.keytab" = "/keytabs/hive-presto-master.keytab", - // "hive.metastore.sasl.enabled" = "true", - // "hive.metastore.kerberos.principal" = "hive/_HOST@LABS.TERADATA.COM", - // "hive.metastore.warehouse.dir"="hdfs://hadoop-master:9000/user/hive/warehouse", - // "doris.krb5.debug" = "true" - // ); - // """ - // - // sql """ - // CREATE CATALOG IF NOT EXISTS other_test_krb_iceberg_ctl - // PROPERTIES ( - // 'type'='iceberg', - // 'iceberg.catalog.type'='hms', - // "hive.metastore.uris" = "thrift://172.31.71.26:9083", - // "fs.defaultFS" = "hdfs://hadoop-master-2:9000", - // "hive.metastore.warehouse.dir"="hdfs://hadoop-master-2:9000/user/hive/warehouse", - // "hadoop.security.authentication" = "kerberos", - // "hadoop.kerberos.min.seconds.before.relogin" = "5", - // "hadoop.kerberos.principal"="hive/presto-master.docker.cluster@OTHERREALM.COM", - // "hadoop.kerberos.keytab" = "/keytabs/other-hive-presto-master.keytab", - // "hive.metastore.sasl.enabled" = "true", - // "hive.metastore.kerberos.principal" = "hive/_HOST@OTHERREALM.COM", - // "hadoop.security.auth_to_local" ="RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// - // RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// - // DEFAULT", - // "doris.krb5.debug" = "true" - // ); - // """ - // - // sql """ SWITCH test_krb_iceberg_ctl; """ - // sql """ CREATE DATABASE IF NOT EXISTS `test_krb_iceberg_db`; """ - // sql """ USE `test_krb_iceberg_db`; """ - // sql """ CREATE TABLE IF NOT EXISTS test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ - // sql """ INSERT INTO test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ - // order_qt_iceberg_q01 """ SELECT * FROM test_krb_iceberg_tbl """ - // - // sql """ SWITCH other_test_krb_iceberg_ctl; """ - // sql """ CREATE DATABASE IF NOT EXISTS `other_test_krb_iceberg_db`; """ - // sql """ USE `other_test_krb_iceberg_db`; """ - // sql """ CREATE TABLE IF NOT EXISTS other_test_krb_iceberg_tbl (id int, str string, dd date) engine = iceberg; """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(1, 'krb1', '2023-05-14') """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(2, 'krb2', '2023-05-16') """ - // sql """ INSERT INTO other_test_krb_iceberg_tbl values(3, 'krb3', '2023-05-17') """ } }