diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 950cea7515bfed..a60295b0fab111 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -67,16 +67,25 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector if (_current_line_of_group == 0) {// the first read RETURN_IF_ERROR(column_indices(tuple_slot_descs)); // read batch - _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch); - arrow::Status status = _rb_batch->ReadNext(&_batch); + arrow::Status status = _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch); + if (!status.ok()) { + LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString(); + return Status::InternalError(status.ToString()); + } + status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { LOG(WARNING) << "The first read record. " << status.ToString(); return Status::InternalError(status.ToString()); } //save column type std::shared_ptr field_schema = _batch->schema(); - for (auto index : _parquet_column_ids) { - _parquet_column_type.emplace_back(field_schema->field(index)->type()->id()); + for (int i = 0; i < _parquet_column_ids.size(); i++) { + std::shared_ptr field = field_schema->field(i); + if (!field) { + LOG(WARNING) << "Get filed schema failed. Column order:" << i; + return Status::InternalError(status.ToString()); + } + _parquet_column_type.emplace_back(field->type()->id()); } } return Status::OK(); @@ -126,7 +135,10 @@ Status ParquetReaderWrap::column_indices(const std::vector& tup inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc) { if (!slot_desc->is_nullable()) { - Status::RuntimeError("Null is not allowed, but Parquet field is NULL."); + std::stringstream str_error; + str_error << "The field name("<< slot_desc->col_name() <<") is not allowed null, but Parquet field is NULL."; + LOG(WARNING) << str_error.str(); + return Status::RuntimeError(str_error.str()); } tuple->set_null(slot_desc->null_indicator_offset()); return Status::OK(); @@ -143,8 +155,11 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& _current_line_of_group = 0; _rows_of_group = _file_metadata->RowGroup(_current_group)->num_rows(); //get rows of the current row group // read batch - _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch); - arrow::Status status = _rb_batch->ReadNext(&_batch); + arrow::Status status = _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch); + if (!status.ok()) { + return Status::InternalError("Get RecordBatchReader Failed."); + } + status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { return Status::InternalError("Read Batch Error With Libarrow."); } @@ -191,7 +206,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& size_t slots = tuple_slot_descs.size(); for (size_t i = 0; i < slots; ++i) { auto slot_desc = tuple_slot_descs[i]; - column_index = _parquet_column_ids[i];// column index with Parquet Field + column_index = i;// column index in batch record switch (_parquet_column_type[i]) { case arrow::Type::type::STRING: { auto str_array = std::dynamic_pointer_cast(_batch->column(column_index));