diff --git a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf index 7624b94e6ad2a4..36547b8f89d163 100644 --- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf +++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf @@ -24,7 +24,7 @@ default_realm = LABS.TERADATA.COM dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 24h + ticket_lifetime = 5s # this setting is causing a Message stream modified (41) error when talking to KDC running on CentOS 7: https://stackoverflow.com/a/60978520 # renew_lifetime = 7d forwardable = true diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index 32a27b2263a746..875ae4542e1193 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -26,6 +26,7 @@ public abstract class AuthenticationConfig { public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; + public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; /** * @return true if the config is valid, otherwise false. @@ -57,6 +58,7 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf, krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey)); krbConfig.setKerberosKeytab(conf.get(krbKeytabKey)); krbConfig.setConf(conf); + krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false"))); return krbConfig; } else { // AuthType.SIMPLE diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java new file mode 100644 index 00000000000000..c3cab5f410be3a --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public interface HadoopAuthenticator { + + UserGroupInformation getUGI() throws IOException; + + default T doAs(PrivilegedExceptionAction action) throws IOException { + try { + return getUGI().doAs(action); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) { + if (config instanceof KerberosAuthenticationConfig) { + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config); + } else { + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config); + } + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java new file mode 100644 index 00000000000000..90c7927f5a6d4d --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; + +public class HadoopKerberosAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopKerberosAuthenticator.class); + private final KerberosAuthenticationConfig config; + private Subject subject; + private long nextRefreshTime; + private UserGroupInformation ugi; + + public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) { + this.config = config; + } + + public static void initializeAuthConfig(Configuration hadoopConf) { + hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + synchronized (HadoopKerberosAuthenticator.class) { + // avoid other catalog set conf at the same time + UserGroupInformation.setConfiguration(hadoopConf); + } + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + subject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), config.isPrintDebugLog()); + ugi = Objects.requireNonNull(login(subject), "login result is null"); + return ugi; + } + if (nextRefreshTime < System.currentTimeMillis()) { + long lastRefreshTime = nextRefreshTime; + Subject existingSubject = subject; + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(subject); + LOG.debug("Current ticket expired time is {}", lastTicketEndTime); + } + // renew subject + Subject newSubject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), + config.isPrintDebugLog()); + Objects.requireNonNull(login(newSubject), "re-login result is null"); + // modify UGI instead of returning new UGI + existingSubject.getPrincipals().addAll(newSubject.getPrincipals()); + Set privateCredentials = existingSubject.getPrivateCredentials(); + // clear the old credentials + synchronized (privateCredentials) { + privateCredentials.clear(); + privateCredentials.addAll(newSubject.getPrivateCredentials()); + } + Set publicCredentials = existingSubject.getPublicCredentials(); + synchronized (publicCredentials) { + publicCredentials.clear(); + publicCredentials.addAll(newSubject.getPublicCredentials()); + } + nextRefreshTime = calculateNextRefreshTime(newSubject); + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(newSubject); + LOG.debug("Next ticket expired time is {}", lastTicketEndTime); + LOG.debug("Refresh kerberos ticket succeeded, last time is {}, next time is {}", + lastRefreshTime, nextRefreshTime); + } + } + return ugi; + } + + private UserGroupInformation login(Subject subject) throws IOException { + // login and get ugi when catalog is initialized + initializeAuthConfig(config.getConf()); + String principal = config.getKerberosPrincipal(); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by kerberos authentication with principal: {}", principal); + } + return UserGroupInformation.getUGIFromSubject(subject); + } + + private static long calculateNextRefreshTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); + return getRefreshTime(tgtTicket); + } + + private static Date getTicketEndTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); + return tgtTicket.getEndTime(); + } + + public static KerberosTicket getTicketGrantingTicket(Subject subject) { + Set tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + if (isOriginalTicketGrantingTicket(ticket)) { + return ticket; + } + } + throw new IllegalArgumentException("kerberos ticket not found in " + subject); + } + + public static boolean isOriginalTicketGrantingTicket(KerberosTicket ticket) { + return isTicketGrantingServerPrincipal(ticket.getServer()); + } + + private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal principal) { + if (principal == null) { + return false; + } + if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" + principal.getRealm())) { + return true; + } + return false; + } + + public static long getRefreshTime(KerberosTicket ticket) { + long start = ticket.getStartTime().getTime(); + long end = ticket.getEndTime().getTime(); + return start + (long) ((end - start) * 0.8f); + } + + private static Subject getSubject(String keytab, String principal, boolean printDebugLog) { + Subject subject = new Subject(false, ImmutableSet.of(new KerberosPrincipal(principal)), + Collections.emptySet(), Collections.emptySet()); + javax.security.auth.login.Configuration conf = getConfiguration(keytab, principal, printDebugLog); + try { + LoginContext loginContext = new LoginContext("", subject, null, conf); + loginContext.login(); + return loginContext.getSubject(); + } catch (LoginException e) { + throw new RuntimeException(e); + } + } + + private static javax.security.auth.login.Configuration getConfiguration(String keytab, String principal, + boolean printDebugLog) { + return new javax.security.auth.login.Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + ImmutableMap.Builder builder = ImmutableMap.builder() + .put("doNotPrompt", "true") + .put("isInitiator", "true") + .put("useKeyTab", "true") + .put("storeKey", "true") + .put("keyTab", keytab) + .put("principal", principal); + if (printDebugLog) { + builder.put("debug", "true"); + } + Map options = builder.build(); + return new AppConfigurationEntry[]{ + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + }; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java new file mode 100644 index 00000000000000..fbe0d0aba7d39f --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HadoopSimpleAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopSimpleAuthenticator.class); + private final UserGroupInformation ugi; + + public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) { + String hadoopUserName = config.getUsername(); + if (hadoopUserName == null) { + hadoopUserName = "hadoop"; + config.setUsername(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); + } + } + ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); + } + } + + @Override + public UserGroupInformation getUGI() { + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java index 1a86b9e327a2fb..d04d772728bc55 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -18,8 +18,6 @@ package org.apache.doris.common.security.authentication; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +25,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +@Deprecated public class HadoopUGI { private static final Logger LOG = LogManager.getLogger(HadoopUGI.class); @@ -39,82 +38,30 @@ private static UserGroupInformation loginWithUGI(AuthenticationConfig config) { if (config == null || !config.isValid()) { return null; } - UserGroupInformation ugi; if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - String principal = krbConfig.getKerberosPrincipal(); try { - // login hadoop with keytab and try checking TGT - ugi = UserGroupInformation.getLoginUser(); - LOG.debug("Current login user: {}", ugi.getUserName()); - if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { - // if the current user is logged by kerberos and is the same user - // just use checkTGTAndReloginFromKeytab because this method will only relogin - // when the TGT is expired or is close to expiry - ugi.checkTGTAndReloginFromKeytab(); - return ugi; + // TODO: remove after iceberg and hudi kerberos test case pass + try { + // login hadoop with keytab and try checking TGT + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + LOG.debug("Current login user: {}", ugi.getUserName()); + String principal = ((KerberosAuthenticationConfig) config).getKerberosPrincipal(); + if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { + // if the current user is logged by kerberos and is the same user + // just use checkTGTAndReloginFromKeytab because this method will only relogin + // when the TGT is expired or is close to expiry + ugi.checkTGTAndReloginFromKeytab(); + return ugi; + } + } catch (IOException e) { + LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); - } - try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, krbConfig.getKerberosKeytab()); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by kerberos authentication with principal: {}", principal); - return ugi; + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI(); } catch (IOException e) { throw new RuntimeException(e); } } else { - String hadoopUserName = ((SimpleAuthenticationConfig) config).getUsername(); - if (hadoopUserName == null) { - hadoopUserName = "hadoop"; - ((SimpleAuthenticationConfig) config).setUsername(hadoopUserName); - LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop"); - } - - try { - ugi = UserGroupInformation.getLoginUser(); - if (ugi.getUserName().equals(hadoopUserName)) { - return ugi; - } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with simple, do login immediately.", e); - } - - ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); - return ugi; - } - } - - /** - * use for HMSExternalCatalog to login - * @param config auth config - */ - public static void tryKrbLogin(String catalogName, AuthenticationConfig config) { - if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - try { - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - /** - * Because metastore client is created by using - * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} - * it will relogin when TGT is expired, so we don't need to relogin manually. - */ - UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(), - krbConfig.getKerberosKeytab()); - } catch (IOException e) { - throw new RuntimeException("login with kerberos auth failed for catalog: " + catalogName, e); - } + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config).getUGI(); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java new file mode 100644 index 00000000000000..10e42f4bc67ab0 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Objects; + +public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator { + + private final HadoopAuthenticator delegate; + private final String username; + private UserGroupInformation ugi; + + public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, String username) { + this.delegate = Objects.requireNonNull(delegate); + this.username = Objects.requireNonNull(username); + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + ugi = UserGroupInformation.createProxyUser(username, delegate.getUGI()); + } + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java index 722cd0352b7d7d..adf76274386f7c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java @@ -18,14 +18,17 @@ package org.apache.doris.common.security.authentication; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +@EqualsAndHashCode(callSuper = true) @Data public class KerberosAuthenticationConfig extends AuthenticationConfig { private String kerberosPrincipal; private String kerberosKeytab; private Configuration conf; + private boolean printDebugLog; @Override public boolean isValid() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 1ac77972053677..a5e0eefb3483aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -113,6 +114,10 @@ void updatePartitionStatistics( void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData); + default void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + // Ignored by default + } + /** * close the connection, eg, to hms */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 91192b63c24d6e..a22eacaf1e4fc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -24,7 +24,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -40,7 +40,9 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.transaction.TransactionManagerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; + @Getter + private HadoopAuthenticator authenticator; + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); } @@ -81,6 +86,8 @@ public HMSExternalCatalog(long catalogId, String name, String resource, Map 0, poolSize); @@ -104,6 +106,10 @@ public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) { this.isClosed = false; } + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + @Override public void close() { synchronized (clientPool) { @@ -678,7 +684,11 @@ public String getCatalogLocation(String catalogName) { } private T ugiDoAs(PrivilegedExceptionAction action) { - return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); + try { + return hadoopAuthenticator.doAs(action); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 68de3a8fdef86f..3cb8a036c2d5f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -56,7 +57,7 @@ public Status listFiles(String remotePath, boolean recursive, List r try { org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); Path locatedPath = new Path(remotePath); - RemoteIterator locatedFiles = fileSystem.listFiles(locatedPath, recursive); + RemoteIterator locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath); while (locatedFiles.hasNext()) { LocatedFileStatus fileStatus = locatedFiles.next(); RemoteFile location = new RemoteFile( @@ -72,11 +73,16 @@ public Status listFiles(String remotePath, boolean recursive, List r return Status.OK; } + protected RemoteIterator getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return fileSystem.listFiles(locatedPath, recursive); + } + @Override public Status listDirectories(String remotePath, Set result) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + FileStatus[] fileStatuses = getFileStatuses(remotePath, fileSystem); result.addAll( Arrays.stream(fileStatuses) .filter(FileStatus::isDirectory) @@ -88,6 +94,10 @@ public Status listDirectories(String remotePath, Set result) { return Status.OK; } + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return fileSystem.listStatus(new Path(remotePath)); + } + @Override public Status renameDir(String origFilePath, String destFilePath, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 944051e8741318..59fbd73bda78cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -21,7 +21,7 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.URI; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +60,8 @@ public class DFSFileSystem extends RemoteFileSystem { public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); - private HDFSFileOperations operations = null; + private HadoopAuthenticator authenticator = null; public DFSFileSystem(Map properties) { this(StorageBackend.StorageType.HDFS, properties); @@ -80,21 +82,35 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException { for (Map.Entry propEntry : properties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); } - - dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + try { + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new UserException(e); + } + operations = new HDFSFileOperations(dfsFileSystem); } } } - operations = new HDFSFileOperations(dfsFileSystem); return dfsFileSystem; } + protected RemoteIterator getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive)); + } + + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return authenticator.doAs(() -> fileSystem.listStatus(new Path(remotePath))); + } + public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { Configuration hdfsConf = new HdfsConfiguration(); if (fallbackToSimpleAuth) { @@ -266,7 +282,7 @@ public Status exists(String remotePath) { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - boolean isPathExist = fileSystem.exists(inputFilePath); + boolean isPathExist = authenticator.doAs(() -> fileSystem.exists(inputFilePath)); if (!isPathExist) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); } @@ -381,7 +397,7 @@ public Status rename(String srcPath, String destPath) { FileSystem fileSystem = nativeFileSystem(destPath); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); - boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); + boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); if (!isRenameSuccess) { return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); } @@ -402,7 +418,7 @@ public Status delete(String remotePath) { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - fileSystem.delete(inputFilePath, true); + authenticator.doAs(() -> fileSystem.delete(inputFilePath, true)); } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { @@ -428,7 +444,7 @@ public Status globList(String remotePath, List result, boolean fileN URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(pathUri.getPath()); - FileStatus[] files = fileSystem.globStatus(pathPattern); + FileStatus[] files = authenticator.doAs(() -> fileSystem.globStatus(pathPattern)); if (files == null) { LOG.info("no files in path " + remotePath); return Status.OK; @@ -455,7 +471,7 @@ public Status globList(String remotePath, List result, boolean fileN public Status makeDir(String remotePath) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - if (!fileSystem.mkdirs(new Path(remotePath))) { + if (!authenticator.doAs(() -> fileSystem.mkdirs(new Path(remotePath)))) { LOG.warn("failed to make dir for " + remotePath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy index a3b39d1221a740..7e7f276236adaa 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy @@ -1,3 +1,5 @@ +import groovyjarjarantlr4.v4.codegen.model.ExceptionClause + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,6 +17,8 @@ // specific language governing permissions and limitations // under the License. +import org.junit.Assert; + suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_d sql """ use test_krb_hive_db """ order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """ + // 3. multi thread test + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from ${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + + Thread thread2 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + sleep(5000L) + thread1.start() + thread2.start() + + thread1.join() + thread2.join() sql """drop catalog ${hms_catalog_name};""" sql """drop catalog other_${hms_catalog_name};""" + // TODO: add tvf case } }