Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cba31c7
Support decimal32/64 in schema conversion
curioustien Jan 19, 2025
a9398a2
Support decimal32/64 in column writer
curioustien Jan 19, 2025
e1dc023
Restrict column writer with correct decimal types
curioustien Jan 19, 2025
6032b02
Support decimal32/64 in reader & vector kernels & tests
curioustien Jan 19, 2025
290de24
Pyarrow parquet to pandas
curioustien Jan 26, 2025
e5b996e
Address comments
curioustien Feb 15, 2025
44f1adc
Add more tests in arrow_schema_test
curioustien Feb 15, 2025
c017323
Add more tests in arrow_reader_writer_test
curioustien Feb 16, 2025
63d307b
Add more typed tests for small decimals
curioustien Feb 16, 2025
77dd7d3
Document new flag
curioustien Feb 16, 2025
d81cf13
Add decimal32/64 list type support arrow to pandas
curioustien Feb 16, 2025
424472f
Support smallest_decimal_enabled flag in pyarrow
curioustien Feb 16, 2025
d1687a7
Revert writer schema manifest arg passing change
curioustien Mar 9, 2025
1f0fb7b
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Mar 22, 2025
52711d5
Fix lint
curioustien Mar 22, 2025
f64d6d9
Remove extra doc
curioustien Mar 22, 2025
3fb307e
Revert FileReader changes
curioustien Mar 29, 2025
f279349
Delay scratch buffer pointer cast
curioustien Mar 29, 2025
8a78c72
Use ArrowReaderProperties
curioustien Mar 29, 2025
29e98ff
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Mar 29, 2025
d2e1ffa
Revert "Delay scratch buffer pointer cast"
curioustien Apr 4, 2025
a8304f3
Remove mistake include
curioustien Apr 4, 2025
de295e3
Merge remote-tracking branch 'upstream/main' into parquet-decimal-test
curioustien Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ KernelInit GetHashInit(Type::type type_id) {
case Type::DATE32:
case Type::TIME32:
case Type::INTERVAL_MONTHS:
case Type::DECIMAL32:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes required to the compute kernels required to support Parquet? I can't see why but I might be missing something. Otherwise, we should move adding support for decimal32 and decimal64 to those compute kernels on a different PR and leave this one only with the required parquet changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see now, on the description says this is required for some tests:
Allow decimal32/64 in Arrow compute vector hash which is needed for some of the existing Parquet tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm down to split this change to another PR which can cover this support with more tests on the arrow compute side. But yes, there are a few tests in Parquet that hit arrow vector kernel code path

return HashInit<RegularHashKernel<UInt32Type, Action>>;
case Type::INT64:
case Type::UINT64:
Expand All @@ -565,6 +566,7 @@ KernelInit GetHashInit(Type::type type_id) {
case Type::TIMESTAMP:
case Type::DURATION:
case Type::INTERVAL_DAY_TIME:
case Type::DECIMAL64:
return HashInit<RegularHashKernel<UInt64Type, Action>>;
case Type::BINARY:
case Type::STRING:
Expand Down Expand Up @@ -708,7 +710,7 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty)
DCHECK_OK(func->AddKernel(base));
}

for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) {
for (auto t : {Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) {
base.init = GetHashInit<Action>(t);
base.signature = KernelSignature::Make({t}, out_ty);
DCHECK_OK(func->AddKernel(base));
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ std::shared_ptr<VectorFunction> MakeIndicesNonZeroFunction(std::string name,
AddKernels(NumericTypes());
AddKernels({boolean()});

for (const auto& ty : {Type::DECIMAL128, Type::DECIMAL256}) {
for (const auto& ty :
{Type::DECIMAL32, Type::DECIMAL64, Type::DECIMAL128, Type::DECIMAL256}) {
kernel.signature = KernelSignature::Make({ty}, uint64());
DCHECK_OK(func->AddKernel(kernel));
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
parquet_scan_options.arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_smallest_decimal_enabled(
parquet_scan_options.arrow_reader_properties->smallest_decimal_enabled());
arrow_properties.set_use_threads(options.use_threads);
return arrow_properties;
}
Expand Down
271 changes: 215 additions & 56 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Large diffs are not rendered by default.

119 changes: 119 additions & 0 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,43 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetAnnotatedFieldsSmallestDecimal) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<const LogicalType> logical_type;
parquet::Type::type physical_type;
int physical_length;
std::shared_ptr<::arrow::DataType> datatype;
};

std::vector<FieldConstructionArguments> cases = {
{"decimal(8, 2)", LogicalType::Decimal(8, 2), ParquetType::INT32, -1,
::arrow::decimal32(8, 2)},
{"decimal(16, 4)", LogicalType::Decimal(16, 4), ParquetType::INT64, -1,
::arrow::decimal64(16, 4)},
{"decimal(32, 8)", LogicalType::Decimal(32, 8), ParquetType::FIXED_LEN_BYTE_ARRAY,
16, ::arrow::decimal128(32, 8)},
{"decimal(73, 38)", LogicalType::Decimal(73, 38), ParquetType::FIXED_LEN_BYTE_ARRAY,
31, ::arrow::decimal256(73, 38)},
};

std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

for (const FieldConstructionArguments& c : cases) {
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::OPTIONAL,
c.logical_type, c.physical_type,
c.physical_length));
arrow_fields.push_back(::arrow::field(c.name, c.datatype));
}

auto reader_props = ArrowReaderProperties();
reader_props.set_smallest_decimal_enabled(true);
ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props));
auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -354,6 +391,42 @@ TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetSmallestDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
ParquetType::FIXED_LEN_BYTE_ARRAY,
ConvertedType::DECIMAL, 4, 8, 4));
arrow_fields.push_back(
::arrow::field("flba-decimal", std::make_shared<::arrow::Decimal32Type>(8, 4)));

parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY,
ConvertedType::DECIMAL, -1, 18, 4));
arrow_fields.push_back(
::arrow::field("binary-decimal", std::make_shared<::arrow::Decimal64Type>(18, 4)));

parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
ParquetType::INT32, ConvertedType::DECIMAL,
-1, 38, 4));
arrow_fields.push_back(
::arrow::field("int32-decimal", std::make_shared<::arrow::Decimal128Type>(38, 4)));

parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
ParquetType::INT64, ConvertedType::DECIMAL,
-1, 48, 4));
arrow_fields.push_back(
::arrow::field("int64-decimal", std::make_shared<::arrow::Decimal256Type>(48, 4)));

auto arrow_schema = ::arrow::schema(arrow_fields);
auto reader_props = ArrowReaderProperties();
reader_props.set_smallest_decimal_enabled(true);
ASSERT_OK(ConvertSchema(parquet_fields, nullptr, reader_props));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, ParquetMaps) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down Expand Up @@ -1157,6 +1230,52 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
// ASSERT_NO_FATAL_FAILURE();
}

TEST_F(TestConvertArrowSchema, ArrowFieldsStoreSchema) {
struct FieldConstructionArguments {
std::string name;
std::shared_ptr<::arrow::DataType> datatype;
std::shared_ptr<const LogicalType> logical_type;
parquet::Type::type physical_type;
int physical_length;
};

std::vector<FieldConstructionArguments> cases = {
{"decimal(1, 0)", ::arrow::decimal128(1, 0), LogicalType::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal128(8, 2), LogicalType::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal128(16, 4), LogicalType::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"decimal(1, 0)", ::arrow::decimal32(1, 0), LogicalType::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal32(8, 2), LogicalType::Decimal(8, 2),
ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
{"decimal(16, 4)", ::arrow::decimal64(16, 4), LogicalType::Decimal(16, 4),
ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
{"decimal(32, 8)", ::arrow::decimal128(32, 8), LogicalType::Decimal(32, 8),
ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
{"decimal(73, 38)", ::arrow::decimal256(73, 38), LogicalType::Decimal(73, 38),
ParquetType::FIXED_LEN_BYTE_ARRAY, 31}};

std::vector<std::shared_ptr<Field>> arrow_fields;
std::vector<NodePtr> parquet_fields;

for (const FieldConstructionArguments& c : cases) {
arrow_fields.push_back(::arrow::field(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED,
c.logical_type, c.physical_type,
c.physical_length));
}

auto writer_props = ::parquet::default_arrow_writer_properties();
writer_props->store_schema();
ASSERT_OK(ConvertSchema(arrow_fields, writer_props));
CheckFlatSchema(parquet_fields);
// ASSERT_NO_FATAL_FAILURE();
}

TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
struct FieldConstructionArguments {
std::string name;
Expand Down
90 changes: 80 additions & 10 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <vector>

#include "arrow/array.h"
#include "arrow/array/array_decimal.h"
#include "arrow/compute/api.h"
#include "arrow/datum.h"
#include "arrow/io/memory.h"
Expand All @@ -37,6 +38,7 @@
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/base64.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -69,6 +71,12 @@ using arrow::Decimal128Type;
using arrow::Decimal256;
using arrow::Decimal256Array;
using arrow::Decimal256Type;
using arrow::Decimal32;
using arrow::Decimal32Array;
using arrow::Decimal32Type;
using arrow::Decimal64;
using arrow::Decimal64Array;
using arrow::Decimal64Type;
using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
Expand Down Expand Up @@ -590,7 +598,8 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool,
}

// ----------------------------------------------------------------------
// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256
// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY
// -> Decimal32 || Decimal64 || Decimal128 || Decimal256

template <typename DecimalType>
Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
Expand All @@ -603,6 +612,16 @@ Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
template <typename DecimalArrayType>
struct DecimalTypeTrait;

template <>
struct DecimalTypeTrait<::arrow::Decimal32Array> {
using value = ::arrow::Decimal32;
};

template <>
struct DecimalTypeTrait<::arrow::Decimal64Array> {
using value = ::arrow::Decimal64;
};

template <>
struct DecimalTypeTrait<::arrow::Decimal128Array> {
using value = ::arrow::Decimal128;
Expand Down Expand Up @@ -721,7 +740,7 @@ struct DecimalConverter<DecimalArrayType, ByteArrayType> {
}
};

/// \brief Convert an Int32 or Int64 array into a Decimal128Array
/// \brief Convert an Int32 or Int64 array into a Decimal32/64/128/256Array
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
Expand All @@ -731,9 +750,11 @@ template <
std::is_same<ParquetIntegerType, Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// Decimal32 and Decimal64 are only Arrow constructs. Parquet does not
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems not correct?

// specifically distinguish between decimal byte widths.
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 ||
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL32 ||
field->type()->id() == ::arrow::Type::DECIMAL64 ||
field->type()->id() == ::arrow::Type::DECIMAL128 ||
field->type()->id() == ::arrow::Type::DECIMAL256);

const int64_t length = reader->values_written();
Expand All @@ -757,7 +778,13 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);

if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
if constexpr (std::is_same_v<DecimalArrayType, Decimal32Array>) {
::arrow::Decimal32 decimal(value);
decimal.ToBytes(out_ptr);
} else if constexpr (std::is_same_v<DecimalArrayType, Decimal64Array>) {
::arrow::Decimal64 decimal(value);
decimal.ToBytes(out_ptr);
} else if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
} else {
Expand Down Expand Up @@ -900,14 +927,58 @@ Status TransferColumnData(RecordReader* reader,
}
RETURN_NOT_OK(TransferHalfFloat(reader, pool, value_field, &result));
} break;
case ::arrow::Type::DECIMAL32: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal32Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal32Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal32Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal32 must be int32, byte array, or fixed length "
"binary");
}
} break;
case ::arrow::Type::DECIMAL64: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal64Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = DecimalIntegerTransfer<Decimal64Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal64Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal64Array, FLBAType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
default:
return Status::Invalid(
"Physical type for decimal64 must be int32, int64, byte array, or fixed "
"length binary");
}
} break;
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
auto fn = DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
Expand All @@ -924,14 +995,14 @@ Status TransferColumnData(RecordReader* reader,
"length binary");
}
} break;
case ::arrow::Type::DECIMAL256:
case ::arrow::Type::DECIMAL256: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
auto fn = DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
Expand All @@ -947,8 +1018,7 @@ Status TransferColumnData(RecordReader* reader,
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;

} break;
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_field->type());
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
length = fixed_size_binary_type.byte_width();
} break;
case ArrowTypeId::DECIMAL32:
case ArrowTypeId::DECIMAL64:
case ArrowTypeId::DECIMAL128:
case ArrowTypeId::DECIMAL256: {
const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
if (properties.store_decimal_as_integer() && 1 <= precision && precision <= 18) {
type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64;
type = precision <= 9 ? ParquetType::INT32 : ParquetType::INT64;
} else {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
length = DecimalType::DecimalSize(precision);
Expand Down
Loading
Loading