Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ FUNCTION(ADD_BE_TEST TEST_NAME)

ADD_EXECUTABLE(${TEST_FILE_NAME} ${TEST_NAME}.cpp ${ADDITIONAL_FILES})
TARGET_LINK_LIBRARIES(${TEST_FILE_NAME} ${TEST_LINK_LIBS})
SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control")
SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control" ENABLE_EXPORTS 1)
if (NOT "${TEST_DIR_NAME}" STREQUAL "")
SET_TARGET_PROPERTIES(${TEST_FILE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}/${TEST_DIR_NAME}")
endif()
Expand Down
52 changes: 38 additions & 14 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,28 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
: BaseScanner(state, profile, params, pre_filter_ctxs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
// _splittable(params.splittable),
_value_separator(static_cast<char>(params.column_separator)),
_line_delimiter(static_cast<char>(params.line_delimiter)),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
_next_range(0),
_cur_line_reader_eof(false),
_scanner_eof(false),
_skip_next_line(false) {}
_skip_next_line(false) {
if (params.__isset.column_separator_length && params.column_separator_length > 1) {
_value_separator = params.column_separator_str;
_value_separator_length = params.column_separator_length;
} else {
_value_separator.push_back(static_cast<char>(params.column_separator));
_value_separator_length = 1;
}
if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
_line_delimiter = params.line_delimiter_str;
_line_delimiter_length = params.line_delimiter_length;
} else {
_line_delimiter.push_back(static_cast<char>(params.line_delimiter));
_line_delimiter_length = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_line_delimiter_length seams useless.

}
}

BrokerScanner::~BrokerScanner() {
close();
Expand Down Expand Up @@ -255,7 +267,7 @@ Status BrokerScanner::open_line_reader() {
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
_cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
size, _line_delimiter);
size, _line_delimiter, _line_delimiter_length);
break;
default: {
std::stringstream ss;
Expand Down Expand Up @@ -292,16 +304,24 @@ void BrokerScanner::close() {
}

void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
// line-begin char and line-end char are considered to be 'delimiter'
const char* value = line.data;
const char* ptr = line.data;
for (size_t i = 0; i < line.size; ++i, ++ptr) {
if (*ptr == _value_separator) {
values->emplace_back(value, ptr - value);
value = ptr + 1;
size_t i = 0;
// TODO improve the performance
while (i < line.size) {
if (i + _value_separator_length <= line.size) {
if (_value_separator.compare(0, _value_separator_length, line.data + i,
_value_separator_length) == 0) {
values->emplace_back(value, line.data + i - value);
value = line.data + i + _value_separator_length;
i += _value_separator_length;
} else {
++i;
}
} else {
break;
}
}
values->emplace_back(value, ptr - value);
values->emplace_back(value, line.data + i - value);
}

void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool, char** new_value_p,
Expand Down Expand Up @@ -413,15 +433,19 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is less than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
<< "actual number: " << values.size() << " column separator: ["
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
return false;
} else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is more than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
<< "actual number: " << values.size() << " column separator: ["
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size), error_msg.str());
_counter->num_rows_filtered++;
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ class BrokerScanner : public BaseScanner {

std::unique_ptr<TextConverter> _text_converter;

char _value_separator;
char _line_delimiter;
std::string _value_separator;
std::string _line_delimiter;
int _value_separator_length;
int _line_delimiter_length;

// Reader
FileReader* _cur_file_reader;
Expand Down
19 changes: 11 additions & 8 deletions be/src/exec/plain_text_line_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ namespace doris {

PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
Decompressor* decompressor, size_t length,
uint8_t line_delimiter)
const std::string& line_delimiter, size_t line_delimiter_length)
: _profile(profile),
_file_reader(file_reader),
_decompressor(decompressor),
_min_length(length),
_total_read_bytes(0),
_line_delimiter(line_delimiter),
_line_delimiter_length(line_delimiter_length),
_input_buf(new uint8_t[INPUT_CHUNK]),
_input_buf_size(INPUT_CHUNK),
_input_buf_pos(0),
Expand Down Expand Up @@ -92,7 +93,7 @@ inline bool PlainTextLineReader::update_eof() {
uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start,
size_t len) {
// TODO: meanwhile find and save field pos
return (uint8_t*)memmem(start, len, &_line_delimiter, 1);
return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length);
}

// extend input buf if necessary only when _more_input_bytes > 0
Expand Down Expand Up @@ -195,10 +196,12 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e

if (pos == nullptr) {
// didn't find line delimiter, read more data from decompressor
// 1. point 'offset' to _output_buf_limit
offset = output_buf_read_remaining();

// 2. read from file reader
// for multi bytes delimiter we cannot set offset to avoid incomplete
// delimiter
// read from file reader
if (_line_delimiter_length == 1) {
offset = output_buf_read_remaining();
}
extend_output_buf();
if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
// we still have data in input which is not decompressed.
Expand Down Expand Up @@ -266,7 +269,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e

if (_decompressor != nullptr) {
SCOPED_TIMER(_decompress_timer);
// 2. decompress
// decompress
size_t input_read_bytes = 0;
size_t decompressed_len = 0;
_more_input_bytes = 0;
Expand Down Expand Up @@ -316,7 +319,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
// we found a complete line
// ready to return
offset = pos - cur_ptr;
found_line_delimiter = 1;
found_line_delimiter = _line_delimiter_length;
break;
}
} // while (!done())
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class Status;
class PlainTextLineReader : public LineReader {
public:
PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
Decompressor* decompressor, size_t length, uint8_t line_delimiter);
Decompressor* decompressor, size_t length,
const std::string& line_delimiter, size_t line_delimiter_length);

virtual ~PlainTextLineReader();

Expand Down Expand Up @@ -61,7 +62,8 @@ class PlainTextLineReader : public LineReader {
Decompressor* _decompressor;
size_t _min_length;
size_t _total_read_bytes;
uint8_t _line_delimiter;
std::string _line_delimiter;
size_t _line_delimiter_length;

// save the data read from file reader
uint8_t* _input_buf;
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
}
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
request.__set_partitions(http_req->header(HTTP_PARTITIONS));
request.__set_isTempPartition(false);
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ static const std::string HTTP_FORMAT_KEY = "format";
static const std::string HTTP_COLUMNS = "columns";
static const std::string HTTP_WHERE = "where";
static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
Expand Down
12 changes: 6 additions & 6 deletions be/test/exec/plain_text_line_reader_bzip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_normal_use) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit2) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand All @@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit3) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit4) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, bzip2_test_limit5) {
st = Decompressor::create_decompressor(CompressType::BZIP2, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down
14 changes: 7 additions & 7 deletions be/test/exec/plain_text_line_reader_gzip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, gzip_normal_use) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -98,7 +98,7 @@ TEST_F(PlainTextLineReaderTest, uncompressed_no_newline) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -133,7 +133,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -169,7 +169,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit2) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand All @@ -194,7 +194,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit3) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -224,7 +224,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit4) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -254,7 +254,7 @@ TEST_F(PlainTextLineReaderTest, gzip_test_limit5) {
st = Decompressor::create_decompressor(CompressType::GZIP, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down
12 changes: 6 additions & 6 deletions be/test/exec/plain_text_line_reader_lz4frame_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(PlainTextLineReaderTest, lz4_normal_use) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, -1, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -97,7 +97,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 8, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -134,7 +134,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit2) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 6, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand All @@ -158,7 +158,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit3) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -188,7 +188,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit4) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 7, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down Expand Up @@ -218,7 +218,7 @@ TEST_F(PlainTextLineReaderTest, lz4_test_limit5) {
st = Decompressor::create_decompressor(CompressType::LZ4FRAME, &decompressor);
ASSERT_TRUE(st.ok());

PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, '\n');
PlainTextLineReader line_reader(&_profile, &file_reader, decompressor, 0, "\n", 1);
const uint8_t* ptr;
size_t size;
bool eof;
Expand Down
Loading