From a41b525fde99671242c5db782c120c134f20725b Mon Sep 17 00:00:00 2001 From: ByteYue Date: Thu, 21 Mar 2024 18:53:09 +0800 Subject: [PATCH 1/3] specify the root path for hdfs resource --- .../src/main/java/org/apache/doris/catalog/HdfsResource.java | 3 +++ gensrc/thrift/PlanNodes.thrift | 2 ++ 2 files changed, 5 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index 6d441707420ce8..c9cb77fbd93ff8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -45,6 +45,7 @@ public class HdfsResource extends Resource { public static final String HADOOP_FS_PREFIX = "dfs."; public static String HADOOP_FS_NAME = "fs.defaultFS"; + public static String HADOOP_FS_ROOT_PATH = "root_path"; public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static String DSF_NAMESERVICES = "dfs.nameservices"; @@ -106,6 +107,8 @@ public static THdfsParams generateHdfsParam(Map properties) { for (Map.Entry property : properties.entrySet()) { if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { tHdfsParams.setFsName(property.getValue()); + } else if (property.getKey().equalsIgnoreCase(HADOOP_FS_ROOT_PATH)) { + tHdfsParams.setRootPath(property.getValue()); } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) { tHdfsParams.setUser(property.getValue()); } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL)) { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 148db2b9a1709b..2fadcdae538795 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -154,6 +154,8 @@ struct THdfsParams { 3: optional string hdfs_kerberos_principal 4: optional string hdfs_kerberos_keytab 5: optional list hdfs_conf + // Used for Cold Heat Separation to specify the root path + 6: optional string root_path } // One broker range information. From b9fd18010e0d5ab0478396317df0a40649af65e3 Mon Sep 17 00:00:00 2001 From: ByteYue Date: Fri, 22 Mar 2024 10:08:16 +0800 Subject: [PATCH 2/3] tmp --- be/src/agent/task_worker_pool.cpp | 2 ++ be/src/io/fs/hdfs_file_system.cpp | 15 ++++++++------- be/src/io/fs/hdfs_file_system.h | 7 ++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 46f05f32de1171..1d5cb301ca98dc 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1387,6 +1387,8 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { Status st; io::RemoteFileSystemSPtr fs; + const std::string root_path = + param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : ""; if (!existed_fs) { // No such FS instance on BE diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index f5d2eff648c8e6..7d760c755ff764 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -54,14 +54,15 @@ namespace doris::io { Result> HdfsFileSystem::create( const std::map& properties, std::string fs_name, std::string id, - RuntimeProfile* profile) { + RuntimeProfile* profile, std::string root_path) { return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id), - profile); + profile, std::move(root_path)); } Result> HdfsFileSystem::create(const THdfsParams& hdfs_params, std::string fs_name, std::string id, - RuntimeProfile* profile) { + RuntimeProfile* profile, + std::string root_path) { #ifdef USE_HADOOP_HDFS if (!config::enable_java_support) { return ResultError(Status::InternalError( @@ -69,15 +70,15 @@ Result> HdfsFileSystem::create(const THdfsParams "true.")); } #endif - std::shared_ptr fs( - new HdfsFileSystem(hdfs_params, std::move(fs_name), std::move(id), profile)); + std::shared_ptr fs(new HdfsFileSystem( + hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path))); RETURN_IF_ERROR_RESULT(fs->init()); return fs; } HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id, - RuntimeProfile* profile) - : RemoteFileSystem("", std::move(id), FileSystemType::HDFS), + RuntimeProfile* profile, std::string root_path) + : RemoteFileSystem(root_path, std::move(id), FileSystemType::HDFS), _hdfs_params(hdfs_params), _fs_name(std::move(fs_name)), _profile(profile) { diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index ec6401964eea45..23ae65b0820ef4 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -48,11 +48,12 @@ class HdfsFileSystem final : public RemoteFileSystem { public: static Result> create(const THdfsParams& hdfs_params, std::string fs_name, std::string id, - RuntimeProfile* profile); + RuntimeProfile* profile, + std::string root_path = ""); static Result> create( const std::map& properties, std::string fs_name, - std::string id, RuntimeProfile* profile); + std::string id, RuntimeProfile* profile, std::string root_path = ""); ~HdfsFileSystem() override; @@ -84,7 +85,7 @@ class HdfsFileSystem final : public RemoteFileSystem { private: friend class HdfsFileWriter; HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id, - RuntimeProfile* profile); + RuntimeProfile* profile, std::string root_path); const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here std::string _fs_name; // do not use std::shared_ptr or std::unique_ptr From c00225bef8cb45d35daa16e817ae589b4d099fc6 Mon Sep 17 00:00:00 2001 From: ByteYue Date: Fri, 22 Mar 2024 10:24:40 +0800 Subject: [PATCH 3/3] use root path --- be/src/agent/task_worker_pool.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1d5cb301ca98dc..2476ec6c77fa68 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1387,14 +1387,14 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { Status st; io::RemoteFileSystemSPtr fs; - const std::string root_path = + std::string root_path = param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : ""; if (!existed_fs) { // No such FS instance on BE - auto res = io::HdfsFileSystem::create(param.hdfs_storage_param, - param.hdfs_storage_param.fs_name, - std::to_string(param.id), nullptr); + auto res = io::HdfsFileSystem::create( + param.hdfs_storage_param, param.hdfs_storage_param.fs_name, + std::to_string(param.id), nullptr, std::move(root_path)); if (!res.has_value()) { st = std::move(res).error(); } else { @@ -1412,7 +1412,8 @@ void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPt } else { LOG_INFO("successfully update hdfs resource") .tag("resource_id", param.id) - .tag("resource_name", param.name); + .tag("resource_name", param.name) + .tag("root_path", fs->root_path().string()); put_storage_resource(param.id, {std::move(fs), param.version}); } }