Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,6 @@ set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/li
add_library(gssapi_krb5 STATIC IMPORTED)
set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libgssapi_krb5.a)

add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

if (OS_MACOSX)
Expand Down Expand Up @@ -770,12 +767,32 @@ set(COMMON_THIRDPARTY
# put this after lz4 to avoid using lz4 lib in librdkafka
librdkafka_cpp
librdkafka
hdfs3
xml2
lzma
simdjson
)

if (ARCH_AMD64 AND OS_LINUX)
add_library(hadoop_hdfs STATIC IMPORTED)
set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/hadoop_hdfs/native/libhdfs.a)

set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
hadoop_hdfs
)
add_definitions(-DUSE_HADOOP_HDFS)
else()
add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a)

# TODO: use arm hadoop hdfs to replace this
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
hdfs3
)
add_definitions(-DUSE_LIBHDFS3)
endif()

if (absl_FOUND)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,6 @@ CONF_Int32(segcompaction_threshold_segment_num, "10");
// The segment whose row number above the threshold will be compacted during segcompaction
CONF_Int32(segcompaction_small_threshold, "1048576");

CONF_String(jvm_max_heap_size, "1024M");

// enable java udf and jdbc scannode
CONF_Bool(enable_java_support, "true");

Expand Down
12 changes: 10 additions & 2 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#include "io/fs/err_utils.h"

#include <fmt/format.h>
#include <hdfs/hdfs.h>

#include <sstream>

#include "io/fs/hdfs.h"

namespace doris {
namespace io {

Expand All @@ -37,8 +38,15 @@ std::string errcode_to_str(const std::error_code& ec) {
std::string hdfs_error() {
std::stringstream ss;
char buf[1024];
ss << "(" << errno << "), " << strerror_r(errno, buf, 1024);
ss << "(" << errno << "), " << strerror_r(errno, buf, 1024) << ")";
#ifdef USE_HADOOP_HDFS
char* root_cause = hdfsGetLastExceptionRootCause();
if (root_cause != nullptr) {
ss << ", reason: " << root_cause;
}
#else
ss << ", reason: " << hdfsGetLastError();
#endif
return ss.str();
}

Expand Down
24 changes: 24 additions & 0 deletions be/src/io/fs/hdfs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef USE_HADOOP_HDFS
#include <hadoop_hdfs/hdfs.h>
#else
#include <hdfs/hdfs.h>
#endif
7 changes: 4 additions & 3 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#include "io/fs/hdfs_file_reader.h"

#include "io/fs/err_utils.h"
#include "io/fs/hdfs_file_system.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"

namespace doris {
namespace io {
HdfsFileReader::HdfsFileReader(Path path, size_t file_size, const std::string& name_node,
Expand Down Expand Up @@ -66,7 +68,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
int res = hdfsSeek(handle->hdfs_fs, _hdfs_file, offset);
if (res != 0) {
return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}",
BackendOptions::get_localhost(), offset, hdfsGetLastError());
BackendOptions::get_localhost(), offset, hdfs_error());
}

size_t bytes_req = result.size;
Expand All @@ -84,8 +86,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
if (loop_read < 0) {
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _name_node, _path.string(),
hdfsGetLastError());
BackendOptions::get_localhost(), _name_node, _path.string(), hdfs_error());
}
if (loop_read == 0) {
break;
Expand Down
7 changes: 7 additions & 0 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class HdfsFileSystemCache {

Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<HdfsFileSystem>* fs) {
#ifdef USE_HADOOP_HDFS
if (!config::enable_java_support) {
return Status::InternalError(
"hdfs file system is not enabled, you can change be config enable_java_support to "
"true.");
}
#endif
(*fs).reset(new HdfsFileSystem(hdfs_params, path));
return (*fs)->connect();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/hdfs_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
#pragma once

#include <gen_cpp/PlanNodes_types.h>
#include <hdfs/hdfs.h>

#include <atomic>

#include "io/fs/hdfs.h"
#include "io/fs/remote_file_system.h"
namespace doris {

namespace doris {
namespace io {

class HdfsFileSystemHandle {
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "util/string_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"

namespace doris {

Status HDFSCommonBuilder::init_hdfs_builder() {
Expand All @@ -35,6 +36,7 @@ Status HDFSCommonBuilder::init_hdfs_builder() {
return Status::InternalError(
"failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml");
}
hdfsBuilderSetForceNewInstance(hdfs_builder);
return Status::OK();
}

Expand All @@ -53,7 +55,10 @@ Status HDFSCommonBuilder::run_kinit() {
if (!rc) {
return Status::InternalError("Kinit failed, errMsg: " + msg);
}
#ifdef USE_LIBHDFS3
hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
#endif
return Status::OK();
}

Expand Down Expand Up @@ -100,7 +105,6 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build
if (hdfsParams.__isset.hdfs_kerberos_principal) {
builder->need_kinit = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
}
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->need_kinit = true;
Expand Down
8 changes: 5 additions & 3 deletions be/src/io/hdfs_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

#pragma once

#include <hdfs/hdfs.h>

#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/fs/hdfs.h"

namespace doris {

Expand All @@ -38,9 +37,12 @@ class HDFSCommonBuilder {
public:
HDFSCommonBuilder() {}
~HDFSCommonBuilder() {
#ifdef USE_LIBHDFS3
// for hadoop hdfs, the hdfs_builder will be freed in hdfsConnect
if (hdfs_builder != nullptr) {
hdfsFreeBuilder(hdfs_builder);
}
#endif
}

// Must call this to init hdfs_builder first.
Expand All @@ -51,7 +53,7 @@ class HDFSCommonBuilder {
Status run_kinit();

private:
hdfsBuilder* hdfs_builder;
hdfsBuilder* hdfs_builder = nullptr;
bool need_kinit {false};
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
Expand Down
13 changes: 7 additions & 6 deletions be/src/tools/meta_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void delete_meta(DataDir* data_dir) {

Status init_data_dir(const std::string& dir, std::unique_ptr<DataDir>* ret) {
std::string root_path;
RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(dir, &root_path));
RETURN_IF_ERROR(doris::io::global_local_filesystem()->canonicalize(dir, &root_path));
doris::StorePath path;
auto res = parse_root_path(root_path, &path);
if (!res.ok()) {
Expand All @@ -156,8 +156,8 @@ Status init_data_dir(const std::string& dir, std::unique_ptr<DataDir>* ret) {
std::cout << "new data dir failed" << std::endl;
return Status::InternalError("new data dir failed");
}
st = p->init();
if (!st.ok()) {
res = p->init();
if (!res.ok()) {
std::cout << "data_dir load failed" << std::endl;
return Status::InternalError("data_dir load failed");
}
Expand Down Expand Up @@ -188,7 +188,7 @@ void batch_delete_meta(const std::string& tablet_file) {
}
// 1. get dir
std::string dir;
Status st = io::global_local_filesystem()->canonicalize(v[0], &dir);
Status st = doris::io::global_local_filesystem()->canonicalize(v[0], &dir);
if (!st.ok()) {
std::cout << "invalid root dir in tablet_file: " << line << std::endl;
err_num++;
Expand Down Expand Up @@ -295,7 +295,7 @@ Status get_segment_footer(doris::io::FileReader* file_reader, SegmentFooterPB* f

void show_segment_footer(const std::string& file_name) {
doris::io::FileReaderSPtr file_reader;
Status st = doris::io::global_local_filesystem()->open_file(file_name, &file_reader);
Status status = doris::io::global_local_filesystem()->open_file(file_name, &file_reader);
if (!status.ok()) {
std::cout << "open file failed: " << status << std::endl;
return;
Expand Down Expand Up @@ -327,7 +327,8 @@ int main(int argc, char** argv) {
show_meta();
} else if (FLAGS_operation == "batch_delete_meta") {
std::string tablet_file;
Status st = io::global_local_filesystem()->canonicalize(FLAGS_tablet_file, &tablet_file);
Status st =
doris::io::global_local_filesystem()->canonicalize(FLAGS_tablet_file, &tablet_file);
if (!st.ok()) {
std::cout << "invalid tablet file: " << FLAGS_tablet_file
<< ", error: " << st.to_string() << std::endl;
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

#pragma once

#include <hdfs/hdfs.h>

#include <map>
#include <memory>
#include <string>

#include "common/status.h"
#include "io/fs/hdfs.h"
#include "io/fs/path.h"
#include "io/hdfs_builder.h"

Expand Down
Loading