From 2897a7278d2bd05b88b595530a5b5832d691cf58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 14 Dec 2018 10:27:06 -0500 Subject: [PATCH] ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all timestamps fields. This changed the behavior of use_deprecated_int96_timestamps to support all timestamp fields irregardless of the time unit. It would previously only apply this conversion to fields with Nanosecond resolution. People will only use this option when they use a system that only supports INT96 timestamps, systems that also support INT64 timestamps in other resolutions would not need the option. A notable API change is that this option now take precedence over the coerce_timestamps option. --- .../parquet/arrow/arrow-reader-writer-test.cc | 185 +++++++++--------- cpp/src/parquet/arrow/reader.cc | 16 +- cpp/src/parquet/arrow/schema.cc | 73 ++++--- cpp/src/parquet/arrow/writer.cc | 74 ++++--- cpp/src/parquet/arrow/writer.h | 62 ++++-- cpp/src/parquet/types.h | 21 ++ python/pyarrow/parquet.py | 4 +- python/pyarrow/tests/test_parquet.py | 5 +- 8 files changed, 256 insertions(+), 184 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 24ec0dd24ee..5b3ffb67b22 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1189,65 +1189,116 @@ void MakeDateTimeTypesTable(std::shared_ptr* out, bool nanos_as_micros = auto f0 = field("f0", ::arrow::date32()); auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); - std::shared_ptr<::arrow::Field> f3; - if (nanos_as_micros) { - f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO)); - } else { - f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO)); - } + auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO; + auto f3 = field("f3", ::arrow::timestamp(f3_unit)); auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI)); auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5})); std::vector t32_values = {1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; - std::vector t64_values = {1489269000000, 1489270000000, 1489271000000, - 1489272000000, 1489272000000, 1489273000000}; + std::vector t64_ns_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; std::vector t64_us_values = {1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; + std::vector t64_ms_values = {1489269, 1489270, 1489271, + 1489272, 1489272, 1489273}; std::shared_ptr a0, a1, a2, a3, a4, a5; ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); - ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1); - ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2); - if (nanos_as_micros) { - ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values, - &a3); - } else { - ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values, - &a3); - } + ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values, + &a1); + ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values, + &a2); + auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values; + ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3); ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4); - ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5); + ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5); std::vector> columns = { std::make_shared("f0", a0), std::make_shared("f1", a1), std::make_shared("f2", a2), std::make_shared("f3", a3), std::make_shared("f4", a4), std::make_shared("f5", a5)}; + *out = Table::Make(schema, columns); } TEST(TestArrowReadWrite, DateTimeTypes) { - std::shared_ptr
table; + std::shared_ptr
table, result; MakeDateTimeTypesTable(&table); - // Use deprecated INT96 type - std::shared_ptr
result; - ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( - table, false /* use_threads */, table->num_rows(), {}, &result, - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); - // Cast nanaoseconds to microseconds and use INT64 physical type ASSERT_NO_FATAL_FAILURE( DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); - std::shared_ptr
expected; MakeDateTimeTypesTable(&table, true); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); } +TEST(TestArrowReadWrite, UseDeprecatedInt96) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + std::vector is_valid = {true, true, true, false, true, true}; + + auto t_s = ::arrow::timestamp(TimeUnit::SECOND); + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + auto t_us = ::arrow::timestamp(TimeUnit::MICRO); + auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + + std::vector s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; + std::vector ms_values = {1489269000, 1489270000, 1489271000, + 1489272001, 1489272000, 1489273000}; + std::vector us_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000001, 1489272000000, 1489273000000}; + std::vector ns_values = {1489269000000000LL, 1489270000000000LL, + 1489271000000000LL, 1489272000000001LL, + 1489272000000000LL, 1489273000000000LL}; + + std::shared_ptr a_s, a_ms, a_us, a_ns; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); + + // Each input is typed with a unique TimeUnit + auto input_schema = schema( + {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)}); + auto input = Table::Make( + input_schema, + {std::make_shared("f_s", a_s), std::make_shared("f_ms", a_ms), + std::make_shared("f_us", a_us), std::make_shared("f_ns", a_ns)}); + + // When reading parquet files, all int96 schema fields are converted to + // timestamp nanoseconds + auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns), + field("f_ns", t_ns)}); + auto ex_result = Table::Make( + ex_schema, + {std::make_shared("f_s", a_ns), std::make_shared("f_ms", a_ns), + std::make_shared("f_us", a_ns), std::make_shared("f_ns", a_ns)}); + + std::shared_ptr
result; + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( + input, false /* use_threads */, input->num_rows(), {}, &result, + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); + + // Ensure enable_deprecated_int96_timestamps as precedence over + // coerce_timestamps. + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */, + input->num_rows(), {}, &result, + ArrowWriterProperties::Builder() + .enable_deprecated_int96_timestamps() + ->coerce_timestamps(TimeUnit::MILLI) + ->build())); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; @@ -1293,6 +1344,12 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { {std::make_shared("f_s", a_ms), std::make_shared("f_ms", a_ms), std::make_shared("f_us", a_ms), std::make_shared("f_ns", a_ms)}); + std::shared_ptr
milli_result; + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( + input, false /* use_threads */, input->num_rows(), {}, &milli_result, + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); + // Result when coercing to microseconds auto s3 = std::shared_ptr<::arrow::Schema>( new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us), @@ -1302,13 +1359,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { {std::make_shared("f_s", a_us), std::make_shared("f_ms", a_us), std::make_shared("f_us", a_us), std::make_shared("f_ns", a_us)}); - std::shared_ptr
milli_result; - ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( - input, false /* use_threads */, input->num_rows(), {}, &milli_result, - ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); - std::shared_ptr
micro_result; ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( input, false /* use_threads */, input->num_rows(), {}, µ_result, @@ -1453,65 +1503,6 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result)); } -// Regression for ARROW-2802 -TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) { - using ::arrow::Column; - using ::arrow::default_memory_pool; - using ::arrow::Field; - using ::arrow::Schema; - using ::arrow::Table; - using ::arrow::TimestampBuilder; - using ::arrow::TimestampType; - using ::arrow::TimeUnit; - - auto timestamp_type = std::make_shared(TimeUnit::NANO); - - TimestampBuilder builder(timestamp_type, default_memory_pool()); - for (std::int64_t ii = 0; ii < 10; ++ii) { - ASSERT_OK(builder.Append(1000000000L * ii)); - } - std::shared_ptr values; - ASSERT_OK(builder.Finish(&values)); - - std::vector> fields; - auto field = std::make_shared("nanos", timestamp_type); - fields.emplace_back(field); - - auto schema = std::make_shared(fields); - - std::vector> columns; - auto column = std::make_shared("nanos", values); - columns.emplace_back(column); - - auto table = Table::Make(schema, columns); - - auto arrow_writer_properties = ArrowWriterProperties::Builder() - .coerce_timestamps(TimeUnit::MICRO) - ->enable_deprecated_int96_timestamps() - ->build(); - - std::shared_ptr
result; - DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result, - arrow_writer_properties); - - ASSERT_EQ(table->num_columns(), result->num_columns()); - ASSERT_EQ(table->num_rows(), result->num_rows()); - - auto actual_column = result->column(0); - auto data = actual_column->data(); - auto expected_values = - static_cast<::arrow::NumericArray*>(values.get())->raw_values(); - for (int ii = 0; ii < data->num_chunks(); ++ii) { - auto chunk = - static_cast<::arrow::NumericArray*>(data->chunk(ii).get()); - auto values = chunk->raw_values(); - for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) { - // Check that the nanos have been converted to micros - ASSERT_EQ(*expected_values / 1000, values[jj]); - } - } -} - void MakeDoubleTable(int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; @@ -2284,11 +2275,13 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead, ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL)); -TEST(TestImpalaConversion, NanosecondToImpala) { +TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) { // June 20, 2017 16:32:56 and 123456789 nanoseconds int64_t nanoseconds = INT64_C(1497976376123456789); - Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; + Int96 calculated; + + Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated); ASSERT_EQ(expected, calculated); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 6273fda4640..a886148b3d7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -70,18 +70,6 @@ namespace arrow { using ::arrow::BitUtil::BytesForBits; -constexpr int64_t kJulianToUnixEpochDays = 2440588LL; -constexpr int64_t kMillisecondsInADay = 86400000LL; -constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL; - -static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) { - int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays; - int64_t nanoseconds = 0; - - memcpy(&nanoseconds, &impala_timestamp.value, sizeof(int64_t)); - return days_since_epoch * kNanosecondsInADay + nanoseconds; -} - template using ArrayType = typename ::arrow::TypeTraits::ArrayType; @@ -1001,7 +989,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> { auto data_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { - *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]); + *data_ptr++ = Int96GetNanoSeconds(values[i]); } if (reader->nullable_values()) { @@ -1029,7 +1017,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> { auto out_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { - *out_ptr++ = static_cast(values[i]) * kMillisecondsInADay; + *out_ptr++ = static_cast(values[i]) * kMillisecondsPerDay; } if (reader->nullable_values()) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index d0014a6f3aa..af9fbc91a50 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -423,45 +423,66 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, return Status::OK(); } +static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) { + switch (time_unit) { + case ::arrow::TimeUnit::MILLI: + return LogicalType::TIMESTAMP_MILLIS; + case ::arrow::TimeUnit::MICRO: + return LogicalType::TIMESTAMP_MICROS; + case ::arrow::TimeUnit::SECOND: + case ::arrow::TimeUnit::NANO: + // No equivalent parquet logical type. + break; + } + + return LogicalType::NONE; +} + static Status GetTimestampMetadata(const ::arrow::TimestampType& type, const ArrowWriterProperties& properties, ParquetType::type* physical_type, LogicalType::type* logical_type) { - auto unit = type.unit(); - *physical_type = ParquetType::INT64; + const bool coerce = properties.coerce_timestamps_enabled(); + const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit(); - if (properties.coerce_timestamps_enabled()) { - auto coerce_unit = properties.coerce_timestamps_unit(); - if (coerce_unit == ::arrow::TimeUnit::MILLI) { - *logical_type = LogicalType::TIMESTAMP_MILLIS; - } else if (coerce_unit == ::arrow::TimeUnit::MICRO) { - *logical_type = LogicalType::TIMESTAMP_MICROS; - } else { - return Status::NotImplemented( - "Can only coerce Arrow timestamps to milliseconds" - " or microseconds"); + // The user is explicitly asking for Impala int96 encoding, there is no + // logical type. + if (properties.support_deprecated_int96_timestamps()) { + *physical_type = ParquetType::INT96; + return Status::OK(); + } + + *physical_type = ParquetType::INT64; + *logical_type = LogicalTypeFromArrowTimeUnit(unit); + + // The user is requesting that all timestamp columns are casted to a specific + // type. Only 2 TimeUnit are supported by arrow-parquet. + if (coerce) { + switch (unit) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + break; + case ::arrow::TimeUnit::NANO: + case ::arrow::TimeUnit::SECOND: + return Status::NotImplemented( + "Can only coerce Arrow timestamps to milliseconds" + " or microseconds"); } + return Status::OK(); } - if (unit == ::arrow::TimeUnit::MILLI) { - *logical_type = LogicalType::TIMESTAMP_MILLIS; - } else if (unit == ::arrow::TimeUnit::MICRO) { + // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to + // int64 microseconds when deprecated int96 is not requested. + if (type.unit() == ::arrow::TimeUnit::NANO) *logical_type = LogicalType::TIMESTAMP_MICROS; - } else if (unit == ::arrow::TimeUnit::NANO) { - if (properties.support_deprecated_int96_timestamps()) { - *physical_type = ParquetType::INT96; - // No corresponding logical type - } else { - *logical_type = LogicalType::TIMESTAMP_MICROS; - } - } else { + else if (type.unit() == ::arrow::TimeUnit::SECOND) return Status::NotImplemented( "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " "Parquet."); - } + return Status::OK(); -} +} // namespace arrow Status FieldToNode(const std::shared_ptr& field, const WriterProperties& properties, @@ -698,7 +719,7 @@ int32_t DecimalSize(int32_t precision) { } DCHECK(false); return -1; -} +} // namespace arrow } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 402cbf0f202..bce9f37026c 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -386,7 +386,11 @@ class ArrowColumnWriter { Status WriteBatch(int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const typename ParquetType::c_type* values) { - auto typed_writer = static_cast*>(writer_); + auto typed_writer = + ::arrow::internal::checked_cast*>(writer_); + // WriteBatch was called with type mismatching the writer_'s type. This + // could be a schema conversion problem. + DCHECK(typed_writer); PARQUET_CATCH_NOT_OK( typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values)); return Status::OK(); @@ -397,7 +401,11 @@ class ArrowColumnWriter { const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ParquetType::c_type* values) { - auto typed_writer = static_cast*>(writer_); + auto typed_writer = + ::arrow::internal::checked_cast*>(writer_); + // WriteBatchSpaced was called with type mismatching the writer_'s type. This + // could be a schema conversion problem. + DCHECK(typed_writer); PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced( num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values)); return Status::OK(); @@ -570,20 +578,42 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +#define CONV_CASE_LOOP(ConversionFunction) \ + for (int64_t i = 0; i < num_values; i++) \ + ConversionFunction(arrow_values[i], &output[i]); + +static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values, + int64_t num_values, + ::arrow::TimeUnit ::type unit_type, + Int96* output) { + switch (unit_type) { + case TimeUnit::NANO: + CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp); + break; + case TimeUnit::MICRO: + CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp); + break; + case TimeUnit::MILLI: + CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp); + break; + case TimeUnit::SECOND: + CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp); + break; + } +} + +#undef CONV_CASE_LOOP + template <> Status ArrowColumnWriter::WriteNullableBatch( const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const int64_t* values) { - Int96* buffer; + Int96* buffer = nullptr; RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - if (type.unit() == TimeUnit::NANO) { - for (int i = 0; i < num_values; i++) { - internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]); - } - } else { - return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing"); - } + + ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); + return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer); } @@ -592,15 +622,11 @@ template <> Status ArrowColumnWriter::WriteNonNullableBatch( const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { - Int96* buffer; + Int96* buffer = nullptr; RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - if (type.unit() == TimeUnit::NANO) { - for (int i = 0; i < num_values; i++) { - internal::NanosecondsToImpalaTimestamp(values[i], buffer + i); - } - } else { - return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing"); - } + + ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); + return WriteBatch(num_levels, def_levels, rep_levels, buffer); } @@ -611,21 +637,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level const bool is_nanosecond = type.unit() == TimeUnit::NANO; - // In the case where support_deprecated_int96_timestamps was specified - // and coerce_timestamps_enabled was specified, a nanosecond column - // will have a physical type of int64. In that case, we fall through - // to the else if below. - // - // See https://issues.apache.org/jira/browse/ARROW-2082 - if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps() && - !ctx_->properties->coerce_timestamps_enabled()) { + if (ctx_->properties->support_deprecated_int96_timestamps()) { + // The user explicitly required to use Int96 storage. return TypedWriteBatch(values, num_levels, def_levels, rep_levels); } else if (is_nanosecond || (ctx_->properties->coerce_timestamps_enabled() && (type.unit() != ctx_->properties->coerce_timestamps_unit()))) { // Casting is required. This covers several cases - // * Nanoseconds -> cast to microseconds + // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved) // * coerce_timestamps_enabled_, cast all timestamps to requested unit return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values, num_levels, def_levels, rep_levels); diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 2538c028002..50cb4cfea7d 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -45,19 +45,19 @@ class PARQUET_EXPORT ArrowWriterProperties { class Builder { public: Builder() - : write_nanos_as_int96_(false), + : write_timestamps_as_int96_(false), coerce_timestamps_enabled_(false), coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), truncated_timestamps_allowed_(false) {} virtual ~Builder() {} Builder* disable_deprecated_int96_timestamps() { - write_nanos_as_int96_ = false; + write_timestamps_as_int96_ = false; return this; } Builder* enable_deprecated_int96_timestamps() { - write_nanos_as_int96_ = true; + write_timestamps_as_int96_ = true; return this; } @@ -79,19 +79,19 @@ class PARQUET_EXPORT ArrowWriterProperties { std::shared_ptr build() { return std::shared_ptr(new ArrowWriterProperties( - write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, + write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_)); } private: - bool write_nanos_as_int96_; + bool write_timestamps_as_int96_; bool coerce_timestamps_enabled_; ::arrow::TimeUnit::type coerce_timestamps_unit_; bool truncated_timestamps_allowed_; }; - bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; } + bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } ::arrow::TimeUnit::type coerce_timestamps_unit() const { @@ -105,12 +105,12 @@ class PARQUET_EXPORT ArrowWriterProperties { bool coerce_timestamps_enabled, ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed) - : write_nanos_as_int96_(write_nanos_as_int96), + : write_timestamps_as_int96_(write_nanos_as_int96), coerce_timestamps_enabled_(coerce_timestamps_enabled), coerce_timestamps_unit_(coerce_timestamps_unit), truncated_timestamps_allowed_(truncated_timestamps_allowed) {} - const bool write_nanos_as_int96_; + const bool write_timestamps_as_int96_; const bool coerce_timestamps_enabled_; const ::arrow::TimeUnit::type coerce_timestamps_unit_; const bool truncated_timestamps_allowed_; @@ -208,24 +208,52 @@ namespace internal { * Timestamp conversion constants */ constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); -constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000); -/** - * Converts nanosecond timestamps to Impala (Int96) format - */ -inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, - Int96* impala_timestamp) { - int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays; +template +inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { + int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; (*impala_timestamp).value[2] = (uint32_t)julian_days; - int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay; + int64_t last_day_units = time % UnitPerDay; int64_t* impala_last_day_nanos = reinterpret_cast(impala_timestamp); - *impala_last_day_nanos = last_day_nanos; + *impala_last_day_nanos = last_day_units * NanosecondsPerUnit; +} + +constexpr int64_t kSecondsInNanos = INT64_C(1000000000); + +inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp(seconds, + impala_timestamp); +} + +constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); + +inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + milliseconds, impala_timestamp); +} + +constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); + +inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + microseconds, impala_timestamp); +} + +constexpr int64_t kNanosecondsInNanos = INT64_C(1); + +inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + nanoseconds, impala_timestamp); } } // namespace internal } // namespace arrow + } // namespace parquet #endif // PARQUET_ARROW_WRITER_H diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index b27718027b0..1812f5547ab 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -175,6 +175,19 @@ struct FixedLenByteArray { using FLBA = FixedLenByteArray; +// Julian day at unix epoch. +// +// The Julian Day Number (JDN) is the integer assigned to a whole solar day in +// the Julian day count starting from noon Universal time, with Julian day +// number 0 assigned to the day starting at noon on Monday, January 1, 4713 BC, +// proleptic Julian calendar (November 24, 4714 BC, in the proleptic Gregorian +// calendar), +constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588); +constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24); +constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000); +constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000); +constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000); + MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; }; STRUCT_END(Int96, 12); @@ -192,6 +205,14 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } +static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { + int64_t days_since_epoch = i96.value[2] - kJulianToUnixEpochDays; + int64_t nanoseconds = 0; + + memcpy(&nanoseconds, &i96.value, sizeof(int64_t)); + return days_since_epoch * kNanosecondsPerDay + nanoseconds; +} + static inline std::string Int96ToString(const Int96& a) { std::ostringstream result; std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 3ebfc8c0517..7350aee8d11 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -285,8 +285,8 @@ def _sanitize_table(table, new_schema, flavor): Specify if we should use dictionary encoding in general or only for some columns. use_deprecated_int96_timestamps : boolean, default None - Write nanosecond resolution timestamps to INT96 Parquet - format. Defaults to False unless enabled by flavor argument + Write timestamps to INT96 Parquet format. Defaults to False unless enabled + by flavor argument. This take priority over the coerce_timestamps option. coerce_timestamps : string, default None Cast timestamps a particular resolution. Valid values: {None, 'ms', 'us'} diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 89d32245804..e8ce2ab0469 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -844,7 +844,7 @@ def test_date_time_types(): a2 = pa.array(data2, type=t2) t3 = pa.timestamp('us') - start = pd.Timestamp('2000-01-01').value / 1000 + start = pd.Timestamp('2001-01-01').value / 1000 data3 = np.array([start, start + 1, start + 2], dtype='int64') a3 = pa.array(data3, type=t3) @@ -892,8 +892,9 @@ def test_date_time_types(): # date64 as date32 # time32[s] to time32[ms] + # 'timestamp[ms]' is saved as INT96 timestamp # 'timestamp[ns]' is saved as INT96 timestamp - expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], + expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', 'time32_from64[s]',