Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,25 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
if (_current_line_of_group == 0) {// the first read
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
// read batch
_reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
arrow::Status status = _rb_batch->ReadNext(&_batch);
arrow::Status status = _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
if (!status.ok()) {
LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
return Status::InternalError(status.ToString());
}
status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
LOG(WARNING) << "The first read record. " << status.ToString();
return Status::InternalError(status.ToString());
}
//save column type
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
for (auto index : _parquet_column_ids) {
_parquet_column_type.emplace_back(field_schema->field(index)->type()->id());
for (int i = 0; i < _parquet_column_ids.size(); i++) {
std::shared_ptr<arrow::Field> field = field_schema->field(i);
if (!field) {
LOG(WARNING) << "Get filed schema failed. Column order:" << i;
return Status::InternalError(status.ToString());
}
_parquet_column_type.emplace_back(field->type()->id());
}
}
return Status::OK();
Expand Down Expand Up @@ -126,7 +135,10 @@ Status ParquetReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tup

inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc) {
if (!slot_desc->is_nullable()) {
Status::RuntimeError("Null is not allowed, but Parquet field is NULL.");
std::stringstream str_error;
str_error << "The field name("<< slot_desc->col_name() <<") is not allowed null, but Parquet field is NULL.";
LOG(WARNING) << str_error.str();
return Status::RuntimeError(str_error.str());
}
tuple->set_null(slot_desc->null_indicator_offset());
return Status::OK();
Expand All @@ -143,8 +155,11 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
_current_line_of_group = 0;
_rows_of_group = _file_metadata->RowGroup(_current_group)->num_rows(); //get rows of the current row group
// read batch
_reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
arrow::Status status = _rb_batch->ReadNext(&_batch);
arrow::Status status = _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
if (!status.ok()) {
return Status::InternalError("Get RecordBatchReader Failed.");
}
status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");
}
Expand Down Expand Up @@ -191,7 +206,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
size_t slots = tuple_slot_descs.size();
for (size_t i = 0; i < slots; ++i) {
auto slot_desc = tuple_slot_descs[i];
column_index = _parquet_column_ids[i];// column index with Parquet Field
column_index = i;// column index in batch record
switch (_parquet_column_type[i]) {
case arrow::Type::type::STRING: {
auto str_array = std::dynamic_pointer_cast<arrow::StringArray>(_batch->column(column_index));
Expand Down