Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
// lower write amplification, trading off read amplification and space amplification.
CONF_mString(cumulative_compaction_policy, "size_based");
CONF_Validator(cumulative_compaction_policy, [](const std::string config) -> bool {
return config == "size_based" || config == "num_based";
return config == "size_based" || config == "num_based";
});

// In size_based policy, output rowset of cumulative compaction total disk size exceed this config size,
Expand Down Expand Up @@ -289,14 +289,12 @@ CONF_Int32(max_meta_checkpoint_threads, "-1");
CONF_mInt64(total_permits_for_compaction_score, "10000");

// sleep interval in ms after generated compaction tasks
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10");

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
CONF_mInt32(compaction_task_num_per_disk, "2");
CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
return config >= 2;
});
CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool { return config >= 2; });

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand Down Expand Up @@ -597,9 +595,11 @@ CONF_mInt32(zone_map_row_num_threshold, "20");
// Trace = 6
CONF_Int32(aws_log_level, "3");

// the buffer size when read data from remote storage like s3
CONF_mInt32(remote_storage_read_buffer_mb, "256");

} // namespace config

} // namespace doris

#endif // DORIS_BE_SRC_COMMON_CONFIG_H

10 changes: 5 additions & 5 deletions be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ Status BrokerReader::open() {
}

//not support
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
return Status::NotSupported("Not support");
}

Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
DCHECK_NE(*buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf));
if (*buf_len == 0) {
Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
DCHECK_NE(buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
if (*bytes_read == 0) {
*eof = true;
} else {
*eof = false;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/broker_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class BrokerReader : public FileReader {
virtual Status open() override;

// Read
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
31 changes: 14 additions & 17 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <sstream>

#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/exec_node.h"
#include "exec/hdfs_file_reader.h"
#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/s3_reader.h"
Expand All @@ -36,14 +38,6 @@
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/tuple.h"
#include "exprs/expr.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exec/plain_text_line_reader.h"
#include "exec/hdfs_file_reader.h"
#include "exec/local_file_reader.h"
#include "exec/broker_reader.h"
#include "exec/decompressor.h"
#include "util/utf8_check.h"

namespace doris {
Expand All @@ -52,7 +46,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<ExprContext*>& pre_filter_ctxs,
const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
Expand Down Expand Up @@ -168,8 +162,9 @@ Status BrokerScanner::open_file_reader() {
break;
}
case TFileType::FILE_HDFS: {
HdfsFileReader* file_reader = new HdfsFileReader(
range.hdfs_params, range.path, start_offset);
BufferedReader* file_reader =
new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
RETURN_IF_ERROR(file_reader->open());
_cur_file_reader = file_reader;
break;
Expand All @@ -183,7 +178,9 @@ Status BrokerScanner::open_file_reader() {
break;
}
case TFileType::FILE_S3: {
S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset);
BufferedReader* s3_reader =
new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
Expand Down Expand Up @@ -320,12 +317,12 @@ void BrokerScanner::close() {

void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos= 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.

// Separator: AAAA
//
//
// curpos
// ▼
// AAAA
Expand All @@ -351,7 +348,7 @@ void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
}
}

CHECK(curpos == line.size) << curpos << " vs " << line.size;
CHECK(curpos == line.size) << curpos << " vs " << line.size;
values->emplace_back(value + start, curpos - start);
}

Expand Down
28 changes: 12 additions & 16 deletions be/src/exec/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,18 @@ Status BufferedReader::open() {
return Status::InternalError(ss.str());
}
RETURN_IF_ERROR(_reader->open());
RETURN_IF_ERROR(_fill());
return Status::OK();
}

//not support
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
return Status::NotSupported("Not support");

}

Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
DCHECK_NE(*buf_len, 0);
int64_t bytes_read;
RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
if (bytes_read == 0) {
Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
DCHECK_NE(buf_len, 0);
RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
if (*bytes_read == 0) {
*eof = true;
} else {
*eof = false;
Expand Down Expand Up @@ -97,7 +94,11 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt
// if requested length is larger than the capacity of buffer, do not
// need to copy the character into local buffer.
if (nbytes > _buffer_size) {
return _reader->readat(position, nbytes, bytes_read, out);
auto st = _reader->readat(position, nbytes, bytes_read, out);
if (st.ok()) {
_cur_offset = position + *bytes_read;
}
return st;
}
_buffer_offset = position;
RETURN_IF_ERROR(_fill());
Expand All @@ -116,13 +117,8 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt

Status BufferedReader::_fill() {
if (_buffer_offset >= 0) {
int64_t bytes_read;
// retry for new content
int retry_times = 1;
do {
// fill the buffer
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
} while (bytes_read == 0 && retry_times++ < 2);
int64_t bytes_read = 0;
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
_buffer_limit = _buffer_offset + bytes_read;
}
return Status::OK();
Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <stdint.h>

#include <memory>

#include "common/status.h"
Expand All @@ -34,16 +35,16 @@ class BufferedReader : public FileReader {
// If the reader need the file size, set it when construct FileReader.
// There is no other way to set the file size.
// buffered_reader will acquire reader
BufferedReader(FileReader* reader, int64_t = 1024 * 1024);
BufferedReader(FileReader* reader, int64_t buffer_size = 1024 * 1024);
virtual ~BufferedReader();

virtual Status open() override;

// Read
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <stdint.h>

#include <memory>

#include "common/status.h"
Expand All @@ -29,10 +30,10 @@ class FileReader {
virtual ~FileReader() {}
virtual Status open() = 0;
// Read content to 'buf', 'buf_len' is the max size of this buffer.
// Return ok when read success, and 'buf_len' is set to size of read content
// If reach to end of file, the eof is set to true. meanwhile 'buf_len'
// Return ok when read success, and 'bytes_read' is set to size of read content
// If reach to end of file, the eof is set to true. meanwhile 'bytes_read'
// is set to zero.
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) = 0;
virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) = 0;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) = 0;

/**
Expand All @@ -41,7 +42,7 @@ class FileReader {
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
* other return readed bytes.
*/
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) = 0;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) = 0;
virtual int64_t size() = 0;
virtual Status seek(int64_t position) = 0;
virtual Status tell(int64_t* position) = 0;
Expand Down
28 changes: 16 additions & 12 deletions be/src/exec/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
#include "common/logging.h"

namespace doris {
HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params,
const std::string& path, int64_t start_offset)
: _hdfs_params(hdfs_params), _path(path), _current_offset(start_offset),
_file_size(-1), _hdfs_fs(nullptr), _hdfs_file(nullptr) {
HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, const std::string& path,
int64_t start_offset)
: _hdfs_params(hdfs_params),
_path(path),
_current_offset(start_offset),
_file_size(-1),
_hdfs_fs(nullptr),
_hdfs_file(nullptr) {
std::stringstream namenode_ss;
namenode_ss << "hdfs://" << _hdfs_params.host<< ":" << _hdfs_params.port;
namenode_ss << "hdfs://" << _hdfs_params.host << ":" << _hdfs_params.port;
_namenode = namenode_ss.str();
}

Expand All @@ -47,7 +51,8 @@ Status HdfsFileReader::connect() {
hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str());
}
if (_hdfs_params.__isset.kerb_ticket_cache_path) {
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _hdfs_params.kerb_ticket_cache_path.c_str());
hdfsBuilderSetKerbTicketCachePath(hdfs_builder,
_hdfs_params.kerb_ticket_cache_path.c_str());
}
// set token
if (_hdfs_params.__isset.token) {
Expand Down Expand Up @@ -107,23 +112,22 @@ bool HdfsFileReader::closed() {
}

// Read all bytes
Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
int64_t file_size = size() - _current_offset;
if (file_size <= 0) {
buf->reset();
*length = 0;
return Status::OK();
}
bool eof;
*length = file_size;
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
read(buf->get(), file_size, length, &eof);
return Status::OK();
}

Status HdfsFileReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
readat(_current_offset, (int64_t)*buf_len, (int64_t*)buf_len, buf);
if (*buf_len == 0) {
Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
readat(_current_offset, buf_len, bytes_read, buf);
if (*bytes_read == 0) {
*eof = true;
} else {
*eof = false;
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hdfs_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <hdfs/hdfs.h>

#include "exec/file_reader.h"

#include "gen_cpp/PlanNodes_types.h"

namespace doris {
Expand All @@ -36,10 +35,10 @@ class HdfsFileReader : public FileReader {
// Return ok when read success, and 'buf_len' is set to size of read content
// If reach to end of file, the eof is set to true. meanwhile 'buf_len'
// is set to zero.
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand All @@ -48,6 +47,7 @@ class HdfsFileReader : public FileReader {

private:
Status connect();

private:
THdfsParams _hdfs_params;
std::string _namenode;
Expand Down
10 changes: 6 additions & 4 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "env/env.h"
#include "exec/broker_reader.h"
#include "exec/buffered_reader.h"
#include "exec/local_file_reader.h"
#include "exec/s3_reader.h"
#include "exprs/expr.h"
Expand All @@ -37,8 +38,7 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter)
const std::vector<ExprContext*>& pre_filter_ctxs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
Expand Down Expand Up @@ -121,7 +121,9 @@ Status JsonScanner::open_next_reader() {
break;
}
case TFileType::FILE_S3: {
S3Reader* s3_reader = new S3Reader(_params.properties, range.path, start_offset);
BufferedReader* s3_reader =
new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
RETURN_IF_ERROR(s3_reader->open());
file = s3_reader;
break;
Expand Down Expand Up @@ -286,7 +288,7 @@ Status JsonReader::_parse_json_doc(bool* eof) {
// read a whole message, must be delete json_str by `delete[]`
SCOPED_TIMER(_file_read_timer);
std::unique_ptr<uint8_t[]> json_str;
size_t length = 0;
int64_t length = 0;
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
_bytes_read_counter += length;
if (length == 0) {
Expand Down
Loading