diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 21f6f0405f7..5f4e21365b4 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -87,58 +87,76 @@ static constexpr int LARGE_SIZE = 10000; static constexpr uint32_t kDefaultSeed = 0; -LogicalType::type get_logical_type(const ::DataType& type) { +std::shared_ptr get_logical_annotation(const ::DataType& type) { switch (type.id()) { case ArrowId::UINT8: - return LogicalType::UINT_8; + return LogicalAnnotation::Int(8, false); case ArrowId::INT8: - return LogicalType::INT_8; + return LogicalAnnotation::Int(8, true); case ArrowId::UINT16: - return LogicalType::UINT_16; + return LogicalAnnotation::Int(16, false); case ArrowId::INT16: - return LogicalType::INT_16; + return LogicalAnnotation::Int(16, true); case ArrowId::UINT32: - return LogicalType::UINT_32; + return LogicalAnnotation::Int(32, false); case ArrowId::INT32: - return LogicalType::INT_32; + return LogicalAnnotation::Int(32, true); case ArrowId::UINT64: - return LogicalType::UINT_64; + return LogicalAnnotation::Int(64, false); case ArrowId::INT64: - return LogicalType::INT_64; + return LogicalAnnotation::Int(64, true); case ArrowId::STRING: - return LogicalType::UTF8; + return LogicalAnnotation::String(); case ArrowId::DATE32: - return LogicalType::DATE; + return LogicalAnnotation::Date(); case ArrowId::DATE64: - return LogicalType::DATE; + return LogicalAnnotation::Date(); case ArrowId::TIMESTAMP: { const auto& ts_type = static_cast(type); + const bool adjusted_to_utc = !(ts_type.timezone().empty()); switch (ts_type.unit()) { case TimeUnit::MILLI: - return LogicalType::TIMESTAMP_MILLIS; + return LogicalAnnotation::Timestamp(adjusted_to_utc, + LogicalAnnotation::TimeUnit::MILLIS); case TimeUnit::MICRO: - return LogicalType::TIMESTAMP_MICROS; + return LogicalAnnotation::Timestamp(adjusted_to_utc, + LogicalAnnotation::TimeUnit::MICROS); + case TimeUnit::NANO: + return LogicalAnnotation::Timestamp(adjusted_to_utc, + LogicalAnnotation::TimeUnit::NANOS); default: - DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps " - "with Parquet."; + DCHECK(false) + << "Only MILLI, MICRO, and NANO units supported for Arrow TIMESTAMP."; } break; } case ArrowId::TIME32: - return LogicalType::TIME_MILLIS; - case ArrowId::TIME64: - return LogicalType::TIME_MICROS; + return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS); + case ArrowId::TIME64: { + const auto& tm_type = static_cast(type); + switch (tm_type.unit()) { + case TimeUnit::MICRO: + return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS); + case TimeUnit::NANO: + return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS); + default: + DCHECK(false) << "Only MICRO and NANO units supported for Arrow TIME64."; + } + break; + } case ArrowId::DICTIONARY: { const ::arrow::DictionaryType& dict_type = static_cast(type); - return get_logical_type(*dict_type.value_type()); + return get_logical_annotation(*dict_type.value_type()); + } + case ArrowId::DECIMAL: { + const auto& dec_type = static_cast(type); + return LogicalAnnotation::Decimal(dec_type.precision(), dec_type.scale()); } - case ArrowId::DECIMAL: - return LogicalType::DECIMAL; default: break; } - return LogicalType::NONE; + return LogicalAnnotation::None(); } ParquetType::type get_physical_type(const ::DataType& type) { @@ -353,6 +371,49 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual } } +void DoConfiguredRoundtrip( + const std::shared_ptr& table, int64_t row_group_size, + std::shared_ptr
* out, + const std::shared_ptr<::parquet::WriterProperties>& parquet_properties = + ::parquet::default_writer_properties(), + const std::shared_ptr& arrow_properties = + default_arrow_writer_properties()) { + std::shared_ptr buffer; + + auto sink = CreateOutputStream(); + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, + row_group_size, parquet_properties, arrow_properties)); + ASSERT_OK_NO_THROW(sink->Finish(&buffer)); + + std::unique_ptr reader; + ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), + ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + ASSERT_OK_NO_THROW(reader->ReadTable(out)); +} + +void CheckConfiguredRoundtrip( + const std::shared_ptr
& input_table, + const std::shared_ptr
& expected_table = nullptr, + const std::shared_ptr<::parquet::WriterProperties>& parquet_properties = + ::parquet::default_writer_properties(), + const std::shared_ptr& arrow_properties = + default_arrow_writer_properties()) { + std::shared_ptr
actual_table; + ASSERT_NO_FATAL_FAILURE(DoConfiguredRoundtrip(input_table, input_table->num_rows(), + &actual_table, parquet_properties, + arrow_properties)); + if (expected_table) { + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema())); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *expected_table)); + } else { + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*actual_table->schema(), *input_table->schema())); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *input_table)); + } +} + void DoSimpleRoundtrip(const std::shared_ptr
& table, bool use_threads, int64_t row_group_size, const std::vector& column_subset, std::shared_ptr
* out, @@ -383,14 +444,14 @@ void CheckSimpleRoundtrip(const std::shared_ptr
& table, int64_t row_group std::shared_ptr
result; DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result, arrow_properties); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*table->schema(), *result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false)); } static std::shared_ptr MakeSimpleSchema(const ::DataType& type, Repetition::type repetition) { int32_t byte_width = -1; - int32_t precision = -1; - int32_t scale = -1; switch (type.id()) { case ::arrow::Type::DICTIONARY: { @@ -404,9 +465,7 @@ static std::shared_ptr MakeSimpleSchema(const ::DataType& type, case ::arrow::Type::DECIMAL: { const auto& decimal_type = static_cast(values_type); - precision = decimal_type.precision(); - scale = decimal_type.scale(); - byte_width = DecimalSize(precision); + byte_width = DecimalSize(decimal_type.precision()); } break; default: break; @@ -417,15 +476,13 @@ static std::shared_ptr MakeSimpleSchema(const ::DataType& type, break; case ::arrow::Type::DECIMAL: { const auto& decimal_type = static_cast(type); - precision = decimal_type.precision(); - scale = decimal_type.scale(); - byte_width = DecimalSize(precision); + byte_width = DecimalSize(decimal_type.precision()); } break; default: break; } - auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type), - get_logical_type(type), byte_width, precision, scale); + auto pnode = PrimitiveNode::Make("column1", repetition, get_logical_annotation(type), + get_physical_type(type), byte_width); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); @@ -1195,7 +1252,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4)); } -void MakeDateTimeTypesTable(std::shared_ptr
* out, bool nanos_as_micros = false) { +void MakeDateTimeTypesTable(std::shared_ptr
* out, bool expected = false) { using ::arrow::ArrayFromVector; std::vector is_valid = {true, true, true, false, true, true}; @@ -1204,12 +1261,14 @@ void MakeDateTimeTypesTable(std::shared_ptr
* out, bool nanos_as_micros = auto f0 = field("f0", ::arrow::date32()); auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); - auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO; - auto f3 = field("f3", ::arrow::timestamp(f3_unit)); + auto f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO)); + auto f3_x = field("f3", ::arrow::timestamp(TimeUnit::MICRO)); auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI)); auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO)); + auto f6 = field("f6", ::arrow::time64(TimeUnit::NANO)); - std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5})); + std::shared_ptr<::arrow::Schema> schema( + new ::arrow::Schema({f0, f1, f2, (expected ? f3_x : f3), f4, f5, f6})); std::vector t32_values = {1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; @@ -1220,34 +1279,42 @@ void MakeDateTimeTypesTable(std::shared_ptr
* out, bool nanos_as_micros = std::vector t64_ms_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; - std::shared_ptr a0, a1, a2, a3, a4, a5; + std::shared_ptr a0, a1, a2, a3, a3_x, a4, a5, a6; ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values, &a1); ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values, &a2); - auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values; - ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3); + ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_ns_values, + &a3); + ArrayFromVector<::arrow::TimestampType, int64_t>(f3_x->type(), is_valid, t64_us_values, + &a3_x); ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4); ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5); + ArrayFromVector<::arrow::Time64Type, int64_t>(f6->type(), is_valid, t64_ns_values, &a6); std::vector> columns = { - std::make_shared("f0", a0), std::make_shared("f1", a1), - std::make_shared("f2", a2), std::make_shared("f3", a3), - std::make_shared("f4", a4), std::make_shared("f5", a5)}; + std::make_shared("f0", a0), + std::make_shared("f1", a1), + std::make_shared("f2", a2), + std::make_shared("f3", (expected ? a3_x : a3)), + std::make_shared("f4", a4), + std::make_shared("f5", a5), + std::make_shared("f6", a6)}; *out = Table::Make(schema, columns); } TEST(TestArrowReadWrite, DateTimeTypes) { std::shared_ptr
table, result; - MakeDateTimeTypesTable(&table); - // Cast nanaoseconds to microseconds and use INT64 physical type + MakeDateTimeTypesTable(&table); ASSERT_NO_FATAL_FAILURE( DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); - MakeDateTimeTypesTable(&table, true); + MakeDateTimeTypesTable(&table, true); // build expected result + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*table->schema(), *result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); } @@ -1300,6 +1367,8 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { input, false /* use_threads */, input->num_rows(), {}, &result, ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*ex_result->schema(), *result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); // Ensure enable_deprecated_int96_timestamps as precedence over @@ -1311,6 +1380,8 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ->coerce_timestamps(TimeUnit::MILLI) ->build())); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*ex_result->schema(), *result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } @@ -1318,7 +1389,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; - // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS std::vector is_valid = {true, true, true, false, true, true}; auto t_s = ::arrow::timestamp(TimeUnit::SECOND); @@ -1342,42 +1412,41 @@ TEST(TestArrowReadWrite, CoerceTimestamps) { ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); // Input table, all data as is - auto s1 = std::shared_ptr<::arrow::Schema>( - new ::arrow::Schema({field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), - field("f_ns", t_ns)})); + auto s1 = ::arrow::schema( + {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)}); auto input = Table::Make( s1, {std::make_shared("f_s", a_s), std::make_shared("f_ms", a_ms), std::make_shared("f_us", a_us), std::make_shared("f_ns", a_ns)}); // Result when coercing to milliseconds - auto s2 = std::shared_ptr<::arrow::Schema>( - new ::arrow::Schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms), - field("f_ns", t_ms)})); + auto s2 = ::arrow::schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms), + field("f_ns", t_ms)}); auto ex_milli_result = Table::Make( s2, {std::make_shared("f_s", a_ms), std::make_shared("f_ms", a_ms), std::make_shared("f_us", a_ms), std::make_shared("f_ns", a_ms)}); - std::shared_ptr
milli_result; ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( input, false /* use_threads */, input->num_rows(), {}, &milli_result, ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*ex_milli_result->schema(), *milli_result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); // Result when coercing to microseconds - auto s3 = std::shared_ptr<::arrow::Schema>( - new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us), - field("f_ns", t_us)})); + auto s3 = ::arrow::schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us), + field("f_ns", t_us)}); auto ex_micro_result = Table::Make( s3, {std::make_shared("f_s", a_us), std::make_shared("f_ms", a_us), std::make_shared("f_us", a_us), std::make_shared("f_ns", a_us)}); - std::shared_ptr
micro_result; ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( input, false /* use_threads */, input->num_rows(), {}, µ_result, ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build())); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*ex_micro_result->schema(), *micro_result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result)); } @@ -1408,10 +1477,10 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) { ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); - auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s", t_s)})); - auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms", t_ms)})); - auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us", t_us)})); - auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns", t_ns)})); + auto s1 = ::arrow::schema({field("f_s", t_s)}); + auto s2 = ::arrow::schema({field("f_ms", t_ms)}); + auto s3 = ::arrow::schema({field("f_us", t_us)}); + auto s4 = ::arrow::schema({field("f_ns", t_ns)}); auto c1 = std::make_shared("f_s", a_s); auto c2 = std::make_shared("f_ms", a_ms); @@ -1439,25 +1508,221 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) { ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, default_writer_properties(), coerce_millis)); - // OK to lose precision if we explicitly allow it - auto allow_truncation = (ArrowWriterProperties::Builder() - .coerce_timestamps(TimeUnit::MILLI) - ->allow_truncated_timestamps() - ->build()); + // OK to lose micros/nanos -> millis precision if we explicitly allow it + auto allow_truncation_to_millis = (ArrowWriterProperties::Builder() + .coerce_timestamps(TimeUnit::MILLI) + ->allow_truncated_timestamps() + ->build()); ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, - default_writer_properties(), allow_truncation)); + default_writer_properties(), allow_truncation_to_millis)); ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, - default_writer_properties(), allow_truncation)); + default_writer_properties(), allow_truncation_to_millis)); - // OK to write micros to micros + // OK to write to micros auto coerce_micros = (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()); + ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10, + default_writer_properties(), coerce_micros)); + ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10, + default_writer_properties(), coerce_micros)); ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, default_writer_properties(), coerce_micros)); // Loss of precision ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, default_writer_properties(), coerce_micros)); + + // OK to lose nanos -> micros precision if we explicitly allow it + auto allow_truncation_to_micros = (ArrowWriterProperties::Builder() + .coerce_timestamps(TimeUnit::MICRO) + ->allow_truncated_timestamps() + ->build()); + ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, + default_writer_properties(), allow_truncation_to_micros)); +} + +TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + std::vector is_valid = {true, true, true, false, true, true}; + + auto t_s = ::arrow::timestamp(TimeUnit::SECOND); + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + + std::vector s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; + std::vector ms_values = {1489269000, 1489270000, 1489271000, + 1489272000, 1489272000, 1489273000}; + + std::shared_ptr a_s, a_ms; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); + + auto si = schema({field("timestamp", t_s)}); + auto sx = schema({field("timestamp", t_ms)}); + + auto ci = std::make_shared("timestamp", a_s); + auto cx = std::make_shared("timestamp", a_ms); + + auto ti = Table::Make(si, {ci}); // input + auto tx = Table::Make(sx, {cx}); // expected output + std::shared_ptr
to; // actual output + + // default properties (without explicit coercion instructions) used ... + ASSERT_NO_FATAL_FAILURE( + DoSimpleRoundtrip(ti, false /* use_threads */, ti->num_rows(), {}, &to)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*tx->schema(), *to->schema())); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to)); +} + +TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) { + using ::arrow::ArrayFromVector; + using ::arrow::field; + using ::arrow::schema; + + auto t_s = ::arrow::timestamp(TimeUnit::SECOND); + auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); + auto t_us = ::arrow::timestamp(TimeUnit::MICRO); + auto t_ns = ::arrow::timestamp(TimeUnit::NANO); + + const int N = 24; + int64_t instant = INT64_C(1262304000); // 2010-01-01T00:00:00 seconds offset + std::vector d_s, d_ms, d_us, d_ns; + for (int i = 0; i < N; ++i) { + d_s.push_back(instant); + d_ms.push_back(instant * INT64_C(1000)); + d_us.push_back(instant * INT64_C(1000000)); + d_ns.push_back(instant * INT64_C(1000000000)); + instant += 3600; + } + + std::shared_ptr a_s, a_ms, a_us, a_ns; + ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, d_s, &a_s); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, d_ms, &a_ms); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, d_us, &a_us); + ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, d_ns, &a_ns); + + auto c_s = std::make_shared("ts:s", a_s); + auto c_ms = std::make_shared("ts:ms", a_ms); + auto c_us = std::make_shared("ts:us", a_us); + auto c_ns = std::make_shared("ts:ns", a_ns); + + auto input_schema = schema({field("ts:s", t_s), field("ts:ms", t_ms), + field("ts:us", t_us), field("ts:ns", t_ns)}); + auto input_table = Table::Make(input_schema, {c_s, c_ms, c_us, c_ns}); + + auto parquet_version_1_properties = ::parquet::default_writer_properties(); + auto parquet_version_2_properties = ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + + { + // Using Parquet version 1.0 defaults, seconds should be coerced to milliseconds + // and nanoseconds should be coerced to microseconds + auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms), + field("ts:us", t_us), field("ts:ns", t_us)}); + auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_us}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_1_properties)); + } + { + // Using Parquet version 2.0 defaults, seconds should be coerced to milliseconds + // and nanoseconds should be retained + auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms), + field("ts:us", t_us), field("ts:ns", t_ns)}); + auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_ns}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_2_properties)); + } + + auto arrow_coerce_to_seconds_properties = + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::SECOND)->build(); + auto arrow_coerce_to_millis_properties = + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build(); + auto arrow_coerce_to_micros_properties = + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build(); + auto arrow_coerce_to_nanos_properties = + ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build(); + { + // Neither Parquet version 1.0 nor 2.0 allow coercing to seconds + auto sink = CreateOutputStream(); + std::shared_ptr
actual_table; + ASSERT_RAISES(NotImplemented, + WriteTable(*input_table, ::arrow::default_memory_pool(), sink, + input_table->num_rows(), parquet_version_1_properties, + arrow_coerce_to_seconds_properties)); + ASSERT_RAISES(NotImplemented, + WriteTable(*input_table, ::arrow::default_memory_pool(), sink, + input_table->num_rows(), parquet_version_2_properties, + arrow_coerce_to_seconds_properties)); + } + { + // Using Parquet version 1.0, coercing to milliseconds or microseconds is allowed + auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms), + field("ts:us", t_ms), field("ts:ns", t_ms)}); + auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_1_properties, + arrow_coerce_to_millis_properties)); + + expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us), + field("ts:us", t_us), field("ts:ns", t_us)}); + expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_1_properties, + arrow_coerce_to_micros_properties)); + } + { + // Using Parquet version 2.0, coercing to milliseconds or microseconds is allowed + auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms), + field("ts:us", t_ms), field("ts:ns", t_ms)}); + auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_2_properties, + arrow_coerce_to_millis_properties)); + + expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us), + field("ts:us", t_us), field("ts:ns", t_us)}); + expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_2_properties, + arrow_coerce_to_micros_properties)); + } + { + // Using Parquet version 1.0, coercing to (int64) nanoseconds is not allowed + auto sink = CreateOutputStream(); + std::shared_ptr
actual_table; + ASSERT_RAISES(NotImplemented, + WriteTable(*input_table, ::arrow::default_memory_pool(), sink, + input_table->num_rows(), parquet_version_1_properties, + arrow_coerce_to_nanos_properties)); + } + { + // Using Parquet version 2.0, coercing to (int64) nanoseconds is allowed + auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns), + field("ts:us", t_ns), field("ts:ns", t_ns)}); + auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_2_properties, + arrow_coerce_to_nanos_properties)); + } + + auto arrow_enable_int96_properties = + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(); + { + // For either Parquet version, coercing to nanoseconds is allowed if Int96 + // storage is used + auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns), + field("ts:us", t_ns), field("ts:ns", t_ns)}); + auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns}); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_1_properties, + arrow_enable_int96_properties)); + ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table, + parquet_version_2_properties, + arrow_enable_int96_properties)); + } } TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { @@ -1515,6 +1780,8 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { ASSERT_NO_FATAL_FAILURE( DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); + ASSERT_NO_FATAL_FAILURE( + ::arrow::AssertSchemaEqual(*ex_table->schema(), *result->schema())); ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result)); } diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index b806782a09d..cedabdb73aa 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -33,6 +33,7 @@ using arrow::Field; using arrow::TimeUnit; using ParquetType = parquet::Type; +using parquet::LogicalAnnotation; using parquet::LogicalType; using parquet::Repetition; using parquet::schema::GroupNode; @@ -115,12 +116,14 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); - arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + arrow_fields.push_back(std::make_shared( + "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - arrow_fields.push_back(std::make_shared("timestamp[us]", TIMESTAMP_US, false)); + arrow_fields.push_back(std::make_shared( + "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false)); parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); @@ -168,6 +171,103 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { + struct FieldConstructionArguments { + std::string name; + std::shared_ptr annotation; + parquet::Type::type physical_type; + int physical_length; + std::shared_ptr<::arrow::DataType> datatype; + }; + + std::vector cases = { + {"string", LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1, + ::arrow::utf8()}, + {"enum", LogicalAnnotation::Enum(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"decimal(8, 2)", LogicalAnnotation::Decimal(8, 2), ParquetType::INT32, -1, + ::arrow::decimal(8, 2)}, + {"decimal(16, 4)", LogicalAnnotation::Decimal(16, 4), ParquetType::INT64, -1, + ::arrow::decimal(16, 4)}, + {"decimal(32, 8)", LogicalAnnotation::Decimal(32, 8), + ParquetType::FIXED_LEN_BYTE_ARRAY, 16, ::arrow::decimal(32, 8)}, + {"date", LogicalAnnotation::Date(), ParquetType::INT32, -1, ::arrow::date32()}, + {"time(ms)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)}, + {"time(us)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)}, + {"time(ns)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::NANOS), + ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)}, + {"time(ms)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)}, + {"time(us)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)}, + {"time(ns)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS), + ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)}, + {"timestamp(true, ms)", + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC")}, + {"timestamp(true, us)", + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")}, + {"timestamp(true, ns)", + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")}, + {"timestamp(false, ms)", + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI)}, + {"timestamp(false, us)", + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO)}, + {"timestamp(false, ns)", + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS), + ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO)}, + {"int(8, false)", LogicalAnnotation::Int(8, false), ParquetType::INT32, -1, + ::arrow::uint8()}, + {"int(8, true)", LogicalAnnotation::Int(8, true), ParquetType::INT32, -1, + ::arrow::int8()}, + {"int(16, false)", LogicalAnnotation::Int(16, false), ParquetType::INT32, -1, + ::arrow::uint16()}, + {"int(16, true)", LogicalAnnotation::Int(16, true), ParquetType::INT32, -1, + ::arrow::int16()}, + {"int(32, false)", LogicalAnnotation::Int(32, false), ParquetType::INT32, -1, + ::arrow::uint32()}, + {"int(32, true)", LogicalAnnotation::Int(32, true), ParquetType::INT32, -1, + ::arrow::int32()}, + {"int(64, false)", LogicalAnnotation::Int(64, false), ParquetType::INT64, -1, + ::arrow::uint64()}, + {"int(64, true)", LogicalAnnotation::Int(64, true), ParquetType::INT64, -1, + ::arrow::int64()}, + {"json", LogicalAnnotation::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"bson", LogicalAnnotation::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"interval", LogicalAnnotation::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12, + ::arrow::fixed_size_binary(12)}, + {"uuid", LogicalAnnotation::UUID(), ParquetType::FIXED_LEN_BYTE_ARRAY, 16, + ::arrow::fixed_size_binary(16)}, + {"none", LogicalAnnotation::None(), ParquetType::BOOLEAN, -1, ::arrow::boolean()}, + {"none", LogicalAnnotation::None(), ParquetType::INT32, -1, ::arrow::int32()}, + {"none", LogicalAnnotation::None(), ParquetType::INT64, -1, ::arrow::int64()}, + {"none", LogicalAnnotation::None(), ParquetType::FLOAT, -1, ::arrow::float32()}, + {"none", LogicalAnnotation::None(), ParquetType::DOUBLE, -1, ::arrow::float64()}, + {"none", LogicalAnnotation::None(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"none", LogicalAnnotation::None(), ParquetType::FIXED_LEN_BYTE_ARRAY, 64, + ::arrow::fixed_size_binary(64)}, + {"null", LogicalAnnotation::Null(), ParquetType::BYTE_ARRAY, -1, ::arrow::null()}, + }; + + std::vector parquet_fields; + std::vector> arrow_fields; + + for (const FieldConstructionArguments& c : cases) { + parquet_fields.push_back(PrimitiveNode::Make( + c.name, Repetition::OPTIONAL, c.annotation, c.physical_type, c.physical_length)); + arrow_fields.push_back(std::make_shared(c.name, c.datatype)); + } + + ASSERT_OK(ConvertSchema(parquet_fields)); + auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); +} + TEST_F(TestConvertParquetSchema, DuplicateFieldNames) { std::vector parquet_fields; std::vector> arrow_fields; @@ -586,6 +686,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } + TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { std::vector parquet_fields; std::vector> arrow_fields; @@ -686,12 +787,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); - arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + arrow_fields.push_back(std::make_shared( + "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - arrow_fields.push_back(std::make_shared("timestamp[us]", TIMESTAMP_US, false)); + arrow_fields.push_back(std::make_shared( + "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); @@ -714,6 +817,113 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); } +TEST_F(TestConvertArrowSchema, ArrowFields) { + struct FieldConstructionArguments { + std::string name; + std::shared_ptr<::arrow::DataType> datatype; + std::shared_ptr annotation; + parquet::Type::type physical_type; + int physical_length; + }; + + std::vector cases = { + {"boolean", ::arrow::boolean(), LogicalAnnotation::None(), ParquetType::BOOLEAN, + -1}, + {"binary", ::arrow::binary(), LogicalAnnotation::None(), ParquetType::BYTE_ARRAY, + -1}, + {"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalAnnotation::None(), + ParquetType::FIXED_LEN_BYTE_ARRAY, 64}, + {"uint8", ::arrow::uint8(), LogicalAnnotation::Int(8, false), ParquetType::INT32, + -1}, + {"int8", ::arrow::int8(), LogicalAnnotation::Int(8, true), ParquetType::INT32, -1}, + {"uint16", ::arrow::uint16(), LogicalAnnotation::Int(16, false), ParquetType::INT32, + -1}, + {"int16", ::arrow::int16(), LogicalAnnotation::Int(16, true), ParquetType::INT32, + -1}, + {"uint32", ::arrow::uint32(), LogicalAnnotation::None(), ParquetType::INT64, + -1}, // Parquet 1.0 + {"int32", ::arrow::int32(), LogicalAnnotation::None(), ParquetType::INT32, -1}, + {"uint64", ::arrow::uint64(), LogicalAnnotation::Int(64, false), ParquetType::INT64, + -1}, + {"int64", ::arrow::int64(), LogicalAnnotation::None(), ParquetType::INT64, -1}, + {"float32", ::arrow::float32(), LogicalAnnotation::None(), ParquetType::FLOAT, -1}, + {"float64", ::arrow::float64(), LogicalAnnotation::None(), ParquetType::DOUBLE, -1}, + {"utf8", ::arrow::utf8(), LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1}, + {"decimal(1, 0)", ::arrow::decimal(1, 0), LogicalAnnotation::Decimal(1, 0), + ParquetType::FIXED_LEN_BYTE_ARRAY, 1}, + {"decimal(8, 2)", ::arrow::decimal(8, 2), LogicalAnnotation::Decimal(8, 2), + ParquetType::FIXED_LEN_BYTE_ARRAY, 4}, + {"decimal(16, 4)", ::arrow::decimal(16, 4), LogicalAnnotation::Decimal(16, 4), + ParquetType::FIXED_LEN_BYTE_ARRAY, 7}, + {"decimal(32, 8)", ::arrow::decimal(32, 8), LogicalAnnotation::Decimal(32, 8), + ParquetType::FIXED_LEN_BYTE_ARRAY, 14}, + {"time32", ::arrow::time32(::arrow::TimeUnit::MILLI), + LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT32, -1}, + {"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO), + LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO), + LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS), + ParquetType::INT64, -1}, + {"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI), + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT64, -1}, + {"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO), + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO), + LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT64, -1}, + {"timestamp(microsecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS), + ParquetType::INT64, -1}, + {"timestamp(microsecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "CET"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"), + LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS), + ParquetType::INT64, -1}, + {"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}}; + + std::vector> arrow_fields; + std::vector parquet_fields; + + for (const FieldConstructionArguments& c : cases) { + arrow_fields.push_back(std::make_shared(c.name, c.datatype, false)); + parquet_fields.push_back(PrimitiveNode::Make( + c.name, Repetition::REQUIRED, c.annotation, c.physical_type, c.physical_length)); + } + + ASSERT_OK(ConvertSchema(arrow_fields)); + ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); +} + +TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) { + struct FieldConstructionArguments { + std::string name; + std::shared_ptr<::arrow::DataType> datatype; + }; + + std::vector cases = { + {"float16", ::arrow::float16()}, + }; + + for (const FieldConstructionArguments& c : cases) { + auto field = std::make_shared(c.name, c.datatype); + ASSERT_RAISES(NotImplemented, ConvertSchema({field})); + } +} + TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) { std::vector parquet_fields; std::vector> arrow_fields; @@ -809,15 +1019,6 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields)); } -TEST_F(TestConvertArrowSchema, UnsupportedTypes) { - std::vector> unsupported_fields = { - ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))}; - - for (const auto& field : unsupported_fields) { - ASSERT_RAISES(NotImplemented, ConvertSchema({field})); - } -} - TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { std::vector parquet_fields; std::vector> arrow_fields; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index bdff716c193..56656038f81 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1613,7 +1613,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, TRANSFER_DATA(::arrow::TimestampType, Int64Type); } break; case ::arrow::TimeUnit::NANO: { - TRANSFER_DATA(::arrow::TimestampType, Int96Type); + if (descr_->physical_type() == ::parquet::Type::INT96) { + TRANSFER_DATA(::arrow::TimestampType, Int96Type); + } else { + TRANSFER_DATA(::arrow::TimestampType, Int64Type); + } } break; default: return Status::NotImplemented("TimeUnit not supported"); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 45b4b38fd49..22b82973fd1 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -17,6 +17,7 @@ #include "parquet/arrow/schema.h" +#include #include #include #include @@ -25,6 +26,7 @@ #include "arrow/array.h" #include "arrow/status.h" #include "arrow/type.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "parquet/arrow/writer.h" @@ -35,6 +37,7 @@ using arrow::Field; using arrow::Status; +using arrow::internal::checked_cast; using ArrowType = arrow::DataType; using ArrowTypeId = arrow::Type; @@ -46,6 +49,7 @@ using parquet::schema::NodePtr; using parquet::schema::PrimitiveNode; using ParquetType = parquet::Type; +using parquet::LogicalAnnotation; using parquet::LogicalType; namespace parquet { @@ -56,117 +60,201 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); -std::shared_ptr MakeDecimal128Type(const PrimitiveNode& node) { - const auto& metadata = node.decimal_metadata(); - return ::arrow::decimal(metadata.precision, metadata.scale); +static Status MakeArrowDecimal(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + const auto& decimal = checked_cast(annotation); + *out = ::arrow::decimal(decimal.precision(), decimal.scale()); + return Status::OK(); } -static Status FromByteArray(const PrimitiveNode& node, std::shared_ptr* out) { - switch (node.logical_type()) { - case LogicalType::UTF8: - *out = ::arrow::utf8(); +static Status MakeArrowInt(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + const auto& integer = checked_cast(annotation); + switch (integer.bit_width()) { + case 8: + *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8(); break; - case LogicalType::DECIMAL: - *out = MakeDecimal128Type(node); + case 16: + *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16(); break; - default: - // BINARY - *out = ::arrow::binary(); + case 32: + *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32(); break; + default: + return Status::TypeError(annotation.ToString(), + " can not annotate physical type Int32"); } return Status::OK(); } -static Status FromFLBA(const PrimitiveNode& node, std::shared_ptr* out) { - switch (node.logical_type()) { - case LogicalType::NONE: - *out = ::arrow::fixed_size_binary(node.type_length()); +static Status MakeArrowInt64(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + const auto& integer = checked_cast(annotation); + switch (integer.bit_width()) { + case 64: + *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64(); break; - case LogicalType::DECIMAL: - *out = MakeDecimal128Type(node); + default: + return Status::TypeError(annotation.ToString(), + " can not annotate physical type Int64"); + } + return Status::OK(); +} + +static Status MakeArrowTime32(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + const auto& time = checked_cast(annotation); + switch (time.time_unit()) { + case LogicalAnnotation::TimeUnit::MILLIS: + *out = ::arrow::time32(::arrow::TimeUnit::MILLI); break; default: - return Status::NotImplemented("Unhandled logical type ", - LogicalTypeToString(node.logical_type()), - " for fixed-length binary array"); + return Status::TypeError(annotation.ToString(), + " can not annotate physical type Time32"); } + return Status::OK(); +} +static Status MakeArrowTime64(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + const auto& time = checked_cast(annotation); + switch (time.time_unit()) { + case LogicalAnnotation::TimeUnit::MICROS: + *out = ::arrow::time64(::arrow::TimeUnit::MICRO); + break; + case LogicalAnnotation::TimeUnit::NANOS: + *out = ::arrow::time64(::arrow::TimeUnit::NANO); + break; + default: + return Status::TypeError(annotation.ToString(), + " can not annotate physical type Time64"); + } return Status::OK(); } -static Status FromInt32(const PrimitiveNode& node, std::shared_ptr* out) { - switch (node.logical_type()) { - case LogicalType::NONE: - *out = ::arrow::int32(); +static Status MakeArrowTimestamp(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + static const char* utc = "UTC"; + const auto& timestamp = checked_cast(annotation); + switch (timestamp.time_unit()) { + case LogicalAnnotation::TimeUnit::MILLIS: + *out = (timestamp.is_adjusted_to_utc() + ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc) + : ::arrow::timestamp(::arrow::TimeUnit::MILLI)); + break; + case LogicalAnnotation::TimeUnit::MICROS: + *out = (timestamp.is_adjusted_to_utc() + ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc) + : ::arrow::timestamp(::arrow::TimeUnit::MICRO)); + break; + case LogicalAnnotation::TimeUnit::NANOS: + *out = (timestamp.is_adjusted_to_utc() + ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc) + : ::arrow::timestamp(::arrow::TimeUnit::NANO)); break; - case LogicalType::UINT_8: - *out = ::arrow::uint8(); + default: + return Status::TypeError("Unrecognized time unit in timestamp annotation: ", + annotation.ToString()); + } + return Status::OK(); +} + +static Status FromByteArray(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + switch (annotation.type()) { + case LogicalAnnotation::Type::STRING: + *out = ::arrow::utf8(); break; - case LogicalType::INT_8: - *out = ::arrow::int8(); + case LogicalAnnotation::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(annotation, out)); break; - case LogicalType::UINT_16: - *out = ::arrow::uint16(); + case LogicalAnnotation::Type::NONE: + case LogicalAnnotation::Type::ENUM: + case LogicalAnnotation::Type::JSON: + case LogicalAnnotation::Type::BSON: + *out = ::arrow::binary(); break; - case LogicalType::INT_16: - *out = ::arrow::int16(); + default: + return Status::NotImplemented("Unhandled logical annotation ", + annotation.ToString(), " for binary array"); + } + return Status::OK(); +} + +static Status FromFLBA(const LogicalAnnotation& annotation, int32_t physical_length, + std::shared_ptr* out) { + switch (annotation.type()) { + case LogicalAnnotation::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(annotation, out)); break; - case LogicalType::INT_32: - *out = ::arrow::int32(); + case LogicalAnnotation::Type::NONE: + case LogicalAnnotation::Type::INTERVAL: + case LogicalAnnotation::Type::UUID: + *out = ::arrow::fixed_size_binary(physical_length); break; - case LogicalType::UINT_32: - *out = ::arrow::uint32(); + default: + return Status::NotImplemented("Unhandled logical annotation ", + annotation.ToString(), + " for fixed-length binary array"); + } + + return Status::OK(); +} + +static Status FromInt32(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + switch (annotation.type()) { + case LogicalAnnotation::Type::INT: + RETURN_NOT_OK(MakeArrowInt(annotation, out)); break; - case LogicalType::DATE: + case LogicalAnnotation::Type::DATE: *out = ::arrow::date32(); break; - case LogicalType::TIME_MILLIS: - *out = ::arrow::time32(::arrow::TimeUnit::MILLI); + case LogicalAnnotation::Type::TIME: + RETURN_NOT_OK(MakeArrowTime32(annotation, out)); + break; + case LogicalAnnotation::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(annotation, out)); break; - case LogicalType::DECIMAL: - *out = MakeDecimal128Type(node); + case LogicalAnnotation::Type::NONE: + *out = ::arrow::int32(); break; default: - return Status::NotImplemented("Unhandled logical type ", - LogicalTypeToString(node.logical_type()), + return Status::NotImplemented("Unhandled logical type ", annotation.ToString(), " for INT32"); } return Status::OK(); } -static Status FromInt64(const PrimitiveNode& node, std::shared_ptr* out) { - switch (node.logical_type()) { - case LogicalType::NONE: - *out = ::arrow::int64(); - break; - case LogicalType::INT_64: - *out = ::arrow::int64(); - break; - case LogicalType::UINT_64: - *out = ::arrow::uint64(); +static Status FromInt64(const LogicalAnnotation& annotation, + std::shared_ptr* out) { + switch (annotation.type()) { + case LogicalAnnotation::Type::INT: + RETURN_NOT_OK(MakeArrowInt64(annotation, out)); break; - case LogicalType::DECIMAL: - *out = MakeDecimal128Type(node); + case LogicalAnnotation::Type::DECIMAL: + RETURN_NOT_OK(MakeArrowDecimal(annotation, out)); break; - case LogicalType::TIMESTAMP_MILLIS: - *out = TIMESTAMP_MS; + case LogicalAnnotation::Type::TIMESTAMP: + RETURN_NOT_OK(MakeArrowTimestamp(annotation, out)); break; - case LogicalType::TIMESTAMP_MICROS: - *out = TIMESTAMP_US; + case LogicalAnnotation::Type::TIME: + RETURN_NOT_OK(MakeArrowTime64(annotation, out)); break; - case LogicalType::TIME_MICROS: - *out = ::arrow::time64(::arrow::TimeUnit::MICRO); + case LogicalAnnotation::Type::NONE: + *out = ::arrow::int64(); break; default: - return Status::NotImplemented("Unhandled logical type ", - LogicalTypeToString(node.logical_type()), + return Status::NotImplemented("Unhandled logical type ", annotation.ToString(), " for INT64"); } return Status::OK(); } Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr* out) { - if (primitive.logical_type() == LogicalType::NA) { + const std::shared_ptr& annotation = + primitive.logical_annotation(); + if (annotation->is_invalid() || annotation->is_null()) { *out = ::arrow::null(); return Status::OK(); } @@ -176,10 +264,10 @@ Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr* *out = ::arrow::boolean(); break; case ParquetType::INT32: - RETURN_NOT_OK(FromInt32(primitive, out)); + RETURN_NOT_OK(FromInt32(*annotation, out)); break; case ParquetType::INT64: - RETURN_NOT_OK(FromInt64(primitive, out)); + RETURN_NOT_OK(FromInt64(*annotation, out)); break; case ParquetType::INT96: *out = TIMESTAMP_NS; @@ -191,10 +279,10 @@ Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr* *out = ::arrow::float64(); break; case ParquetType::BYTE_ARRAY: - RETURN_NOT_OK(FromByteArray(primitive, out)); + RETURN_NOT_OK(FromByteArray(*annotation, out)); break; case ParquetType::FIXED_LEN_BYTE_ARRAY: - RETURN_NOT_OK(FromFLBA(primitive, out)); + RETURN_NOT_OK(FromFLBA(*annotation, primitive.type_length(), out)); break; default: { // PARQUET-1565: This can occur if the file is corrupt @@ -321,7 +409,7 @@ Status NodeToFieldInternal(const Node& node, } } else if (node.is_group()) { const auto& group = static_cast(node); - if (node.logical_type() == LogicalType::LIST) { + if (node.logical_annotation()->is_list()) { RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type)); } else { RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type)); @@ -411,7 +499,7 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::str RETURN_NOT_OK(FieldToNode(type->value_field(), properties, arrow_properties, &element)); NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element}); - *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST); + *out = GroupNode::Make(name, repetition, {list}, LogicalAnnotation::List()); return Status::OK(); } @@ -431,71 +519,96 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type, return Status::OK(); } -static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) { +static std::shared_ptr TimestampAnnotationFromArrowTimestamp( + const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) { + const bool utc = !(timestamp_type.timezone().empty()); switch (time_unit) { case ::arrow::TimeUnit::MILLI: - return LogicalType::TIMESTAMP_MILLIS; + return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::MILLIS); case ::arrow::TimeUnit::MICRO: - return LogicalType::TIMESTAMP_MICROS; - case ::arrow::TimeUnit::SECOND: + return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::MICROS); case ::arrow::TimeUnit::NANO: + return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::NANOS); + case ::arrow::TimeUnit::SECOND: // No equivalent parquet logical type. break; } - - return LogicalType::NONE; + return LogicalAnnotation::None(); } static Status GetTimestampMetadata(const ::arrow::TimestampType& type, - const ArrowWriterProperties& properties, + const WriterProperties& properties, + const ArrowWriterProperties& arrow_properties, ParquetType::type* physical_type, - LogicalType::type* logical_type) { - const bool coerce = properties.coerce_timestamps_enabled(); - const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit(); + std::shared_ptr* annotation) { + const bool coerce = arrow_properties.coerce_timestamps_enabled(); + const auto target_unit = + coerce ? arrow_properties.coerce_timestamps_unit() : type.unit(); // The user is explicitly asking for Impala int96 encoding, there is no // logical type. - if (properties.support_deprecated_int96_timestamps()) { + if (arrow_properties.support_deprecated_int96_timestamps()) { *physical_type = ParquetType::INT96; return Status::OK(); } *physical_type = ParquetType::INT64; - *logical_type = LogicalTypeFromArrowTimeUnit(unit); + *annotation = TimestampAnnotationFromArrowTimestamp(type, target_unit); - // The user is requesting that all timestamp columns are casted to a specific - // type. Only 2 TimeUnit are supported by arrow-parquet. + // The user is explicitly asking for timestamp data to be converted to the + // specified units (target_unit). if (coerce) { - switch (unit) { - case ::arrow::TimeUnit::MILLI: - case ::arrow::TimeUnit::MICRO: - break; - case ::arrow::TimeUnit::NANO: - case ::arrow::TimeUnit::SECOND: - return Status::NotImplemented( - "Can only coerce Arrow timestamps to milliseconds" - " or microseconds"); + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { + switch (target_unit) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + break; + case ::arrow::TimeUnit::NANO: + case ::arrow::TimeUnit::SECOND: + return Status::NotImplemented( + "For Parquet version 1.0 files, can only coerce Arrow timestamps to " + "milliseconds or microseconds"); + } + } else { + switch (target_unit) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + case ::arrow::TimeUnit::NANO: + break; + case ::arrow::TimeUnit::SECOND: + return Status::NotImplemented( + "For Parquet files, can only coerce Arrow timestamps to milliseconds, " + "microseconds, or nanoseconds"); + } } + return Status::OK(); + } + // The user implicitly wants timestamp data to retain its original time units, + // however the ConvertedType field used to indicate logical types for Parquet + // version 1.0 fields does not allow for nanosecond time units and so nanoseconds + // must be coerced to microseconds. + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0 && + type.unit() == ::arrow::TimeUnit::NANO) { + *annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO); return Status::OK(); } - // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to - // int64 microseconds when deprecated int96 is not requested. - if (type.unit() == ::arrow::TimeUnit::NANO) - *logical_type = LogicalType::TIMESTAMP_MICROS; - else if (type.unit() == ::arrow::TimeUnit::SECOND) - return Status::NotImplemented( - "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with " - "Parquet."); + // The user implicitly wants timestamp data to retain its original time units, + // however the Arrow seconds time unit can not be represented (annotated) in + // any version of Parquet and so must be coerced to milliseconds. + if (type.unit() == ::arrow::TimeUnit::SECOND) { + *annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI); + return Status::OK(); + } return Status::OK(); -} // namespace arrow +} Status FieldToNode(const std::shared_ptr& field, const WriterProperties& properties, const ArrowWriterProperties& arrow_properties, NodePtr* out) { - LogicalType::type logical_type = LogicalType::NONE; + std::shared_ptr annotation = LogicalAnnotation::None(); ParquetType::type type; Repetition::type repetition = field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED; @@ -507,33 +620,33 @@ Status FieldToNode(const std::shared_ptr& field, switch (field->type()->id()) { case ArrowTypeId::NA: type = ParquetType::INT32; - logical_type = LogicalType::NA; + annotation = LogicalAnnotation::Null(); break; case ArrowTypeId::BOOL: type = ParquetType::BOOLEAN; break; case ArrowTypeId::UINT8: type = ParquetType::INT32; - logical_type = LogicalType::UINT_8; + annotation = LogicalAnnotation::Int(8, false); break; case ArrowTypeId::INT8: type = ParquetType::INT32; - logical_type = LogicalType::INT_8; + annotation = LogicalAnnotation::Int(8, true); break; case ArrowTypeId::UINT16: type = ParquetType::INT32; - logical_type = LogicalType::UINT_16; + annotation = LogicalAnnotation::Int(16, false); break; case ArrowTypeId::INT16: type = ParquetType::INT32; - logical_type = LogicalType::INT_16; + annotation = LogicalAnnotation::Int(16, true); break; case ArrowTypeId::UINT32: if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { type = ParquetType::INT64; } else { type = ParquetType::INT32; - logical_type = LogicalType::UINT_32; + annotation = LogicalAnnotation::Int(32, false); } break; case ArrowTypeId::INT32: @@ -541,7 +654,7 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowTypeId::UINT64: type = ParquetType::INT64; - logical_type = LogicalType::UINT_64; + annotation = LogicalAnnotation::Int(64, false); break; case ArrowTypeId::INT64: type = ParquetType::INT64; @@ -554,7 +667,7 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowTypeId::STRING: type = ParquetType::BYTE_ARRAY; - logical_type = LogicalType::UTF8; + annotation = LogicalAnnotation::String(); break; case ArrowTypeId::BINARY: type = ParquetType::BYTE_ARRAY; @@ -567,37 +680,38 @@ Status FieldToNode(const std::shared_ptr& field, } break; case ArrowTypeId::DECIMAL: { type = ParquetType::FIXED_LEN_BYTE_ARRAY; - logical_type = LogicalType::DECIMAL; const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); length = DecimalSize(precision); + PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Decimal(precision, scale)); } break; case ArrowTypeId::DATE32: type = ParquetType::INT32; - logical_type = LogicalType::DATE; + annotation = LogicalAnnotation::Date(); break; case ArrowTypeId::DATE64: type = ParquetType::INT32; - logical_type = LogicalType::DATE; + annotation = LogicalAnnotation::Date(); break; case ArrowTypeId::TIMESTAMP: RETURN_NOT_OK( GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()), - arrow_properties, &type, &logical_type)); + properties, arrow_properties, &type, &annotation)); break; case ArrowTypeId::TIME32: type = ParquetType::INT32; - logical_type = LogicalType::TIME_MILLIS; + annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS); break; case ArrowTypeId::TIME64: { + type = ParquetType::INT64; auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); if (time_type->unit() == ::arrow::TimeUnit::NANO) { - return Status::NotImplemented("Nanosecond time not supported in Parquet."); + annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS); + } else { + annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS); } - type = ParquetType::INT64; - logical_type = LogicalType::TIME_MICROS; } break; case ArrowTypeId::STRUCT: { auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); @@ -625,9 +739,10 @@ Status FieldToNode(const std::shared_ptr& field, field->type()->ToString()); } } - PARQUET_CATCH_NOT_OK(*out = - PrimitiveNode::Make(field->name(), repetition, type, - logical_type, length, precision, scale)); + + PARQUET_CATCH_NOT_OK(*out = PrimitiveNode::Make(field->name(), repetition, annotation, + type, length)); + return Status::OK(); } @@ -725,7 +840,7 @@ int32_t DecimalSize(int32_t precision) { } DCHECK(false); return -1; -} // namespace arrow +} } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 96db68b3ec0..91811203f92 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -383,9 +383,9 @@ class ArrowColumnWriter { Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); - Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data, - int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels); + Status WriteTimestampsCoerce(const Array& data, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + const ArrowWriterProperties& properties); template Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values, @@ -650,50 +650,92 @@ Status ArrowColumnWriter::WriteNonNullableBatch(*values.type()); - - const bool is_nanosecond = type.unit() == TimeUnit::NANO; + const auto& source_type = static_cast(*values.type()); if (ctx_->properties->support_deprecated_int96_timestamps()) { - // The user explicitly required to use Int96 storage. + // User explicitly requested Int96 timestamps return TypedWriteBatch(values, num_levels, def_levels, rep_levels); - } else if (is_nanosecond || - (ctx_->properties->coerce_timestamps_enabled() && - (type.unit() != ctx_->properties->coerce_timestamps_unit()))) { - // Casting is required. This covers several cases - // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved) - // * coerce_timestamps_enabled_, cast all timestamps to requested unit - return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values, - num_levels, def_levels, rep_levels); + } else if (ctx_->properties->coerce_timestamps_enabled()) { + // User explicitly requested coercion to specific unit + if (source_type.unit() == ctx_->properties->coerce_timestamps_unit()) { + // No data conversion necessary + return TypedWriteBatch(values, num_levels, + def_levels, rep_levels); + } else { + return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, + *(ctx_->properties)); + } + } else if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0 && + source_type.unit() == TimeUnit::NANO) { + // Absent superseding user instructions, when writing Parquet version 1.0 files, + // timestamps in nanoseconds are coerced to microseconds + std::shared_ptr properties = + (ArrowWriterProperties::Builder()) + .coerce_timestamps(TimeUnit::MICRO) + ->disallow_truncated_timestamps() + ->build(); + return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties); + } else if (source_type.unit() == TimeUnit::SECOND) { + // Absent superseding user instructions, timestamps in seconds are coerced to + // milliseconds + std::shared_ptr properties = + (ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build(); + return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties); } else { - // No casting of timestamps is required, take the fast path + // No data conversion necessary return TypedWriteBatch(values, num_levels, def_levels, rep_levels); } } -Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed, - const Array& array, int64_t num_levels, +#define COERCE_DIVIDE -1 +#define COERCE_INVALID 0 +#define COERCE_MULTIPLY +1 + +static std::pair kTimestampCoercionFactors[4][4] = { + // from seconds ... + {{COERCE_INVALID, 0}, // ... to seconds + {COERCE_MULTIPLY, 1000}, // ... to millis + {COERCE_MULTIPLY, 1000000}, // ... to micros + {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos + // from millis ... + {{COERCE_INVALID, 0}, + {COERCE_MULTIPLY, 1}, + {COERCE_MULTIPLY, 1000}, + {COERCE_MULTIPLY, 1000000}}, + // from micros ... + {{COERCE_INVALID, 0}, + {COERCE_DIVIDE, 1000}, + {COERCE_MULTIPLY, 1}, + {COERCE_MULTIPLY, 1000}}, + // from nanos ... + {{COERCE_INVALID, 0}, + {COERCE_DIVIDE, 1000000}, + {COERCE_DIVIDE, 1000}, + {COERCE_MULTIPLY, 1}}}; + +Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels) { + const int16_t* rep_levels, + const ArrowWriterProperties& properties) { int64_t* buffer; RETURN_NOT_OK(ctx_->GetScratchData(num_levels, &buffer)); const auto& data = static_cast(array); - auto values = data.raw_values(); - const auto& type = static_cast(*array.type()); - TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled() - ? ctx_->properties->coerce_timestamps_unit() - : TimeUnit::MICRO; + const auto& source_type = static_cast(*array.type()); + auto source_unit = source_type.unit(); + + TimeUnit::type target_unit = properties.coerce_timestamps_unit(); auto target_type = ::arrow::timestamp(target_unit); + bool truncation_allowed = properties.truncated_timestamps_allowed(); auto DivideBy = [&](const int64_t factor) { for (int64_t i = 0; i < array.length(); i++) { - if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) { - return Status::Invalid("Casting from ", type.ToString(), " to ", + if (!truncation_allowed && !data.IsNull(i) && (values[i] % factor != 0)) { + return Status::Invalid("Casting from ", source_type.ToString(), " to ", target_type->ToString(), " would lose data: ", values[i]); } buffer[i] = values[i] / factor; @@ -708,22 +750,12 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_ return Status::OK(); }; - if (type.unit() == TimeUnit::NANO) { - if (target_unit == TimeUnit::MICRO) { - RETURN_NOT_OK(DivideBy(1000)); - } else { - DCHECK_EQ(TimeUnit::MILLI, target_unit); - RETURN_NOT_OK(DivideBy(1000000)); - } - } else if (type.unit() == TimeUnit::SECOND) { - RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000)); - } else if (type.unit() == TimeUnit::MILLI) { - DCHECK_EQ(TimeUnit::MICRO, target_unit); - RETURN_NOT_OK(MultiplyBy(1000)); - } else { - DCHECK_EQ(TimeUnit::MILLI, target_unit); - RETURN_NOT_OK(DivideBy(1000)); - } + const auto& coercion = kTimestampCoercionFactors[static_cast(source_unit)] + [static_cast(target_unit)]; + // .first -> coercion operation; .second -> scale factor + DCHECK_NE(coercion.first, COERCE_INVALID); + RETURN_NOT_OK(coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second) + : MultiplyBy(coercion.second)); if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) { // no nulls, just dump the data @@ -736,9 +768,14 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_ static_cast(*target_type), array.length(), num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer))); } + return Status::OK(); } +#undef COERCE_DIVIDE +#undef COERCE_INVALID +#undef COERCE_MULTIPLY + // This specialization seems quite similar but it significantly differs in two points: // * offset is added at the most latest time to the pointer as we have sub-byte access // * Arrow data is stored bitwise thus we cannot use std::copy to transform from diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index db48b246880..ee81af323a3 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -498,11 +498,13 @@ std::shared_ptr LogicalAnnotation::Date() { std::shared_ptr LogicalAnnotation::Time( bool is_adjusted_to_utc, LogicalAnnotation::TimeUnit::unit time_unit) { + DCHECK(time_unit != LogicalAnnotation::TimeUnit::UNKNOWN); return TimeAnnotation::Make(is_adjusted_to_utc, time_unit); } std::shared_ptr LogicalAnnotation::Timestamp( bool is_adjusted_to_utc, LogicalAnnotation::TimeUnit::unit time_unit) { + DCHECK(time_unit != LogicalAnnotation::TimeUnit::UNKNOWN); return TimestampAnnotation::Make(is_adjusted_to_utc, time_unit); } @@ -512,6 +514,7 @@ std::shared_ptr LogicalAnnotation::Interval() { std::shared_ptr LogicalAnnotation::Int(int bit_width, bool is_signed) { + DCHECK(bit_width == 64 || bit_width == 32 || bit_width == 16 || bit_width == 8); return IntAnnotation::Make(bit_width, is_signed); } diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 4598bb9ab9e..c7423c935f4 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -179,9 +179,6 @@ def test_chunked_table_write(): # ARROW-232 df = alltypes_sample(size=10) - # The nanosecond->ms conversion is a nuisance, so we just avoid it here - del df['datetime'] - batch = pa.RecordBatch.from_pandas(df) table = pa.Table.from_batches([batch] * 3) _check_roundtrip(table, version='2.0') @@ -195,8 +192,6 @@ def test_chunked_table_write(): @pytest.mark.pandas def test_no_memory_map(tempdir): df = alltypes_sample(size=10) - # The nanosecond->us conversion is a nuisance, so we just avoid it here - del df['datetime'] table = pa.Table.from_pandas(df) _check_roundtrip(table, read_table_kwargs={'memory_map': False}, @@ -223,8 +218,6 @@ def test_special_chars_filename(tempdir): @pytest.mark.pandas def test_empty_table_roundtrip(): df = alltypes_sample(size=10) - # The nanosecond->us conversion is a nuisance, so we just avoid it here - del df['datetime'] # Create a non-empty table to infer the types correctly, then slice to 0 table = pa.Table.from_pandas(df) @@ -896,7 +889,7 @@ def test_column_of_lists(tempdir): @pytest.mark.pandas -def test_date_time_types(): +def test_date_time_types(tempdir): t1 = pa.date32() data1 = np.array([17259, 17260, 17261], dtype='int32') a1 = pa.array(data1, type=t1) @@ -929,12 +922,6 @@ def test_date_time_types(): dtype='int64') a7 = pa.array(data7, type=t7) - t7_us = pa.timestamp('us') - start = pd.Timestamp('2001-01-01').value - data7_us = np.array([start, start + 1000, start + 2000], - dtype='int64') // 1000 - a7_us = pa.array(data7_us, type=t7_us) - table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', @@ -943,8 +930,7 @@ def test_date_time_types(): # date64 as date32 # time32[s] to time32[ms] - # 'timestamp[ns]' to 'timestamp[us]' - expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7_us], + expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], ['date32', 'date64', 'timestamp[us]', 'time32[s]', 'time64[us]', 'time32_from64[s]', @@ -952,35 +938,62 @@ def test_date_time_types(): _check_roundtrip(table, expected=expected, version='2.0') - # date64 as date32 - # time32[s] to time32[ms] - # 'timestamp[ms]' is saved as INT96 timestamp - # 'timestamp[ns]' is saved as INT96 timestamp - expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7], - ['date32', 'date64', 'timestamp[us]', - 'time32[s]', 'time64[us]', - 'time32_from64[s]', - 'timestamp[ns]']) - - _check_roundtrip(table, expected=expected, version='2.0', - use_deprecated_int96_timestamps=True) - - # Check that setting flavor to 'spark' uses int96 timestamps - _check_roundtrip(table, expected=expected, version='2.0', - flavor='spark') - - # Unsupported stuff - def _assert_unsupported(array): - table = pa.Table.from_arrays([array], ['unsupported']) - buf = io.BytesIO() + t0 = pa.timestamp('ms') + data0 = np.arange(4, dtype='int64') + a0 = pa.array(data0, type=t0) - with pytest.raises(NotImplementedError): - _write_table(table, buf, version="2.0") + t1 = pa.timestamp('us') + data1 = np.arange(4, dtype='int64') + a1 = pa.array(data1, type=t1) - t7 = pa.time64('ns') - a7 = pa.array(data4.astype('int64'), type=t7) + t2 = pa.timestamp('ns') + data2 = np.arange(4, dtype='int64') + a2 = pa.array(data2, type=t2) - _assert_unsupported(a7) + table = pa.Table.from_arrays([a0, a1, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + expected = pa.Table.from_arrays([a0, a1, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + + # int64 for all timestamps supported by default + filename = tempdir / 'int64_timestamps.parquet' + _write_table(table, filename, version='2.0') + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT64' + read_table = _read_table(filename) + assert read_table.equals(expected) + + t0_ns = pa.timestamp('ns') + data0_ns = np.array(data0 * 1000000, dtype='int64') + a0_ns = pa.array(data0_ns, type=t0_ns) + + t1_ns = pa.timestamp('ns') + data1_ns = np.array(data1 * 1000, dtype='int64') + a1_ns = pa.array(data1_ns, type=t1_ns) + + expected = pa.Table.from_arrays([a0_ns, a1_ns, a2], + ['ts[ms]', 'ts[us]', 'ts[ns]']) + + # int96 nanosecond timestamps produced upon request + filename = tempdir / 'explicit_int96_timestamps.parquet' + _write_table(table, filename, version='2.0', + use_deprecated_int96_timestamps=True) + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT96' + read_table = _read_table(filename) + assert read_table.equals(expected) + + # int96 nanosecond timestamps implied by flavor 'spark' + filename = tempdir / 'spark_int96_timestamps.parquet' + _write_table(table, filename, version='2.0', + flavor='spark') + parquet_schema = pq.ParquetFile(filename).schema + for i in range(3): + assert parquet_schema.column(i).physical_type == 'INT96' + read_table = _read_table(filename) + assert read_table.equals(expected) @pytest.mark.pandas @@ -992,6 +1005,64 @@ def test_list_of_datetime_time_roundtrip(): _roundtrip_pandas_dataframe(df, write_kwargs={}) +@pytest.mark.pandas +def test_parquet_version_timestamp_differences(): + i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000 + + d_s = np.arange(i_s, i_s + 10, 1, dtype='int64') + d_ms = d_s * 1000 + d_us = d_ms * 1000 + d_ns = d_us * 1000 + + a_s = pa.array(d_s, type=pa.timestamp('s')) + a_ms = pa.array(d_ms, type=pa.timestamp('ms')) + a_us = pa.array(d_us, type=pa.timestamp('us')) + a_ns = pa.array(d_ns, type=pa.timestamp('ns')) + + names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns'] + table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names) + + # Using Parquet version 1.0, seconds should be coerced to milliseconds + # and nanoseconds should be coerced to microseconds by default + expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names) + _check_roundtrip(table, expected) + + # Using Parquet version 2.0, seconds should be coerced to milliseconds + # and nanoseconds should be retained by default + expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names) + _check_roundtrip(table, expected, version='2.0') + + # Using Parquet version 1.0, coercing to milliseconds or microseconds + # is allowed + expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names) + _check_roundtrip(table, expected, coerce_timestamps='ms') + + # Using Parquet version 2.0, coercing to milliseconds or microseconds + # is allowed + expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names) + _check_roundtrip(table, expected, version='2.0', coerce_timestamps='us') + + # TODO: after pyarrow allows coerce_timestamps='ns', tests like the + # following should pass ... + + # Using Parquet version 1.0, coercing to nanoseconds is not allowed + # expected = None + # with pytest.raises(NotImplementedError): + # _roundtrip_table(table, coerce_timestamps='ns') + + # Using Parquet version 2.0, coercing to nanoseconds is allowed + # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names) + # _check_roundtrip(table, expected, version='2.0', coerce_timestamps='ns') + + # For either Parquet version, coercing to nanoseconds is allowed + # if Int96 storage is used + expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names) + _check_roundtrip(table, expected, + use_deprecated_int96_timestamps=True) + _check_roundtrip(table, expected, version='2.0', + use_deprecated_int96_timestamps=True) + + def test_large_list_records(): # This was fixed in PARQUET-1100 @@ -2035,6 +2106,33 @@ def test_write_error_deletes_incomplete_file(tempdir): assert not filename.exists() +@pytest.mark.pandas +def test_noncoerced_nanoseconds_written_without_exception(tempdir): + # ARROW-1957: the Parquet version 2.0 writer preserves Arrow + # nanosecond timestamps by default + n = 9 + df = pd.DataFrame({'x': range(n)}, + index=pd.DatetimeIndex(start='2017-01-01', + freq='1n', + periods=n)) + tb = pa.Table.from_pandas(df) + + filename = tempdir / 'written.parquet' + try: + pq.write_table(tb, filename, version='2.0') + except Exception: + pass + assert filename.exists() + + recovered_table = pq.read_table(filename) + assert tb.equals(recovered_table) + + # Loss of data thru coercion (without explicit override) still an error + filename = tempdir / 'not_written.parquet' + with pytest.raises(ValueError): + pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0') + + def test_read_non_existent_file(tempdir): path = 'non-existent-file.parquet' try: