Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.common.classloader.ThreadClassLoaderContext;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner {
private final int fetchSize;
private final ClassLoader classLoader;

private final PreExecutionAuthenticator preExecutionAuthenticator;

public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
this.basePath = params.get("base_path");
this.dataFilePath = params.get("data_file_path");
Expand Down Expand Up @@ -120,6 +124,7 @@ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue());
}
}
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps);

ZoneId zoneId;
if (Strings.isNullOrEmpty(params.get("time_zone"))) {
Expand All @@ -135,10 +140,14 @@ public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
@Override
public void open() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
preExecutionAuthenticator.execute(() -> {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
return null;
});

} catch (Exception e) {
close();
LOG.warn("failed to open hadoop hudi jni scanner", e);
Expand All @@ -149,25 +158,27 @@ public void open() throws IOException {
@Override
public int getNext() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
return preExecutionAuthenticator.execute(() -> {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
return numRows;
return numRows;
});
} catch (Exception e) {
close();
LOG.warn("failed to get next in hadoop hudi jni scanner", e);
Expand Down
6 changes: 0 additions & 6 deletions fe/be-java-extensions/paimon-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ under the License.
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>fe-common</artifactId>
<groupId>org.apache.doris</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;

Expand Down Expand Up @@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner {
private long lastUpdateTime;
private RecordReader.RecordIterator<InternalRow> recordIterator = null;
private final ClassLoader classLoader;
private PreExecutionAuthenticator preExecutionAuthenticator;

public PaimonJniScanner(int batchSize, Map<String, String> params) {
this.classLoader = this.getClass().getClassLoader();
Expand Down Expand Up @@ -104,6 +107,7 @@ public PaimonJniScanner(int batchSize, Map<String, String> params) {
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
}

@Override
Expand All @@ -114,12 +118,16 @@ public void open() throws IOException {
// `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
// so we need to provide a classloader, otherwise it will cause NPE.
Thread.currentThread().setContextClassLoader(classLoader);
initTable();
initReader();
preExecutionAuthenticator.execute(() -> {
initTable();
initReader();
return null;
});
resetDatetimeV2Precision();

} catch (Throwable e) {
LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e);
throw e;
throw new RuntimeException(e);
}
}

Expand All @@ -137,7 +145,7 @@ private void initReader() throws IOException {
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}

private int[] getProjected() {
Expand Down Expand Up @@ -183,8 +191,7 @@ public void close() throws IOException {
}
}

@Override
protected int getNext() throws IOException {
private int readAndProcessNextBatch() throws IOException {
int rows = 0;
try {
if (recordIterator == null) {
Expand All @@ -210,13 +217,22 @@ protected int getNext() throws IOException {
} catch (Exception e) {
close();
LOG.warn("Failed to get the next batch of paimon. "
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}",
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}",
getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e);
throw new IOException(e);
}
return rows;
}

@Override
protected int getNext() {
try {
return preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

public abstract class AuthenticationConfig {
private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class);
public static String HADOOP_USER_NAME = "hadoop.username";
Expand All @@ -31,12 +33,24 @@ public abstract class AuthenticationConfig {
public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file";
public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
private static final String DEFAULT_HADOOP_USERNAME = "hadoop";

/**
* @return true if the config is valid, otherwise false.
*/
public abstract boolean isValid();

protected static String generalAuthenticationConfigKey(Map<String, String> conf) {
String authentication = conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
null);
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-"
+ conf.getOrDefault(DORIS_KRB5_DEBUG, "false");
} else {
return conf.getOrDefault(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME);
}
}

/**
* get kerberos config from hadoop conf
* @param conf config
Expand Down Expand Up @@ -90,7 +104,8 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf,
private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) {
// AuthType.SIMPLE
SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig();
simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
String hadoopUserName = conf.get(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME);
simpleAuthenticationConfig.setUsername(hadoopUserName);
return simpleAuthenticationConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations.
* This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration.
* It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds
* the maximum size (MAX_CACHE_SIZE).
* <p>
* The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs),
* only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing
* redundant instantiations.
*/
public class PreExecutionAuthenticatorCache {
private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class);
private static final int MAX_CACHE_SIZE = 100;

private static final Cache<String, PreExecutionAuthenticator> preExecutionAuthenticatorCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(60 * 24, TimeUnit.MINUTES)
.build();

/**
* Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist.
* This method first checks if the configuration is already cached. If not, it computes a new instance and
* caches it for future use.
*
* @param hadoopConfig The Hadoop configuration (key-value pairs)
* @return A PreExecutionAuthenticator instance for the given configuration
*/
public static PreExecutionAuthenticator getAuthenticator(Map<String, String> hadoopConfig) {
String authenticatorCacheKey = AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig);
PreExecutionAuthenticator authenticator;
try {
authenticator = preExecutionAuthenticatorCache.get(authenticatorCacheKey,
() -> createAuthenticator(hadoopConfig, authenticatorCacheKey));
} catch (ExecutionException exception) {
throw new RuntimeException("Failed to create PreExecutionAuthenticator for key: " + authenticatorCacheKey,
exception);
}
return authenticator;
}

private static PreExecutionAuthenticator createAuthenticator(Map<String, String> hadoopConfig,
String authenticatorCacheKey) {
Configuration conf = new Configuration();
hadoopConfig.forEach(conf::set);
PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator();
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(
conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
.getHadoopAuthenticator(authenticationConfig);
preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
LOG.info("Creating new PreExecutionAuthenticator for configuration, Cache key: {}",
authenticatorCacheKey);
return preExecutionAuthenticator;
}

}
Loading