Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ DEFINE_Bool(enable_feature_binlog, "false");
DEFINE_Bool(enable_set_in_bitmap_value, "false");

DEFINE_Int64(max_hdfs_file_handle_cache_num, "20000");
DEFINE_Int32(max_hdfs_file_handle_cache_time_sec, "3600");
DEFINE_Int64(max_external_file_meta_cache_num, "20000");
// Apply delete pred in cumu compaction
DEFINE_mBool(enable_delete_when_cumu_compaction, "false");
Expand All @@ -1070,7 +1071,8 @@ DEFINE_mBool(enable_delete_when_cumu_compaction, "false");
DEFINE_Int32(rocksdb_max_write_buffer_number, "5");

DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
DEFINE_mInt64(kerberos_expiration_time_seconds, "43200");
DEFINE_mString(kerberos_ccache_path, "");
DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
Expand Down
10 changes: 6 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ DECLARE_Bool(enable_set_in_bitmap_value);

// max number of hdfs file handle in cache
DECLARE_Int64(max_hdfs_file_handle_cache_num);
DECLARE_Int32(max_hdfs_file_handle_cache_time_sec);

// max number of meta info of external files, such as parquet footer
DECLARE_Int64(max_external_file_meta_cache_num);
// Apply delete pred in cumu compaction
Expand All @@ -1111,10 +1113,10 @@ DECLARE_Int32(rocksdb_max_write_buffer_number);

// Allow invalid decimalv2 literal for compatible with old version. Recommend set it false strongly.
DECLARE_mBool(allow_invalid_decimalv2_literal);
// the max expiration time of kerberos ticket.
// If a hdfs filesytem with kerberos authentication live longer
// than this time, it will be expired.
DECLARE_mInt64(kerberos_expiration_time_seconds);
// Allow to specify kerberos credentials cache path.
DECLARE_mString(kerberos_ccache_path);
// set krb5.conf path, use "/etc/krb5.conf" by default
DECLARE_mString(kerberos_krb5_conf_path);

// Values include `none`, `glog`, `boost`, `glibc`, `libunwind`
DECLARE_mString(get_stack_trace_tool);
Expand Down
17 changes: 8 additions & 9 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ class HdfsFileSystemCache {
HdfsFileSystemCache() = default;

uint64 _hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& fs_name);
Status _create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, hdfsFS* fs,
bool* is_kerberos);
Status _create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, hdfsFS* fs);
void _clean_invalid();
void _clean_oldest();
};
Expand All @@ -105,7 +104,9 @@ class HdfsFileHandleCache {

private:
FileHandleCache _cache;
HdfsFileHandleCache() : _cache(config::max_hdfs_file_handle_cache_num, 16, 3600 * 1000L) {}
HdfsFileHandleCache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use '= default' to define a trivial default constructor [modernize-use-equals-default]

be/src/io/fs/hdfs_file_system.cpp:108:

-                      config::max_hdfs_file_handle_cache_time_sec * 1000L) {};
+                      config::max_hdfs_file_handle_cache_time_sec * 1000L) = default;;

: _cache(config::max_hdfs_file_handle_cache_num, 16,
config::max_hdfs_file_handle_cache_time_sec * 1000L) {};
};

Status HdfsFileHandleCache::get_file(const std::shared_ptr<HdfsFileSystem>& fs, const Path& file,
Expand Down Expand Up @@ -390,10 +391,9 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;

Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, const std::string& fs_name,
hdfsFS* fs, bool* is_kerberos) {
hdfsFS* fs) {
HDFSCommonBuilder builder;
RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder));
*is_kerberos = builder.is_need_kinit();
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::IOError("faield to connect to hdfs {}: {}", fs_name, hdfs_error());
Expand Down Expand Up @@ -448,20 +448,19 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
// not find in cache, or fs handle is invalid
// create a new one and try to put it into cache
hdfsFS hdfs_fs = nullptr;
bool is_kerberos = false;
RETURN_IF_ERROR(_create_fs(hdfs_params, fs_name, &hdfs_fs, &is_kerberos));
RETURN_IF_ERROR(_create_fs(hdfs_params, fs_name, &hdfs_fs));
if (_cache.size() >= MAX_CACHE_HANDLE) {
_clean_invalid();
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
std::unique_ptr<HdfsFileSystemHandle> handle =
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true, is_kerberos);
std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
handle->inc_ref();
*fs_handle = handle.get();
_cache[hash_code] = std::move(handle);
} else {
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos);
*fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
}
}
return Status::OK();
Expand Down
10 changes: 2 additions & 8 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ struct FileInfo;

class HdfsFileSystemHandle {
public:
HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos)
HdfsFileSystemHandle(hdfsFS fs, bool cached)
: hdfs_fs(fs),
from_cache(cached),
_is_kerberos(is_kerberos),
_ref_cnt(0),
_create_time(_now()),
_last_access_time(0),
Expand Down Expand Up @@ -75,11 +74,7 @@ class HdfsFileSystemHandle {

int ref_cnt() { return _ref_cnt; }

bool invalid() {
return _invalid ||
(_is_kerberos &&
_now() - _create_time.load() > config::kerberos_expiration_time_seconds * 1000 / 2);
}
bool invalid() { return _invalid; }

void set_invalid() { _invalid = true; }

Expand All @@ -89,7 +84,6 @@ class HdfsFileSystemHandle {
const bool from_cache;

private:
const bool _is_kerberos;
// the number of referenced client
std::atomic<int> _ref_cnt;
// For kerberos authentication, we need to save create time so that
Expand Down
64 changes: 23 additions & 41 deletions be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,19 @@ Status HDFSCommonBuilder::init_hdfs_builder() {
return Status::OK();
}

Status HDFSCommonBuilder::run_kinit() {
Status HDFSCommonBuilder::check_krb_params() {
std::string ticket_path = doris::config::kerberos_ccache_path;
if (!ticket_path.empty()) {
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
return Status::OK();
}
// we should check hdfs_kerberos_principal and hdfs_kerberos_keytab nonnull to login kdc.
if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
}
std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string();
const char* krb_home = getenv("KRB_HOME");
std::string krb_home_str(krb_home ? krb_home : "");
fmt::memory_buffer kinit_command;
if (krb_home_str.empty()) {
fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path,
hdfs_kerberos_keytab, hdfs_kerberos_principal);
} else {
// Assign kerberos home in env, get kinit in kerberos home
fmt::format_to(kinit_command, krb_home_str + "/bin/kinit -c {} -R -t {} -k {}", ticket_path,
hdfs_kerberos_keytab, hdfs_kerberos_principal);
}
VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command);
std::string msg;
AgentUtils util;
bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg);
if (!rc) {
return Status::InternalError("Kinit failed, errMsg: " + msg);
}
#ifdef USE_LIBHDFS3
hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
#endif
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
LOG(INFO) << "finished to run kinit command: " << fmt::to_string(kinit_command);
// enable auto-renew thread
hdfsBuilderConfSetStr(hdfs_builder, "hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
return Status::OK();
}

Expand Down Expand Up @@ -114,22 +98,23 @@ Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_
RETURN_IF_ERROR(builder->init_hdfs_builder());
hdfsBuilderSetNameNode(builder->get(), fs_name.c_str());
// set kerberos conf
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->kerberos_login = true;
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(builder->get(), doris::config::kerberos_krb5_conf_path.c_str());
hdfsBuilderSetKeyTabFile(builder->get(), hdfsParams.hdfs_kerberos_keytab.c_str());
#endif
}
if (hdfsParams.__isset.hdfs_kerberos_principal) {
builder->need_kinit = true;
builder->kerberos_login = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
hdfsBuilderSetUserName(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
} else if (hdfsParams.__isset.user) {
hdfsBuilderSetUserName(builder->get(), hdfsParams.user.c_str());
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(builder->get(), nullptr);
hdfsBuilderSetKeyTabFile(builder->get(), nullptr);
#endif
}
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->need_kinit = true;
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKeyTabFile(builder->get(), hdfsParams.hdfs_kerberos_keytab.c_str());
#endif
}
// set other conf
Expand All @@ -145,13 +130,10 @@ Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_
#endif
}
}

hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true");

if (builder->is_need_kinit()) {
RETURN_IF_ERROR(builder->run_kinit());
if (builder->is_kerberos()) {
RETURN_IF_ERROR(builder->check_krb_params());
}

hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true");
return Status::OK();
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/io/hdfs_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class HDFSCommonBuilder {
Status init_hdfs_builder();

hdfsBuilder* get() { return hdfs_builder; }
bool is_need_kinit() const { return need_kinit; }
Status run_kinit();
bool is_kerberos() const { return kerberos_login; }
Status check_krb_params();

private:
hdfsBuilder* hdfs_builder = nullptr;
bool need_kinit {false};
bool kerberos_login {false};
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
};
Expand Down
28 changes: 14 additions & 14 deletions bin/start_be.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,6 @@ if [[ "${MAX_FILE_COUNT}" -lt 60000 ]]; then
exit 1
fi

# add java libs
preload_jars=("preload-extensions")
preload_jars+=("java-udf")

for preload_jar_dir in "${preload_jars[@]}"; do
for f in "${DORIS_HOME}/lib/java_extensions/${preload_jar_dir}"/*.jar; do
if [[ -z "${DORIS_CLASSPATH}" ]]; then
export DORIS_CLASSPATH="${f}"
else
export DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
fi
done
done

if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
# add hadoop libs
for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
Expand All @@ -118,6 +104,20 @@ if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
done
fi

# add java libs
preload_jars=("preload-extensions")
preload_jars+=("java-udf")

for preload_jar_dir in "${preload_jars[@]}"; do
for f in "${DORIS_HOME}/lib/java_extensions/${preload_jar_dir}"/*.jar; do
if [[ -z "${DORIS_CLASSPATH}" ]]; then
export DORIS_CLASSPATH="${f}"
else
export DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
fi
done
done

# add custom_libs to CLASSPATH
if [[ -d "${DORIS_HOME}/custom_lib" ]]; then
for f in "${DORIS_HOME}/custom_lib"/*.jar; do
Expand Down