Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,21 @@ Status CsvReader::init_reader(bool is_load) {
_options.converted_from_string = _trim_double_quotes;
_not_trim_enclose = (!_trim_double_quotes && _enclose == '\"');

if (_state != nullptr) {
_keep_cr = _state->query_options().keep_carriage_return;
}

std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
if (_enclose == 0) {
text_line_reader_ctx =
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length);
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);

_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape, _keep_cr);

_fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
_trim_tailing_spaces, !_not_trim_enclose,
Expand Down Expand Up @@ -878,20 +882,24 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
}

if (_state != nullptr) {
_keep_cr = _state->query_options().keep_carriage_return;
}

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());
std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
if (_enclose == 0) {
text_line_reader_ctx =
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length);
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);
_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, _trim_double_quotes, _value_separator,
_value_separator_length);
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape, _keep_cr);
_fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
_trim_tailing_spaces, false,
std::static_pointer_cast<EncloseCsvLineReaderContext>(text_line_reader_ctx),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ class CsvReader : public GenericReader {
bool _trim_tailing_spaces = false;
// `should_not_trim` is to manage the case that: user do not expect to trim double quotes but enclose is double quotes
bool _not_trim_enclose = true;
bool _keep_cr = false;

io::IOContext* _io_ctx = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <glog/logging.h>
#include <string.h>

#ifdef __AVX2__
#include <immintrin.h>
#endif
#include <algorithm>
#include <cstddef>
#include <cstring>
Expand All @@ -42,7 +45,6 @@
// leave these 2 size small for debugging

namespace doris {

const uint8_t* EncloseCsvLineReaderContext::read_line_impl(const uint8_t* start,
const size_t length) {
_total_len = length;
Expand Down Expand Up @@ -82,12 +84,11 @@ void EncloseCsvLineReaderContext::on_col_sep_found(const uint8_t* start,
}

size_t EncloseCsvLineReaderContext::update_reading_bound(const uint8_t* start) {
_result = (uint8_t*)memmem(start + _idx, _total_len - _idx, line_delimiter.c_str(),
line_delimiter_len);
_result = call_find_line_sep(start + _idx, _total_len - _idx);
if (_result == nullptr) {
return _total_len;
}
return _result - start + line_delimiter_len;
return _result - start + line_delimiter_length();
}

template <bool SingleChar>
Expand Down
106 changes: 96 additions & 10 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class TextLineReaderContextIf {
// info about the current line may be record to the ctx, like column seprator pos.
/// @return line delimiter pos if found, otherwise return nullptr.
virtual const uint8_t* read_line(const uint8_t* start, const size_t len) = 0;

/// @return length of line delimiter
[[nodiscard]] virtual size_t line_delimiter_length() const = 0;

Expand All @@ -62,30 +61,117 @@ class BaseTextLineReaderContext : public TextLineReaderContextIf {

public:
explicit BaseTextLineReaderContext(const std::string& line_delimiter_,
const size_t line_delimiter_len_)
: line_delimiter(line_delimiter_), line_delimiter_len(line_delimiter_len_) {}
const size_t line_delimiter_len_, const bool keep_cr_)
: line_delimiter(line_delimiter_),
line_delimiter_len(line_delimiter_len_),
keep_cr(keep_cr_) {
use_memmem = line_delimiter_len != 1 || line_delimiter != "\n" || keep_cr;
if (use_memmem) {
find_line_delimiter_func = &BaseTextLineReaderContext::find_multi_char_line_sep;
} else {
find_line_delimiter_func = &BaseTextLineReaderContext::find_lf_crlf_line_sep;
}
}

inline const uint8_t* read_line(const uint8_t* start, const size_t len) final {
return static_cast<Ctx*>(this)->read_line_impl(start, len);
}

[[nodiscard]] inline size_t line_delimiter_length() const final { return line_delimiter_len; }
[[nodiscard]] inline size_t line_delimiter_length() const final {
return line_delimiter_len + line_crlf;
}

inline void refresh() final { return static_cast<Ctx*>(this)->refresh_impl(); };

inline const uint8_t* find_multi_char_line_sep(const uint8_t* start, const size_t length) {
return static_cast<uint8_t*>(
memmem(start, length, line_delimiter.c_str(), line_delimiter_len));
}

const uint8_t* find_lf_crlf_line_sep(const uint8_t* start, const size_t length) {
line_crlf = false;
if (start == nullptr || length == 0) {
return nullptr;
}
size_t i = 0;
#ifdef __AVX2__
// const uint8_t* end = start + length;
const __m256i newline = _mm256_set1_epi8('\n');
const __m256i carriage_return = _mm256_set1_epi8('\r');

const size_t simd_width = 32;
// Process 32 bytes at a time using AVX2
for (; i + simd_width <= length; i += simd_width) {
__m256i data = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(start + i));

// Compare with '\n' and '\r'
__m256i cmp_newline = _mm256_cmpeq_epi8(data, newline);
__m256i cmp_carriage_return = _mm256_cmpeq_epi8(data, carriage_return);

// Check if there is a match
int mask_newline = _mm256_movemask_epi8(cmp_newline);
int mask_carriage_return = _mm256_movemask_epi8(cmp_carriage_return);

if (mask_newline != 0 || mask_carriage_return != 0) {
int pos_lf = (mask_newline != 0) ? i + __builtin_ctz(mask_newline) : INT32_MAX;
int pos_cr = (mask_carriage_return != 0) ? i + __builtin_ctz(mask_carriage_return)
: INT32_MAX;
if (pos_lf < pos_cr) {
return start + pos_lf;
} else if (pos_cr < pos_lf) {
if (pos_lf != INT32_MAX) {
if (pos_lf - 1 >= 0 && start[pos_lf - 1] == '\r') {
//check xxx\r\r\r\nxxx
line_crlf = true;
return start + pos_lf - 1;
}
// xxx\rxxxx\nxx
return start + pos_lf;
} else if (i + simd_width < length && start[i + simd_width - 1] == '\r' &&
start[i + simd_width] == '\n') {
//check [/r/r/r/r/r/r/rxxx/r] [\nxxxx]
line_crlf = true;
return start + i + simd_width - 1;
}
}
}
}

// Process remaining bytes
#endif
for (; i < length; ++i) {
if (start[i] == '\n') {
return &start[i];
}
if (start[i] == '\r' && (i + 1 < length) && start[i + 1] == '\n') {
line_crlf = true;
return &start[i];
}
}
return nullptr;
}
const uint8_t* call_find_line_sep(const uint8_t* start, const size_t length) {
return (this->*find_line_delimiter_func)(start, length);
}

protected:
const std::string line_delimiter;
const size_t line_delimiter_len;
bool keep_cr = false;
bool line_crlf = false;
bool use_memmem = true;
using FindLineDelimiterFunc = const uint8_t* (BaseTextLineReaderContext::*)(const uint8_t*,
size_t);
FindLineDelimiterFunc find_line_delimiter_func;
};

class PlainTextLineReaderCtx final : public BaseTextLineReaderContext<PlainTextLineReaderCtx> {
public:
explicit PlainTextLineReaderCtx(const std::string& line_delimiter_,
const size_t line_delimiter_len_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_) {}
const size_t line_delimiter_len_, const bool keep_cr_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_) {}

inline const uint8_t* read_line_impl(const uint8_t* start, const size_t length) {
return (uint8_t*)memmem(start, length, line_delimiter.c_str(), line_delimiter_len);
return call_find_line_sep(start, length);
}

inline void refresh_impl() {}
Expand Down Expand Up @@ -119,8 +205,8 @@ class EncloseCsvLineReaderContext final
const size_t line_delimiter_len_,
const std::string& column_sep_,
const size_t column_sep_len_, size_t col_sep_num,
const char enclose, const char escape)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_),
const char enclose, const char escape, const bool keep_cr_)
: BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_, keep_cr_),
_enclose(enclose),
_escape(escape),
_column_sep_len(column_sep_len_),
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,9 @@ Status NewJsonReader::_open_line_reader() {
}
_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, _decompressor.get(),
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length), size,
_current_offset);
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length,
false),
size, _current_offset);
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique";

public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return";

public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";

// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
Expand Down Expand Up @@ -1755,6 +1757,12 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
"The maximum number of partitions created during table creation"})
public int createTablePartitionMaxNum = 10000;


@VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN,
description = {"在同时处理\r和\r\n作为CSV的行分隔符时,是否保留\r",
"When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;

@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
private boolean forceJniScanner = false;
Expand Down Expand Up @@ -3133,6 +3141,14 @@ public void setEnableUnicodeNameSupport(boolean enableUnicodeNameSupport) {
this.enableUnicodeNameSupport = enableUnicodeNameSupport;
}

public boolean isKeepCarriageReturn() {
return keepCarriageReturn;
}

public void setKeepCarriageReturn(boolean keepCarriageReturn) {
this.keepCarriageReturn = keepCarriageReturn;
}

public boolean isDropTableIfCtasFailed() {
return dropTableIfCtasFailed;
}
Expand Down Expand Up @@ -3395,6 +3411,7 @@ public TQueryOptions toThrift() {

tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());
tResult.setKeepCarriageReturn(keepCarriageReturn);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ struct TQueryOptions {
117: optional bool read_csv_empty_line_as_null = false;

118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS;

119: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV.
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
}
Expand Down
Loading