From 95dce769df57e7dc97c6fcc316cef324a1b166bf Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sat, 18 Jan 2025 09:33:50 +0800 Subject: [PATCH 1/5] [Fix](catalog)Fixes query failures for Paimon tables stored in Kerberized HDFS. --- fe/be-java-extensions/paimon-scanner/pom.xml | 6 --- .../apache/doris/paimon/PaimonJniScanner.java | 43 ++++++++++++++++--- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/fe/be-java-extensions/paimon-scanner/pom.xml b/fe/be-java-extensions/paimon-scanner/pom.xml index 603ee65eaadea5..70993a0dd418a9 100644 --- a/fe/be-java-extensions/paimon-scanner/pom.xml +++ b/fe/be-java-extensions/paimon-scanner/pom.xml @@ -39,12 +39,6 @@ under the License. org.apache.doris java-common ${project.version} - - - fe-common - org.apache.doris - - diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 6ffd5f1ad9029a..c33e35cc6c908a 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -20,9 +20,13 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; +import org.apache.hadoop.conf.Configuration; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; @@ -74,6 +78,7 @@ public class PaimonJniScanner extends JniScanner { private long lastUpdateTime; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; + private PreExecutionAuthenticator preExecutionAuthenticator; public PaimonJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); @@ -104,6 +109,14 @@ public PaimonJniScanner(int batchSize, Map params) { .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + Configuration conf = new Configuration(); + hadoopOptionParams.forEach(conf::set); + preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); } @Override @@ -114,12 +127,20 @@ public void open() throws IOException { // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); - initTable(); - initReader(); + preExecutionAuthenticator.execute(() -> { + initTable(); + initReader(); + return null; + }); resetDatetimeV2Precision(); + } catch (Throwable e) { LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e); - throw e; + try { + throw e; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } } @@ -137,7 +158,7 @@ private void initReader() throws IOException { readBuilder.withFilter(getPredicates()); reader = readBuilder.newRead().executeFilter().createReader(getSplit()); paimonDataTypeList = - Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); + Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); } private int[] getProjected() { @@ -183,8 +204,7 @@ public void close() throws IOException { } } - @Override - protected int getNext() throws IOException { + private int readAndProcessNextBatch() throws IOException { int rows = 0; try { if (recordIterator == null) { @@ -210,13 +230,22 @@ protected int getNext() throws IOException { } catch (Exception e) { close(); LOG.warn("Failed to get the next batch of paimon. " - + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", + + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); throw new IOException(e); } return rows; } + @Override + protected int getNext() { + try { + return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { // do nothing From 0799fb388c1c83efc94e13439c76c6137a8437f7 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 24 Jan 2025 10:12:53 +0800 Subject: [PATCH 2/5] add cache --- .../doris/hudi/HadoopHudiJniScanner.java | 55 ++++--- .../apache/doris/paimon/PaimonJniScanner.java | 13 +- .../PreExecutionAuthenticatorCache.java | 135 ++++++++++++++++++ 3 files changed, 170 insertions(+), 33 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java index f2b38815a366fe..f163be11aa203f 100644 --- a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -20,6 +20,8 @@ import org.apache.doris.common.classloader.ThreadClassLoaderContext; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner { private final int fetchSize; private final ClassLoader classLoader; + private final PreExecutionAuthenticator preExecutionAuthenticator; + public HadoopHudiJniScanner(int fetchSize, Map params) { this.basePath = params.get("base_path"); this.dataFilePath = params.get("data_file_path"); @@ -120,6 +124,7 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); } } + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps); ZoneId zoneId; if (Strings.isNullOrEmpty(params.get("time_zone"))) { @@ -135,10 +140,14 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { @Override public void open() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - initRequiredColumnsAndTypes(); - initTableInfo(requiredTypes, requiredFields, fetchSize); - Properties properties = getReaderProperties(); - initReader(properties); + preExecutionAuthenticator.execute(() -> { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + return null; + }); + } catch (Exception e) { close(); LOG.warn("failed to open hadoop hudi jni scanner", e); @@ -149,25 +158,27 @@ public void open() throws IOException { @Override public int getNext() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - int numRows = 0; - for (; numRows < fetchSize; numRows++) { - if (!reader.next(key, value)) { - break; + return preExecutionAuthenticator.execute(() -> { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } } - Object rowData = deserializer.deserialize(value); - for (int i = 0; i < fields.length; i++) { - Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); - columnValue.setRow(fieldData); - // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", - // numRows, i, types[i].getName(), types[i].getType().name(), - // fieldInspectors[i].getTypeName()); - columnValue.setField(types[i], fieldInspectors[i]); - appendData(i, columnValue); - } - } - return numRows; + return numRows; + }); } catch (Exception e) { close(); LOG.warn("failed to get next in hadoop hudi jni scanner", e); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index c33e35cc6c908a..2c8d669452c4e8 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -20,13 +20,11 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.TableSchema; -import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; -import org.apache.hadoop.conf.Configuration; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; @@ -109,14 +107,7 @@ public PaimonJniScanner(int batchSize, Map params) { .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); - Configuration conf = new Configuration(); - hadoopOptionParams.forEach(conf::set); - preExecutionAuthenticator = new PreExecutionAuthenticator(); - AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, - AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); - HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); - preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java new file mode 100644 index 00000000000000..b33c4766ba7555 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations. + * This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration. + * It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds + * the maximum size (MAX_CACHE_SIZE). + *

+ * The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs), + * only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing + * redundant instantiations. + */ +public class PreExecutionAuthenticatorCache { + private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); + private static final int MAX_CACHE_SIZE = 100; + private static final Map preExecutionAuthenticatorCache = + new LinkedHashMap(MAX_CACHE_SIZE, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_CACHE_SIZE; + } + }; + + /** + * Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist. + * This method first checks if the configuration is already cached. If not, it computes a new instance and + * caches it for future use. + * + * @param hadoopConfig The Hadoop configuration (key-value pairs) + * @return A PreExecutionAuthenticator instance for the given configuration + */ + public static PreExecutionAuthenticator getAuthenticator(Map hadoopConfig) { + + HadoopConfigWrapper hadoopConfigWrapper = new HadoopConfigWrapper(hadoopConfig); + PreExecutionAuthenticator cachedAuthenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper); + if (cachedAuthenticator != null) { + return cachedAuthenticator; + } + return preExecutionAuthenticatorCache.computeIfAbsent(hadoopConfigWrapper, config -> { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(authenticator); + LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper); + return preExecutionAuthenticator; + }); + } + + + /** + * Hadoop configuration wrapper class that wraps a Map configuration. + * This class overrides the equals() and hashCode() methods to enable comparison of + * the configurations in the cache, ensuring that identical configurations (with the same key-value pairs) + * are considered equal and can reuse the same cached PreExecutionAuthenticator instance. + *

+ * The purpose of this class is to ensure that in the cache, if two configurations are identical + * (i.e., they have the same key-value pairs), only one instance of PreExecutionAuthenticator is created and cached. + * By implementing custom equals() and hashCode() methods, we ensure that even if different Map instances + * hold the same configuration data, they are considered equal in the cache. + */ + private static class HadoopConfigWrapper { + private final Map config; + + /** + * Constructor that takes a Map configuration. + * + * @param config The Hadoop configuration, typically a Map containing configuration key-value + * pairs + */ + public HadoopConfigWrapper(Map config) { + this.config = new HashMap<>(config); + } + + /** + * Checks if two HadoopConfigWrapper objects are equal. + * Two objects are considered equal if their wrapped Map configurations are identical + * (i.e., the key-value pairs are the same). + * + * @param obj The object to compare with the current object + * @return true if the two HadoopConfigWrapper objects have the same wrapped configuration; false otherwise + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + HadoopConfigWrapper that = (HadoopConfigWrapper) obj; + return config.equals(that.config); + } + + /** + * Generates a hash code based on the Hadoop configuration. + * Objects with the same configuration will generate the same hash code, ensuring + * that they can be correctly matched in a Map. + * + * @return The hash code of the Hadoop configuration + */ + @Override + public int hashCode() { + return config.hashCode(); + } + } +} From b27c7f43b4be0c7a9dc3da78c0b7c2dbef46a68f Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sun, 26 Jan 2025 22:46:16 +0800 Subject: [PATCH 3/5] fix --- .../PreExecutionAuthenticatorCache.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java index b33c4766ba7555..f07b09ab0b3f8a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -17,13 +17,16 @@ package org.apache.doris.common.security.authentication; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations. @@ -38,13 +41,12 @@ public class PreExecutionAuthenticatorCache { private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); private static final int MAX_CACHE_SIZE = 100; - private static final Map preExecutionAuthenticatorCache = - new LinkedHashMap(MAX_CACHE_SIZE, 0.75f, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > MAX_CACHE_SIZE; - } - }; + + private static final Cache preExecutionAuthenticatorCache = + CacheBuilder.newBuilder() + .maximumSize(MAX_CACHE_SIZE) + .expireAfterAccess(60 * 24, TimeUnit.MINUTES) + .build(); /** * Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist. @@ -57,22 +59,26 @@ protected boolean removeEldestEntry(Map.Entry hadoopConfig) { HadoopConfigWrapper hadoopConfigWrapper = new HadoopConfigWrapper(hadoopConfig); - PreExecutionAuthenticator cachedAuthenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper); - if (cachedAuthenticator != null) { - return cachedAuthenticator; + + PreExecutionAuthenticator authenticator = null; + try { + authenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper, () -> { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig( + conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator + .getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper); + return preExecutionAuthenticator; + }); + } catch (ExecutionException exception) { + throw new RuntimeException(exception.getCause().getMessage(), exception); } - return preExecutionAuthenticatorCache.computeIfAbsent(hadoopConfigWrapper, config -> { - Configuration conf = new Configuration(); - hadoopConfig.forEach(conf::set); - PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); - AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf, - AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); - HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig); - preExecutionAuthenticator.setHadoopAuthenticator(authenticator); - LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper); - return preExecutionAuthenticator; - }); + return authenticator; } From f1946daa03c3da14639a076ab7e99071db131b37 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 5 Feb 2025 17:14:41 +0800 Subject: [PATCH 4/5] fix --- .../authentication/AuthenticationConfig.java | 30 ++++++ .../PreExecutionAuthenticatorCache.java | 98 +++++-------------- 2 files changed, 52 insertions(+), 76 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index b580f9ecbe0582..8bde71b17935f8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -23,6 +23,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + public abstract class AuthenticationConfig { private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class); public static String HADOOP_USER_NAME = "hadoop.username"; @@ -31,12 +33,24 @@ public abstract class AuthenticationConfig { public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; + private static final String DEFAULT_HADOOP_USERNAME = "hadoop"; /** * @return true if the config is valid, otherwise false. */ public abstract boolean isValid(); + protected static String generalAuthenticationConfigKey(Map conf) { + String authentication = conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + null); + if (AuthType.KERBEROS.getDesc().equals(authentication)) { + return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-" + + conf.getOrDefault(DORIS_KRB5_DEBUG, "false"); + } + // kerberos config + return getHadoopUserName(conf); + } + /** * get kerberos config from hadoop conf * @param conf config @@ -91,6 +105,22 @@ private static AuthenticationConfig createSimpleAuthenticationConfig(Configurati // AuthType.SIMPLE SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig(); simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME)); + String hadoopUserName = conf.get(HADOOP_USER_NAME); + if (hadoopUserName == null) { + hadoopUserName = DEFAULT_HADOOP_USERNAME; + simpleAuthenticationConfig.setUsername(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); + } + } return simpleAuthenticationConfig; } + + private static String getHadoopUserName(Map conf) { + String hadoopUserName = conf.get(HADOOP_USER_NAME); + if (hadoopUserName == null) { + hadoopUserName = DEFAULT_HADOOP_USERNAME; + } + return hadoopUserName; + } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java index f07b09ab0b3f8a..5b0d1cb70ff989 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -42,7 +41,7 @@ public class PreExecutionAuthenticatorCache { private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); private static final int MAX_CACHE_SIZE = 100; - private static final Cache preExecutionAuthenticatorCache = + private static final Cache preExecutionAuthenticatorCache = CacheBuilder.newBuilder() .maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(60 * 24, TimeUnit.MINUTES) @@ -57,85 +56,32 @@ public class PreExecutionAuthenticatorCache { * @return A PreExecutionAuthenticator instance for the given configuration */ public static PreExecutionAuthenticator getAuthenticator(Map hadoopConfig) { - - HadoopConfigWrapper hadoopConfigWrapper = new HadoopConfigWrapper(hadoopConfig); - - PreExecutionAuthenticator authenticator = null; + String authenticatorCacheKey = AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig); + PreExecutionAuthenticator authenticator; try { - authenticator = preExecutionAuthenticatorCache.get(hadoopConfigWrapper, () -> { - Configuration conf = new Configuration(); - hadoopConfig.forEach(conf::set); - PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); - AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig( - conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); - HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator - .getHadoopAuthenticator(authenticationConfig); - preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); - LOG.info("Created new authenticator for configuration: " + hadoopConfigWrapper); - return preExecutionAuthenticator; - }); + authenticator = preExecutionAuthenticatorCache.get(authenticatorCacheKey, + () -> createAuthenticator(hadoopConfig, authenticatorCacheKey)); } catch (ExecutionException exception) { - throw new RuntimeException(exception.getCause().getMessage(), exception); + throw new RuntimeException("Failed to create PreExecutionAuthenticator for key: " + authenticatorCacheKey, + exception); } return authenticator; } - - /** - * Hadoop configuration wrapper class that wraps a Map configuration. - * This class overrides the equals() and hashCode() methods to enable comparison of - * the configurations in the cache, ensuring that identical configurations (with the same key-value pairs) - * are considered equal and can reuse the same cached PreExecutionAuthenticator instance. - *

- * The purpose of this class is to ensure that in the cache, if two configurations are identical - * (i.e., they have the same key-value pairs), only one instance of PreExecutionAuthenticator is created and cached. - * By implementing custom equals() and hashCode() methods, we ensure that even if different Map instances - * hold the same configuration data, they are considered equal in the cache. - */ - private static class HadoopConfigWrapper { - private final Map config; - - /** - * Constructor that takes a Map configuration. - * - * @param config The Hadoop configuration, typically a Map containing configuration key-value - * pairs - */ - public HadoopConfigWrapper(Map config) { - this.config = new HashMap<>(config); - } - - /** - * Checks if two HadoopConfigWrapper objects are equal. - * Two objects are considered equal if their wrapped Map configurations are identical - * (i.e., the key-value pairs are the same). - * - * @param obj The object to compare with the current object - * @return true if the two HadoopConfigWrapper objects have the same wrapped configuration; false otherwise - */ - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - HadoopConfigWrapper that = (HadoopConfigWrapper) obj; - return config.equals(that.config); - } - - /** - * Generates a hash code based on the Hadoop configuration. - * Objects with the same configuration will generate the same hash code, ensuring - * that they can be correctly matched in a Map. - * - * @return The hash code of the Hadoop configuration - */ - @Override - public int hashCode() { - return config.hashCode(); - } + private static PreExecutionAuthenticator createAuthenticator(Map hadoopConfig, + String authenticatorCacheKey) { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig( + conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator + .getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + LOG.info("Creating new PreExecutionAuthenticator for configuration, Cache key: {}", + authenticatorCacheKey); + return preExecutionAuthenticator; } + } From 67228fac10fd952b14f0a014ab9027bcd91db0b7 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 10 Feb 2025 12:01:33 +0800 Subject: [PATCH 5/5] simplify logic of getting hadoop user name --- .../apache/doris/paimon/PaimonJniScanner.java | 6 +---- .../authentication/AuthenticationConfig.java | 23 ++++--------------- 2 files changed, 5 insertions(+), 24 deletions(-) diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 2c8d669452c4e8..e6c04a0a2f72b8 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -127,11 +127,7 @@ public void open() throws IOException { } catch (Throwable e) { LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e); - try { - throw e; - } catch (Exception ex) { - throw new RuntimeException(ex); - } + throw new RuntimeException(e); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index 8bde71b17935f8..2fa8d09b0d71a8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -46,9 +46,9 @@ protected static String generalAuthenticationConfigKey(Map conf) if (AuthType.KERBEROS.getDesc().equals(authentication)) { return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-" + conf.getOrDefault(DORIS_KRB5_DEBUG, "false"); + } else { + return conf.getOrDefault(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); } - // kerberos config - return getHadoopUserName(conf); } /** @@ -104,23 +104,8 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf, private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) { // AuthType.SIMPLE SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig(); - simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME)); - String hadoopUserName = conf.get(HADOOP_USER_NAME); - if (hadoopUserName == null) { - hadoopUserName = DEFAULT_HADOOP_USERNAME; - simpleAuthenticationConfig.setUsername(hadoopUserName); - if (LOG.isDebugEnabled()) { - LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); - } - } + String hadoopUserName = conf.get(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); + simpleAuthenticationConfig.setUsername(hadoopUserName); return simpleAuthenticationConfig; } - - private static String getHadoopUserName(Map conf) { - String hadoopUserName = conf.get(HADOOP_USER_NAME); - if (hadoopUserName == null) { - hadoopUserName = DEFAULT_HADOOP_USERNAME; - } - return hadoopUserName; - } }