Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,6 @@ set_target_properties(k5crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/li
add_library(gssapi_krb5 STATIC IMPORTED)
set_target_properties(gssapi_krb5 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libgssapi_krb5.a)

add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

if (OS_MACOSX)
Expand Down Expand Up @@ -719,12 +716,32 @@ set(COMMON_THIRDPARTY
# put this after lz4 to avoid using lz4 lib in librdkafka
librdkafka_cpp
librdkafka
hdfs3
xml2
lzma
simdjson
)

if (ARCH_AMD64 AND OS_LINUX)
add_library(hadoop_hdfs STATIC IMPORTED)
set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/hadoop_hdfs/native/libhdfs.a)

set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
hadoop_hdfs
)
add_definitions(-DUSE_HADOOP_HDFS)
else()
add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a)

# TODO: use arm hadoop hdfs to replace this
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
hdfs3
)
add_definitions(-DUSE_LIBHDFS3)
endif()

if (OS_MACOSX)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
Expand Down
16 changes: 11 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
// port for brpc
CONF_Int32(brpc_port, "8060");

// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1");

// port to brpc server for single replica load
Expand Down Expand Up @@ -395,8 +396,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
CONF_Int64(load_data_reserve_hours, "4");
// log error log will be removed after this time
CONF_mInt64(load_error_log_reserve_hours, "48");
CONF_Int32(number_tablet_writer_threads, "16");
CONF_Int32(number_slave_replica_download_threads, "64");

// be brpc interface is classified into two categories: light and heavy
// each category has diffrent thread number
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
CONF_Int32(brpc_heavy_work_pool_threads, "192");
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
CONF_Int32(brpc_light_work_pool_threads, "32");
CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");

// The maximum amount of data that can be processed by a stream load
CONF_mInt64(streaming_load_max_mb, "10240");
Expand Down Expand Up @@ -898,8 +906,6 @@ CONF_Int32(segcompaction_threshold_segment_num, "10");
// The segment whose row number above the threshold will be compacted during segcompaction
CONF_Int32(segcompaction_small_threshold, "1048576");

CONF_String(jvm_max_heap_size, "1024M");

// enable java udf and jdbc scannode
CONF_Bool(enable_java_support, "true");

Expand Down
1 change: 1 addition & 0 deletions be/src/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set(IO_FILES
local_file_writer.cpp
s3_reader.cpp
s3_writer.cpp
fs/err_utils.cpp
fs/file_system_map.cpp
fs/local_file_reader.cpp
fs/local_file_system.cpp
Expand Down
53 changes: 53 additions & 0 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/err_utils.h"

#include <fmt/format.h>

#include <sstream>

#include "io/fs/hdfs.h"

namespace doris {
namespace io {

std::string errno_to_str() {
char buf[1024];
return fmt::format("({}), {}", errno, strerror_r(errno, buf, 1024));
}

std::string errcode_to_str(const std::error_code& ec) {
return fmt::format("({}), {}", ec.value(), ec.message());
}

std::string hdfs_error() {
std::stringstream ss;
char buf[1024];
ss << "(" << errno << "), " << strerror_r(errno, buf, 1024) << ")";
#ifdef USE_HADOOP_HDFS
char* root_cause = hdfsGetLastExceptionRootCause();
if (root_cause != nullptr) {
ss << ", reason: " << root_cause;
}
#else
ss << ", reason: " << hdfsGetLastError();
#endif
return ss.str();
}

} // namespace io
} // namespace doris
31 changes: 31 additions & 0 deletions be/src/io/fs/err_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include <system_error>

namespace doris {
namespace io {

std::string errno_to_str();
std::string errcode_to_str(const std::error_code& ec);
std::string hdfs_error();

} // namespace io
} // namespace doris
24 changes: 24 additions & 0 deletions be/src/io/fs/hdfs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef USE_HADOOP_HDFS
#include <hadoop_hdfs/hdfs.h>
#else
#include <hdfs/hdfs.h>
#endif
7 changes: 6 additions & 1 deletion be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "io/hdfs_builder.h"

#include <fmt/format.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <fstream>

Expand All @@ -26,6 +27,7 @@
#include "util/string_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"

namespace doris {

Status HDFSCommonBuilder::init_hdfs_builder() {
Expand All @@ -36,6 +38,7 @@ Status HDFSCommonBuilder::init_hdfs_builder() {
return Status::InternalError(
"failed to init HDFSCommonBuilder, please check be/conf/hdfs-site.xml and be.out");
}
hdfsBuilderSetForceNewInstance(hdfs_builder);
return Status::OK();
}

Expand All @@ -54,7 +57,10 @@ Status HDFSCommonBuilder::run_kinit() {
if (!rc) {
return Status::InternalError("Kinit failed, errMsg: " + msg);
}
#ifdef USE_LIBHDFS3
hdfsBuilderSetPrincipal(hdfs_builder, hdfs_kerberos_principal.c_str());
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
#endif
return Status::OK();
}

Expand Down Expand Up @@ -97,7 +103,6 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build
if (hdfsParams.__isset.hdfs_kerberos_principal) {
builder->need_kinit = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
}
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->need_kinit = true;
Expand Down
8 changes: 5 additions & 3 deletions be/src/io/hdfs_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

#pragma once

#include <hdfs/hdfs.h>

#include "gen_cpp/PlanNodes_types.h"
#include "io/file_reader.h"
#include "io/fs/hdfs.h"

namespace doris {

Expand All @@ -38,9 +37,12 @@ class HDFSCommonBuilder {
public:
HDFSCommonBuilder() = default;
~HDFSCommonBuilder() {
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
// for hadoop hdfs, the hdfs_builder will be freed in hdfsConnect
if (hdfs_builder != nullptr) {
hdfsFreeBuilder(hdfs_builder);
}
#endif
}

// Must call this to init hdfs_builder first.
Expand All @@ -51,7 +53,7 @@ class HDFSCommonBuilder {
Status run_kinit();

private:
hdfsBuilder* hdfs_builder;
hdfsBuilder* hdfs_builder = nullptr;
bool need_kinit {false};
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
Expand Down
14 changes: 8 additions & 6 deletions be/src/io/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io/hdfs_file_reader.h"

#include <sys/stat.h>
#include <unistd.h>

#include "io/fs/err_utils.h"
#include "service/backend_options.h"

namespace doris {
Expand Down Expand Up @@ -100,12 +102,12 @@ Status HdfsFileReader::open() {
if (_hdfs_fs == nullptr) {
return Status::InternalError(
"open file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
BackendOptions::get_localhost(), _namenode, _path, io::hdfs_error());
}
} else {
return Status::InternalError("open file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, _path,
hdfsGetLastError());
io::hdfs_error());
}
}
VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path;
Expand Down Expand Up @@ -174,7 +176,7 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
if (loop_read < 0) {
return Status::InternalError(
"Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
BackendOptions::get_localhost(), _namenode, _path, io::hdfs_error());
}
if (loop_read == 0) {
break;
Expand All @@ -192,7 +194,7 @@ int64_t HdfsFileReader::size() {
hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
if (file_info == nullptr) {
return Status::IOError("failed to get path info, path: {}, error: {}", _path,
hdfsGetLastError());
io::hdfs_error());
}
_file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
Expand All @@ -205,7 +207,7 @@ Status HdfsFileReader::seek(int64_t position) {
int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
if (res != 0) {
return Status::InternalError("Seek to offset failed. (BE: {}) offset={}, err: {}",
BackendOptions::get_localhost(), position, hdfsGetLastError());
BackendOptions::get_localhost(), position, io::hdfs_error());
}
_current_offset = position;
return Status::OK();
Expand All @@ -223,7 +225,7 @@ Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params, hdfsFS* fs) {
RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::InternalError("connect to hdfs failed. error: {}", hdfsGetLastError());
return Status::InternalError("connect to hdfs failed. error: {}", io::hdfs_error());
}
*fs = hdfs_fs;
return Status::OK();
Expand Down
Loading