From 3040f190f8fb567d32709b43dc8c5e8c897b1f8c Mon Sep 17 00:00:00 2001 From: slothever Date: Thu, 18 Jul 2024 16:04:29 +0800 Subject: [PATCH 1/4] [fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#37301) ## Proposed changes optimize kerberos ugi login: 1. support authentication framework for external table 2. cache ugi to avoid conflicts when set configuration for ugi login 3. do ugi login just on creating catalog, which reduces the ugi create times 4. only simple authentication will use getloginUser, which avoids conflicts between simple and kerberos authentication (cherry picked from commit a5fd82400a0d40560c0f9c85da987ec296e9d1df) --- .../kerberos/common/conf/doris-krb5.conf | 2 +- .../authentication/AuthenticationConfig.java | 2 + .../authentication/HadoopAuthenticator.java | 44 +++++ .../HadoopKerberosAuthenticator.java | 163 ++++++++++++++++++ .../HadoopSimpleAuthenticator.java | 47 +++++ .../security/authentication/HadoopUGI.java | 89 ++-------- .../ImpersonatingHadoopAuthenticator.java | 43 +++++ .../KerberosAuthenticationConfig.java | 3 + .../datasource/hive/HMSCachedClient.java | 5 + .../datasource/hive/HMSExternalCatalog.java | 12 +- .../datasource/hive/HiveMetadataOps.java | 7 +- .../hive/ThriftHMSCachedClient.java | 12 +- .../doris/fs/remote/RemoteFileSystem.java | 14 +- .../doris/fs/remote/dfs/DFSFileSystem.java | 48 ++++-- .../kerberos/test_two_hive_kerberos.groovy | 33 ++++ 15 files changed, 428 insertions(+), 96 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java diff --git a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf index 7624b94e6ad2a4..36547b8f89d163 100644 --- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf +++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf @@ -24,7 +24,7 @@ default_realm = LABS.TERADATA.COM dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 24h + ticket_lifetime = 5s # this setting is causing a Message stream modified (41) error when talking to KDC running on CentOS 7: https://stackoverflow.com/a/60978520 # renew_lifetime = 7d forwardable = true diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index 32a27b2263a746..875ae4542e1193 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -26,6 +26,7 @@ public abstract class AuthenticationConfig { public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; + public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; /** * @return true if the config is valid, otherwise false. @@ -57,6 +58,7 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf, krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey)); krbConfig.setKerberosKeytab(conf.get(krbKeytabKey)); krbConfig.setConf(conf); + krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false"))); return krbConfig; } else { // AuthType.SIMPLE diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java new file mode 100644 index 00000000000000..c3cab5f410be3a --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public interface HadoopAuthenticator { + + UserGroupInformation getUGI() throws IOException; + + default T doAs(PrivilegedExceptionAction action) throws IOException { + try { + return getUGI().doAs(action); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) { + if (config instanceof KerberosAuthenticationConfig) { + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config); + } else { + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config); + } + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java new file mode 100644 index 00000000000000..14dace68b2453e --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.plugin.base.authentication.KerberosTicketUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; + +public class HadoopKerberosAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopKerberosAuthenticator.class); + private final KerberosAuthenticationConfig config; + private Subject subject; + private long nextRefreshTime; + private UserGroupInformation ugi; + + public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) { + this.config = config; + } + + public static void initializeAuthConfig(Configuration hadoopConf) { + hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + synchronized (HadoopKerberosAuthenticator.class) { + // avoid other catalog set conf at the same time + UserGroupInformation.setConfiguration(hadoopConf); + } + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + subject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), config.isPrintDebugLog()); + ugi = Objects.requireNonNull(login(subject), "login result is null"); + return ugi; + } + if (nextRefreshTime < System.currentTimeMillis()) { + long lastRefreshTime = nextRefreshTime; + Subject existingSubject = subject; + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(subject); + LOG.debug("Current ticket expired time is {}", lastTicketEndTime); + } + // renew subject + Subject newSubject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), + config.isPrintDebugLog()); + Objects.requireNonNull(login(newSubject), "re-login result is null"); + // modify UGI instead of returning new UGI + existingSubject.getPrincipals().addAll(newSubject.getPrincipals()); + Set privateCredentials = existingSubject.getPrivateCredentials(); + // clear the old credentials + synchronized (privateCredentials) { + privateCredentials.clear(); + privateCredentials.addAll(newSubject.getPrivateCredentials()); + } + Set publicCredentials = existingSubject.getPublicCredentials(); + synchronized (publicCredentials) { + publicCredentials.clear(); + publicCredentials.addAll(newSubject.getPublicCredentials()); + } + nextRefreshTime = calculateNextRefreshTime(newSubject); + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(newSubject); + LOG.debug("Next ticket expired time is {}", lastTicketEndTime); + LOG.debug("Refresh kerberos ticket succeeded, last time is {}, next time is {}", + lastRefreshTime, nextRefreshTime); + } + } + return ugi; + } + + private UserGroupInformation login(Subject subject) throws IOException { + // login and get ugi when catalog is initialized + initializeAuthConfig(config.getConf()); + String principal = config.getKerberosPrincipal(); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by kerberos authentication with principal: {}", principal); + } + return UserGroupInformation.getUGIFromSubject(subject); + } + + private static long calculateNextRefreshTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); + return KerberosTicketUtils.getRefreshTime(tgtTicket); + } + + private static Date getTicketEndTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); + return tgtTicket.getEndTime(); + } + + private static Subject getSubject(String keytab, String principal, boolean printDebugLog) { + Subject subject = new Subject(false, ImmutableSet.of(new KerberosPrincipal(principal)), + Collections.emptySet(), Collections.emptySet()); + javax.security.auth.login.Configuration conf = getConfiguration(keytab, principal, printDebugLog); + try { + LoginContext loginContext = new LoginContext("", subject, null, conf); + loginContext.login(); + return loginContext.getSubject(); + } catch (LoginException e) { + throw new RuntimeException(e); + } + } + + private static javax.security.auth.login.Configuration getConfiguration(String keytab, String principal, + boolean printDebugLog) { + return new javax.security.auth.login.Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + ImmutableMap.Builder builder = ImmutableMap.builder() + .put("doNotPrompt", "true") + .put("isInitiator", "true") + .put("useKeyTab", "true") + .put("storeKey", "true") + .put("keyTab", keytab) + .put("principal", principal); + if (printDebugLog) { + builder.put("debug", "true"); + } + Map options = builder.build(); + return new AppConfigurationEntry[]{ + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + }; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java new file mode 100644 index 00000000000000..fbe0d0aba7d39f --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HadoopSimpleAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopSimpleAuthenticator.class); + private final UserGroupInformation ugi; + + public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) { + String hadoopUserName = config.getUsername(); + if (hadoopUserName == null) { + hadoopUserName = "hadoop"; + config.setUsername(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); + } + } + ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); + } + } + + @Override + public UserGroupInformation getUGI() { + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java index 1a86b9e327a2fb..d04d772728bc55 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -18,8 +18,6 @@ package org.apache.doris.common.security.authentication; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +25,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +@Deprecated public class HadoopUGI { private static final Logger LOG = LogManager.getLogger(HadoopUGI.class); @@ -39,82 +38,30 @@ private static UserGroupInformation loginWithUGI(AuthenticationConfig config) { if (config == null || !config.isValid()) { return null; } - UserGroupInformation ugi; if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - String principal = krbConfig.getKerberosPrincipal(); try { - // login hadoop with keytab and try checking TGT - ugi = UserGroupInformation.getLoginUser(); - LOG.debug("Current login user: {}", ugi.getUserName()); - if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { - // if the current user is logged by kerberos and is the same user - // just use checkTGTAndReloginFromKeytab because this method will only relogin - // when the TGT is expired or is close to expiry - ugi.checkTGTAndReloginFromKeytab(); - return ugi; + // TODO: remove after iceberg and hudi kerberos test case pass + try { + // login hadoop with keytab and try checking TGT + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + LOG.debug("Current login user: {}", ugi.getUserName()); + String principal = ((KerberosAuthenticationConfig) config).getKerberosPrincipal(); + if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { + // if the current user is logged by kerberos and is the same user + // just use checkTGTAndReloginFromKeytab because this method will only relogin + // when the TGT is expired or is close to expiry + ugi.checkTGTAndReloginFromKeytab(); + return ugi; + } + } catch (IOException e) { + LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); - } - try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, krbConfig.getKerberosKeytab()); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by kerberos authentication with principal: {}", principal); - return ugi; + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI(); } catch (IOException e) { throw new RuntimeException(e); } } else { - String hadoopUserName = ((SimpleAuthenticationConfig) config).getUsername(); - if (hadoopUserName == null) { - hadoopUserName = "hadoop"; - ((SimpleAuthenticationConfig) config).setUsername(hadoopUserName); - LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop"); - } - - try { - ugi = UserGroupInformation.getLoginUser(); - if (ugi.getUserName().equals(hadoopUserName)) { - return ugi; - } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with simple, do login immediately.", e); - } - - ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); - return ugi; - } - } - - /** - * use for HMSExternalCatalog to login - * @param config auth config - */ - public static void tryKrbLogin(String catalogName, AuthenticationConfig config) { - if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - try { - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - /** - * Because metastore client is created by using - * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} - * it will relogin when TGT is expired, so we don't need to relogin manually. - */ - UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(), - krbConfig.getKerberosKeytab()); - } catch (IOException e) { - throw new RuntimeException("login with kerberos auth failed for catalog: " + catalogName, e); - } + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config).getUGI(); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java new file mode 100644 index 00000000000000..10e42f4bc67ab0 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Objects; + +public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator { + + private final HadoopAuthenticator delegate; + private final String username; + private UserGroupInformation ugi; + + public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, String username) { + this.delegate = Objects.requireNonNull(delegate); + this.username = Objects.requireNonNull(username); + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + ugi = UserGroupInformation.createProxyUser(username, delegate.getUGI()); + } + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java index 722cd0352b7d7d..adf76274386f7c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java @@ -18,14 +18,17 @@ package org.apache.doris.common.security.authentication; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +@EqualsAndHashCode(callSuper = true) @Data public class KerberosAuthenticationConfig extends AuthenticationConfig { private String kerberosPrincipal; private String kerberosKeytab; private Configuration conf; + private boolean printDebugLog; @Override public boolean isValid() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 1ac77972053677..a5e0eefb3483aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -113,6 +114,10 @@ void updatePartitionStatistics( void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData); + default void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + // Ignored by default + } + /** * close the connection, eg, to hms */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 91192b63c24d6e..a22eacaf1e4fc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -24,7 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -40,7 +40,9 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.transaction.TransactionManagerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; + @Getter + private HadoopAuthenticator authenticator; + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); } @@ -81,6 +86,8 @@ public HMSExternalCatalog(long catalogId, String name, String resource, Map 0, poolSize); @@ -104,6 +106,10 @@ public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) { this.isClosed = false; } + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + @Override public void close() { synchronized (clientPool) { @@ -678,7 +684,11 @@ public String getCatalogLocation(String catalogName) { } private T ugiDoAs(PrivilegedExceptionAction action) { - return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); + try { + return hadoopAuthenticator.doAs(action); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 68de3a8fdef86f..3cb8a036c2d5f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -56,7 +57,7 @@ public Status listFiles(String remotePath, boolean recursive, List r try { org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); Path locatedPath = new Path(remotePath); - RemoteIterator locatedFiles = fileSystem.listFiles(locatedPath, recursive); + RemoteIterator locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath); while (locatedFiles.hasNext()) { LocatedFileStatus fileStatus = locatedFiles.next(); RemoteFile location = new RemoteFile( @@ -72,11 +73,16 @@ public Status listFiles(String remotePath, boolean recursive, List r return Status.OK; } + protected RemoteIterator getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return fileSystem.listFiles(locatedPath, recursive); + } + @Override public Status listDirectories(String remotePath, Set result) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + FileStatus[] fileStatuses = getFileStatuses(remotePath, fileSystem); result.addAll( Arrays.stream(fileStatuses) .filter(FileStatus::isDirectory) @@ -88,6 +94,10 @@ public Status listDirectories(String remotePath, Set result) { return Status.OK; } + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return fileSystem.listStatus(new Path(remotePath)); + } + @Override public Status renameDir(String origFilePath, String destFilePath, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 944051e8741318..59fbd73bda78cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -21,7 +21,7 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.URI; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +60,8 @@ public class DFSFileSystem extends RemoteFileSystem { public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); - private HDFSFileOperations operations = null; + private HadoopAuthenticator authenticator = null; public DFSFileSystem(Map properties) { this(StorageBackend.StorageType.HDFS, properties); @@ -80,21 +82,35 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { for (Map.Entry propEntry : properties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); } - - dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + try { + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new UserException(e); + } + operations = new HDFSFileOperations(dfsFileSystem); } } } - operations = new HDFSFileOperations(dfsFileSystem); return dfsFileSystem; } + protected RemoteIterator getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive)); + } + + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return authenticator.doAs(() -> fileSystem.listStatus(new Path(remotePath))); + } + public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { Configuration hdfsConf = new HdfsConfiguration(); if (fallbackToSimpleAuth) { @@ -266,7 +282,7 @@ public Status exists(String remotePath) { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - boolean isPathExist = fileSystem.exists(inputFilePath); + boolean isPathExist = authenticator.doAs(() -> fileSystem.exists(inputFilePath)); if (!isPathExist) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); } @@ -381,7 +397,7 @@ public Status rename(String srcPath, String destPath) { FileSystem fileSystem = nativeFileSystem(destPath); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); - boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); + boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); if (!isRenameSuccess) { return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); } @@ -402,7 +418,7 @@ public Status delete(String remotePath) { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - fileSystem.delete(inputFilePath, true); + authenticator.doAs(() -> fileSystem.delete(inputFilePath, true)); } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { @@ -428,7 +444,7 @@ public Status globList(String remotePath, List result, boolean fileN URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(pathUri.getPath()); - FileStatus[] files = fileSystem.globStatus(pathPattern); + FileStatus[] files = authenticator.doAs(() -> fileSystem.globStatus(pathPattern)); if (files == null) { LOG.info("no files in path " + remotePath); return Status.OK; @@ -455,7 +471,7 @@ public Status globList(String remotePath, List result, boolean fileN public Status makeDir(String remotePath) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - if (!fileSystem.mkdirs(new Path(remotePath))) { + if (!authenticator.doAs(() -> fileSystem.mkdirs(new Path(remotePath)))) { LOG.warn("failed to make dir for " + remotePath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy index a3b39d1221a740..7e7f276236adaa 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy @@ -1,3 +1,5 @@ +import groovyjarjarantlr4.v4.codegen.model.ExceptionClause + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,6 +17,8 @@ // specific language governing permissions and limitations // under the License. +import org.junit.Assert; + suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_d sql """ use test_krb_hive_db """ order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """ + // 3. multi thread test + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from ${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + + Thread thread2 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + sleep(5000L) + thread1.start() + thread2.start() + + thread1.join() + thread2.join() sql """drop catalog ${hms_catalog_name};""" sql """drop catalog other_${hms_catalog_name};""" + // TODO: add tvf case } } From c044bbab05e49cbdd2c8c257dcd9c36cb226eb7e Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 31 Jul 2024 18:05:50 +0800 Subject: [PATCH 2/4] 1 --- .../HadoopKerberosAuthenticator.java | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java index 14dace68b2453e..993d1041ed3b67 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import io.trino.plugin.base.authentication.KerberosTicketUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; @@ -113,16 +112,48 @@ private UserGroupInformation login(Subject subject) throws IOException { private static long calculateNextRefreshTime(Subject subject) { Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); - KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); - return KerberosTicketUtils.getRefreshTime(tgtTicket); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); + return getRefreshTime(tgtTicket); } private static Date getTicketEndTime(Subject subject) { Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); - KerberosTicket tgtTicket = KerberosTicketUtils.getTicketGrantingTicket(subject); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); return tgtTicket.getEndTime(); } + public static KerberosTicket getTicketGrantingTicket(Subject subject) { + Set tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + if (isOriginalTicketGrantingTicket(ticket)) { + return ticket; + } + } + throw new IllegalArgumentException("kerberos ticket not found in " + subject); + } + + public static boolean isOriginalTicketGrantingTicket(KerberosTicket ticket) + { + return isTicketGrantingServerPrincipal(ticket.getServer()); + } + + private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal principal) + { + if (principal == null) { + return false; + } + if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" + principal.getRealm())) { + return true; + } + return false; + } + + public static long getRefreshTime(KerberosTicket ticket) { + long start = ticket.getStartTime().getTime(); + long end = ticket.getEndTime().getTime(); + return start + (long) ((end - start) * 0.8f); + } + private static Subject getSubject(String keytab, String principal, boolean printDebugLog) { Subject subject = new Subject(false, ImmutableSet.of(new KerberosPrincipal(principal)), Collections.emptySet(), Collections.emptySet()); From b15c88c51ff57849e54b4b8fe5d8ce9911f31401 Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 31 Jul 2024 18:53:57 +0800 Subject: [PATCH 3/4] 2 --- .../authentication/HadoopKerberosAuthenticator.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java index 993d1041ed3b67..90c7927f5a6d4d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -132,13 +132,11 @@ public static KerberosTicket getTicketGrantingTicket(Subject subject) { throw new IllegalArgumentException("kerberos ticket not found in " + subject); } - public static boolean isOriginalTicketGrantingTicket(KerberosTicket ticket) - { + public static boolean isOriginalTicketGrantingTicket(KerberosTicket ticket) { return isTicketGrantingServerPrincipal(ticket.getServer()); } - private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal principal) - { + private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal principal) { if (principal == null) { return false; } From 6d8ad182d83479631ad4e2616bed84fdca4b6b0c Mon Sep 17 00:00:00 2001 From: slothever Date: Wed, 31 Jul 2024 20:34:16 +0800 Subject: [PATCH 4/4] 3 --- .../java/org/apache/doris/datasource/hive/HiveMetadataOps.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 73ecf7c3aa99a8..dcfc6d1ad33f90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -32,8 +32,8 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; -import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;