diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 502fa0ef51c53f..b91d14713364fd 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -170,7 +170,7 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& return Status::OK(); } -Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes) { +Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbytes) { const auto type = std::dynamic_pointer_cast(ts_array->type()); // Doris only supports seconds time_t timestamp = 0; @@ -196,13 +196,13 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& tuple_slot_descs, MemPool* mem_pool, bool* eof) { uint8_t tmp_buf[128] = {0}; - int32_t wbtyes = 0; + int32_t wbytes = 0; const uint8_t *value = nullptr; int column_index = 0; try { @@ -216,8 +216,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& if (str_array->IsNull(_current_line_of_group)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { - value = str_array->GetValue(_current_line_of_group, &wbtyes); - fill_slot(tuple, slot_desc, mem_pool, value, wbtyes); + value = str_array->GetValue(_current_line_of_group, &wbytes); + fill_slot(tuple, slot_desc, mem_pool, value, wbytes); } break; } @@ -227,8 +227,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { int32_t value = int32_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%d", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%d", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -238,8 +238,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { int64_t value = int64_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%ld", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%ld", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -249,8 +249,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { uint32_t value = uint32_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%u", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%u", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -260,8 +260,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { uint64_t value = uint64_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%lu", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%lu", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -270,8 +270,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& if (str_array->IsNull(_current_line_of_group)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { - value = str_array->GetValue(_current_line_of_group, &wbtyes); - fill_slot(tuple, slot_desc, mem_pool, value, wbtyes); + value = str_array->GetValue(_current_line_of_group, &wbytes); + fill_slot(tuple, slot_desc, mem_pool, value, wbytes); } break; } @@ -305,8 +305,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { uint8_t value = uint8_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%d", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%d", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -316,8 +316,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { int8_t value = int8_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%d", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%d", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -327,8 +327,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { uint16_t value = uint16_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%d", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%d", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -338,8 +338,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { int16_t value = int16_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%d", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%d", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -349,8 +349,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { float value = half_float_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%f", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%f", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -360,8 +360,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { float value = float_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%f", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%f", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -371,8 +371,8 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { float value = double_array->Value(_current_line_of_group); - wbtyes = sprintf((char*)tmp_buf, "%f", value); - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + wbytes = sprintf((char*)tmp_buf, "%f", value); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; } @@ -381,8 +381,18 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& if (ts_array->IsNull(_current_line_of_group)) { RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { - RETURN_IF_ERROR(handle_timestamp(ts_array, tmp_buf, &wbtyes));// convert timestamp to string time - fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbtyes); + RETURN_IF_ERROR(handle_timestamp(ts_array, tmp_buf, &wbytes));// convert timestamp to string time + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); + } + break; + } + case arrow::Type::type::DECIMAL: { + auto decimal_array = std::dynamic_pointer_cast(_batch->column(column_index)); + if (decimal_array->IsNull(_current_line_of_group)) { + RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); + } else { + std::string value = decimal_array->FormatValue(_current_line_of_group); + fill_slot(tuple, slot_desc, mem_pool, (const uint8_t*)value.c_str(), value.length()); } break; }