diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 24ec0dd24ee..5b3ffb67b22 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1189,65 +1189,116 @@ void MakeDateTimeTypesTable(std::shared_ptr* out, bool nanos_as_micros = auto f0 = field("f0", ::arrow::date32()); auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); - std::shared_ptr<::arrow::Field> f3; - if (nanos_as_micros) { - f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO)); - } else { - f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO)); - } + auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO; + auto f3 = field("f3", ::arrow::timestamp(f3_unit)); auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI)); auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5})); std::vector t32_values = {1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; - std::vector t64_values = {1489269000000, 1489270000000, 1489271000000, - 1489272000000, 1489272000000, 1489273000000}; + std::vector t64_ns_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; std::vector t64_us_values = {1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; + std::vector t64_ms_values = {1489269, 1489270, 1489271, + 1489272, 1489272, 1489273}; std::shared_ptr a0, a1, a2, a3, a4, a5; ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); - ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1); - ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2); - if (nanos_as_micros) { - ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values, - &a3); - } else { - ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values, - &a3); - } + ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values, + &a1); + ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values, + &a2); + auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values; + ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3); ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4); - ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5); + ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5); std::vector> columns = { std::make_shared("f0", a0), std::make_shared("f1", a1), std::make_shared("f2", a2), std::make_shared("f3", a3), std::make_shared("f4", a4), std::make_shared("f5", a5)}; + *out = Table::Make(schema, columns); } TEST(TestArrowReadWrite, DateTimeTypes) { - std::shared_ptr
table; + std::shared_ptr
table, result; MakeDateTimeTypesTable(&table); - // Use deprecated INT96 type - std::shared_ptr
result; - ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( - table, false /* use_threads */, table->num_rows(), {}, &result, - ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); - // Cast nanaoseconds to microseconds and use INT64 physical type ASSERT_NO_FATAL_FAILURE( DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); - std::shared_ptr
expected; MakeDateTimeTypesTable(&table, true); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); } +TEST(TestArrowReadWrite, UseDeprecatedInt96) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + std::vector is_valid = {true, true, true, false, true, true}; + + auto t_s = ::arrow::timestamp(TimeUnit::SECOND); + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + auto t_us = ::arrow::timestamp(TimeUnit::MICRO); + auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + + std::vector s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; + std::vector ms_values = {1489269000, 1489270000, 1489271000, + 1489272001, 1489272000, 1489273000}; + std::vector us_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000001, 1489272000000, 1489273000000}; + std::vector ns_values = {1489269000000000LL, 1489270000000000LL, + 1489271000000000LL, 1489272000000001LL, + 1489272000000000LL, 1489273000000000LL}; + + std::shared_ptr a_s, a_ms, a_us, a_ns; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); + + // Each input is typed with a unique TimeUnit + auto input_schema = schema( + {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)}); + auto input = Table::Make( + input_schema, + {std::make_shared("f_s", a_s), std::make_shared("f_ms", a_ms), + std::make_shared("f_us", a_us), std::make_shared("f_ns", a_ns)}); + + // When reading parquet files, all int96 schema fields are converted to + // timestamp nanoseconds + auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns), + field("f_ns", t_ns)}); + auto ex_result = Table::Make( + ex_schema, + {std::make_shared("f_s", a_ns), std::make_shared("f_ms", a_ns), + std::make_shared("f_us", a_ns), std::make_shared("f_ns", a_ns)}); + + std::shared_ptr
result; + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( + input, false /* use_threads */, input->num_rows(), {}, &result, + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); + + // Ensure enable_deprecated_int96_timestamps as precedence over + // coerce_timestamps. + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */, + input->num_rows(), {}, &result, + ArrowWriterProperties::Builder() + .enable_deprecated_int96_timestamps() + ->coerce_timestamps(TimeUnit::MILLI) + ->build())); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; @@ -1293,6 +1344,12 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { {std::make_shared("f_s", a_ms), std::make_shared("f_ms", a_ms), std::make_shared("f_us", a_ms), std::make_shared("f_ns", a_ms)}); + std::shared_ptr
milli_result; + ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( + input, false /* use_threads */, input->num_rows(), {}, &milli_result, + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); + // Result when coercing to microseconds auto s3 = std::shared_ptr<::arrow::Schema>( new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us), @@ -1302,13 +1359,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { {std::make_shared("f_s", a_us), std::make_shared("f_ms", a_us), std::make_shared("f_us", a_us), std::make_shared("f_ns", a_us)}); - std::shared_ptr
milli_result; - ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( - input, false /* use_threads */, input->num_rows(), {}, &milli_result, - ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); - - ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); - std::shared_ptr
micro_result; ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( input, false /* use_threads */, input->num_rows(), {}, µ_result, @@ -1453,65 +1503,6 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result)); } -// Regression for ARROW-2802 -TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) { - using ::arrow::Column; - using ::arrow::default_memory_pool; - using ::arrow::Field; - using ::arrow::Schema; - using ::arrow::Table; - using ::arrow::TimestampBuilder; - using ::arrow::TimestampType; - using ::arrow::TimeUnit; - - auto timestamp_type = std::make_shared(TimeUnit::NANO); - - TimestampBuilder builder(timestamp_type, default_memory_pool()); - for (std::int64_t ii = 0; ii < 10; ++ii) { - ASSERT_OK(builder.Append(1000000000L * ii)); - } - std::shared_ptr values; - ASSERT_OK(builder.Finish(&values)); - - std::vector> fields; - auto field = std::make_shared("nanos", timestamp_type); - fields.emplace_back(field); - - auto schema = std::make_shared(fields); - - std::vector> columns; - auto column = std::make_shared("nanos", values); - columns.emplace_back(column); - - auto table = Table::Make(schema, columns); - - auto arrow_writer_properties = ArrowWriterProperties::Builder() - .coerce_timestamps(TimeUnit::MICRO) - ->enable_deprecated_int96_timestamps() - ->build(); - - std::shared_ptr
result; - DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result, - arrow_writer_properties); - - ASSERT_EQ(table->num_columns(), result->num_columns()); - ASSERT_EQ(table->num_rows(), result->num_rows()); - - auto actual_column = result->column(0); - auto data = actual_column->data(); - auto expected_values = - static_cast<::arrow::NumericArray*>(values.get())->raw_values(); - for (int ii = 0; ii < data->num_chunks(); ++ii) { - auto chunk = - static_cast<::arrow::NumericArray*>(data->chunk(ii).get()); - auto values = chunk->raw_values(); - for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) { - // Check that the nanos have been converted to micros - ASSERT_EQ(*expected_values / 1000, values[jj]); - } - } -} - void MakeDoubleTable(int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; @@ -2284,11 +2275,13 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead, ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL)); -TEST(TestImpalaConversion, NanosecondToImpala) { +TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) { // June 20, 2017 16:32:56 and 123456789 nanoseconds int64_t nanoseconds = INT64_C(1497976376123456789); - Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; + Int96 calculated; + + Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated); ASSERT_EQ(expected, calculated); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 6273fda4640..a886148b3d7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -70,18 +70,6 @@ namespace arrow { using ::arrow::BitUtil::BytesForBits; -constexpr int64_t kJulianToUnixEpochDays = 2440588LL; -constexpr int64_t kMillisecondsInADay = 86400000LL; -constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL; - -static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) { - int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays; - int64_t nanoseconds = 0; - - memcpy(&nanoseconds, &impala_timestamp.value, sizeof(int64_t)); - return days_since_epoch * kNanosecondsInADay + nanoseconds; -} - template using ArrayType = typename ::arrow::TypeTraits::ArrayType; @@ -1001,7 +989,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> { auto data_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { - *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]); + *data_ptr++ = Int96GetNanoSeconds(values[i]); } if (reader->nullable_values()) { @@ -1029,7 +1017,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> { auto out_ptr = reinterpret_cast(data->mutable_data()); for (int64_t i = 0; i < length; i++) { - *out_ptr++ = static_cast(values[i]) * kMillisecondsInADay; + *out_ptr++ = static_cast(values[i]) * kMillisecondsPerDay; } if (reader->nullable_values()) { diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index d0014a6f3aa..af9fbc91a50 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -423,45 +423,66 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, return Status::OK(); } +static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) { + switch (time_unit) { + case ::arrow::TimeUnit::MILLI: + return LogicalType::TIMESTAMP_MILLIS; + case ::arrow::TimeUnit::MICRO: + return LogicalType::TIMESTAMP_MICROS; + case ::arrow::TimeUnit::SECOND: + case ::arrow::TimeUnit::NANO: + // No equivalent parquet logical type. + break; + } + + return LogicalType::NONE; +} + static Status GetTimestampMetadata(const ::arrow::TimestampType& type, const ArrowWriterProperties& properties, ParquetType::type* physical_type, LogicalType::type* logical_type) { - auto unit = type.unit(); - *physical_type = ParquetType::INT64; + const bool coerce = properties.coerce_timestamps_enabled(); + const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit(); - if (properties.coerce_timestamps_enabled()) { - auto coerce_unit = properties.coerce_timestamps_unit(); - if (coerce_unit == ::arrow::TimeUnit::MILLI) { - *logical_type = LogicalType::TIMESTAMP_MILLIS; - } else if (coerce_unit == ::arrow::TimeUnit::MICRO) { - *logical_type = LogicalType::TIMESTAMP_MICROS; - } else { - return Status::NotImplemented( - "Can only coerce Arrow timestamps to milliseconds" - " or microseconds"); + // The user is explicitly asking for Impala int96 encoding, there is no + // logical type. + if (properties.support_deprecated_int96_timestamps()) { + *physical_type = ParquetType::INT96; + return Status::OK(); + } + + *physical_type = ParquetType::INT64; + *logical_type = LogicalTypeFromArrowTimeUnit(unit); + + // The user is requesting that all timestamp columns are casted to a specific + // type. Only 2 TimeUnit are supported by arrow-parquet. + if (coerce) { + switch (unit) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + break; + case ::arrow::TimeUnit::NANO: + case ::arrow::TimeUnit::SECOND: + return Status::NotImplemented( + "Can only coerce Arrow timestamps to milliseconds" + " or microseconds"); } + return Status::OK(); } - if (unit == ::arrow::TimeUnit::MILLI) { - *logical_type = LogicalType::TIMESTAMP_MILLIS; - } else if (unit == ::arrow::TimeUnit::MICRO) { + // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to + // int64 microseconds when deprecated int96 is not requested. + if (type.unit() == ::arrow::TimeUnit::NANO) *logical_type = LogicalType::TIMESTAMP_MICROS; - } else if (unit == ::arrow::TimeUnit::NANO) { - if (properties.support_deprecated_int96_timestamps()) { - *physical_type = ParquetType::INT96; - // No corresponding logical type - } else { - *logical_type = LogicalType::TIMESTAMP_MICROS; - } - } else { + else if (type.unit() == ::arrow::TimeUnit::SECOND) return Status::NotImplemented( "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " "Parquet."); - } + return Status::OK(); -} +} // namespace arrow Status FieldToNode(const std::shared_ptr& field, const WriterProperties& properties, @@ -698,7 +719,7 @@ int32_t DecimalSize(int32_t precision) { } DCHECK(false); return -1; -} +} // namespace arrow } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 402cbf0f202..bce9f37026c 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -386,7 +386,11 @@ class ArrowColumnWriter { Status WriteBatch(int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const typename ParquetType::c_type* values) { - auto typed_writer = static_cast*>(writer_); + auto typed_writer = + ::arrow::internal::checked_cast*>(writer_); + // WriteBatch was called with type mismatching the writer_'s type. This + // could be a schema conversion problem. + DCHECK(typed_writer); PARQUET_CATCH_NOT_OK( typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values)); return Status::OK(); @@ -397,7 +401,11 @@ class ArrowColumnWriter { const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ParquetType::c_type* values) { - auto typed_writer = static_cast*>(writer_); + auto typed_writer = + ::arrow::internal::checked_cast*>(writer_); + // WriteBatchSpaced was called with type mismatching the writer_'s type. This + // could be a schema conversion problem. + DCHECK(typed_writer); PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced( num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values)); return Status::OK(); @@ -570,20 +578,42 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +#define CONV_CASE_LOOP(ConversionFunction) \ + for (int64_t i = 0; i < num_values; i++) \ + ConversionFunction(arrow_values[i], &output[i]); + +static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values, + int64_t num_values, + ::arrow::TimeUnit ::type unit_type, + Int96* output) { + switch (unit_type) { + case TimeUnit::NANO: + CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp); + break; + case TimeUnit::MICRO: + CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp); + break; + case TimeUnit::MILLI: + CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp); + break; + case TimeUnit::SECOND: + CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp); + break; + } +} + +#undef CONV_CASE_LOOP + template <> Status ArrowColumnWriter::WriteNullableBatch( const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const int64_t* values) { - Int96* buffer; + Int96* buffer = nullptr; RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - if (type.unit() == TimeUnit::NANO) { - for (int i = 0; i < num_values; i++) { - internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]); - } - } else { - return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing"); - } + + ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); + return WriteBatchSpaced(num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer); } @@ -592,15 +622,11 @@ template <> Status ArrowColumnWriter::WriteNonNullableBatch( const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) { - Int96* buffer; + Int96* buffer = nullptr; RETURN_NOT_OK(ctx_->GetScratchData(num_values, &buffer)); - if (type.unit() == TimeUnit::NANO) { - for (int i = 0; i < num_values; i++) { - internal::NanosecondsToImpalaTimestamp(values[i], buffer + i); - } - } else { - return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing"); - } + + ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer); + return WriteBatch(num_levels, def_levels, rep_levels, buffer); } @@ -611,21 +637,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level const bool is_nanosecond = type.unit() == TimeUnit::NANO; - // In the case where support_deprecated_int96_timestamps was specified - // and coerce_timestamps_enabled was specified, a nanosecond column - // will have a physical type of int64. In that case, we fall through - // to the else if below. - // - // See https://issues.apache.org/jira/browse/ARROW-2082 - if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps() && - !ctx_->properties->coerce_timestamps_enabled()) { + if (ctx_->properties->support_deprecated_int96_timestamps()) { + // The user explicitly required to use Int96 storage. return TypedWriteBatch(values, num_levels, def_levels, rep_levels); } else if (is_nanosecond || (ctx_->properties->coerce_timestamps_enabled() && (type.unit() != ctx_->properties->coerce_timestamps_unit()))) { // Casting is required. This covers several cases - // * Nanoseconds -> cast to microseconds + // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved) // * coerce_timestamps_enabled_, cast all timestamps to requested unit return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values, num_levels, def_levels, rep_levels); diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 2538c028002..50cb4cfea7d 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -45,19 +45,19 @@ class PARQUET_EXPORT ArrowWriterProperties { class Builder { public: Builder() - : write_nanos_as_int96_(false), + : write_timestamps_as_int96_(false), coerce_timestamps_enabled_(false), coerce_timestamps_unit_(::arrow::TimeUnit::SECOND), truncated_timestamps_allowed_(false) {} virtual ~Builder() {} Builder* disable_deprecated_int96_timestamps() { - write_nanos_as_int96_ = false; + write_timestamps_as_int96_ = false; return this; } Builder* enable_deprecated_int96_timestamps() { - write_nanos_as_int96_ = true; + write_timestamps_as_int96_ = true; return this; } @@ -79,19 +79,19 @@ class PARQUET_EXPORT ArrowWriterProperties { std::shared_ptr build() { return std::shared_ptr(new ArrowWriterProperties( - write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, + write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_)); } private: - bool write_nanos_as_int96_; + bool write_timestamps_as_int96_; bool coerce_timestamps_enabled_; ::arrow::TimeUnit::type coerce_timestamps_unit_; bool truncated_timestamps_allowed_; }; - bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; } + bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; } ::arrow::TimeUnit::type coerce_timestamps_unit() const { @@ -105,12 +105,12 @@ class PARQUET_EXPORT ArrowWriterProperties { bool coerce_timestamps_enabled, ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed) - : write_nanos_as_int96_(write_nanos_as_int96), + : write_timestamps_as_int96_(write_nanos_as_int96), coerce_timestamps_enabled_(coerce_timestamps_enabled), coerce_timestamps_unit_(coerce_timestamps_unit), truncated_timestamps_allowed_(truncated_timestamps_allowed) {} - const bool write_nanos_as_int96_; + const bool write_timestamps_as_int96_; const bool coerce_timestamps_enabled_; const ::arrow::TimeUnit::type coerce_timestamps_unit_; const bool truncated_timestamps_allowed_; @@ -208,24 +208,52 @@ namespace internal { * Timestamp conversion constants */ constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); -constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000); -/** - * Converts nanosecond timestamps to Impala (Int96) format - */ -inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, - Int96* impala_timestamp) { - int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays; +template +inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { + int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; (*impala_timestamp).value[2] = (uint32_t)julian_days; - int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay; + int64_t last_day_units = time % UnitPerDay; int64_t* impala_last_day_nanos = reinterpret_cast(impala_timestamp); - *impala_last_day_nanos = last_day_nanos; + *impala_last_day_nanos = last_day_units * NanosecondsPerUnit; +} + +constexpr int64_t kSecondsInNanos = INT64_C(1000000000); + +inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp(seconds, + impala_timestamp); +} + +constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); + +inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + milliseconds, impala_timestamp); +} + +constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); + +inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + microseconds, impala_timestamp); +} + +constexpr int64_t kNanosecondsInNanos = INT64_C(1); + +inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, + Int96* impala_timestamp) { + ArrowTimestampToImpalaTimestamp( + nanoseconds, impala_timestamp); } } // namespace internal } // namespace arrow + } // namespace parquet #endif // PARQUET_ARROW_WRITER_H diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index b27718027b0..1812f5547ab 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -175,6 +175,19 @@ struct FixedLenByteArray { using FLBA = FixedLenByteArray; +// Julian day at unix epoch. +// +// The Julian Day Number (JDN) is the integer assigned to a whole solar day in +// the Julian day count starting from noon Universal time, with Julian day +// number 0 assigned to the day starting at noon on Monday, January 1, 4713 BC, +// proleptic Julian calendar (November 24, 4714 BC, in the proleptic Gregorian +// calendar), +constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588); +constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24); +constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000); +constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000); +constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000); + MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; }; STRUCT_END(Int96, 12); @@ -192,6 +205,14 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } +static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { + int64_t days_since_epoch = i96.value[2] - kJulianToUnixEpochDays; + int64_t nanoseconds = 0; + + memcpy(&nanoseconds, &i96.value, sizeof(int64_t)); + return days_since_epoch * kNanosecondsPerDay + nanoseconds; +} + static inline std::string Int96ToString(const Int96& a) { std::ostringstream result; std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 3ebfc8c0517..7350aee8d11 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -285,8 +285,8 @@ def _sanitize_table(table, new_schema, flavor): Specify if we should use dictionary encoding in general or only for some columns. use_deprecated_int96_timestamps : boolean, default None - Write nanosecond resolution timestamps to INT96 Parquet - format. Defaults to False unless enabled by flavor argument + Write timestamps to INT96 Parquet format. Defaults to False unless enabled + by flavor argument. This take priority over the coerce_timestamps option. coerce_timestamps : string, default None Cast timestamps a particular resolution. Valid values: {None, 'ms', 'us'} diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 89d32245804..e8ce2ab0469 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -844,7 +844,7 @@ def test_date_time_types(): a2 = pa.array(data2, type=t2) t3 = pa.timestamp('us') - start = pd.Timestamp('2000-01-01').value / 1000 + start = pd.Timestamp('2001-01-01').value / 1000 data3 = np.array([start, start + 1, start + 2], dtype='int64') a3 = pa.array(data3, type=t3) @@ -892,8 +892,9 @@ def test_date_time_types(): # date64 as date32 # time32[s] to time32[ms] + # 'timestamp[ms]' is saved as INT96 timestamp # 'timestamp[ns]' is saved as INT96 timestamp - expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], + expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', 'time32_from64[s]',