Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
return Status::OK();
}

Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t *buf, int32_t *wbtyes) {
Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t *buf, int32_t *wbytes) {
const auto type = std::dynamic_pointer_cast<arrow::TimestampType>(ts_array->type());
// Doris only supports seconds
time_t timestamp = 0;
Expand All @@ -196,13 +196,13 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::Timestam
}
tm* local;
local = localtime(&timestamp);
*wbtyes = (uint32_t)strftime((char*)buf, 64, "%Y-%m-%d %H:%M:%S", local);
*wbytes = (uint32_t)strftime((char*)buf, 64, "%Y-%m-%d %H:%M:%S", local);
return Status::OK();
}

Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, MemPool* mem_pool, bool* eof) {
uint8_t tmp_buf[128] = {0};
int32_t wbtyes = 0;
int32_t wbytes = 0;
const uint8_t *value = nullptr;
int column_index = 0;
try {
Expand All @@ -216,8 +216,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
if (str_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
value = str_array->GetValue(_current_line_of_group, &wbtyes);
fill_slot(tuple, slot_desc, mem_pool, value, wbtyes);
value = str_array->GetValue(_current_line_of_group, &wbytes);
fill_slot(tuple, slot_desc, mem_pool, value, wbytes);
}
break;
}
Expand All @@ -227,8 +227,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
int32_t value = int32_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -238,8 +238,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
int64_t value = int64_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%ld", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%ld", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -249,8 +249,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
uint32_t value = uint32_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%u", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%u", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -260,8 +260,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
uint64_t value = uint64_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%lu", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%lu", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -270,8 +270,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
if (str_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
value = str_array->GetValue(_current_line_of_group, &wbtyes);
fill_slot(tuple, slot_desc, mem_pool, value, wbtyes);
value = str_array->GetValue(_current_line_of_group, &wbytes);
fill_slot(tuple, slot_desc, mem_pool, value, wbytes);
}
break;
}
Expand Down Expand Up @@ -305,8 +305,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
uint8_t value = uint8_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -316,8 +316,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
int8_t value = int8_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -327,8 +327,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
uint16_t value = uint16_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -338,8 +338,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
int16_t value = int16_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%d", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -349,8 +349,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
float value = half_float_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -360,8 +360,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
float value = float_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -371,8 +371,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
float value = double_array->Value(_current_line_of_group);
wbtyes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
wbytes = sprintf((char*)tmp_buf, "%f", value);
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
Expand All @@ -381,8 +381,18 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
if (ts_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
RETURN_IF_ERROR(handle_timestamp(ts_array, tmp_buf, &wbtyes));// convert timestamp to string time
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes);
RETURN_IF_ERROR(handle_timestamp(ts_array, tmp_buf, &wbytes));// convert timestamp to string time
fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes);
}
break;
}
case arrow::Type::type::DECIMAL: {
auto decimal_array = std::dynamic_pointer_cast<arrow::DecimalArray>(_batch->column(column_index));
if (decimal_array->IsNull(_current_line_of_group)) {
RETURN_IF_ERROR(set_field_null(tuple, slot_desc));
} else {
std::string value = decimal_array->FormatValue(_current_line_of_group);
fill_slot(tuple, slot_desc, mem_pool, (const uint8_t*)value.c_str(), value.length());
}
break;
}
Expand Down