Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,30 @@ set_target_properties(orc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/lib
add_library(cctz STATIC IMPORTED)
set_target_properties(cctz PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libcctz.a)

# add_library(aws-sdk-core STATIC IMPORTED)
# set_target_properties(aws-sdk-core PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-cpp-sdk-core.a)
add_library(aws-sdk-core STATIC IMPORTED)
set_target_properties(aws-sdk-core PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-cpp-sdk-core.a)

# add_library(aws-sdk-s3 STATIC IMPORTED)
# set_target_properties(aws-sdk-s3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-cpp-sdk-s3.a)
add_library(aws-sdk-s3 STATIC IMPORTED)
set_target_properties(aws-sdk-s3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-cpp-sdk-s3.a)

# add_library(aws-c-cal STATIC IMPORTED)
# set_target_properties(aws-c-cal PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-cal.a)
add_library(aws-c-cal STATIC IMPORTED)
set_target_properties(aws-c-cal PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-cal.a)

# add_library(aws-c-common STATIC IMPORTED)
# set_target_properties(aws-c-common PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-common.a)
add_library(aws-c-common STATIC IMPORTED)
set_target_properties(aws-c-common PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-common.a)

# add_library(aws-c-event-stream STATIC IMPORTED)
# set_target_properties(aws-c-event-stream PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-event-stream.a)
add_library(aws-c-event-stream STATIC IMPORTED)
set_target_properties(aws-c-event-stream PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-event-stream.a)

# add_library(aws-c-io STATIC IMPORTED)
# set_target_properties(aws-c-io PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-io.a)
add_library(aws-c-io STATIC IMPORTED)
set_target_properties(aws-c-io PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-c-io.a)

# add_library(aws-checksums STATIC IMPORTED)
# set_target_properties(aws-checksums PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-checksums.a)
add_library(aws-checksums STATIC IMPORTED)
set_target_properties(aws-checksums PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libaws-checksums.a)

add_library(aws-s2n STATIC IMPORTED)
set_target_properties(aws-s2n PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libs2n.a)

# add_library(aws-s2n STATIC IMPORTED)
# set_target_properties(aws-s2n PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libs2n.a)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

Expand Down Expand Up @@ -387,7 +388,7 @@ include_directories(
set(WL_START_GROUP "-Wl,--start-group")
set(WL_END_GROUP "-Wl,--end-group")

# set(AWS_LIBS aws-sdk-s3 aws-sdk-core aws-checksums aws-c-io aws-c-event-stream aws-c-common aws-c-cal aws-s2n)
set(AWS_LIBS aws-sdk-s3 aws-sdk-core aws-checksums aws-c-io aws-c-event-stream aws-c-common aws-c-cal aws-s2n)

# Set Palo libraries
set(DORIS_LINK_LIBS
Expand Down Expand Up @@ -454,7 +455,7 @@ set(DORIS_DEPENDENCIES
orc
odbc
cctz
# ${AWS_LIBS}
${AWS_LIBS}
${WL_END_GROUP}
)

Expand Down
23 changes: 17 additions & 6 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1237,9 +1237,14 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
<< ", job id:" << upload_request.job_id;

std::map<int64_t, std::vector<std::string>> tablet_files;
SnapshotLoader loader(_env, upload_request.job_id, agent_task_req.signature);
Status status = loader.upload(upload_request.src_dest_map, upload_request.broker_addr,
upload_request.broker_prop, &tablet_files);
std::unique_ptr<SnapshotLoader> loader = nullptr;
if (upload_request.__isset.storage_backend && upload_request.storage_backend == TStorageBackendType::S3) {
loader.reset(new SnapshotLoader(_env, upload_request.job_id, agent_task_req.signature, upload_request.broker_prop));
} else {
loader.reset(new SnapshotLoader(_env, upload_request.job_id, agent_task_req.signature, upload_request.broker_addr,
upload_request.broker_prop));
}
Status status = loader->upload(upload_request.src_dest_map, &tablet_files);

TStatusCode::type status_code = TStatusCode::OK;
std::vector<string> error_msgs;
Expand Down Expand Up @@ -1295,9 +1300,15 @@ void TaskWorkerPool::_download_worker_thread_callback() {

// TODO: download
std::vector<int64_t> downloaded_tablet_ids;
SnapshotLoader loader(_env, download_request.job_id, agent_task_req.signature);
Status status = loader.download(download_request.src_dest_map, download_request.broker_addr,
download_request.broker_prop, &downloaded_tablet_ids);

std::unique_ptr<SnapshotLoader> loader = nullptr;
if (download_request.__isset.storage_backend && download_request.storage_backend == TStorageBackendType::S3) {
loader.reset(new SnapshotLoader(_env, download_request.job_id, agent_task_req.signature, download_request.broker_prop));
} else {
loader.reset(new SnapshotLoader(_env, download_request.job_id, agent_task_req.signature, download_request.broker_addr,
download_request.broker_prop));
}
Status status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);

if (!status.ok()) {
status_code = TStatusCode::RUNTIME_ERROR;
Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "common/daemon.h"

#include <signal.h>

#include <aws/core/Aws.h>
#include <gflags/gflags.h>
#include <gperftools/malloc_extension.h>

Expand Down Expand Up @@ -66,6 +68,8 @@ namespace doris {

bool k_doris_exit = false;

Aws::SDKOptions aws_options;

void Daemon::tcmalloc_gc_thread() {
while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) {
size_t used_size = 0;
Expand Down Expand Up @@ -264,6 +268,14 @@ void Daemon::init(int argc, char** argv, const std::vector<StorePath>& paths) {
HllFunctions::init();
HashFunctions::init();
TopNFunctions::init();
// disable EC2 metadata service
setenv("AWS_EC2_METADATA_DISABLED", "true", false);
Aws::Utils::Logging::LogLevel logLevel = Aws::Utils::Logging::LogLevel::Info;
aws_options.loggingOptions.logLevel = logLevel;
aws_options.loggingOptions.logger_create_fn = [logLevel] {
return std::make_shared<DorisAWSLogger>(logLevel);
};
Aws::InitAPI(aws_options);

LOG(INFO) << CpuInfo::debug_string();
LOG(INFO) << DiskInfo::debug_string();
Expand Down Expand Up @@ -313,6 +325,7 @@ void Daemon::stop() {
if (_calculate_metrics_thread) {
_calculate_metrics_thread->join();
}
Aws::ShutdownAPI(aws_options);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ set(EXEC_FILES
odbc_connector.cpp
json_scanner.cpp
assert_num_rows_node.cpp
s3_reader.cpp
s3_writer.cpp
)

if (WITH_MYSQL)
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/exec_node.h"
#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exprs/expr.h"
Expand Down Expand Up @@ -154,6 +155,12 @@ Status BrokerScanner::open_file_reader() {
_cur_file_reader = broker_reader;
break;
}
case TFileType::FILE_S3: {
S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset);
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
}
case TFileType::FILE_STREAM: {
_stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
if (_stream_load_pipe == nullptr) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "env/env.h"
#include "exec/broker_reader.h"
#include "exec/local_file_reader.h"
#include "exec/s3_reader.h"
#include "exprs/expr.h"
#include "exprs/json_functions.h"
#include "gutil/strings/split.h"
Expand Down Expand Up @@ -119,7 +120,12 @@ Status JsonScanner::open_next_reader() {
file = broker_reader;
break;
}

case TFileType::FILE_S3: {
S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset);
RETURN_IF_ERROR(s3_reader->open());
file = s3_reader;
break;
}
case TFileType::FILE_STREAM: {
_stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
if (_stream_load_pipe == nullptr) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "exec/orc_scanner.h"

#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/local_file_reader.h"
#include "exec/s3_reader.h"
#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -399,6 +401,11 @@ Status ORCScanner::open_next_reader() {
file_size));
break;
}
case TFileType::FILE_S3: {
file_reader.reset(new BufferedReader(
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
default: {
std::stringstream ss;
ss << "Unknown file type, type=" << range.file_type;
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "exec/decompressor.h"
#include "exec/local_file_reader.h"
#include "exec/parquet_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exprs/expr.h"
Expand Down Expand Up @@ -127,6 +128,11 @@ Status ParquetScanner::open_next_reader() {
range.path, range.start_offset, file_size)));
break;
}
case TFileType::FILE_S3: {
file_reader.reset(new BufferedReader(
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
#if 0
case TFileType::FILE_STREAM:
{
Expand Down
142 changes: 142 additions & 0 deletions be/src/exec/s3_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/s3_reader.h"

#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>

#include "common/logging.h"
#include "gutil/strings/strcat.h"
#include "util/s3_util.h"

namespace doris {

#ifndef CHECK_S3_CLIENT
#define CHECK_S3_CLIENT(client) \
if (!client) { \
return Status::InternalError("init aws s3 client error."); \
}
#endif

S3Reader::S3Reader(const std::map<std::string, std::string>& properties, const std::string& path,
int64_t start_offset)
: _properties(properties),
_path(path),
_uri(path),
_cur_offset(start_offset),
_file_size(0),
_closed(false) {
_client = create_client(_properties);
DCHECK(_client) << "init aws s3 client error.";
}

S3Reader::~S3Reader() {}

Status S3Reader::open() {
CHECK_S3_CLIENT(_client);
if (!_uri.parse()) {
return Status::InvalidArgument("s3 uri is invalid: " + _path);
}
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
Aws::S3::Model::HeadObjectOutcome response = _client->HeadObject(request);
if (response.IsSuccess()) {
_file_size = response.GetResult().GetContentLength();
return Status::OK();
} else if (response.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::NotFound(_path + " not exists!");
} else {
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
<< response.GetError().GetMessage();
return Status::InternalError(out.str());
}
}
Status S3Reader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
DCHECK_NE(*buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf));
if (*buf_len == 0 ) {
*eof = true;
} else {
*eof = false;
}
return Status::OK();
}
Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
CHECK_S3_CLIENT(_client);
if (position >= _file_size) {
*bytes_read = 0;
VLOG_FILE << "Read end of file: " + _path;
return Status::EndOfFile("Read end of file: " + _path);
}
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
string bytes = StrCat("bytes=", position, "-");
if (position + nbytes < _file_size) {
string bytes = StrCat(bytes.c_str(), position + nbytes - 1);
}
request.SetRange(bytes.c_str());
auto response = _client->GetObject(request);
if (!response.IsSuccess()) {
*bytes_read = 0;
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
<< response.GetError().GetMessage();
LOG(INFO) << out.str();
return Status::InternalError(out.str());
}
*bytes_read = response.GetResult().GetContentLength();
*bytes_read = nbytes < *bytes_read ? nbytes : *bytes_read;
_cur_offset = position + *bytes_read;
response.GetResult().GetBody().read((char*)out, *bytes_read);
return Status::OK();
}
Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
bool eof;
int64_t file_size = size() - _cur_offset;
if (file_size <= 0) {
buf->reset();
*length = 0;
return Status::OK();
}
*length = file_size;
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
return Status::OK();
}

int64_t S3Reader::size() {
return _file_size;
}
Status S3Reader::seek(int64_t position) {
_cur_offset = position;
return Status::OK();
}
Status S3Reader::tell(int64_t* position) {
*position = _cur_offset;
return Status::OK();
}
void S3Reader::close() {
_closed = true;
}
bool S3Reader::closed() {
return _closed;
}

} // end namespace doris
Loading