Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method
// else we will call sync method
CONF_mBool(runtime_filter_use_async_rpc, "true");

} // namespace config

} // namespace doris
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class MemTracker;
class RuntimeState;
class ExprContext;

// The counter will be passed to each scanner.
// Note that this struct is not thread safe.
// So if we support concurrent scan in the future, we need to modify this struct.
struct ScannerCounter {
ScannerCounter() : num_rows_filtered(0), num_rows_unselected(0) {}

Expand Down
6 changes: 2 additions & 4 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ Status BrokerScanner::open_file_reader() {
case TFileType::FILE_HDFS: {
#if defined(__x86_64__)
BufferedReader* file_reader =
new BufferedReader(new HdfsFileReader(range.hdfs_params, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
new BufferedReader(_profile, new HdfsFileReader(range.hdfs_params, range.path, start_offset));
RETURN_IF_ERROR(file_reader->open());
_cur_file_reader = file_reader;
break;
Expand All @@ -188,8 +187,7 @@ Status BrokerScanner::open_file_reader() {
}
case TFileType::FILE_S3: {
BufferedReader* s3_reader =
new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset));
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
Expand Down
27 changes: 25 additions & 2 deletions be/src/exec/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
#include <algorithm>
#include <sstream>

#include "common/config.h"
#include "common/logging.h"

namespace doris {

// buffered reader
BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
: _reader(reader),
BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size)
: _profile(profile),
_reader(reader),
_buffer_size(buffer_size),
_buffer_offset(0),
_buffer_limit(0),
_cur_offset(0) {
if (_buffer_size == -1L) {
_buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
}
_buffer = new char[_buffer_size];
// set the _cur_offset of this reader as same as the inner reader's,
// to make sure the buffer reader will start to read at right position.
Expand All @@ -47,6 +52,14 @@ Status BufferedReader::open() {
ss << "Open buffered reader failed, reader is null";
return Status::InternalError(ss.str());
}

// the macro ADD_XXX is idempotent.
// So although each scanner calls the ADD_XXX method, they all use the same counters.
_read_timer = ADD_TIMER(_profile, "FileReadTime");
_remote_read_timer = ADD_CHILD_TIMER(_profile, "FileRemoteReadTime", "FileReadTime");
_read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
_remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT);

RETURN_IF_ERROR(_reader->open());
return Status::OK();
}
Expand All @@ -68,6 +81,7 @@ Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
}

Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
SCOPED_TIMER(_read_timer);
if (nbytes <= 0) {
*bytes_read = 0;
return Status::OK();
Expand All @@ -92,6 +106,7 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r

Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) {
_read_count++;
// requested bytes missed the local buffer
if (position >= _buffer_limit || position < _buffer_offset) {
// if requested length is larger than the capacity of buffer, do not
Expand Down Expand Up @@ -121,6 +136,7 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt
Status BufferedReader::_fill() {
if (_buffer_offset >= 0) {
int64_t bytes_read = 0;
SCOPED_TIMER(_remote_read_timer);
RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
_buffer_limit = _buffer_offset + bytes_read;
}
Expand All @@ -144,6 +160,13 @@ Status BufferedReader::tell(int64_t* position) {
void BufferedReader::close() {
_reader->close();
SAFE_DELETE_ARRAY(_buffer);

if (_read_counter != nullptr) {
COUNTER_UPDATE(_read_counter, _read_count);
}
if (_remote_read_counter != nullptr) {
COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
}
}

bool BufferedReader::closed() {
Expand Down
17 changes: 16 additions & 1 deletion be/src/exec/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "exec/file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"

namespace doris {

Expand All @@ -35,7 +36,8 @@ class BufferedReader : public FileReader {
// If the reader need the file size, set it when construct FileReader.
// There is no other way to set the file size.
// buffered_reader will acquire reader
BufferedReader(FileReader* reader, int64_t buffer_size = 1024 * 1024);
// -1 means using config buffered_reader_buffer_size_bytes
BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t = -1L);
virtual ~BufferedReader();

virtual Status open() override;
Expand All @@ -56,12 +58,25 @@ class BufferedReader : public FileReader {
Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out);

private:
RuntimeProfile* _profile;
std::unique_ptr<FileReader> _reader;
char* _buffer;
int64_t _buffer_size;
int64_t _buffer_offset;
int64_t _buffer_limit;
int64_t _cur_offset;

int64_t _read_count = 0;
int64_t _remote_read_count = 0;

// total time cost in this reader
RuntimeProfile::Counter* _read_timer = nullptr;
// time cost of "_reader", "remote" because "_reader" is always a remote reader
RuntimeProfile::Counter* _remote_read_timer = nullptr;
// counter of calling read()
RuntimeProfile::Counter* _read_counter = nullptr;
// counter of calling "remote read()"
RuntimeProfile::Counter* _remote_read_counter = nullptr;
};

} // namespace doris
3 changes: 1 addition & 2 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ Status JsonScanner::open_file_reader() {
}
case TFileType::FILE_S3: {
BufferedReader* s3_reader =
new BufferedReader(new S3Reader(_params.properties, range.path, start_offset),
config::remote_storage_read_buffer_mb * 1024 * 1024);
new BufferedReader(_profile, new S3Reader(_params.properties, range.path, start_offset));
RETURN_IF_ERROR(s3_reader->open());
_cur_file_reader = s3_reader;
break;
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,13 @@ Status ORCScanner::open_next_reader() {
if (range.__isset.file_size) {
file_size = range.file_size;
}
file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses,
file_reader.reset(new BufferedReader(_profile, new BrokerReader(_state->exec_env(), _broker_addresses,
_params.properties, range.path, range.start_offset,
file_size)));
break;
}
case TFileType::FILE_S3: {
file_reader.reset(new BufferedReader(
file_reader.reset(new BufferedReader(_profile,
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ Status ParquetScanner::open_next_reader() {
if (range.__isset.file_size) {
file_size = range.file_size;
}
file_reader.reset(new BufferedReader(
file_reader.reset(new BufferedReader(_profile,
new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
range.path, range.start_offset, file_size)));
break;
}
case TFileType::FILE_S3: {
file_reader.reset(new BufferedReader(
file_reader.reset(new BufferedReader(_profile,
new S3Reader(_params.properties, range.path, range.start_offset)));
break;
}
Expand Down
12 changes: 8 additions & 4 deletions be/test/exec/buffered_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class BufferedReaderTest : public testing::Test {
};

TEST_F(BufferedReaderTest, normal_use) {
RuntimeProfile profile("test");
// buffered_reader_test_file 950 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
BufferedReader reader(file_reader, 1024);
BufferedReader reader(&profile, file_reader, 1024);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[1024];
Expand All @@ -50,10 +51,11 @@ TEST_F(BufferedReaderTest, normal_use) {
}

TEST_F(BufferedReaderTest, test_validity) {
RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(file_reader, 64);
BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
Expand Down Expand Up @@ -92,10 +94,11 @@ TEST_F(BufferedReaderTest, test_validity) {
}

TEST_F(BufferedReaderTest, test_seek) {
RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(file_reader, 64);
BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[10];
Expand Down Expand Up @@ -143,10 +146,11 @@ TEST_F(BufferedReaderTest, test_seek) {
}

TEST_F(BufferedReaderTest, test_miss) {
RuntimeProfile profile("test");
// buffered_reader_test_file.txt 45 bytes
auto file_reader = new LocalFileReader(
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
BufferedReader reader(file_reader, 64);
BufferedReader reader(&profile, file_reader, 64);
auto st = reader.open();
ASSERT_TRUE(st.ok());
uint8_t buf[128];
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -2620,6 +2620,10 @@ show_param ::=
{:
RESULT = new ShowQueryProfileStmt(queryIdPath);
:}
| KW_LOAD KW_PROFILE STRING_LITERAL:loadIdPath
{:
RESULT = new ShowLoadProfileStmt(loadIdPath);
:}
| KW_ENCRYPTKEYS opt_db:dbName opt_wild_where
{:
RESULT = new ShowEncryptKeysStmt(dbName, parser.wild);
Expand Down
Loading