From 4de42a5a984c32c5ade9742e29c8094ae335b836 Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Mon, 7 Jun 2021 08:43:07 +0100 Subject: [PATCH 1/7] Fixes JIRA issue ARROW-12096. Only the CPP code changes to allow users to define INT96 -> arrow timestamp unit. Tests added. Please enter the commit message for your changes. Lines starting --- .../parquet/arrow/arrow_reader_writer_test.cc | 83 +++++++++++++++++++ cpp/src/parquet/arrow/reader_internal.cc | 41 +++++---- cpp/src/parquet/arrow/schema.cc | 2 +- cpp/src/parquet/arrow/schema_internal.cc | 17 +++- cpp/src/parquet/arrow/schema_internal.h | 12 +++ cpp/src/parquet/properties.h | 9 +- cpp/src/parquet/types.h | 43 ++++++++++ 7 files changed, 185 insertions(+), 22 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 677458ce37e..550d95c1bfa 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1671,6 +1671,89 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } +TEST(TestArrowReadWrite, DowncastDeprecatedInt96) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + std::vector is_valid = {true, true, true, true}; + + auto t_s = ::arrow::timestamp(TimeUnit::SECOND); + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + auto t_us = ::arrow::timestamp(TimeUnit::MICRO); + auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + + std::vector s_values = {1489269, 1489269, 1489269, 1489269}; + std::vector ms_values = {1489269000, 1489269000, + 1489269000, 1489269001}; + std::vector us_values = {1489269000000, 1489269000000, + 1489269000001, 1489269001000}; + std::vector ns_values = {1489269000000000LL, 1489269000000001LL, + 1489269000001000LL, 1489269001000000LL}; + + std::shared_ptr a_s, a_ms, a_us, a_ns; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); + + // Create single input table of NS to be written to parquet with INT96 + auto input_schema = schema({field("f", t_ns)}); + auto input = Table::Make(input_schema, {a_ns}); + + // Create an expected schema for each resulting table (one for each "down sampled" ts) + auto ex_schema_s = schema({field("f", t_s)}); + auto ex_schema_ms = schema({field("f", t_ms)}); + auto ex_schema_us = schema({field("f", t_us)}); + + // Create tables + auto ex_result_s = Table::Make(ex_schema_s, {a_s}); + auto ex_result_ms = Table::Make(ex_schema_ms, {a_ms}); + auto ex_result_us = Table::Make(ex_schema_us, {a_us}); + + std::shared_ptr result_s; + std::shared_ptr
result_ms; + std::shared_ptr
result_us; + + ArrowReaderProperties arrow_reader_prop_s, arrow_reader_prop_ms, arrow_reader_prop_us; + arrow_reader_prop_s.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::SECOND); + arrow_reader_prop_ms.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MILLI); + arrow_reader_prop_us.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MICRO); + +// SECOND + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result_s, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop_s)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_s->schema(), + *result_s->schema(), + /*check_metadata=*/false)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_s, *result_s)); + +// MILLI + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result_ms, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop_ms)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_ms->schema(), + *result_ms->schema(), + /*check_metadata=*/false)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_ms, *result_ms)); + + // MICRO + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result_us, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop_us)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_us->schema(), + *result_us->schema(), + /*check_metadata=*/false)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_us, *result_us)); +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 1410a5f89e2..549b0a94945 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { } Status TransferInt96(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& type, Datum* out) { + const std::shared_ptr& type, Datum* out, + const ::arrow::TimeUnit::type& int96_arrow_time_unit) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, @@ -365,8 +366,16 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool, // isn't representable as a 64-bit Unix timestamp. *data_ptr++ = 0; } else { - *data_ptr++ = Int96GetNanoSeconds(values[i]); - } + switch (int96_arrow_time_unit){ + case ::arrow::TimeUnit::NANO: + *data_ptr++ = Int96GetNanoSeconds(values[i]); break; + case ::arrow::TimeUnit::MICRO: + *data_ptr++ = Int96GetMicroSeconds(values[i]); break; + case ::arrow::TimeUnit::MILLI: + *data_ptr++ = Int96GetMilliSeconds(values[i]); break; + case ::arrow::TimeUnit::SECOND: + *data_ptr++ = Int96GetSeconds(values[i]); break; + } } *out = std::make_shared(type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); @@ -742,20 +751,20 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_type); - switch (timestamp_type.unit()) { - case ::arrow::TimeUnit::MILLI: - case ::arrow::TimeUnit::MICRO: { - result = TransferZeroCopy(reader, value_type); - } break; - case ::arrow::TimeUnit::NANO: { - if (descr->physical_type() == ::parquet::Type::INT96) { - RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result)); - } else { + if (descr->physical_type() == ::parquet::Type::INT96) { + RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); + } + else { + switch (timestamp_type.unit()) { + case ::arrow::TimeUnit::SECOND: + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + case ::arrow::TimeUnit::NANO: { result = TransferZeroCopy(reader, value_type); - } - } break; - default: - return Status::NotImplemented("TimeUnit not supported"); + } break; + default: + return Status::NotImplemented("TimeUnit not supported"); + } } } break; default: diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 7610ce17605..7e32eff90cb 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -454,7 +454,7 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE(std::shared_ptr storage_type, GetArrowType(primitive_node)); + ASSIGN_OR_RAISE(std::shared_ptr storage_type, GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index fbdfa09a040..c87aad60eb1 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -181,7 +181,8 @@ Result> FromInt64(const LogicalType& logical_type) { Result> GetArrowType(Type::type physical_type, const LogicalType& logical_type, - int type_length) { + int type_length, + const ::arrow::TimeUnit::type& int96_arrow_time_unit) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -194,7 +195,7 @@ Result> GetArrowType(Type::type physical_type, case ParquetType::INT64: return FromInt64(logical_type); case ParquetType::INT96: - return ::arrow::timestamp(::arrow::TimeUnit::NANO); + return ::arrow::timestamp(int96_arrow_time_unit); case ParquetType::FLOAT: return ::arrow::float32(); case ParquetType::DOUBLE: @@ -211,14 +212,22 @@ Result> GetArrowType(Type::type physical_type, } } +// ARROW-12096 -- Overloading functions with new input (setting default as NANO) Result> GetArrowType(const schema::PrimitiveNode& primitive) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length()); + primitive.type_length(), ::arrow::TimeUnit::NANO); } Result> GetArrowType(const ColumnDescriptor& descriptor) { return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(), - descriptor.type_length()); + descriptor.type_length(), ::arrow::TimeUnit::NANO); +} + +// ARROW-12096 -- Exposing INT96 arrow type definition fromm parquet reader +Result> GetArrowType(const schema::PrimitiveNode& primitive, + const ::arrow::TimeUnit::type& int96_arrow_time_unit) { + return GetArrowType(primitive.physical_type(), *primitive.logical_type(), + primitive.type_length(), int96_arrow_time_unit); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index ec0d9571304..623b7a6d05d 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -39,8 +39,20 @@ Result> GetArrowType(Type::type physical_type const LogicalType& logical_type, int type_length); +// ARROW-12096 Exposing int96 arrow timestamp unit definition +Result> GetArrowType(Type::type physical_type, + const LogicalType& logical_type, + int type_length, + const ::arrow::TimeUnit::type& int96_arrow_time_unit); + Result> GetArrowType( const schema::PrimitiveNode& primitive); + +// ARROW-12096 Exposing int96 arrow timestamp unit definition +Result> GetArrowType( + const schema::PrimitiveNode& primitive, + const ::arrow::TimeUnit::type& int96_arrow_time_unit); + Result> GetArrowType( const ColumnDescriptor& descriptor); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5018fff9531..6d27bce79b4 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -575,7 +575,8 @@ class PARQUET_EXPORT ArrowReaderProperties { read_dict_indices_(), batch_size_(kArrowDefaultBatchSize), pre_buffer_(false), - cache_options_(::arrow::io::CacheOptions::Defaults()) {} + cache_options_(::arrow::io::CacheOptions::Defaults()), + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} void set_use_threads(bool use_threads) { use_threads_ = use_threads; } @@ -620,6 +621,11 @@ class PARQUET_EXPORT ArrowReaderProperties { const ::arrow::io::IOContext& io_context() const { return io_context_; } + /// Set output Arrow format for parquet reader ARROW-12096 + void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { coerce_int96_timestamp_unit_ = unit; } + + ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { return coerce_int96_timestamp_unit_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -627,6 +633,7 @@ class PARQUET_EXPORT ArrowReaderProperties { bool pre_buffer_; ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; + ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 4529dbe6133..507fd8404f9 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -602,6 +602,49 @@ static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { return static_cast(days_since_epoch * kNanosecondsPerDay + nanoseconds); } +// ARROW-12096 +static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) { + // We do the computations in the unsigned domain to avoid unsigned behaviour + // on overflow. + uint64_t days_since_epoch = + i96.value[2] - static_cast(kJulianToUnixEpochDays); + uint64_t nanoseconds = 0; + memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); + + uint64_t microseconds = nanoseconds/static_cast(1000); + + return static_cast(days_since_epoch * kMicrosecondsPerDay + microseconds); +} + +// ARROW-12096 +static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { + // We do the computations in the unsigned domain to avoid unsigned behaviour + // on overflow. + uint64_t days_since_epoch = + i96.value[2] - static_cast(kJulianToUnixEpochDays); + uint64_t nanoseconds = 0; + memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); + + uint64_t milliseconds = nanoseconds/static_cast(1000000); + + return static_cast(days_since_epoch * kMillisecondsPerDay + milliseconds); +} + +// ARROW-12096 +static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { + // We do the computations in the unsigned domain to avoid unsigned behaviour + // on overflow. + uint64_t days_since_epoch = + i96.value[2] - static_cast(kJulianToUnixEpochDays); + + uint64_t nanoseconds = 0; + memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); + + uint64_t seconds = nanoseconds/(static_cast(1000000000)); + + return static_cast(days_since_epoch * kSecondsPerDay + seconds); +} + static inline std::string Int96ToString(const Int96& a) { std::ostringstream result; std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); From eaaf788e17109a0b8074ef733c319d8b2a07933b Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Mon, 7 Jun 2021 08:52:11 +0100 Subject: [PATCH 2/7] Adding comment to test --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 550d95c1bfa..35f10338599 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1671,6 +1671,7 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } +// Test for added functionality in ARROW-12096 TEST(TestArrowReadWrite, DowncastDeprecatedInt96) { using ::arrow::ArrayFromVector; using ::arrow::field; From 4977c03bb1fe03daaaac89108ee6b7b5eb5cfa56 Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Mon, 7 Jun 2021 08:55:18 +0100 Subject: [PATCH 3/7] another comment - to explain test --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 35f10338599..86098f11fa9 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1672,7 +1672,7 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { } // Test for added functionality in ARROW-12096 -TEST(TestArrowReadWrite, DowncastDeprecatedInt96) { +TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) { using ::arrow::ArrayFromVector; using ::arrow::field; using ::arrow::schema; @@ -1684,6 +1684,7 @@ TEST(TestArrowReadWrite, DowncastDeprecatedInt96) { auto t_us = ::arrow::timestamp(TimeUnit::MICRO); auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + // Values demonstrate loss of resolution when "down sampling" INT96 to units that are not NS std::vector s_values = {1489269, 1489269, 1489269, 1489269}; std::vector ms_values = {1489269000, 1489269000, 1489269000, 1489269001}; From 534bcb1f2a81d45842ddc9c66e406d4e4118b72f Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Mon, 7 Jun 2021 10:06:14 +0100 Subject: [PATCH 4/7] Typo on bracket --- cpp/src/parquet/arrow/reader_internal.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 549b0a94945..9c62921f143 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -376,6 +376,7 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool, case ::arrow::TimeUnit::SECOND: *data_ptr++ = Int96GetSeconds(values[i]); break; } + } } *out = std::make_shared(type, length, std::move(data), reader->ReleaseIsValid(), reader->null_count()); From ed1e92134a6f3f568bd0feb0cb4505ccd2f5871a Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Sun, 13 Jun 2021 08:20:06 +0100 Subject: [PATCH 5/7] Update cpp/src/parquet/types.h Co-authored-by: emkornfield --- cpp/src/parquet/types.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 507fd8404f9..e0031941c77 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -632,7 +632,7 @@ static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { // ARROW-12096 static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { - // We do the computations in the unsigned domain to avoid unsigned behaviour + // We do the computations in the unsigned domain to avoid undefined behaviour // on overflow. uint64_t days_since_epoch = i96.value[2] - static_cast(kJulianToUnixEpochDays); From 3741d0841f085fc8fc4976540d7362e32cbde63d Mon Sep 17 00:00:00 2001 From: Karik Isichei Date: Sun, 13 Jun 2021 13:47:33 +0100 Subject: [PATCH 6/7] Addressed all but one comment in PR. Will discuss outstanding comment on GitHub. --- .../parquet/arrow/arrow_reader_writer_test.cc | 123 +++++++----------- cpp/src/parquet/arrow/reader_internal.cc | 5 +- cpp/src/parquet/arrow/schema_internal.cc | 6 +- cpp/src/parquet/arrow/schema_internal.h | 5 +- cpp/src/parquet/types.h | 66 ++++------ 5 files changed, 75 insertions(+), 130 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 86098f11fa9..72cb405caf9 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -558,6 +558,35 @@ void ReadSingleColumnFileStatistics(std::unique_ptr file_reader, ASSERT_OK(StatisticsAsScalars(*statistics, min, max)); } +void DownsampleInt96RoundTrip(std::shared_ptr arrow_vector_in, + std::shared_ptr arrow_vector_out, + ::arrow::TimeUnit::type unit) { + + // Create single input table of NS to be written to parquet with INT96 + auto input_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))}); + auto input = Table::Make(input_schema, {arrow_vector_in}); + + // Create an expected schema for each resulting table (one for each "downsampled" ts) + auto ex_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(unit))}); + auto ex_result = Table::Make(ex_schema, {arrow_vector_out}); + + std::shared_ptr
result; + + ArrowReaderProperties arrow_reader_prop; + arrow_reader_prop.set_coerce_int96_timestamp_unit(unit); + + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result->schema(), + *result->schema(), + /*check_metadata=*/false)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); +} + // Non-template base class for TestParquetIO, to avoid code duplication class ParquetIOTestBase : public ::testing::Test { public: @@ -1671,89 +1700,25 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } -// Test for added functionality in ARROW-12096 TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) { - using ::arrow::ArrayFromVector; + using ::arrow::ArrayFromJSON; using ::arrow::field; using ::arrow::schema; - std::vector is_valid = {true, true, true, true}; - - auto t_s = ::arrow::timestamp(TimeUnit::SECOND); - auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); - auto t_us = ::arrow::timestamp(TimeUnit::MICRO); - auto t_ns = ::arrow::timestamp(TimeUnit::NANO); - - // Values demonstrate loss of resolution when "down sampling" INT96 to units that are not NS - std::vector s_values = {1489269, 1489269, 1489269, 1489269}; - std::vector ms_values = {1489269000, 1489269000, - 1489269000, 1489269001}; - std::vector us_values = {1489269000000, 1489269000000, - 1489269000001, 1489269001000}; - std::vector ns_values = {1489269000000000LL, 1489269000000001LL, - 1489269000001000LL, 1489269001000000LL}; - - std::shared_ptr a_s, a_ms, a_us, a_ns; - ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); - ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); - ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); - ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); - - // Create single input table of NS to be written to parquet with INT96 - auto input_schema = schema({field("f", t_ns)}); - auto input = Table::Make(input_schema, {a_ns}); - - // Create an expected schema for each resulting table (one for each "down sampled" ts) - auto ex_schema_s = schema({field("f", t_s)}); - auto ex_schema_ms = schema({field("f", t_ms)}); - auto ex_schema_us = schema({field("f", t_us)}); - - // Create tables - auto ex_result_s = Table::Make(ex_schema_s, {a_s}); - auto ex_result_ms = Table::Make(ex_schema_ms, {a_ms}); - auto ex_result_us = Table::Make(ex_schema_us, {a_us}); - - std::shared_ptr
result_s; - std::shared_ptr
result_ms; - std::shared_ptr
result_us; - - ArrowReaderProperties arrow_reader_prop_s, arrow_reader_prop_ms, arrow_reader_prop_us; - arrow_reader_prop_s.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::SECOND); - arrow_reader_prop_ms.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MILLI); - arrow_reader_prop_us.set_coerce_int96_timestamp_unit(::arrow::TimeUnit::MICRO); - -// SECOND - ASSERT_NO_FATAL_FAILURE(DoRoundtrip( - input, input->num_rows(), &result_s, default_writer_properties(), - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), - arrow_reader_prop_s)); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_s->schema(), - *result_s->schema(), - /*check_metadata=*/false)); - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_s, *result_s)); - -// MILLI - ASSERT_NO_FATAL_FAILURE(DoRoundtrip( - input, input->num_rows(), &result_ms, default_writer_properties(), - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), - arrow_reader_prop_ms)); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_ms->schema(), - *result_ms->schema(), - /*check_metadata=*/false)); - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_ms, *result_ms)); - - // MICRO - ASSERT_NO_FATAL_FAILURE(DoRoundtrip( - input, input->num_rows(), &result_us, default_writer_properties(), - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), - arrow_reader_prop_us)); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result_us->schema(), - *result_us->schema(), - /*check_metadata=*/false)); - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result_us, *result_us)); + // timestamp values at 2000-01-01 00:00:00 then with increment unit of 1ns, 1us, 1ms and 1s + auto a_nano = ArrayFromJSON(timestamp(TimeUnit::NANO), + "[946684800000000000, 946684800000000001, 946684800000001000, 946684800001000000, 946684801000000000]"); + auto a_micro = ArrayFromJSON(timestamp(TimeUnit::MICRO), + "[946684800000000, 946684800000000, 946684800000001, 946684800001000, 946684801000000]"); + auto a_milli = ArrayFromJSON(timestamp(TimeUnit::MILLI), + "[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]"); + auto a_second = ArrayFromJSON(timestamp(TimeUnit::SECOND), + "[946684800, 946684800, 946684800, 946684800, 946684801]"); + + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_nano, TimeUnit::NANO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_micro, TimeUnit::MICRO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_milli, TimeUnit::MILLI)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_second, TimeUnit::SECOND)); } TEST(TestArrowReadWrite, CoerceTimestamps) { diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 9c62921f143..4b1f76bf49b 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -354,7 +354,7 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { Status TransferInt96(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& type, Datum* out, - const ::arrow::TimeUnit::type& int96_arrow_time_unit) { + const ::arrow::TimeUnit::type int96_arrow_time_unit) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, @@ -757,14 +757,11 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ } else { switch (timestamp_type.unit()) { - case ::arrow::TimeUnit::SECOND: case ::arrow::TimeUnit::MILLI: case ::arrow::TimeUnit::MICRO: case ::arrow::TimeUnit::NANO: { result = TransferZeroCopy(reader, value_type); } break; - default: - return Status::NotImplemented("TimeUnit not supported"); } } } break; diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index c87aad60eb1..2ebed1b7a3c 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -182,7 +182,7 @@ Result> FromInt64(const LogicalType& logical_type) { Result> GetArrowType(Type::type physical_type, const LogicalType& logical_type, int type_length, - const ::arrow::TimeUnit::type& int96_arrow_time_unit) { + const ::arrow::TimeUnit::type int96_arrow_time_unit) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -212,7 +212,6 @@ Result> GetArrowType(Type::type physical_type, } } -// ARROW-12096 -- Overloading functions with new input (setting default as NANO) Result> GetArrowType(const schema::PrimitiveNode& primitive) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), primitive.type_length(), ::arrow::TimeUnit::NANO); @@ -223,9 +222,8 @@ Result> GetArrowType(const ColumnDescriptor& descript descriptor.type_length(), ::arrow::TimeUnit::NANO); } -// ARROW-12096 -- Exposing INT96 arrow type definition fromm parquet reader Result> GetArrowType(const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type& int96_arrow_time_unit) { + const ::arrow::TimeUnit::type int96_arrow_time_unit) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), primitive.type_length(), int96_arrow_time_unit); } diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 623b7a6d05d..06c34910cc9 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -43,15 +43,14 @@ Result> GetArrowType(Type::type physical_type Result> GetArrowType(Type::type physical_type, const LogicalType& logical_type, int type_length, - const ::arrow::TimeUnit::type& int96_arrow_time_unit); + const ::arrow::TimeUnit::type int96_arrow_time_unit); Result> GetArrowType( const schema::PrimitiveNode& primitive); -// ARROW-12096 Exposing int96 arrow timestamp unit definition Result> GetArrowType( const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type& int96_arrow_time_unit); + const ::arrow::TimeUnit::type int96_arrow_time_unit); Result> GetArrowType( const ColumnDescriptor& descriptor); diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index e0031941c77..3902e333e8a 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -591,58 +591,44 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } -static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { +// ARROW-12096 - Update INT96 conversion to allow users to define arrow timestamp unit +struct DecodedInt96 { + uint64_t days_since_epoch; + uint64_t nanoseconds; +}; + +static inline DecodedInt96 DecodeInt96Timestamp(const parquet::Int96& i96) { // We do the computations in the unsigned domain to avoid unsigned behaviour // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - uint64_t nanoseconds = 0; + DecodedInt96 result; + result.days_since_epoch = i96.value[2] - static_cast(kJulianToUnixEpochDays); + result.nanoseconds = 0; + + memcpy(&result.nanoseconds, &i96.value, sizeof(uint64_t)); + return result; +} - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - return static_cast(days_since_epoch * kNanosecondsPerDay + nanoseconds); +static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + return static_cast(decoded.days_since_epoch * kNanosecondsPerDay + decoded.nanoseconds); } -// ARROW-12096 static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) { - // We do the computations in the unsigned domain to avoid unsigned behaviour - // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - uint64_t nanoseconds = 0; - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - - uint64_t microseconds = nanoseconds/static_cast(1000); - - return static_cast(days_since_epoch * kMicrosecondsPerDay + microseconds); + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t microseconds = decoded.nanoseconds/static_cast(1000); + return static_cast(decoded.days_since_epoch * kMicrosecondsPerDay + microseconds); } -// ARROW-12096 static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { - // We do the computations in the unsigned domain to avoid unsigned behaviour - // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - uint64_t nanoseconds = 0; - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - - uint64_t milliseconds = nanoseconds/static_cast(1000000); - - return static_cast(days_since_epoch * kMillisecondsPerDay + milliseconds); + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t milliseconds = decoded.nanoseconds/static_cast(1000000); + return static_cast(decoded.days_since_epoch * kMillisecondsPerDay + milliseconds); } -// ARROW-12096 static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { - // We do the computations in the unsigned domain to avoid undefined behaviour - // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - - uint64_t nanoseconds = 0; - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - - uint64_t seconds = nanoseconds/(static_cast(1000000000)); - - return static_cast(days_since_epoch * kSecondsPerDay + seconds); + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t seconds = decoded.nanoseconds/static_cast(1000000000); + return static_cast(decoded.days_since_epoch * kSecondsPerDay + seconds); } static inline std::string Int96ToString(const Int96& a) { From ee293fcdf302d18a00b6d602145b0941c63d9180 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 15 Jun 2021 16:24:13 +0200 Subject: [PATCH 7/7] Nit + fix lint + fix compile error --- .../parquet/arrow/arrow_reader_writer_test.cc | 34 +++++++++++-------- cpp/src/parquet/arrow/reader_internal.cc | 26 ++++++++------ cpp/src/parquet/arrow/schema.cc | 4 ++- cpp/src/parquet/arrow/schema_internal.cc | 22 ++++-------- cpp/src/parquet/arrow/schema_internal.h | 14 ++------ cpp/src/parquet/properties.h | 11 ++++-- cpp/src/parquet/types.h | 18 +++++----- 7 files changed, 66 insertions(+), 63 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 72cb405caf9..6c82b8dee78 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -561,24 +561,24 @@ void ReadSingleColumnFileStatistics(std::unique_ptr file_reader, void DownsampleInt96RoundTrip(std::shared_ptr arrow_vector_in, std::shared_ptr arrow_vector_out, ::arrow::TimeUnit::type unit) { - // Create single input table of NS to be written to parquet with INT96 - auto input_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))}); + auto input_schema = + ::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))}); auto input = Table::Make(input_schema, {arrow_vector_in}); // Create an expected schema for each resulting table (one for each "downsampled" ts) auto ex_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(unit))}); auto ex_result = Table::Make(ex_schema, {arrow_vector_out}); - + std::shared_ptr
result; ArrowReaderProperties arrow_reader_prop; arrow_reader_prop.set_coerce_int96_timestamp_unit(unit); ASSERT_NO_FATAL_FAILURE(DoRoundtrip( - input, input->num_rows(), &result, default_writer_properties(), - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), - arrow_reader_prop)); + input, input->num_rows(), &result, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop)); ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result->schema(), *result->schema(), @@ -1705,15 +1705,21 @@ TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) { using ::arrow::field; using ::arrow::schema; - // timestamp values at 2000-01-01 00:00:00 then with increment unit of 1ns, 1us, 1ms and 1s - auto a_nano = ArrayFromJSON(timestamp(TimeUnit::NANO), - "[946684800000000000, 946684800000000001, 946684800000001000, 946684800001000000, 946684801000000000]"); + // Timestamp values at 2000-01-01 00:00:00, + // then with increment unit of 1ns, 1us, 1ms and 1s. + auto a_nano = + ArrayFromJSON(timestamp(TimeUnit::NANO), + "[946684800000000000, 946684800000000001, 946684800000001000, " + "946684800001000000, 946684801000000000]"); auto a_micro = ArrayFromJSON(timestamp(TimeUnit::MICRO), - "[946684800000000, 946684800000000, 946684800000001, 946684800001000, 946684801000000]"); - auto a_milli = ArrayFromJSON(timestamp(TimeUnit::MILLI), - "[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]"); - auto a_second = ArrayFromJSON(timestamp(TimeUnit::SECOND), - "[946684800, 946684800, 946684800, 946684800, 946684801]"); + "[946684800000000, 946684800000000, 946684800000001, " + "946684800001000, 946684801000000]"); + auto a_milli = ArrayFromJSON( + timestamp(TimeUnit::MILLI), + "[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]"); + auto a_second = + ArrayFromJSON(timestamp(TimeUnit::SECOND), + "[946684800, 946684800, 946684800, 946684800, 946684801]"); ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_nano, TimeUnit::NANO)); ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_micro, TimeUnit::MICRO)); diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 4b1f76bf49b..0ffa3e89970 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -366,15 +366,19 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool, // isn't representable as a 64-bit Unix timestamp. *data_ptr++ = 0; } else { - switch (int96_arrow_time_unit){ + switch (int96_arrow_time_unit) { case ::arrow::TimeUnit::NANO: - *data_ptr++ = Int96GetNanoSeconds(values[i]); break; + *data_ptr++ = Int96GetNanoSeconds(values[i]); + break; case ::arrow::TimeUnit::MICRO: - *data_ptr++ = Int96GetMicroSeconds(values[i]); break; + *data_ptr++ = Int96GetMicroSeconds(values[i]); + break; case ::arrow::TimeUnit::MILLI: - *data_ptr++ = Int96GetMilliSeconds(values[i]); break; + *data_ptr++ = Int96GetMilliSeconds(values[i]); + break; case ::arrow::TimeUnit::SECOND: - *data_ptr++ = Int96GetSeconds(values[i]); break; + *data_ptr++ = Int96GetSeconds(values[i]); + break; } } } @@ -753,15 +757,17 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_type); if (descr->physical_type() == ::parquet::Type::INT96) { - RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); - } - else { + RETURN_NOT_OK( + TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); + } else { switch (timestamp_type.unit()) { case ::arrow::TimeUnit::MILLI: case ::arrow::TimeUnit::MICRO: - case ::arrow::TimeUnit::NANO: { + case ::arrow::TimeUnit::NANO: result = TransferZeroCopy(reader, value_type); - } break; + break; + default: + return Status::NotImplemented("TimeUnit not supported"); } } } break; diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 7e32eff90cb..eb7fd628dfc 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -454,7 +454,9 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE(std::shared_ptr storage_type, GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); + ASSIGN_OR_RAISE( + std::shared_ptr storage_type, + GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 2ebed1b7a3c..064bf4f55cc 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -179,10 +179,9 @@ Result> FromInt64(const LogicalType& logical_type) { } } -Result> GetArrowType(Type::type physical_type, - const LogicalType& logical_type, - int type_length, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { +Result> GetArrowType( + Type::type physical_type, const LogicalType& logical_type, int type_length, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -212,18 +211,9 @@ Result> GetArrowType(Type::type physical_type, } } -Result> GetArrowType(const schema::PrimitiveNode& primitive) { - return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length(), ::arrow::TimeUnit::NANO); -} - -Result> GetArrowType(const ColumnDescriptor& descriptor) { - return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(), - descriptor.type_length(), ::arrow::TimeUnit::NANO); -} - -Result> GetArrowType(const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { +Result> GetArrowType( + const schema::PrimitiveNode& primitive, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), primitive.type_length(), int96_arrow_time_unit); } diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index 06c34910cc9..fb837c3ee6c 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -39,21 +39,13 @@ Result> GetArrowType(Type::type physical_type const LogicalType& logical_type, int type_length); -// ARROW-12096 Exposing int96 arrow timestamp unit definition -Result> GetArrowType(Type::type physical_type, - const LogicalType& logical_type, - int type_length, - const ::arrow::TimeUnit::type int96_arrow_time_unit); - Result> GetArrowType( - const schema::PrimitiveNode& primitive); + Type::type physical_type, const LogicalType& logical_type, int type_length, + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); Result> GetArrowType( const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type int96_arrow_time_unit); - -Result> GetArrowType( - const ColumnDescriptor& descriptor); + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6d27bce79b4..d217b8efa52 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -621,10 +621,15 @@ class PARQUET_EXPORT ArrowReaderProperties { const ::arrow::io::IOContext& io_context() const { return io_context_; } - /// Set output Arrow format for parquet reader ARROW-12096 - void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { coerce_int96_timestamp_unit_ = unit; } + /// Set timestamp unit to use for deprecated INT96-encoded timestamps + /// (default is NANO). + void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { + coerce_int96_timestamp_unit_ = unit; + } - ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { return coerce_int96_timestamp_unit_; } + ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { + return coerce_int96_timestamp_unit_; + } private: bool use_threads_; diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 3902e333e8a..6bd67f1ee5f 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -591,7 +591,6 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } -// ARROW-12096 - Update INT96 conversion to allow users to define arrow timestamp unit struct DecodedInt96 { uint64_t days_since_epoch; uint64_t nanoseconds; @@ -603,31 +602,34 @@ static inline DecodedInt96 DecodeInt96Timestamp(const parquet::Int96& i96) { DecodedInt96 result; result.days_since_epoch = i96.value[2] - static_cast(kJulianToUnixEpochDays); result.nanoseconds = 0; - + memcpy(&result.nanoseconds, &i96.value, sizeof(uint64_t)); return result; } static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { const auto decoded = DecodeInt96Timestamp(i96); - return static_cast(decoded.days_since_epoch * kNanosecondsPerDay + decoded.nanoseconds); + return static_cast(decoded.days_since_epoch * kNanosecondsPerDay + + decoded.nanoseconds); } static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) { const auto decoded = DecodeInt96Timestamp(i96); - uint64_t microseconds = decoded.nanoseconds/static_cast(1000); - return static_cast(decoded.days_since_epoch * kMicrosecondsPerDay + microseconds); + uint64_t microseconds = decoded.nanoseconds / static_cast(1000); + return static_cast(decoded.days_since_epoch * kMicrosecondsPerDay + + microseconds); } static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { const auto decoded = DecodeInt96Timestamp(i96); - uint64_t milliseconds = decoded.nanoseconds/static_cast(1000000); - return static_cast(decoded.days_since_epoch * kMillisecondsPerDay + milliseconds); + uint64_t milliseconds = decoded.nanoseconds / static_cast(1000000); + return static_cast(decoded.days_since_epoch * kMillisecondsPerDay + + milliseconds); } static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { const auto decoded = DecodeInt96Timestamp(i96); - uint64_t seconds = decoded.nanoseconds/static_cast(1000000000); + uint64_t seconds = decoded.nanoseconds / static_cast(1000000000); return static_cast(decoded.days_since_epoch * kSecondsPerDay + seconds); }