Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 109 additions & 12 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,10 @@ class ParquetIOTestBase : public ::testing::Test {

class TestReadDecimals : public ParquetIOTestBase {
public:
void CheckReadFromByteArrays(const std::shared_ptr<const LogicalType>& logical_type,
const std::vector<std::vector<uint8_t>>& values,
const Array& expected) {
void CheckReadFromByteArrays(
const std::shared_ptr<const LogicalType>& logical_type,
const std::vector<std::vector<uint8_t>>& values, const Array& expected,
ArrowReaderProperties properties = default_arrow_reader_properties()) {
std::vector<ByteArray> byte_arrays(values.size());
std::transform(values.begin(), values.end(), byte_arrays.begin(),
[](const std::vector<uint8_t>& bytes) {
Expand All @@ -822,7 +823,6 @@ class TestReadDecimals : public ParquetIOTestBase {
// The binary_type setting shouldn't affect the results
for (auto binary_type : {::arrow::Type::BINARY, ::arrow::Type::LARGE_BINARY,
::arrow::Type::BINARY_VIEW}) {
ArrowReaderProperties properties;
properties.set_binary_type(binary_type);
ASSERT_OK_AND_ASSIGN(auto reader, ReaderFromBuffer(buffer, properties));
ReadAndCheckSingleColumnFile(std::move(reader), expected);
Expand All @@ -833,6 +833,44 @@ class TestReadDecimals : public ParquetIOTestBase {
// The Decimal roundtrip tests always go through the FixedLenByteArray path,
// check the ByteArray case manually.

TEST_F(TestReadDecimals, Decimal32ByteArray) {
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
// 123456
{1, 226, 64},
// 987654
{15, 18, 6},
// -123456
{255, 254, 29, 192},
};

ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_smallest_decimal_enabled(true);

auto expected =
ArrayFromJSON(::arrow::decimal32(6, 3), R"(["123.456", "987.654", "-123.456"])");
CheckReadFromByteArrays(LogicalType::Decimal(6, 3), big_endian_decimals, *expected,
properties);
}

TEST_F(TestReadDecimals, Decimal64ByteArray) {
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
// 123456
{1, 226, 64},
// 987654
{15, 18, 6},
// -123456
{255, 255, 255, 255, 255, 254, 29, 192},
};

ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_smallest_decimal_enabled(true);

auto expected =
ArrayFromJSON(::arrow::decimal64(16, 3), R"(["123.456", "987.654", "-123.456"])");
CheckReadFromByteArrays(LogicalType::Decimal(16, 3), big_endian_decimals, *expected,
properties);
}

TEST_F(TestReadDecimals, Decimal128ByteArray) {
const std::vector<std::vector<uint8_t>> big_endian_decimals = {
// 123456
Expand Down Expand Up @@ -3044,18 +3082,19 @@ TEST(ArrowReadWrite, NestedRequiredField) {
/*row_group_size=*/8);
}

TEST(ArrowReadWrite, Decimal256) {
using ::arrow::Decimal256;
TEST(ArrowReadWrite, Decimal) {
using ::arrow::field;

auto type = ::arrow::decimal256(8, 4);

const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
"-9999.9999", "9999.9999"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);

for (auto type : {::arrow::decimal32(8, 4), ::arrow::decimal64(8, 4),
::arrow::decimal128(8, 4), ::arrow::decimal256(8, 4)}) {
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);
}
}

TEST(ArrowReadWrite, DecimalStats) {
Expand Down Expand Up @@ -5468,6 +5507,64 @@ TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn)
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

template <typename TestType>
class TestIntegerAnnotateSmallestDecimalTypeParquetIO
: public TestIntegerAnnotateDecimalTypeParquetIO<TestType> {
public:
void ReadAndCheckSingleDecimalColumnFile(const Array& values) {
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_smallest_decimal_enabled(true);

std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader, properties);
this->ReadSingleColumnFile(std::move(reader), &out);

if (values.type()->id() == out->type()->id()) {
AssertArraysEqual(values, *out);
} else {
auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(values.type());

ASSERT_OK_AND_ASSIGN(
const auto expected_values,
::arrow::compute::Cast(values, ::arrow::decimal256(decimal_type->precision(),
decimal_type->scale())));
ASSERT_OK_AND_ASSIGN(
const auto out_values,
::arrow::compute::Cast(*out, ::arrow::decimal256(decimal_type->precision(),
decimal_type->scale())));

ASSERT_EQ(expected_values->length(), out_values->length());
ASSERT_EQ(expected_values->null_count(), out_values->null_count());
ASSERT_TRUE(expected_values->Equals(*out_values));
}
}
};

using SmallestDecimalTestTypes = ::testing::Types<
Decimal32WithPrecisionAndScale<9>, Decimal64WithPrecisionAndScale<9>,
Decimal64WithPrecisionAndScale<18>, Decimal128WithPrecisionAndScale<9>,
Decimal128WithPrecisionAndScale<18>, Decimal256WithPrecisionAndScale<9>,
Decimal256WithPrecisionAndScale<18>>;

TYPED_TEST_SUITE(TestIntegerAnnotateSmallestDecimalTypeParquetIO,
SmallestDecimalTestTypes);

TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO,
SingleNonNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

TYPED_TEST(TestIntegerAnnotateSmallestDecimalTypeParquetIO, SingleNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

template <typename TestType>
class TestBufferedParquetIO : public TestParquetIO<TestType> {
public:
Expand Down
146 changes: 67 additions & 79 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ using arrow::Decimal128Type;
using arrow::Decimal256;
using arrow::Decimal256Array;
using arrow::Decimal256Type;
using arrow::Decimal32;
using arrow::Decimal32Array;
using arrow::Decimal32Type;
using arrow::Decimal64;
using arrow::Decimal64Array;
using arrow::Decimal64Type;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
Expand Down Expand Up @@ -153,7 +159,8 @@ static Status FromInt32Statistics(const Int32Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
ARROW_ASSIGN_OR_RAISE(auto type, FromInt32(logical_type));
ARROW_ASSIGN_OR_RAISE(auto type,
FromInt32(logical_type, default_arrow_reader_properties()));

switch (logical_type.type()) {
case LogicalType::Type::INT:
Expand All @@ -175,7 +182,8 @@ static Status FromInt64Statistics(const Int64Statistics& statistics,
const LogicalType& logical_type,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max) {
ARROW_ASSIGN_OR_RAISE(auto type, FromInt64(logical_type));
ARROW_ASSIGN_OR_RAISE(auto type,
FromInt64(logical_type, default_arrow_reader_properties()));

switch (logical_type.type()) {
case LogicalType::Type::INT:
Expand Down Expand Up @@ -600,17 +608,10 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
return ::arrow::Status::OK();
}

template <typename DecimalArrayType>
struct DecimalTypeTrait;

template <>
struct DecimalTypeTrait<::arrow::Decimal128Array> {
using value = ::arrow::Decimal128;
};

template <>
struct DecimalTypeTrait<::arrow::Decimal256Array> {
using value = ::arrow::Decimal256;
template <typename DecimalArrayType,
typename = ::arrow::enable_if_decimal<typename DecimalArrayType::TypeClass>>
struct DecimalTypeTrait {
using value = typename ::arrow::TypeTraits<typename DecimalArrayType::TypeClass>::CType;
};

template <typename DecimalArrayType, typename ParquetType>
Expand Down Expand Up @@ -725,16 +726,17 @@ struct DecimalConverter<DecimalArrayType, ByteArrayType> {
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
template <
typename DecimalArrayType, typename ParquetIntegerType,
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
std::is_same<ParquetIntegerType, Int64Type>::value>>
template <typename DecimalArrayType, typename ParquetIntegerType,
typename = ::arrow::enable_if_t<::arrow::internal::IsOneOf<
ParquetIntegerType, Int32Type, Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// specifically distinguish between decimal byte widths.
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 ||
field->type()->id() == ::arrow::Type::DECIMAL256);
using ArrayTypeClass = typename DecimalArrayType::TypeClass;
using DecimalValue = typename DecimalTypeTrait<DecimalArrayType>::value;

// Decimal32, Decimal64, Decimal128 and Decimal256 are only Arrow constructs. Parquet
// does not specifically distinguish between decimal byte widths.
DCHECK(field->type()->id() == ArrayTypeClass::type_id);

const int64_t length = reader->values_written();

Expand All @@ -745,25 +747,17 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,

const auto values = reinterpret_cast<const ElementType*>(reader->values());

const auto& decimal_type = checked_cast<const ::arrow::DecimalType&>(*field->type());
const auto& decimal_type = checked_cast<const ArrayTypeClass&>(*field->type());
const int64_t type_length = decimal_type.byte_width();

ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
uint8_t* out_ptr = data->mutable_data();

using ::arrow::bit_util::FromLittleEndian;

for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);

if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
} else {
::arrow::Decimal256 decimal(value);
decimal.ToBytes(out_ptr);
}
DecimalValue decimal(value);
decimal.ToBytes(out_ptr);
}

if (reader->nullable_values() && field->nullable()) {
Expand Down Expand Up @@ -802,6 +796,33 @@ Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
return Status::OK();
}

template <typename DecimalArrayType, typename... Args>
Status TransferDecimalTo(Type::type physical_type, Args&&... args) {
switch (physical_type) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<DecimalArrayType, Int32Type>;
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
} break;
case ::parquet::Type::INT64: {
auto fn = DecimalIntegerTransfer<DecimalArrayType, Int64Type>;
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = TransferDecimal<DecimalArrayType, ByteArrayType>;
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = TransferDecimal<DecimalArrayType, FLBAType>;
RETURN_NOT_OK(fn(std::forward<Args>(args)...));
} break;
default:
return Status::Invalid(
"Physical type for decimal must be int32, int64, byte array, or fixed length "
"binary");
}
return Status::OK();
}

Status TransferHalfFloat(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
static const auto binary_type = ::arrow::fixed_size_binary(2);
Expand Down Expand Up @@ -902,55 +923,22 @@ Status TransferColumnData(RecordReader* reader,
}
RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL32: {
RETURN_NOT_OK(TransferDecimalTo<Decimal32Array>(descr->physical_type(), reader,
pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL64: {
RETURN_NOT_OK(TransferDecimalTo<Decimal64Array>(descr->physical_type(), reader,
pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal128 must be int32, int64, byte array, or fixed "
"length binary");
}
RETURN_NOT_OK(TransferDecimalTo<Decimal128Array>(descr->physical_type(), reader,
pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL256: {
RETURN_NOT_OK(TransferDecimalTo<Decimal256Array>(descr->physical_type(), reader,
pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;

case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_field->type());
Expand Down
Loading
Loading