Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,18 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool*
}
return true;
}

void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path) {
// values of columns from path can not be null
for (int i = 0; i < columns_from_path.size(); ++i) {
auto slot_desc = _src_slot_descs.at(i + start);
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
const std::string& column_from_path = columns_from_path[i];
str_slot->ptr = const_cast<char*>(column_from_path.c_str());
str_slot->len = column_from_path.size();
}
}

}
2 changes: 2 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class BaseScanner {
virtual void close() = 0;
bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool);

void fill_slots_of_columns_from_path(int start, const std::vector<std::string>& columns_from_path);

protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
Expand Down
19 changes: 12 additions & 7 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace doris {

BrokerScanner::BrokerScanner(RuntimeState* state,
RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter) : BaseScanner(state, profile, params, counter),
Expand Down Expand Up @@ -73,7 +73,7 @@ Status BrokerScanner::open() {

Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
SCOPED_TIMER(_read_timer);
// Get one line
// Get one line
while (!_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
RETURN_IF_ERROR(open_next_reader());
Expand Down Expand Up @@ -120,7 +120,7 @@ Status BrokerScanner::open_next_reader() {
RETURN_IF_ERROR(open_file_reader());
RETURN_IF_ERROR(open_line_reader());
_next_range++;

return Status::OK();
}

Expand Down Expand Up @@ -379,8 +379,8 @@ bool BrokerScanner::check_decimal_input(
}

bool is_null(const Slice& slice) {
return slice.size == 2 &&
slice.data[0] == '\\' &&
return slice.size == 2 &&
slice.data[0] == '\\' &&
slice.data[1] == 'N';
}

Expand Down Expand Up @@ -469,7 +469,10 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
split_line(line, &values);
}

if (values.size() < _src_slot_descs.size()) {
// range of current file
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
const std::vector<std::string>& columns_from_path = range.columns_from_path;
if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is less than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
Expand All @@ -478,7 +481,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
error_msg.str());
_counter->num_rows_filtered++;
return false;
} else if (values.size() > _src_slot_descs.size()) {
} else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is more than schema column number. "
<< "actual number: " << values.size() << " sep: " << _value_separator << ", "
Expand All @@ -503,6 +506,8 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
str_slot->len = value.size;
}

fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);

return true;
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BrokerScanner : public BaseScanner {
BrokerScanner(
RuntimeState* state,
RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
ScannerCounter* counter);
Expand Down Expand Up @@ -123,7 +123,7 @@ private:;

bool _scanner_eof;

// When we fetch range doesn't start from 0,
// When we fetch range doesn't start from 0,
// we will read to one ahead, and skip the first line
bool _skip_next_line;

Expand Down
43 changes: 37 additions & 6 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ namespace doris {

// Broker

ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader) :
_total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) {
ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file) :
_num_of_columns_from_file(num_of_columns_from_file), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) {
_parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
_properties = parquet::ReaderProperties();
_properties.enable_buffered_stream();
Expand Down Expand Up @@ -122,16 +122,18 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc
Status ParquetReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs)
{
_parquet_column_ids.clear();
for (auto slot_desc : tuple_slot_descs) {
for (int i = 0; i < _num_of_columns_from_file; i++) {
auto slot_desc = tuple_slot_descs.at(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto slot_desc = tuple_slot_descs.at(i);
auto slot_desc = tuple_slot_descs[i];

// Get the Column Reader for the boolean column
auto iter = _map_column.find(slot_desc->col_name());
if (iter == _map_column.end()) {
if (iter != _map_column.end()) {
_parquet_column_ids.emplace_back(iter->second);
} else {
std::stringstream str_error;
str_error << "Invalid Column Name:" << slot_desc->col_name();
LOG(WARNING) << str_error.str();
return Status::InvalidArgument(str_error.str());
}
_parquet_column_ids.emplace_back(iter->second);
}
return Status::OK();
}
Expand Down Expand Up @@ -206,7 +208,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
const uint8_t *value = nullptr;
int column_index = 0;
try {
size_t slots = tuple_slot_descs.size();
size_t slots = _parquet_column_ids.size();
for (size_t i = 0; i < slots; ++i) {
auto slot_desc = tuple_slot_descs[i];
column_index = i;// column index in batch record
Expand Down Expand Up @@ -396,6 +398,35 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
}
break;
}
case arrow::Type::type::DATE32: {
auto ts_array = std::dynamic_pointer_cast<arrow::Date32Array>(_batch->column(column_index));
if (ts_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60);
struct tm local;
localtime_r(&timestamp, &local);
char* to = reinterpret_cast<char*>(&tmp_buf);
wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d", &local);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
case arrow::Type::type::DATE64: {
auto ts_array = std::dynamic_pointer_cast<arrow::Date64Array>(_batch->column(column_index));
if (ts_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
// convert milliseconds to seconds
time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) / 1000);
struct tm local;
localtime_r(&timestamp, &local);
char* to = reinterpret_cast<char*>(&tmp_buf);
wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d %H:%M:%S", &local);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
default: {
// other type not support.
std::stringstream str_error;
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ParquetFile : public arrow::io::RandomAccessFile {
// Reader of broker parquet file
class ParquetReaderWrap {
public:
ParquetReaderWrap(FileReader *file_reader);
ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file);
virtual ~ParquetReaderWrap();

// Read
Expand All @@ -85,6 +85,7 @@ class ParquetReaderWrap {
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t *buf, int32_t *wbtyes);

private:
const int32_t _num_of_columns_from_file;
parquet::ReaderProperties _properties;
std::shared_ptr<ParquetFile> _parquet;

Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
_cur_file_eof = false;
}
RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof));
// range of current file
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
const TBrokerRangeDesc& range = _ranges[_next_range - 1];

fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
{
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
Expand Down Expand Up @@ -141,7 +144,7 @@ Status ParquetScanner::open_next_reader() {
file_reader->close();
continue;
}
_cur_file_reader = new ParquetReaderWrap(file_reader.release());
_cur_file_reader = new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file);
Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs);
if (status.is_end_of_file()) {
continue;
Expand Down
77 changes: 68 additions & 9 deletions be/test/exec/broker_scan_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,33 @@ void BrokerScanNodeTest::init_desc_table() {
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "k3";
slot_desc.slotIdx = 2;
slot_desc.slotIdx = 3;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}
// k4(partitioned column)
{
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 0;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::INT);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 1;
slot_desc.byteOffset = 12;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "k4";
slot_desc.slotIdx = 4;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
Expand All @@ -164,7 +190,7 @@ void BrokerScanNodeTest::init_desc_table() {
// TTupleDescriptor dest
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 0;
t_tuple_desc.byteSize = 12;
t_tuple_desc.byteSize = 16;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
Expand Down Expand Up @@ -251,7 +277,34 @@ void BrokerScanNodeTest::init_desc_table() {
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "k3";
slot_desc.slotIdx = 2;
slot_desc.slotIdx = 3;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
}
// k4(partitioned column)
{
TSlotDescriptor slot_desc;

slot_desc.id = next_slot_id++;
slot_desc.parent = 1;
TTypeDesc type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::VARCHAR);
scalar_type.__set_len(65535);
node.__set_scalar_type(scalar_type);
type.types.push_back(node);
}
slot_desc.slotType = type;
slot_desc.columnPos = 1;
slot_desc.byteOffset = 48;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "k4";
slot_desc.slotIdx = 4;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
Expand All @@ -261,7 +314,7 @@ void BrokerScanNodeTest::init_desc_table() {
// TTupleDescriptor source
TTupleDescriptor t_tuple_desc;
t_tuple_desc.id = 1;
t_tuple_desc.byteSize = 48;
t_tuple_desc.byteSize = 60;
t_tuple_desc.numNullBytes = 0;
t_tuple_desc.tableId = 0;
t_tuple_desc.__isset.tableId = true;
Expand All @@ -276,7 +329,7 @@ void BrokerScanNodeTest::init_desc_table() {
void BrokerScanNodeTest::init() {
_params.column_separator = ',';
_params.line_delimiter = '\n';

TTypeDesc int_type;
{
TTypeNode node;
Expand All @@ -297,7 +350,7 @@ void BrokerScanNodeTest::init() {
varchar_type.types.push_back(node);
}

for (int i = 0; i < 3; ++i) {
for (int i = 0; i < 4; ++i) {
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = int_type;
Expand All @@ -319,15 +372,15 @@ void BrokerScanNodeTest::init() {
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = 4 + i;
slot_ref.slot_ref.slot_id = 5 + i;
slot_ref.slot_ref.tuple_id = 1;

TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);

_params.expr_of_dest_slot.emplace(i + 1, expr);
_params.src_slot_ids.push_back(4 + i);
_params.src_slot_ids.push_back(5 + i);
}
// _params.__isset.expr_of_dest_slot = true;
_params.__set_dest_tuple_id(0);
Expand Down Expand Up @@ -367,6 +420,9 @@ TEST_F(BrokerScanNodeTest, normal) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
range.splittable = true;
std::vector<std::string> columns_from_path{"1"};
range.__set_columns_from_path(columns_from_path);
range.__set_num_of_columns_from_file(3);
broker_scan_range.ranges.push_back(range);

scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
Expand All @@ -386,6 +442,9 @@ TEST_F(BrokerScanNodeTest, normal) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
range.splittable = true;
std::vector<std::string> columns_from_path{"2"};
range.__set_columns_from_path(columns_from_path);
range.__set_num_of_columns_from_file(3);
broker_scan_range.ranges.push_back(range);

scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
Expand All @@ -394,7 +453,7 @@ TEST_F(BrokerScanNodeTest, normal) {
}

scan_node.set_scan_ranges(scan_ranges);

status = scan_node.open(&_runtime_state);
ASSERT_TRUE(status.ok());

Expand Down
Loading