Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1143,8 +1143,8 @@ DEFINE_Bool(enable_feature_binlog, "false");
// enable set in BitmapValue
DEFINE_Bool(enable_set_in_bitmap_value, "true");

DEFINE_Int64(max_hdfs_file_handle_cache_num, "1000");
DEFINE_Int32(max_hdfs_file_handle_cache_time_sec, "3600");
DEFINE_Int64(max_hdfs_file_handle_cache_num, "20000");
DEFINE_Int32(max_hdfs_file_handle_cache_time_sec, "28800");
DEFINE_Int64(max_external_file_meta_cache_num, "1000");
DEFINE_mInt32(common_obj_lru_cache_stale_sweep_time_sec, "900");
// Apply delete pred in cumu compaction
Expand All @@ -1157,7 +1157,8 @@ DEFINE_mBool(allow_zero_date, "false");
DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
DEFINE_mString(kerberos_ccache_path, "/tmp/");
DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");
DEFINE_mInt32(kerberos_refresh_interval_second, "3600");
// Deprecated
DEFINE_mInt32(kerberos_refresh_interval_second, "43200");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/kerberos/kerberos_ticket_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <thread>

#include "common/config.h"
#include "util/time.h"

namespace doris::kerberos {

Expand Down Expand Up @@ -119,6 +120,8 @@ Status KerberosTicketCache::login() {
}
return status;
}
_ticket_lifetime_sec = static_cast<int64_t>(creds.times.endtime) -
static_cast<int64_t>(creds.times.starttime);

// init ccache file
status = _krb5_interface->cc_initialize(_context, temp_ccache, _principal);
Expand Down Expand Up @@ -214,7 +217,8 @@ void KerberosTicketCache::start_periodic_refresh() {
// ignore and continue
LOG(WARNING) << st.to_string();
} else {
LOG(INFO) << "refresh kerberos ticket cache: " << _ticket_cache_path;
LOG(INFO) << "refresh kerberos ticket cache: " << _ticket_cache_path
<< ", lifetime sec: " << _ticket_lifetime_sec;
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/kerberos/kerberos_ticket_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class KerberosTicketCache : public std::enable_shared_from_this<KerberosTicketCa
// Get detailed information about all credentials in the current ticket cache
virtual std::vector<KerberosTicketInfo> get_ticket_info();

int64_t get_ticket_lifetime_sec() const { return _ticket_lifetime_sec; }

private:
// Initialize the ticket cache file path using principal and keytab information
Status _init_ticket_cache_path();
Expand All @@ -125,6 +127,8 @@ class KerberosTicketCache : public std::enable_shared_from_this<KerberosTicketCa
krb5_ccache _ccache {nullptr};
// Principal handle
krb5_principal _principal {nullptr};
// Ticket lifetime in second
int64_t _ticket_lifetime_sec;

// Thread for periodic ticket refresh
std::unique_ptr<std::thread> _refresh_thread;
Expand All @@ -133,7 +137,7 @@ class KerberosTicketCache : public std::enable_shared_from_this<KerberosTicketCa
// Flag to control refresh thread execution
std::atomic<bool> _should_stop_refresh {false};
// Sleep time between refresh checks (in milliseconds)
std::chrono::milliseconds _refresh_thread_sleep_time {10};
std::chrono::milliseconds _refresh_thread_sleep_time {5000};

// Interface for KRB5 operations
std::unique_ptr<Krb5Interface> _krb5_interface;
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/fs/file_handle_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
for (FileHandleCachePartition& p : _cache_partitions) {
p.cache.set_capacity(partition_capacity);
}
Status st = init();
if (!st) {
LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
}
}

FileHandleCache::~FileHandleCache() {
Expand Down
17 changes: 12 additions & 5 deletions be/src/io/fs/hdfs/hdfs_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,20 @@ void HdfsMgr::_cleanup_loop() {

// Find expired handlers
for (const auto& entry : _fs_handlers) {
if (current_time - entry.second->last_access_time >=
_instance_timeout_seconds) {
bool is_expired = current_time - entry.second->last_access_time >=
_instance_timeout_seconds;
// bool is_krb_expired =
// entry.second->is_kerberos_auth &&
// (current_time - entry.second->create_time >=
// entry.second->ticket_cache->get_ticket_lifetime_sec() / 2);
if (is_expired) {
LOG(INFO) << "Found expired HDFS handler, hash_code=" << entry.first
<< ", last_access_time=" << entry.second->last_access_time
<< ", is_kerberos=" << entry.second->is_kerberos_auth
<< ", principal=" << entry.second->principal
<< ", fs_name=" << entry.second->fs_name;
<< ", fs_name=" << entry.second->fs_name
<< ", is_expired=" << is_expired;
// << ", is_krb_expire=" << is_krb_expired;
to_remove.push_back(entry.first);
handlers_to_cleanup.push_back(entry.second);
}
Expand Down Expand Up @@ -180,8 +187,8 @@ Status HdfsMgr::_create_hdfs_fs_impl(const THdfsParams& hdfs_params, const std::
bool is_kerberos = builder.is_kerberos();
*fs_handler = std::make_shared<HdfsHandler>(
hdfs_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "",
is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name,
builder.get_ticket_cache());
is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name);
// builder.get_ticket_cache());
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace {
Result<FileHandleCache::Accessor> get_file(const hdfsFS& fs, const Path& file, int64_t mtime,
int64_t file_size) {
static FileHandleCache cache(config::max_hdfs_file_handle_cache_num, 16,
config::max_hdfs_file_handle_cache_time_sec * 1000L);
config::max_hdfs_file_handle_cache_time_sec);
bool cache_hit;
FileHandleCache::Accessor accessor;
RETURN_IF_ERROR_RESULT(cache.get_file_handle(fs, file.native(), mtime, file_size, false,
Expand Down
37 changes: 22 additions & 15 deletions be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,23 @@ void HDFSCommonBuilder::set_hdfs_conf_to_hdfs_builder() {
}
}

// This method is deprecated, will be removed later
Status HDFSCommonBuilder::set_kerberos_ticket_cache() {
kerberos::KerberosConfig config;
config.set_principal_and_keytab(hdfs_kerberos_principal, hdfs_kerberos_keytab);
config.set_krb5_conf_path(config::kerberos_krb5_conf_path);
config.set_refresh_interval(config::kerberos_refresh_interval_second);
config.set_min_time_before_refresh(600);
kerberos::KerberosTicketMgr* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
// ATTN, can't use ticket_cache->get_ticket_cache_path() directly,
// it may cause the kerberos ticket cache path in libhdfs is empty,
kerberos_ticket_path = ticket_cache->get_ticket_cache_path();
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, kerberos_ticket_path.c_str());
hdfsBuilderSetForceNewInstance(hdfs_builder);
LOG(INFO) << "get kerberos ticket path: " << kerberos_ticket_path
<< " with principal: " << hdfs_kerberos_principal;
// kerberos::KerberosConfig config;
// config.set_principal_and_keytab(hdfs_kerberos_principal, hdfs_kerberos_keytab);
// config.set_krb5_conf_path(config::kerberos_krb5_conf_path);
// config.set_refresh_interval(config::kerberos_refresh_interval_second);
// config.set_min_time_before_refresh(600);
// kerberos::KerberosTicketMgr* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
// RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
// // ATTN, can't use ticket_cache->get_ticket_cache_path() directly,
// // it may cause the kerberos ticket cache path in libhdfs is empty,
// kerberos_ticket_path = ticket_cache->get_ticket_cache_path();
// hdfsBuilderSetUserName(hdfs_builder, hdfs_kerberos_principal.c_str());
// hdfsBuilderSetKerbTicketCachePath(hdfs_builder, kerberos_ticket_path.c_str());
// hdfsBuilderSetForceNewInstance(hdfs_builder);
// LOG(INFO) << "get kerberos ticket path: " << kerberos_ticket_path
// << " with principal: " << hdfs_kerberos_principal;
return Status::OK();
}

Expand Down Expand Up @@ -225,7 +227,12 @@ Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_
builder->kerberos_login = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
RETURN_IF_ERROR(builder->set_kerberos_ticket_cache());
hdfsBuilderSetKerb5Conf(builder->get(), doris::config::kerberos_krb5_conf_path.c_str());
hdfsBuilderSetPrincipal(builder->get(), builder->hdfs_kerberos_principal.c_str());
hdfsBuilderSetKeyTabFile(builder->get(), builder->hdfs_kerberos_keytab.c_str());
hdfsBuilderConfSetStr(builder->get(), "hadoop.kerberos.keytab.login.autorenewal.enabled",
"true");
// RETURN_IF_ERROR(builder->set_kerberos_ticket_cache());
} else {
if (hdfsParams.__isset.user) {
builder->hadoop_user = hdfsParams.user;
Expand Down
9 changes: 0 additions & 9 deletions be/src/io/hdfs_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ const std::string HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authenticati
const std::string FALLBACK_TO_SIMPLE_AUTH_ALLOWED = "ipc.client.fallback-to-simple-auth-allowed";
const std::string TRUE_VALUE = "true";

namespace kerberos {
class KerberosTicketCache;
class KerberosConfig;
}; // namespace kerberos

class HDFSCommonBuilder {
friend Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name,
HDFSCommonBuilder* builder);
Expand Down Expand Up @@ -71,8 +66,6 @@ class HDFSCommonBuilder {
std::string get_hdfs_conf_value(const std::string& key, const std::string& default_val) const;
void set_hdfs_conf_to_hdfs_builder();

std::shared_ptr<kerberos::KerberosTicketCache> get_ticket_cache() { return ticket_cache; }

private:
hdfsBuilder* hdfs_builder = nullptr;
bool kerberos_login {false};
Expand All @@ -83,8 +76,6 @@ class HDFSCommonBuilder {
std::string hadoop_user;
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
std::string kerberos_ticket_path;
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;
std::unordered_map<std::string, std::string> hdfs_conf;
};

Expand Down
14 changes: 8 additions & 6 deletions be/src/io/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <string>
#include <unordered_map>

#include "common/kerberos/kerberos_ticket_cache.h"
// #include "common/kerberos/kerberos_ticket_cache.h"
#include "common/status.h"
#include "io/fs/hdfs.h"
#include "io/fs/path.h"
Expand All @@ -47,19 +47,21 @@ class HdfsHandler {
std::string principal;
std::string keytab_path;
std::string fs_name;
uint64_t create_time;
std::atomic<uint64_t> last_access_time;
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;
// std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;

HdfsHandler(hdfsFS fs, bool is_kerberos, const std::string& principal_,
const std::string& keytab_path_, const std::string& fs_name_,
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache_)
const std::string& keytab_path_, const std::string& fs_name_)
// std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache_)
: hdfs_fs(fs),
is_kerberos_auth(is_kerberos),
principal(principal_),
keytab_path(keytab_path_),
fs_name(fs_name_),
last_access_time(std::time(nullptr)),
ticket_cache(ticket_cache_) {}
create_time(std::time(nullptr)),
last_access_time(std::time(nullptr)) {}
// ticket_cache(ticket_cache_) {}

~HdfsHandler() {
// The ticket_cache will be automatically released when the last reference is gone
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/lru_multi_cache.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ void LruMultiCache<KeyType, ValueType>::release(ValueType_internal* p_value_inte
// Has to be currently not available
DCHECK(!p_value_internal->is_available());

// DO NOT update timestamp_seconds when release.
// Because we are about to evict cache value after a certain period.
p_value_internal->timestamp_seconds = MonotonicSeconds();

Container& container = p_value_internal->container;
Expand Down
Loading
Loading