Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
01787cc
Enable partition discovery when loading data from parquet file
yuanlihan Jul 31, 2019
cefe179
Fix bug that replicas of a tablet may be located on same host (#1517)
morningman Aug 1, 2019
6c21a5a
Switch MAKE_TEST off in build.sh (#1579)
imay Aug 3, 2019
938c6d4
Thrown TabletQuorumFailedException in commitTxn (#1575)
EmmyMiao87 Aug 4, 2019
81cbffd
Read date type column from parquet
yuanlihan Aug 4, 2019
0ab22e4
Enable partition discovery for broker loading
yuanlihan Aug 4, 2019
bf8e081
Update comment
yuanlihan Aug 4, 2019
93a3577
Support multi partition column when creating table (#1574)
morningman Aug 5, 2019
eda55a7
Fix bug that unable to delete replica if version is missing (#1585)
morningman Aug 5, 2019
d938f9a
Implement the initial version of BetaRowset (#1568)
Aug 6, 2019
ec7b9e4
Acquire tablet map write lock during tablet gc (#1588)
yiguolei Aug 6, 2019
b2e678d
Support Segment for BetaRowset (#1577)
imay Aug 6, 2019
343b913
Fix a serious bug that will cause all replicas being deleted. (#1589)
morningman Aug 6, 2019
f7a05d8
Support setting timezone variable in FE (#1587)
Youngwb Aug 7, 2019
9402456
Fix parquet directory have empty file (#1593)
HangyuanLiu Aug 7, 2019
dc4a5e6
Support Decimal Type when load Parquet File (#1595)
worker24h Aug 7, 2019
41cbedf
Manage tablet by partition id (#1591)
yiguolei Aug 7, 2019
4c2a3d6
Merge Help document to documentation (#1586)
xy720 Aug 7, 2019
60d997f
Fix errors when ES username and passwd is empty (#1601)
wuyunfeng Aug 8, 2019
b937887
Include header file for ‘preadv' which caused break build on ubuntu …
lenmom Aug 8, 2019
133e90e
Enable partition discovery when loading data from parquet file
yuanlihan Jul 31, 2019
067c0df
Read date type column from parquet
yuanlihan Aug 4, 2019
7da20bc
Enable partition discovery for broker loading
yuanlihan Aug 4, 2019
66b9ffd
Update comment
yuanlihan Aug 4, 2019
2f11132
Enable parsing columns from file path for Broker Load
yuanlihan Aug 11, 2019
3ed9248
Update docs
yuanlihan Aug 11, 2019
3e553c2
merge conflicts
yuanlihan Aug 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/env/env_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/uio.h>

#include "common/logging.h"
#include "gutil/macros.h"
Expand Down
47 changes: 31 additions & 16 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
_cur_decompressor(nullptr),
_next_range(0),
_cur_line_reader_eof(false),
_columns_from_path(),
_scanner_eof(false),
_skip_next_line(false) {
}
Expand All @@ -73,7 +74,7 @@ Status BrokerScanner::open() {

Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
SCOPED_TIMER(_read_timer);
// Get one line
// Get one line
while (!_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
RETURN_IF_ERROR(open_next_reader());
Expand Down Expand Up @@ -120,7 +121,7 @@ Status BrokerScanner::open_next_reader() {
RETURN_IF_ERROR(open_file_reader());
RETURN_IF_ERROR(open_line_reader());
_next_range++;

return Status::OK();
}

Expand Down Expand Up @@ -236,6 +237,9 @@ Status BrokerScanner::open_line_reader() {
// _decompressor may be NULL if this is not a compressed file
RETURN_IF_ERROR(create_decompressor(range.format_type));

// set partitioned columns
_columns_from_path = range.columns_from_path;

// open line reader
switch (range.format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
Expand Down Expand Up @@ -379,8 +383,8 @@ bool BrokerScanner::check_decimal_input(
}

bool is_null(const Slice& slice) {
return slice.size == 2 &&
slice.data[0] == '\\' &&
return slice.size == 2 &&
slice.data[0] == '\\' &&
slice.data[1] == 'N';
}

Expand Down Expand Up @@ -452,6 +456,17 @@ bool BrokerScanner::convert_one_row(
return fill_dest_tuple(line, tuple, tuple_pool);
}

inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& value) {
if (slot_desc->is_nullable() && is_null(value)) {
_src_tuple->set_null(slot_desc->null_indicator_offset());
}
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
str_slot->ptr = value.data;
str_slot->len = value.size;
}

// Convert one row to this tuple
bool BrokerScanner::line_to_src_tuple(const Slice& line) {

Expand All @@ -469,7 +484,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
split_line(line, &values);
}

if (values.size() < _src_slot_descs.size()) {
if (values.size() + _columns_from_path.size() < _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is less than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
Expand All @@ -478,7 +493,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
error_msg.str());
_counter->num_rows_filtered++;
return false;
} else if (values.size() > _src_slot_descs.size()) {
} else if (values.size() + _columns_from_path.size() > _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is more than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
Expand All @@ -489,18 +504,18 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
return false;
}

for (int i = 0; i < values.size(); ++i) {
int file_column_index = 0;
for (int i = 0; i < _src_slot_descs.size(); ++i) {
auto slot_desc = _src_slot_descs[i];
const Slice& value = values[i];
if (slot_desc->is_nullable() && is_null(value)) {
_src_tuple->set_null(slot_desc->null_indicator_offset());
continue;
auto iter = _columns_from_path.find(slot_desc->col_name());
if (iter != _columns_from_path.end()) {
std::string partitioned_field = iter->second;
const Slice value = Slice(partitioned_field.c_str(), partitioned_field.size());
fill_slot(slot_desc, value);
} else {
const Slice& value = values[file_column_index++];
fill_slot(slot_desc, value);
}
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
str_slot->ptr = value.data;
str_slot->len = value.size;
}

return true;
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class BrokerScanner : public BaseScanner {
//Status init_expr_ctxes();

Status line_to_src_tuple();
void fill_slot(SlotDescriptor* slot_desc, const Slice& value);
bool line_to_src_tuple(const Slice& line);
private:;
const std::vector<TBrokerRangeDesc>& _ranges;
Expand All @@ -120,6 +121,7 @@ private:;
Decompressor* _cur_decompressor;
int _next_range;
bool _cur_line_reader_eof;
std::map<std::string, std::string> _columns_from_path;

bool _scanner_eof;

Expand Down
Loading