From ec9697545b3df8635a20faa67461cecdf739a858 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 10 Feb 2025 16:14:52 +0800 Subject: [PATCH] [Fix](catalog)Fixes query failures for Paimon tables stored in Kerberized HDFS (#47192) ### What problem does this PR solve? Using JNI to directly read Paimon tables can lead to query failures when the Paimon storage is on HDFS with Kerberos authentication enabled. #### Reproduction Steps: - Create a Paimon catalog stored on an HDFS cluster with Kerberos authentication enabled. - Execute the command: SET force_jni_scanner=true;. - To ensure a clean environment, restart the BE (Backend) service. - Perform any query on a table within the catalog. ``` 2025-01-18 09:25:06 WARN Thread-13 org.apache.doris.paimon.PaimonJniScanner.open(PaimonJniScanner.java:126) - Failed to open paimon_scanner: java.io.UncheckedIOException: org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] com.google.common.util.concurrent.UncheckedExecutionException: java.io.UncheckedIOException: org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2085) at com.google.common.cache.LocalCache.get(LocalCache.java:4017) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4040) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4989) at org.apache.doris.paimon.PaimonTableCache.getTable(PaimonTableCache.java:84) at org.apache.doris.paimon.PaimonJniScanner.initTable(PaimonJniScanner.java:237) at org.apache.doris.paimon.PaimonJniScanner.open(PaimonJniScanner.java:122) Caused by: java.io.UncheckedIOException: org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] at org.apache.paimon.hive.HiveCatalog.createHiveCatalog(HiveCatalog.java:708) at org.apache.paimon.hive.HiveCatalogFactory.create(HiveCatalogFactory.java:48) at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:76) at org.apache.paimon.catalog.CatalogFactory.createCatalog(CatalogFactory.java:66) at org.apache.doris.paimon.PaimonTableCache.createCatalog(PaimonTableCache.java:75) at org.apache.doris.paimon.PaimonTableCache.loadTable(PaimonTableCache.java:58) at org.apache.doris.paimon.PaimonTableCache.access$000(PaimonTableCache.java:38) at org.apache.doris.paimon.PaimonTableCache$1.load(PaimonTableCache.java:51) at org.apache.doris.paimon.PaimonTableCache$1.load(PaimonTableCache.java:48) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3574) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2316) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2189) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2079) ... 6 more ``` #### changes This PR addresses an issue where queries fail when attempting to directly read Paimon tables using JNI, specifically in environments where HDFS is used as the storage backend and Kerberos authentication is enabled. The failure is caused by the lack of proper Kerberos authentication handling in the JNI implementation. --- .../doris/hudi/HadoopHudiJniScanner.java | 55 +++++++----- fe/be-java-extensions/paimon-scanner/pom.xml | 6 -- .../apache/doris/paimon/PaimonJniScanner.java | 30 +++++-- .../authentication/AuthenticationConfig.java | 17 +++- .../PreExecutionAuthenticatorCache.java | 87 +++++++++++++++++++ 5 files changed, 159 insertions(+), 36 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java index f2b38815a366fe..f163be11aa203f 100644 --- a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -20,6 +20,8 @@ import org.apache.doris.common.classloader.ThreadClassLoaderContext; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner { private final int fetchSize; private final ClassLoader classLoader; + private final PreExecutionAuthenticator preExecutionAuthenticator; + public HadoopHudiJniScanner(int fetchSize, Map params) { this.basePath = params.get("base_path"); this.dataFilePath = params.get("data_file_path"); @@ -120,6 +124,7 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); } } + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps); ZoneId zoneId; if (Strings.isNullOrEmpty(params.get("time_zone"))) { @@ -135,10 +140,14 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { @Override public void open() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - initRequiredColumnsAndTypes(); - initTableInfo(requiredTypes, requiredFields, fetchSize); - Properties properties = getReaderProperties(); - initReader(properties); + preExecutionAuthenticator.execute(() -> { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + return null; + }); + } catch (Exception e) { close(); LOG.warn("failed to open hadoop hudi jni scanner", e); @@ -149,25 +158,27 @@ public void open() throws IOException { @Override public int getNext() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - int numRows = 0; - for (; numRows < fetchSize; numRows++) { - if (!reader.next(key, value)) { - break; + return preExecutionAuthenticator.execute(() -> { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } } - Object rowData = deserializer.deserialize(value); - for (int i = 0; i < fields.length; i++) { - Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); - columnValue.setRow(fieldData); - // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", - // numRows, i, types[i].getName(), types[i].getType().name(), - // fieldInspectors[i].getTypeName()); - columnValue.setField(types[i], fieldInspectors[i]); - appendData(i, columnValue); - } - } - return numRows; + return numRows; + }); } catch (Exception e) { close(); LOG.warn("failed to get next in hadoop hudi jni scanner", e); diff --git a/fe/be-java-extensions/paimon-scanner/pom.xml b/fe/be-java-extensions/paimon-scanner/pom.xml index 0b5136913035a5..3b4eb7acfe39b3 100644 --- a/fe/be-java-extensions/paimon-scanner/pom.xml +++ b/fe/be-java-extensions/paimon-scanner/pom.xml @@ -39,12 +39,6 @@ under the License. org.apache.doris java-common ${project.version} - - - fe-common - org.apache.doris - - diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 6ffd5f1ad9029a..e6c04a0a2f72b8 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -20,6 +20,8 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; import org.apache.doris.paimon.PaimonTableCache.TableExt; @@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner { private long lastUpdateTime; private RecordReader.RecordIterator recordIterator = null; private final ClassLoader classLoader; + private PreExecutionAuthenticator preExecutionAuthenticator; public PaimonJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); @@ -104,6 +107,7 @@ public PaimonJniScanner(int batchSize, Map params) { .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } @Override @@ -114,12 +118,16 @@ public void open() throws IOException { // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); - initTable(); - initReader(); + preExecutionAuthenticator.execute(() -> { + initTable(); + initReader(); + return null; + }); resetDatetimeV2Precision(); + } catch (Throwable e) { LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e); - throw e; + throw new RuntimeException(e); } } @@ -137,7 +145,7 @@ private void initReader() throws IOException { readBuilder.withFilter(getPredicates()); reader = readBuilder.newRead().executeFilter().createReader(getSplit()); paimonDataTypeList = - Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); + Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); } private int[] getProjected() { @@ -183,8 +191,7 @@ public void close() throws IOException { } } - @Override - protected int getNext() throws IOException { + private int readAndProcessNextBatch() throws IOException { int rows = 0; try { if (recordIterator == null) { @@ -210,13 +217,22 @@ protected int getNext() throws IOException { } catch (Exception e) { close(); LOG.warn("Failed to get the next batch of paimon. " - + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", + + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); throw new IOException(e); } return rows; } + @Override + protected int getNext() { + try { + return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { // do nothing diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index b580f9ecbe0582..2fa8d09b0d71a8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -23,6 +23,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; + public abstract class AuthenticationConfig { private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class); public static String HADOOP_USER_NAME = "hadoop.username"; @@ -31,12 +33,24 @@ public abstract class AuthenticationConfig { public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; + private static final String DEFAULT_HADOOP_USERNAME = "hadoop"; /** * @return true if the config is valid, otherwise false. */ public abstract boolean isValid(); + protected static String generalAuthenticationConfigKey(Map conf) { + String authentication = conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + null); + if (AuthType.KERBEROS.getDesc().equals(authentication)) { + return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-" + + conf.getOrDefault(DORIS_KRB5_DEBUG, "false"); + } else { + return conf.getOrDefault(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); + } + } + /** * get kerberos config from hadoop conf * @param conf config @@ -90,7 +104,8 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf, private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) { // AuthType.SIMPLE SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig(); - simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME)); + String hadoopUserName = conf.get(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME); + simpleAuthenticationConfig.setUsername(hadoopUserName); return simpleAuthenticationConfig; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java new file mode 100644 index 00000000000000..5b0d1cb70ff989 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations. + * This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration. + * It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds + * the maximum size (MAX_CACHE_SIZE). + *

+ * The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs), + * only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing + * redundant instantiations. + */ +public class PreExecutionAuthenticatorCache { + private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class); + private static final int MAX_CACHE_SIZE = 100; + + private static final Cache preExecutionAuthenticatorCache = + CacheBuilder.newBuilder() + .maximumSize(MAX_CACHE_SIZE) + .expireAfterAccess(60 * 24, TimeUnit.MINUTES) + .build(); + + /** + * Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist. + * This method first checks if the configuration is already cached. If not, it computes a new instance and + * caches it for future use. + * + * @param hadoopConfig The Hadoop configuration (key-value pairs) + * @return A PreExecutionAuthenticator instance for the given configuration + */ + public static PreExecutionAuthenticator getAuthenticator(Map hadoopConfig) { + String authenticatorCacheKey = AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig); + PreExecutionAuthenticator authenticator; + try { + authenticator = preExecutionAuthenticatorCache.get(authenticatorCacheKey, + () -> createAuthenticator(hadoopConfig, authenticatorCacheKey)); + } catch (ExecutionException exception) { + throw new RuntimeException("Failed to create PreExecutionAuthenticator for key: " + authenticatorCacheKey, + exception); + } + return authenticator; + } + + private static PreExecutionAuthenticator createAuthenticator(Map hadoopConfig, + String authenticatorCacheKey) { + Configuration conf = new Configuration(); + hadoopConfig.forEach(conf::set); + PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator(); + AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig( + conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB); + HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator + .getHadoopAuthenticator(authenticationConfig); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + LOG.info("Creating new PreExecutionAuthenticator for configuration, Cache key: {}", + authenticatorCacheKey); + return preExecutionAuthenticator; + } + +}