Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1387,12 +1387,14 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;
std::string root_path =
param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";

if (!existed_fs) {
// No such FS instance on BE
auto res = io::HdfsFileSystem::create(param.hdfs_storage_param,
param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr);
auto res = io::HdfsFileSystem::create(
param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr, std::move(root_path));
if (!res.has_value()) {
st = std::move(res).error();
} else {
Expand All @@ -1410,7 +1412,8 @@ void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPt
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
.tag("resource_name", param.name)
.tag("root_path", fs->root_path().string());
put_storage_resource(param.id, {std::move(fs), param.version});
}
}
Expand Down
15 changes: 8 additions & 7 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,31 @@ namespace doris::io {

Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(
const std::map<std::string, std::string>& properties, std::string fs_name, std::string id,
RuntimeProfile* profile) {
RuntimeProfile* profile, std::string root_path) {
return HdfsFileSystem::create(parse_properties(properties), std::move(fs_name), std::move(id),
profile);
profile, std::move(root_path));
}

Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams& hdfs_params,
std::string fs_name, std::string id,
RuntimeProfile* profile) {
RuntimeProfile* profile,
std::string root_path) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return ResultError(Status::InternalError(
"hdfs file system is not enabled, you can change be config enable_java_support to "
"true."));
}
#endif
std::shared_ptr<HdfsFileSystem> fs(
new HdfsFileSystem(hdfs_params, std::move(fs_name), std::move(id), profile));
std::shared_ptr<HdfsFileSystem> fs(new HdfsFileSystem(
hdfs_params, std::move(fs_name), std::move(id), profile, std::move(root_path)));
RETURN_IF_ERROR_RESULT(fs->init());
return fs;
}

HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile)
: RemoteFileSystem("", std::move(id), FileSystemType::HDFS),
RuntimeProfile* profile, std::string root_path)
: RemoteFileSystem(root_path, std::move(id), FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_fs_name(std::move(fs_name)),
_profile(profile) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ class HdfsFileSystem final : public RemoteFileSystem {
public:
static Result<std::shared_ptr<HdfsFileSystem>> create(const THdfsParams& hdfs_params,
std::string fs_name, std::string id,
RuntimeProfile* profile);
RuntimeProfile* profile,
std::string root_path = "");

static Result<std::shared_ptr<HdfsFileSystem>> create(
const std::map<std::string, std::string>& properties, std::string fs_name,
std::string id, RuntimeProfile* profile);
std::string id, RuntimeProfile* profile, std::string root_path = "");

~HdfsFileSystem() override;

Expand Down Expand Up @@ -84,7 +85,7 @@ class HdfsFileSystem final : public RemoteFileSystem {
private:
friend class HdfsFileWriter;
HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile);
RuntimeProfile* profile, std::string root_path);
const THdfsParams& _hdfs_params; // Only used in init, so we can use reference here
std::string _fs_name;
// do not use std::shared_ptr or std::unique_ptr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
public class HdfsResource extends Resource {
public static final String HADOOP_FS_PREFIX = "dfs.";
public static String HADOOP_FS_NAME = "fs.defaultFS";
public static String HADOOP_FS_ROOT_PATH = "root_path";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
Expand Down Expand Up @@ -106,6 +107,8 @@ public static THdfsParams generateHdfsParam(Map<String, String> properties) {
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
tHdfsParams.setFsName(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_FS_ROOT_PATH)) {
tHdfsParams.setRootPath(property.getValue());
} else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) {
tHdfsParams.setUser(property.getValue());
} else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL)) {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ struct THdfsParams {
3: optional string hdfs_kerberos_principal
4: optional string hdfs_kerberos_keytab
5: optional list<THdfsConf> hdfs_conf
// Used for Cold Heat Separation to specify the root path
6: optional string root_path
}

// One broker range information.
Expand Down