From d091c21c50c7375eaf9d82c9998dc319ef1501e3 Mon Sep 17 00:00:00 2001 From: Pradeep Gollakota Date: Wed, 10 Aug 2022 13:05:51 -0400 Subject: [PATCH 01/26] [ARROW-17255] Add JSON canonical extension type Arrow now provides a canonical extension type for JSON data. This extension is backed by utf8(). Parquet will recognize this extension and appropriately propagate the LogicalType to the storage format. --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/extension/json.cc | 60 +++++++++++++ cpp/src/arrow/extension/json.h | 53 +++++++++++ cpp/src/arrow/extension_type_test.cc | 11 +++ cpp/src/arrow/testing/extension_type.h | 3 + cpp/src/arrow/testing/gtest_util.cc | 15 ++++ .../parquet/arrow/arrow_reader_writer_test.cc | 52 ++++++++++- cpp/src/parquet/arrow/arrow_schema_test.cc | 90 ++++++++++++++++++- cpp/src/parquet/arrow/schema.cc | 54 +++++++++-- cpp/src/parquet/arrow/schema_internal.cc | 22 +++-- cpp/src/parquet/arrow/schema_internal.h | 8 +- cpp/src/parquet/properties.h | 17 +++- 12 files changed, 359 insertions(+), 28 deletions(-) create mode 100644 cpp/src/arrow/extension/json.cc create mode 100644 cpp/src/arrow/extension/json.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 01ac813f471..1b41d89483b 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -375,6 +375,8 @@ set(ARROW_SRCS device.cc device_allocation_type_set.cc extension_type.cc + extension/json.cc + memory_pool.cc extension/bool8.cc extension/uuid.cc pretty_print.cc diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc new file mode 100644 index 00000000000..a9b00af2703 --- /dev/null +++ b/cpp/src/arrow/extension/json.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension/json.h" + +#include +#include +#include + +#include "arrow/extension_type.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace extension { + +std::once_flag init_flag; + +std::shared_ptr json() { + std::call_once(init_flag, []() { + DCHECK_OK(RegisterExtensionType(std::make_shared())); + }); + return GetExtensionType(JsonExtensionType::type_name()); +} + +bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { + const auto& other_ext = static_cast(other); + return other_ext.extension_name() == this->extension_name(); +} + +Result> JsonExtensionType::Deserialize( + std::shared_ptr storage_type, const std::string& serialized_data) const { + if (serialized_data != JsonExtensionType::type_name()) { + return Status::Invalid("Type identifier did not match"); + } + return json(); +} + +std::string JsonExtensionType::Serialize() const { + return JsonExtensionType::type_name(); +} + +} // namespace extension +} // namespace arrow diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h new file mode 100644 index 00000000000..05e56b90b77 --- /dev/null +++ b/cpp/src/arrow/extension/json.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/extension_type.h" +#include "arrow/result.h" +#include "arrow/type_fwd.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace extension { + +/// \brief Concrete type class for variable-size JSON data, utf8-encoded. +class ARROW_EXPORT JsonExtensionType : public ExtensionType { + public: + JsonExtensionType() : ExtensionType(::arrow::utf8()) {} + + static constexpr const char* type_name() { return "arrow.extension.json"; } + std::string extension_name() const override { return type_name(); } + + bool ExtensionEquals(const ExtensionType& other) const override; + Result> Deserialize( + std::shared_ptr storage_type, + const std::string& serialized_data) const override; + std::string Serialize() const override; + std::shared_ptr MakeArray(std::shared_ptr data) const override { + return std::make_shared(data); + } +}; + +/// \brief Return a JsonExtensionType instance. +std::shared_ptr json(); + +} // namespace extension +} // namespace arrow diff --git a/cpp/src/arrow/extension_type_test.cc b/cpp/src/arrow/extension_type_test.cc index f49ffc5cba5..0a95431d3d3 100644 --- a/cpp/src/arrow/extension_type_test.cc +++ b/cpp/src/arrow/extension_type_test.cc @@ -26,6 +26,7 @@ #include "arrow/array/array_nested.h" #include "arrow/array/util.h" +#include "arrow/extension/json.h" #include "arrow/extension_type.h" #include "arrow/io/memory.h" #include "arrow/ipc/options.h" @@ -230,6 +231,16 @@ TEST_F(TestExtensionType, IpcRoundtrip) { CompareBatch(*batch, *read_batch, false /* compare_metadata */); } +TEST_F(TestExtensionType, JsonRoundtrip) { + auto ext_arr = ExampleJson(); + auto batch = + RecordBatch::Make(schema({field("f0", arrow::extension::json())}), 8, {ext_arr}); + + std::shared_ptr read_batch; + RoundtripBatch(batch, &read_batch); + CompareBatch(*batch, *read_batch, false /* compare_metadata */); +} + TEST_F(TestExtensionType, UnrecognizedExtension) { auto ext_arr = ExampleUuid(); auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr}); diff --git a/cpp/src/arrow/testing/extension_type.h b/cpp/src/arrow/testing/extension_type.h index a4526e31c2b..edd155a29e3 100644 --- a/cpp/src/arrow/testing/extension_type.h +++ b/cpp/src/arrow/testing/extension_type.h @@ -212,6 +212,9 @@ ARROW_TESTING_EXPORT std::shared_ptr MakeComplex128(const std::shared_ptr& real, const std::shared_ptr& imag); +ARROW_TESTING_EXPORT +std::shared_ptr ExampleJson(); + // A RAII class that registers an extension type on construction // and unregisters it on destruction. class ARROW_TESTING_EXPORT ExtensionTypeGuard { diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index c4a7f363c71..0a9c952e4bc 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -50,6 +50,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/datum.h" #include "arrow/io/memory.h" +#include "arrow/extension/json.h" #include "arrow/ipc/json_simple.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" @@ -1040,6 +1041,20 @@ std::shared_ptr ExampleComplex128() { return ExtensionType::WrapArray(complex128(), arr); } +std::shared_ptr ExampleJson() { + std::shared_ptr arr = ArrayFromJSON(utf8(), R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"); + return ExtensionType::WrapArray(arrow::extension::json(), arr); +} + ExtensionTypeGuard::ExtensionTypeGuard(const std::shared_ptr& type) : ExtensionTypeGuard(DataTypeVector{type}) {} diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 724e6c44f2e..692f621d882 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -37,6 +37,7 @@ #include "arrow/array/builder_primitive.h" #include "arrow/chunked_array.h" #include "arrow/compute/api.h" +#include "arrow/extension/json.h" #include "arrow/io/api.h" #include "arrow/record_batch.h" #include "arrow/scalar.h" @@ -618,10 +619,15 @@ class ParquetIOTestBase : public ::testing::Test { return ParquetFileWriter::Open(sink_, schema); } - void ReaderFromSink(std::unique_ptr* out) { + void ReaderFromSink( + std::unique_ptr* out, + const ArrowReaderProperties& properties = default_arrow_reader_properties()) { ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); - ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), - ::arrow::default_memory_pool(), out)); + FileReaderBuilder builder; + ASSERT_OK_NO_THROW(builder.Open(std::make_shared(buffer))); + ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool()) + ->properties(properties) + ->Build(out)); } void ReadSingleColumnFile(std::unique_ptr file_reader, @@ -670,6 +676,7 @@ class ParquetIOTestBase : public ::testing::Test { void RoundTripSingleColumn( const std::shared_ptr& values, const std::shared_ptr& expected, const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_properties, + const ArrowReaderProperties& reader_properties = default_arrow_reader_properties(), bool nullable = true) { std::shared_ptr table = MakeSimpleTable(values, nullable); this->ResetSink(); @@ -679,7 +686,7 @@ class ParquetIOTestBase : public ::testing::Test { std::shared_ptr
out; std::unique_ptr reader; - ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); + ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader, reader_properties)); const bool expect_metadata = arrow_properties->store_schema(); ASSERT_NO_FATAL_FAILURE( this->ReadTableFromFile(std::move(reader), expect_metadata, &out)); @@ -1428,6 +1435,43 @@ TEST_F(TestLargeStringParquetIO, Basics) { this->RoundTripSingleColumn(large_array, large_array, arrow_properties); } +using TestJsonParquetIO = TestParquetIO<::arrow::extension::JsonExtensionType>; + +TEST_F(TestJsonParquetIO, JsonExtension) { + const char* json = R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"; + + const auto json_type = ::arrow::extension::json(); + const auto json_string_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); + const auto json_array = ::arrow::ExtensionType::WrapArray(json_type, json_string_array); + + // When the original Arrow schema isn't stored and Arrow extensions are disabled, + // LogicalType::JSON is read as Binary. + const auto binary_array = ::arrow::ArrayFromJSON(::arrow::binary(), json); + this->RoundTripSingleColumn(json_array, binary_array, + default_arrow_writer_properties()); + + // When the original Arrow schema isn't stored and Arrow extensions are enabled, + // LogicalType::JSON is read as JsonExtensionType. + ::parquet::ArrowReaderProperties reader_properties; + reader_properties.enable_known_arrow_extensions(); + this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), + reader_properties); + + // When the original Arrow schema is stored, the stored Arrow type is always respected. + const auto arrow_properties = + ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); + this->RoundTripSingleColumn(json_array, json_array, arrow_properties); +} + using TestNullParquetIO = TestParquetIO<::arrow::NullType>; TEST_F(TestNullParquetIO, NullColumn) { diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 9f60cd31d35..d7110490c95 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -31,8 +31,11 @@ #include "parquet/thrift_internal.h" #include "arrow/array.h" +#include "arrow/extension/json.h" +#include "arrow/ipc/writer.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" +#include "arrow/util/base64.h" #include "arrow/util/key_value_metadata.h" using arrow::Field; @@ -76,17 +79,17 @@ class TestConvertParquetSchema : public ::testing::Test { auto result_field = result_schema_->field(i); auto expected_field = expected_schema->field(i); EXPECT_TRUE(result_field->Equals(expected_field, check_metadata)) - << "Field " << i << "\n result: " << result_field->ToString() - << "\n expected: " << expected_field->ToString(); + << "Field " << i << "\n result: " << result_field->ToString(check_metadata) + << "\n expected: " << expected_field->ToString(check_metadata); } } ::arrow::Status ConvertSchema( const std::vector& nodes, - const std::shared_ptr& key_value_metadata = nullptr) { + const std::shared_ptr& key_value_metadata = nullptr, + ArrowReaderProperties props = ArrowReaderProperties()) { NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); descr_.Init(schema); - ArrowReaderProperties props; return FromParquetSchema(&descr_, props, key_value_metadata, &result_schema_); } @@ -724,6 +727,85 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { + std::vector parquet_fields; + parquet_fields.push_back(PrimitiveNode::Make( + "json_1", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::JSON)); + parquet_fields.push_back(PrimitiveNode::Make( + "json_2", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, ConvertedType::JSON)); + + { + // Parquet file does not contain Arrow schema. + // By default both fields should be treated as binary() fields in Arrow. + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", BINARY, true), ::arrow::field("json_2", BINARY, true)}); + std::shared_ptr metadata = ::arrow::key_value_metadata({}, {}); + ASSERT_OK(ConvertSchema(parquet_fields, metadata)); + CheckFlatSchema(arrow_schema); + } + + { + // Parquet file does not contain Arrow schema. + // If Arrow extensions are enabled, both fields should be treated as json() extension + // fields. + ArrowReaderProperties props; + props.enable_known_arrow_extensions(); + auto arrow_schema = + ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true), + ::arrow::field("json_2", ::arrow::extension::json(), true)}); + std::shared_ptr metadata = ::arrow::key_value_metadata({}, {}); + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); + CheckFlatSchema(arrow_schema); + } + + { + // Parquet file contains Arrow schema. + // Arrow schema has precedence. json_1 should be returned as a json() field even + // though extensions are not enabled. + std::shared_ptr field_metadata = ::arrow::key_value_metadata( + {"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = + ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", BINARY, true)}); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr serialized, + ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); + std::string schema_as_string = serialized->ToString(); + std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); + std::shared_ptr metadata = + ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); + + ASSERT_OK(ConvertSchema(parquet_fields, metadata)); + CheckFlatSchema(arrow_schema, true /* check_metadata */); + } + + { + // Parquet file contains Arrow schema. + // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 + // is a JSON column and json_2 is a binary column. json_2 should be treated as a + // binary column even if known_arrow_extensions_enabled is enabled. + ArrowReaderProperties props; + props.enable_known_arrow_extensions(); + std::shared_ptr field_metadata = ::arrow::key_value_metadata( + {"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = + ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", BINARY, true)}); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr serialized, + ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); + std::string schema_as_string = serialized->ToString(); + std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); + std::shared_ptr metadata = + ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); + + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); + CheckFlatSchema(arrow_schema, true /* check_metadata */); + } +} + class TestConvertArrowSchema : public ::testing::Test { public: virtual void SetUp() {} diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index ec3890a41f4..ba9016a4c57 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -21,6 +21,7 @@ #include #include +#include "arrow/extension/json.h" #include "arrow/extension_type.h" #include "arrow/io/memory.h" #include "arrow/ipc/api.h" @@ -427,6 +428,12 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } case ArrowTypeId::EXTENSION: { auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); + // Built-in JSON extension is handled differently. + if (ext_type->extension_name() == ::arrow::extension::json()->extension_name()) { + type = ParquetType::BYTE_ARRAY; + logical_type = LogicalType::JSON(); + break; + } std::shared_ptr<::arrow::Field> storage_field = ::arrow::field( name, ext_type->storage_type(), field->nullable(), field->metadata()); return FieldToNode(name, storage_field, properties, arrow_properties, out); @@ -438,7 +445,7 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } default: { - // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR + // TODO: DENSE_UNION, SPARE_UNION, DECIMAL_TEXT, VARCHAR return Status::NotImplemented( "Unhandled type for Arrow to Parquet schema conversion: ", field->type()->ToString()); @@ -478,7 +485,7 @@ ::arrow::Result> GetTypeForNode( SchemaTreeContext* ctx) { ASSIGN_OR_RAISE( std::shared_ptr storage_type, - GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); + GetArrowType(primitive_node, ctx->properties)); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); @@ -987,18 +994,47 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); - auto origin_storage_field = origin_field.WithType(ex_type.storage_type()); + if (inferred_type->id() != ::arrow::Type::EXTENSION && + ex_type.extension_name() == ::arrow::extension::JsonExtensionType::type_name()) { + // Schema mismatch. + // + // Arrow extensions are DISABLED in Parquet. + // origin_type is ::arrow::extension::json() + // inferred_type is ::arrow::binary() + // + // Origin type is restored as Arrow should be considered the source of truth. + DCHECK_EQ(inferred_type->id(), ::arrow::Type::BINARY); + inferred->field = inferred->field->WithType(origin_type); + RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred)); + } else { + auto origin_storage_field = origin_field.WithType(ex_type.storage_type()); - // Apply metadata recursively to storage type - RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred)); + // Apply metadata recursively to storage type + RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred)); - // Restore extension type, if the storage type is the same as inferred - // from the Parquet type - if (ex_type.storage_type()->Equals(*inferred->field->type())) { - inferred->field = inferred->field->WithType(origin_type); + // Restore extension type, if the storage type is the same as inferred + // from the Parquet type + if (ex_type.storage_type()->Equals(*inferred->field->type())) { + inferred->field = inferred->field->WithType(origin_type); + } } modified = true; } else { + if (inferred_type->id() == ::arrow::Type::EXTENSION) { + const auto& ex_type = checked_cast(*inferred_type); + if (ex_type.extension_name() == + ::arrow::extension::JsonExtensionType::type_name()) { + // Schema mismatch. + // + // Arrow extensions are ENABLED in Parquet. + // origin_type is ::arrow::binary() + // inferred_type is ::arrow::extension::json() + // + // Origin type is restored as Arrow should be considered the source of truth. + DCHECK_EQ(origin_type->id(), ::arrow::Type::BINARY); + inferred->field = inferred->field->WithType(origin_type); + } + } ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred)); } diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index a8e2a95b9b9..b6da0c67722 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -18,6 +18,9 @@ #include "parquet/arrow/schema_internal.h" #include "arrow/type.h" +#include "arrow/extension/json.h" + +#include "parquet/properties.h" using ArrowType = ::arrow::DataType; using ArrowTypeId = ::arrow::Type; @@ -107,7 +110,8 @@ Result> MakeArrowTimestamp(const LogicalType& logical } } -Result> FromByteArray(const LogicalType& logical_type) { +Result> FromByteArray( + const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { switch (logical_type.type()) { case LogicalType::Type::STRING: return ::arrow::utf8(); @@ -115,9 +119,13 @@ Result> FromByteArray(const LogicalType& logical_type return MakeArrowDecimal(logical_type); case LogicalType::Type::NONE: case LogicalType::Type::ENUM: - case LogicalType::Type::JSON: case LogicalType::Type::BSON: return ::arrow::binary(); + case LogicalType::Type::JSON: + if (reader_properties.known_arrow_extensions_enabled()) { + return ::arrow::extension::json(); + } + return ::arrow::binary(); default: return Status::NotImplemented("Unhandled logical logical_type ", logical_type.ToString(), " for binary array"); @@ -180,7 +188,7 @@ Result> FromInt64(const LogicalType& logical_type) { Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ArrowReaderProperties& reader_properties) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -193,13 +201,13 @@ Result> GetArrowType( case ParquetType::INT64: return FromInt64(logical_type); case ParquetType::INT96: - return ::arrow::timestamp(int96_arrow_time_unit); + return ::arrow::timestamp(reader_properties.coerce_int96_timestamp_unit()); case ParquetType::FLOAT: return ::arrow::float32(); case ParquetType::DOUBLE: return ::arrow::float64(); case ParquetType::BYTE_ARRAY: - return FromByteArray(logical_type); + return FromByteArray(logical_type, reader_properties); case ParquetType::FIXED_LEN_BYTE_ARRAY: return FromFLBA(logical_type, type_length); default: { @@ -212,9 +220,9 @@ Result> GetArrowType( Result> GetArrowType( const schema::PrimitiveNode& primitive, - const ::arrow::TimeUnit::type int96_arrow_time_unit) { + const ArrowReaderProperties& reader_properties) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length(), int96_arrow_time_unit); + primitive.type_length(), reader_properties); } } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index f56ba0958ae..ff6443faaa9 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -18,6 +18,7 @@ #pragma once #include "arrow/result.h" +#include "parquet/properties.h" #include "parquet/schema.h" namespace arrow { @@ -28,7 +29,8 @@ namespace parquet::arrow { using ::arrow::Result; -Result> FromByteArray(const LogicalType& logical_type); +Result> FromByteArray(const LogicalType& logical_type, + bool use_known_arrow_extensions); Result> FromFLBA(const LogicalType& logical_type, int32_t physical_length); Result> FromInt32(const LogicalType& logical_type); @@ -36,10 +38,10 @@ Result> FromInt64(const LogicalType& logical_ Result> GetArrowType( Type::type physical_type, const LogicalType& logical_type, int type_length, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + const ArrowReaderProperties& reader_properties); Result> GetArrowType( const schema::PrimitiveNode& primitive, - ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + const ArrowReaderProperties& reader_properties); } // namespace parquet::arrow diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 4d3acb491e3..04c2be129ee 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -870,7 +870,8 @@ class PARQUET_EXPORT ArrowReaderProperties { batch_size_(kArrowDefaultBatchSize), pre_buffer_(true), cache_options_(::arrow::io::CacheOptions::LazyDefaults()), - coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), + known_arrow_extensions_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -941,6 +942,19 @@ class PARQUET_EXPORT ArrowReaderProperties { return coerce_int96_timestamp_unit_; } + /// Enable Parquet supported Arrow Extension Types. + /// + /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow schema. + /// Currently only arrow::extension::json() extension type is supported. This will be used + /// for binary columns whose LogicalType is JSON. + void enable_known_arrow_extensions() { + known_arrow_extensions_enabled_ = true; + } + void disable_known_arrow_extensions() { + known_arrow_extensions_enabled_ = false; + } + bool known_arrow_extensions_enabled() const { return known_arrow_extensions_enabled_; } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -949,6 +963,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; + bool known_arrow_extensions_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties From f3944ccb3de39e3f3a56675b351d82ba56b97ddd Mon Sep 17 00:00:00 2001 From: Pradeep Gollakota Date: Tue, 16 Aug 2022 21:09:17 +0000 Subject: [PATCH 02/26] Fix build failures - Code linted properly. - ARROW_EXPORT applied correctly. --- cpp/src/arrow/extension/json.h | 2 +- cpp/src/parquet/arrow/arrow_schema_test.cc | 20 ++++++++++---------- cpp/src/parquet/arrow/schema.cc | 5 ++--- cpp/src/parquet/arrow/schema_internal.cc | 2 +- cpp/src/parquet/properties.h | 14 +++++--------- 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index 05e56b90b77..2b50113dd27 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -47,7 +47,7 @@ class ARROW_EXPORT JsonExtensionType : public ExtensionType { }; /// \brief Return a JsonExtensionType instance. -std::shared_ptr json(); +ARROW_EXPORT std::shared_ptr json(); } // namespace extension } // namespace arrow diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index d7110490c95..c5712292ed4 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -762,11 +762,11 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // Parquet file contains Arrow schema. // Arrow schema has precedence. json_1 should be returned as a json() field even // though extensions are not enabled. - std::shared_ptr field_metadata = ::arrow::key_value_metadata( - {"foo", "bar"}, {"biz", "baz"}); - auto arrow_schema = - ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", BINARY, true)}); + std::shared_ptr field_metadata = + ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", BINARY, true)}); ASSERT_OK_AND_ASSIGN( std::shared_ptr serialized, @@ -787,11 +787,11 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // binary column even if known_arrow_extensions_enabled is enabled. ArrowReaderProperties props; props.enable_known_arrow_extensions(); - std::shared_ptr field_metadata = ::arrow::key_value_metadata( - {"foo", "bar"}, {"biz", "baz"}); - auto arrow_schema = - ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", BINARY, true)}); + std::shared_ptr field_metadata = + ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), + ::arrow::field("json_2", BINARY, true)}); ASSERT_OK_AND_ASSIGN( std::shared_ptr serialized, diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index ba9016a4c57..3bae2821f1e 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -483,9 +483,8 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE( - std::shared_ptr storage_type, - GetArrowType(primitive_node, ctx->properties)); + ASSIGN_OR_RAISE(std::shared_ptr storage_type, + GetArrowType(primitive_node, ctx->properties)); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index b6da0c67722..62ddc566124 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -17,8 +17,8 @@ #include "parquet/arrow/schema_internal.h" -#include "arrow/type.h" #include "arrow/extension/json.h" +#include "arrow/type.h" #include "parquet/properties.h" diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 04c2be129ee..5c8a07c2dfd 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -944,15 +944,11 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Enable Parquet supported Arrow Extension Types. /// - /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow schema. - /// Currently only arrow::extension::json() extension type is supported. This will be used - /// for binary columns whose LogicalType is JSON. - void enable_known_arrow_extensions() { - known_arrow_extensions_enabled_ = true; - } - void disable_known_arrow_extensions() { - known_arrow_extensions_enabled_ = false; - } + /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow + /// schema. Currently only arrow::extension::json() extension type is supported. This + /// will be used for binary columns whose LogicalType is JSON. + void enable_known_arrow_extensions() { known_arrow_extensions_enabled_ = true; } + void disable_known_arrow_extensions() { known_arrow_extensions_enabled_ = false; } bool known_arrow_extensions_enabled() const { return known_arrow_extensions_enabled_; } private: From e5d16046a2d740ce8f9652f3149cf4fa73987eea Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Sun, 14 Apr 2024 23:56:05 +0200 Subject: [PATCH 03/26] - Change how extension types are registered - Move tests - Add docs - Rebase --- cpp/src/arrow/CMakeLists.txt | 4 +- cpp/src/arrow/extension/CMakeLists.txt | 2 +- cpp/src/arrow/extension/json.cc | 33 ++++++++-------- cpp/src/arrow/extension/json.h | 17 ++++++--- cpp/src/arrow/extension/json_test.cc | 52 ++++++++++++++++++++++++++ cpp/src/arrow/extension_type.cc | 3 +- cpp/src/arrow/extension_type_test.cc | 10 ----- cpp/src/arrow/testing/extension_type.h | 3 -- cpp/src/arrow/testing/gtest_util.cc | 28 +++++++------- cpp/src/arrow/testing/gtest_util.h | 3 ++ cpp/src/parquet/arrow/schema.cc | 5 ++- docs/source/status.rst | 2 +- 12 files changed, 109 insertions(+), 53 deletions(-) create mode 100644 cpp/src/arrow/extension/json_test.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 1b41d89483b..0b846fc92dc 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -375,10 +375,10 @@ set(ARROW_SRCS device.cc device_allocation_type_set.cc extension_type.cc - extension/json.cc - memory_pool.cc extension/bool8.cc + extension/json.cc extension/uuid.cc + memory_pool.cc pretty_print.cc record_batch.cc result.cc diff --git a/cpp/src/arrow/extension/CMakeLists.txt b/cpp/src/arrow/extension/CMakeLists.txt index 065ea3f1ddb..4ab6a35b52e 100644 --- a/cpp/src/arrow/extension/CMakeLists.txt +++ b/cpp/src/arrow/extension/CMakeLists.txt @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -set(CANONICAL_EXTENSION_TESTS bool8_test.cc uuid_test.cc) +set(CANONICAL_EXTENSION_TESTS bool8_test.cc json_test.cc uuid_test.cc) if(ARROW_JSON) list(APPEND CANONICAL_EXTENSION_TESTS fixed_shape_tensor_test.cc opaque_test.cc) diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index a9b00af2703..bcde4390481 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -30,31 +30,34 @@ namespace arrow { namespace extension { -std::once_flag init_flag; - -std::shared_ptr json() { - std::call_once(init_flag, []() { - DCHECK_OK(RegisterExtensionType(std::make_shared())); - }); - return GetExtensionType(JsonExtensionType::type_name()); -} - bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { const auto& other_ext = static_cast(other); return other_ext.extension_name() == this->extension_name(); } Result> JsonExtensionType::Deserialize( - std::shared_ptr storage_type, const std::string& serialized_data) const { - if (serialized_data != JsonExtensionType::type_name()) { - return Status::Invalid("Type identifier did not match"); + std::shared_ptr storage_type, const std::string& serialized) const { + if (!serialized.empty()) { + return Status::Invalid("Unexpected serialized metadata: '", serialized, "'"); } - return json(); + if (!storage_type->Equals(*utf8())) { + return Status::Invalid("Invalid storage type for JsonExtensionType: ", + storage_type->ToString()); + } + return std::make_shared(); } -std::string JsonExtensionType::Serialize() const { - return JsonExtensionType::type_name(); +std::string JsonExtensionType::Serialize() const { return ""; } + +std::shared_ptr JsonExtensionType::MakeArray( + std::shared_ptr data) const { + DCHECK_EQ(data->type->id(), Type::EXTENSION); + DCHECK_EQ("arrow.json", + internal::checked_cast(*data->type).extension_name()); + return std::make_shared(data); } +std::shared_ptr json() { return std::make_shared(); } + } // namespace extension } // namespace arrow diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index 2b50113dd27..ee7001103d6 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -23,6 +23,7 @@ #include "arrow/extension_type.h" #include "arrow/result.h" #include "arrow/type_fwd.h" +#include "arrow/util/logging.h" #include "arrow/util/visibility.h" namespace arrow { @@ -33,21 +34,27 @@ class ARROW_EXPORT JsonExtensionType : public ExtensionType { public: JsonExtensionType() : ExtensionType(::arrow::utf8()) {} - static constexpr const char* type_name() { return "arrow.extension.json"; } + static constexpr const char* type_name() { return "arrow.json"; } + std::string extension_name() const override { return type_name(); } + std::string ToString(bool show_metadata = false) const override { + return "extension"; + }; + bool ExtensionEquals(const ExtensionType& other) const override; + Result> Deserialize( std::shared_ptr storage_type, const std::string& serialized_data) const override; + std::string Serialize() const override; - std::shared_ptr MakeArray(std::shared_ptr data) const override { - return std::make_shared(data); - } + + std::shared_ptr MakeArray(std::shared_ptr data) const override; }; /// \brief Return a JsonExtensionType instance. -ARROW_EXPORT std::shared_ptr json(); +ARROW_EXPORT std::shared_ptr json(); } // namespace extension } // namespace arrow diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc new file mode 100644 index 00000000000..53886ac412f --- /dev/null +++ b/cpp/src/arrow/extension/json_test.cc @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension/json.h" + +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" + +namespace arrow { + +using extension::json; + +class TestJsonExtensionType : public ::testing::Test {}; + +std::shared_ptr ExampleJson() { + std::shared_ptr arr = ArrayFromJSON(utf8(), R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"); + return ExtensionType::WrapArray(arrow::extension::json(), arr); +} + +TEST_F(TestJsonExtensionType, JsonRoundtrip) { + auto ext_arr = ExampleJson(); + + auto batch = RecordBatch::Make(schema({field("f0", json())}), 8, {ext_arr}); + std::shared_ptr read_batch; + RoundtripBatch(batch, &read_batch); + CompareBatch(*batch, *read_batch, false /* compare_metadata */); +} + +} // namespace arrow diff --git a/cpp/src/arrow/extension_type.cc b/cpp/src/arrow/extension_type.cc index d0135e905a0..17c2ef2366b 100644 --- a/cpp/src/arrow/extension_type.cc +++ b/cpp/src/arrow/extension_type.cc @@ -32,6 +32,7 @@ # include "arrow/extension/fixed_shape_tensor.h" # include "arrow/extension/opaque.h" #endif +#include "arrow/extension/json.h" #include "arrow/extension/uuid.h" #include "arrow/status.h" #include "arrow/type.h" @@ -148,7 +149,7 @@ static void CreateGlobalRegistry() { // Register canonical extension types g_registry = std::make_shared(); - std::vector> ext_types{extension::bool8(), extension::uuid()}; + std::vector> ext_types{extension::bool8(), extension::json(), extension::uuid()}; #ifdef ARROW_JSON ext_types.push_back(extension::fixed_shape_tensor(int64(), {})); diff --git a/cpp/src/arrow/extension_type_test.cc b/cpp/src/arrow/extension_type_test.cc index 0a95431d3d3..2e47ecd5868 100644 --- a/cpp/src/arrow/extension_type_test.cc +++ b/cpp/src/arrow/extension_type_test.cc @@ -231,16 +231,6 @@ TEST_F(TestExtensionType, IpcRoundtrip) { CompareBatch(*batch, *read_batch, false /* compare_metadata */); } -TEST_F(TestExtensionType, JsonRoundtrip) { - auto ext_arr = ExampleJson(); - auto batch = - RecordBatch::Make(schema({field("f0", arrow::extension::json())}), 8, {ext_arr}); - - std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); - CompareBatch(*batch, *read_batch, false /* compare_metadata */); -} - TEST_F(TestExtensionType, UnrecognizedExtension) { auto ext_arr = ExampleUuid(); auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr}); diff --git a/cpp/src/arrow/testing/extension_type.h b/cpp/src/arrow/testing/extension_type.h index edd155a29e3..a4526e31c2b 100644 --- a/cpp/src/arrow/testing/extension_type.h +++ b/cpp/src/arrow/testing/extension_type.h @@ -212,9 +212,6 @@ ARROW_TESTING_EXPORT std::shared_ptr MakeComplex128(const std::shared_ptr& real, const std::shared_ptr& imag); -ARROW_TESTING_EXPORT -std::shared_ptr ExampleJson(); - // A RAII class that registers an extension type on construction // and unregisters it on destruction. class ARROW_TESTING_EXPORT ExtensionTypeGuard { diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 0a9c952e4bc..d4fb36c88c9 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -591,6 +591,20 @@ void ApproxCompareBatch(const RecordBatch& left, const RecordBatch& right, }); } +void RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out) { + ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create()); + ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), + out_stream.get())); + + ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish()); + + io::BufferReader reader(complete_ipc_stream); + std::shared_ptr batch_reader; + ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); + ASSERT_OK(batch_reader->ReadNext(out)); +} + std::shared_ptr TweakValidityBit(const std::shared_ptr& array, int64_t index, bool validity) { auto data = array->data()->Copy(); @@ -1041,20 +1055,6 @@ std::shared_ptr ExampleComplex128() { return ExtensionType::WrapArray(complex128(), arr); } -std::shared_ptr ExampleJson() { - std::shared_ptr arr = ArrayFromJSON(utf8(), R"([ - "null", - "1234", - "3.14159", - "true", - "false", - "\"a json string\"", - "[\"a\", \"json\", \"array\"]", - "{\"obj\": \"a simple json object\"}" - ])"); - return ExtensionType::WrapArray(arrow::extension::json(), arr); -} - ExtensionTypeGuard::ExtensionTypeGuard(const std::shared_ptr& type) : ExtensionTypeGuard(DataTypeVector{type}) {} diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 90311464c28..f482f8bfc22 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -309,6 +309,9 @@ ARROW_TESTING_EXPORT void ApproxCompareBatch( const RecordBatch& left, const RecordBatch& right, bool compare_metadata = true, const EqualOptions& options = TestingEqualOptions()); +ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out); + // Check if the padding of the buffers of the array is zero. // Also cause valgrind warnings if the padding bytes are uninitialized. ARROW_TESTING_EXPORT void AssertZeroPadded(const Array& array); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 3bae2821f1e..a53efeccc8f 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -429,7 +429,9 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, case ArrowTypeId::EXTENSION: { auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); // Built-in JSON extension is handled differently. - if (ext_type->extension_name() == ::arrow::extension::json()->extension_name()) { + if (ext_type->extension_name() == + std::static_pointer_cast<::arrow::ExtensionType>(::arrow::extension::json()) + ->extension_name()) { type = ParquetType::BYTE_ARRAY; logical_type = LogicalType::JSON(); break; @@ -990,6 +992,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer bool modified = false; auto& origin_type = origin_field.type(); + auto inferred_type = inferred->field->type(); if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); diff --git a/docs/source/status.rst b/docs/source/status.rst index b685d4bbf8a..98374164d7a 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -119,7 +119,7 @@ Data Types +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | Variable shape tensor | | | | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ -| JSON | | | ✓ | | | | | | +| JSON | ✓ | | ✓ | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ | UUID | ✓ | | ✓ | | | | | | +-----------------------+-------+-------+-------+------------+-------+-------+-------+-------+ From 9ce63daf610bc63cf4fb595090744eaa901784ab Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 16 Apr 2024 20:28:12 +0200 Subject: [PATCH 04/26] Review feedback --- cpp/src/arrow/extension_type_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/extension_type_test.cc b/cpp/src/arrow/extension_type_test.cc index 2e47ecd5868..f49ffc5cba5 100644 --- a/cpp/src/arrow/extension_type_test.cc +++ b/cpp/src/arrow/extension_type_test.cc @@ -26,7 +26,6 @@ #include "arrow/array/array_nested.h" #include "arrow/array/util.h" -#include "arrow/extension/json.h" #include "arrow/extension_type.h" #include "arrow/io/memory.h" #include "arrow/ipc/options.h" From f3ab3228366b83a5e2e010ef6f64055ac3b47533 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 22 Aug 2024 00:16:59 +0200 Subject: [PATCH 05/26] Add LargeString and StringView --- cpp/src/arrow/extension/json.cc | 16 ++++++++-------- cpp/src/arrow/extension/json.h | 18 ++++++++++++------ cpp/src/arrow/extension/json_test.cc | 21 ++++++++++++--------- cpp/src/parquet/arrow/schema_internal.cc | 2 +- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index bcde4390481..bf874cbf7f2 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -18,7 +18,6 @@ #include "arrow/extension/json.h" #include -#include #include #include "arrow/extension_type.h" @@ -27,8 +26,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/logging.h" -namespace arrow { -namespace extension { +namespace arrow::extension { bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { const auto& other_ext = static_cast(other); @@ -40,11 +38,12 @@ Result> JsonExtensionType::Deserialize( if (!serialized.empty()) { return Status::Invalid("Unexpected serialized metadata: '", serialized, "'"); } - if (!storage_type->Equals(*utf8())) { + if (!(storage_type->Equals(*utf8()) || storage_type->Equals(large_utf8()) || + storage_type->Equals(utf8_view()))) { return Status::Invalid("Invalid storage type for JsonExtensionType: ", storage_type->ToString()); } - return std::make_shared(); + return std::make_shared(storage_type); } std::string JsonExtensionType::Serialize() const { return ""; } @@ -57,7 +56,8 @@ std::shared_ptr JsonExtensionType::MakeArray( return std::make_shared(data); } -std::shared_ptr json() { return std::make_shared(); } +std::shared_ptr json(const std::shared_ptr storage_type) { + return std::make_shared(storage_type); +} -} // namespace extension -} // namespace arrow +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index ee7001103d6..392d686bc67 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -26,13 +26,16 @@ #include "arrow/util/logging.h" #include "arrow/util/visibility.h" -namespace arrow { -namespace extension { +namespace arrow::extension { /// \brief Concrete type class for variable-size JSON data, utf8-encoded. class ARROW_EXPORT JsonExtensionType : public ExtensionType { public: - JsonExtensionType() : ExtensionType(::arrow::utf8()) {} + explicit JsonExtensionType(const std::shared_ptr& storage_type) + : ExtensionType(storage_type), storage_type_(storage_type) { + ARROW_CHECK(storage_type->Equals(utf8()) || storage_type->Equals(large_utf8()) || + storage_type->Equals(utf8_view())); + } static constexpr const char* type_name() { return "arrow.json"; } @@ -51,10 +54,13 @@ class ARROW_EXPORT JsonExtensionType : public ExtensionType { std::string Serialize() const override; std::shared_ptr MakeArray(std::shared_ptr data) const override; + + private: + std::shared_ptr storage_type_; }; /// \brief Return a JsonExtensionType instance. -ARROW_EXPORT std::shared_ptr json(); +ARROW_EXPORT std::shared_ptr json( + std::shared_ptr storage_type = utf8()); -} // namespace extension -} // namespace arrow +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 53886ac412f..5120a84bab9 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -26,8 +26,8 @@ using extension::json; class TestJsonExtensionType : public ::testing::Test {}; -std::shared_ptr ExampleJson() { - std::shared_ptr arr = ArrayFromJSON(utf8(), R"([ +std::shared_ptr ExampleJson(const std::shared_ptr& storage_type) { + std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ "null", "1234", "3.14159", @@ -37,16 +37,19 @@ std::shared_ptr ExampleJson() { "[\"a\", \"json\", \"array\"]", "{\"obj\": \"a simple json object\"}" ])"); - return ExtensionType::WrapArray(arrow::extension::json(), arr); + return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); } TEST_F(TestJsonExtensionType, JsonRoundtrip) { - auto ext_arr = ExampleJson(); - - auto batch = RecordBatch::Make(schema({field("f0", json())}), 8, {ext_arr}); - std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); - CompareBatch(*batch, *read_batch, false /* compare_metadata */); + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + auto ext_arr = ExampleJson(storage_type); + + auto batch = + RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); + std::shared_ptr read_batch; + RoundtripBatch(batch, &read_batch); + CompareBatch(*batch, *read_batch, false /* compare_metadata */); + } } } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 62ddc566124..81a17106168 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -123,7 +123,7 @@ Result> FromByteArray( return ::arrow::binary(); case LogicalType::Type::JSON: if (reader_properties.known_arrow_extensions_enabled()) { - return ::arrow::extension::json(); + return ::arrow::extension::json(::arrow::utf8()); } return ::arrow::binary(); default: From e6cfa912e6661d5b065fec32aa0dc7c2beb7dac0 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 22 Aug 2024 16:29:49 +0200 Subject: [PATCH 06/26] Move RoundtripBatch to ipc/test_common.cc --- cpp/src/arrow/extension/json_test.cc | 2 ++ cpp/src/arrow/extension_type.cc | 3 ++- cpp/src/arrow/testing/gtest_util.cc | 14 -------------- cpp/src/arrow/testing/gtest_util.h | 3 --- 4 files changed, 4 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 5120a84bab9..85ca8517961 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -17,11 +17,13 @@ #include "arrow/extension/json.h" +#include "arrow/ipc/test_common.h" #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" namespace arrow { +using arrow::ipc::test::RoundtripBatch; using extension::json; class TestJsonExtensionType : public ::testing::Test {}; diff --git a/cpp/src/arrow/extension_type.cc b/cpp/src/arrow/extension_type.cc index 17c2ef2366b..7ad39eab23f 100644 --- a/cpp/src/arrow/extension_type.cc +++ b/cpp/src/arrow/extension_type.cc @@ -149,7 +149,8 @@ static void CreateGlobalRegistry() { // Register canonical extension types g_registry = std::make_shared(); - std::vector> ext_types{extension::bool8(), extension::json(), extension::uuid()}; + std::vector> ext_types{extension::bool8(), extension::json(), + extension::uuid()}; #ifdef ARROW_JSON ext_types.push_back(extension::fixed_shape_tensor(int64(), {})); diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index d4fb36c88c9..adb063e4cd5 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -591,20 +591,6 @@ void ApproxCompareBatch(const RecordBatch& left, const RecordBatch& right, }); } -void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out) { - ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create()); - ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), - out_stream.get())); - - ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish()); - - io::BufferReader reader(complete_ipc_stream); - std::shared_ptr batch_reader; - ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); - ASSERT_OK(batch_reader->ReadNext(out)); -} - std::shared_ptr TweakValidityBit(const std::shared_ptr& array, int64_t index, bool validity) { auto data = array->data()->Copy(); diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index f482f8bfc22..90311464c28 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -309,9 +309,6 @@ ARROW_TESTING_EXPORT void ApproxCompareBatch( const RecordBatch& left, const RecordBatch& right, bool compare_metadata = true, const EqualOptions& options = TestingEqualOptions()); -ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out); - // Check if the padding of the buffers of the array is zero. // Also cause valgrind warnings if the padding bytes are uninitialized. ARROW_TESTING_EXPORT void AssertZeroPadded(const Array& array); From d749d01409ae15d753869cf4b831e573add3ec88 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 26 Aug 2024 16:48:50 +0200 Subject: [PATCH 07/26] Rebase --- cpp/src/arrow/CMakeLists.txt | 1 - cpp/src/arrow/extension/json.cc | 1 - cpp/src/arrow/extension/json.h | 1 - cpp/src/arrow/testing/gtest_util.cc | 2 +- 4 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 0b846fc92dc..e77a02d0c08 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -378,7 +378,6 @@ set(ARROW_SRCS extension/bool8.cc extension/json.cc extension/uuid.cc - memory_pool.cc pretty_print.cc record_batch.cc result.cc diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index bf874cbf7f2..e1dd843820d 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -17,7 +17,6 @@ #include "arrow/extension/json.h" -#include #include #include "arrow/extension_type.h" diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index 392d686bc67..9c7445319d1 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include "arrow/extension_type.h" diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index adb063e4cd5..07d15826f2c 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -49,8 +49,8 @@ #include "arrow/buffer.h" #include "arrow/compute/api_vector.h" #include "arrow/datum.h" -#include "arrow/io/memory.h" #include "arrow/extension/json.h" +#include "arrow/io/memory.h" #include "arrow/ipc/json_simple.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" From c95eda41d6e9b3e4699d373d43f64b75f8421143 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 27 Aug 2024 19:33:24 +0200 Subject: [PATCH 08/26] Update cpp/src/arrow/extension/json_test.cc Co-authored-by: Antoine Pitrou --- cpp/src/arrow/extension/json_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 85ca8517961..93790c71a03 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -50,6 +50,7 @@ TEST_F(TestJsonExtensionType, JsonRoundtrip) { RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); std::shared_ptr read_batch; RoundtripBatch(batch, &read_batch); + ASSERT_OK(read_batch->ValidateFull()); CompareBatch(*batch, *read_batch, false /* compare_metadata */); } } From d731a624c811b5a38b995075f8d14c6b917f5d30 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 27 Aug 2024 20:49:58 +0200 Subject: [PATCH 09/26] Update cpp/src/parquet/arrow/schema.cc Co-authored-by: mwish --- cpp/src/parquet/arrow/schema.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index a53efeccc8f..5efd40f1617 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -992,7 +992,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer bool modified = false; auto& origin_type = origin_field.type(); - auto inferred_type = inferred->field->type(); + const auto& inferred_type = inferred->field->type(); if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); From 197ce79f3c7996758cc55178b0e946efe87bcd61 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 27 Aug 2024 21:34:20 +0200 Subject: [PATCH 10/26] Review feedback --- cpp/src/arrow/array/validate.cc | 21 +++++++++++++++---- cpp/src/arrow/extension/json.cc | 4 ++-- cpp/src/arrow/extension/json.h | 13 ++++++------ cpp/src/arrow/extension/json_test.cc | 2 +- .../parquet/arrow/arrow_reader_writer_test.cc | 2 +- cpp/src/parquet/arrow/arrow_schema_test.cc | 6 +++--- cpp/src/parquet/arrow/schema.cc | 6 ++---- cpp/src/parquet/arrow/schema_internal.cc | 2 +- cpp/src/parquet/arrow/schema_internal.h | 2 +- cpp/src/parquet/properties.h | 10 ++++----- 10 files changed, 40 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index 0d940d3bc86..51ea5c2078f 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -985,10 +985,23 @@ Status ValidateArrayFull(const Array& array) { return ValidateArrayFull(*array.d ARROW_EXPORT Status ValidateUTF8(const ArrayData& data) { - DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW || - data.type->id() == Type::LARGE_STRING); - UTF8DataValidator validator{data}; - return VisitTypeInline(*data.type, &validator); + if (data.type->id() == Type::EXTENSION) { + const auto& storage_type = + checked_pointer_cast(data.type)->storage_type(); + DCHECK(storage_type->id() == Type::STRING || + storage_type->id() == Type::STRING_VIEW || + storage_type->id() == Type::LARGE_STRING); + auto ext_array_data = + std::make_shared(storage_type, data.length, data.buffers, + data.child_data, data.null_count, data.offset); + UTF8DataValidator validator{*ext_array_data}; + return VisitTypeInline(*storage_type, &validator); + } else { + UTF8DataValidator validator{data}; + DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW || + data.type->id() == Type::LARGE_STRING); + return VisitTypeInline(*data.type, &validator); + } } ARROW_EXPORT diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index e1dd843820d..9f5e76dfe99 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -37,8 +37,8 @@ Result> JsonExtensionType::Deserialize( if (!serialized.empty()) { return Status::Invalid("Unexpected serialized metadata: '", serialized, "'"); } - if (!(storage_type->Equals(*utf8()) || storage_type->Equals(large_utf8()) || - storage_type->Equals(utf8_view()))) { + if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW && + storage_type->id() != Type::LARGE_STRING) { return Status::Invalid("Invalid storage type for JsonExtensionType: ", storage_type->ToString()); } diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index 9c7445319d1..c810f19713a 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -17,12 +17,12 @@ #pragma once +#include #include #include "arrow/extension_type.h" #include "arrow/result.h" #include "arrow/type_fwd.h" -#include "arrow/util/logging.h" #include "arrow/util/visibility.h" namespace arrow::extension { @@ -32,13 +32,14 @@ class ARROW_EXPORT JsonExtensionType : public ExtensionType { public: explicit JsonExtensionType(const std::shared_ptr& storage_type) : ExtensionType(storage_type), storage_type_(storage_type) { - ARROW_CHECK(storage_type->Equals(utf8()) || storage_type->Equals(large_utf8()) || - storage_type->Equals(utf8_view())); + if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW && + storage_type->id() != Type::LARGE_STRING) { + throw std::invalid_argument("Invalid storage type for JsonExtensionType: " + + storage_type->ToString()); + } } - static constexpr const char* type_name() { return "arrow.json"; } - - std::string extension_name() const override { return type_name(); } + std::string extension_name() const override { return "arrow.json"; } std::string ToString(bool show_metadata = false) const override { return "extension"; diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 93790c71a03..744f57ed1be 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -51,7 +51,7 @@ TEST_F(TestJsonExtensionType, JsonRoundtrip) { std::shared_ptr read_batch; RoundtripBatch(batch, &read_batch); ASSERT_OK(read_batch->ValidateFull()); - CompareBatch(*batch, *read_batch, false /* compare_metadata */); + CompareBatch(*batch, *read_batch, /*compare_metadata*/ true); } } diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 692f621d882..f583513ebe6 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1462,7 +1462,7 @@ TEST_F(TestJsonParquetIO, JsonExtension) { // When the original Arrow schema isn't stored and Arrow extensions are enabled, // LogicalType::JSON is read as JsonExtensionType. ::parquet::ArrowReaderProperties reader_properties; - reader_properties.enable_known_arrow_extensions(); + reader_properties.set_arrow_extensions_enabled(); this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), reader_properties); diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index c5712292ed4..1ce19c27159 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -749,7 +749,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // If Arrow extensions are enabled, both fields should be treated as json() extension // fields. ArrowReaderProperties props; - props.enable_known_arrow_extensions(); + props.set_arrow_extensions_enabled(); auto arrow_schema = ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true), ::arrow::field("json_2", ::arrow::extension::json(), true)}); @@ -784,9 +784,9 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // Parquet file contains Arrow schema. // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 // is a JSON column and json_2 is a binary column. json_2 should be treated as a - // binary column even if known_arrow_extensions_enabled is enabled. + // binary column even if get_arrow_extensions_enabled is true. ArrowReaderProperties props; - props.enable_known_arrow_extensions(); + props.get_arrow_extensions_enabled(); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 5efd40f1617..790ee80c4b1 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -429,9 +429,7 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, case ArrowTypeId::EXTENSION: { auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); // Built-in JSON extension is handled differently. - if (ext_type->extension_name() == - std::static_pointer_cast<::arrow::ExtensionType>(::arrow::extension::json()) - ->extension_name()) { + if (ext_type->extension_name() == std::string("arrow.json")) { type = ParquetType::BYTE_ARRAY; logical_type = LogicalType::JSON(); break; @@ -997,7 +995,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); if (inferred_type->id() != ::arrow::Type::EXTENSION && - ex_type.extension_name() == ::arrow::extension::JsonExtensionType::type_name()) { + ex_type.extension_name() == std::string("arrow.json")) { // Schema mismatch. // // Arrow extensions are DISABLED in Parquet. diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 81a17106168..422defe140e 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -122,7 +122,7 @@ Result> FromByteArray( case LogicalType::Type::BSON: return ::arrow::binary(); case LogicalType::Type::JSON: - if (reader_properties.known_arrow_extensions_enabled()) { + if (reader_properties.get_arrow_extensions_enabled()) { return ::arrow::extension::json(::arrow::utf8()); } return ::arrow::binary(); diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index ff6443faaa9..58828f85ab8 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -18,7 +18,7 @@ #pragma once #include "arrow/result.h" -#include "parquet/properties.h" +#include "arrow/type_fwd.h" #include "parquet/schema.h" namespace arrow { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5c8a07c2dfd..1cc69cd8114 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -871,7 +871,7 @@ class PARQUET_EXPORT ArrowReaderProperties { pre_buffer_(true), cache_options_(::arrow::io::CacheOptions::LazyDefaults()), coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO), - known_arrow_extensions_enabled_(false) {} + arrow_extensions_enabled_(false) {} /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// @@ -947,9 +947,9 @@ class PARQUET_EXPORT ArrowReaderProperties { /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow /// schema. Currently only arrow::extension::json() extension type is supported. This /// will be used for binary columns whose LogicalType is JSON. - void enable_known_arrow_extensions() { known_arrow_extensions_enabled_ = true; } - void disable_known_arrow_extensions() { known_arrow_extensions_enabled_ = false; } - bool known_arrow_extensions_enabled() const { return known_arrow_extensions_enabled_; } + void set_arrow_extensions_enabled() { arrow_extensions_enabled_ = true; } + void set_arrow_extensions_disabled() { arrow_extensions_enabled_ = false; } + bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; } private: bool use_threads_; @@ -959,7 +959,7 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; - bool known_arrow_extensions_enabled_; + bool arrow_extensions_enabled_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties From b7b01d4c86579a4a935412d546460e9f5bcb630b Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 29 Aug 2024 10:42:27 +0200 Subject: [PATCH 11/26] Review feedback --- cpp/src/arrow/extension/json.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index 9f5e76dfe99..0aa577d982d 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -34,9 +34,6 @@ bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { Result> JsonExtensionType::Deserialize( std::shared_ptr storage_type, const std::string& serialized) const { - if (!serialized.empty()) { - return Status::Invalid("Unexpected serialized metadata: '", serialized, "'"); - } if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW && storage_type->id() != Type::LARGE_STRING) { return Status::Invalid("Invalid storage type for JsonExtensionType: ", From e1a90ee93ff862c01bc61c0dc795b679b6984ef6 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 29 Aug 2024 12:01:58 +0200 Subject: [PATCH 12/26] Review feedback --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 15 +++++++-------- cpp/src/parquet/arrow/arrow_schema_test.cc | 14 +++++++------- cpp/src/parquet/arrow/schema.cc | 6 +++--- cpp/src/parquet/arrow/schema_internal.cc | 4 +++- cpp/src/parquet/properties.h | 2 +- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index f583513ebe6..516b311ed6d 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1394,8 +1394,8 @@ using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>; TEST_F(TestLargeBinaryParquetIO, Basics) { const char* json = "[\"foo\", \"\", null, \"\xff\"]"; - const auto large_type = ::arrow::large_binary(); - const auto narrow_type = ::arrow::binary(); + const auto large_type = ::arrow::large_utf8(); + const auto narrow_type = ::arrow::utf8(); const auto large_array = ::arrow::ArrayFromJSON(large_type, json); const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json); @@ -1417,8 +1417,8 @@ using TestLargeStringParquetIO = TestParquetIO<::arrow::LargeStringType>; TEST_F(TestLargeStringParquetIO, Basics) { const char* json = R"(["foo", "", null, "bar"])"; - const auto large_type = ::arrow::large_utf8(); - const auto narrow_type = ::arrow::utf8(); + const auto large_type = ::arrow::large_binary(); + const auto narrow_type = ::arrow::binary(); const auto large_array = ::arrow::ArrayFromJSON(large_type, json); const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json); @@ -1454,10 +1454,9 @@ TEST_F(TestJsonParquetIO, JsonExtension) { const auto json_array = ::arrow::ExtensionType::WrapArray(json_type, json_string_array); // When the original Arrow schema isn't stored and Arrow extensions are disabled, - // LogicalType::JSON is read as Binary. - const auto binary_array = ::arrow::ArrayFromJSON(::arrow::binary(), json); - this->RoundTripSingleColumn(json_array, binary_array, - default_arrow_writer_properties()); + // LogicalType::JSON is read as utf8. + const auto utf8_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); + this->RoundTripSingleColumn(json_array, utf8_array, default_arrow_writer_properties()); // When the original Arrow schema isn't stored and Arrow extensions are enabled, // LogicalType::JSON is read as JsonExtensionType. diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 1ce19c27159..fa047a571b1 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -233,7 +233,7 @@ TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) { ::arrow::uint64()}, {"int(64, true)", LogicalType::Int(64, true), ParquetType::INT64, -1, ::arrow::int64()}, - {"json", LogicalType::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, + {"json", LogicalType::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::utf8()}, {"bson", LogicalType::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()}, {"interval", LogicalType::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12, ::arrow::fixed_size_binary(12)}, @@ -736,9 +736,9 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file does not contain Arrow schema. - // By default both fields should be treated as binary() fields in Arrow. + // By default, both fields should be treated as utf8() fields in Arrow. auto arrow_schema = ::arrow::schema( - {::arrow::field("json_1", BINARY, true), ::arrow::field("json_2", BINARY, true)}); + {::arrow::field("json_1", UTF8, true), ::arrow::field("json_2", UTF8, true)}); std::shared_ptr metadata = ::arrow::key_value_metadata({}, {}); ASSERT_OK(ConvertSchema(parquet_fields, metadata)); CheckFlatSchema(arrow_schema); @@ -766,7 +766,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", BINARY, true)}); + ::arrow::field("json_2", UTF8, true)}); ASSERT_OK_AND_ASSIGN( std::shared_ptr serialized, @@ -783,15 +783,15 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file contains Arrow schema. // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 - // is a JSON column and json_2 is a binary column. json_2 should be treated as a - // binary column even if get_arrow_extensions_enabled is true. + // is a JSON column and json_2 is a utf8 column. json_2 should be treated as a + // utf8 column even if the get_arrow_extensions_enabled is true. ArrowReaderProperties props; props.get_arrow_extensions_enabled(); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", BINARY, true)}); + ::arrow::field("json_2", UTF8, true)}); ASSERT_OK_AND_ASSIGN( std::shared_ptr serialized, diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 790ee80c4b1..2f66a4074d9 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1003,7 +1003,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer // inferred_type is ::arrow::binary() // // Origin type is restored as Arrow should be considered the source of truth. - DCHECK_EQ(inferred_type->id(), ::arrow::Type::BINARY); + DCHECK_EQ(inferred_type->id(), ::arrow::Type::STRING); inferred->field = inferred->field->WithType(origin_type); RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred)); } else { @@ -1027,11 +1027,11 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer // Schema mismatch. // // Arrow extensions are ENABLED in Parquet. - // origin_type is ::arrow::binary() + // origin_type is ::arrow::utf8() // inferred_type is ::arrow::extension::json() // // Origin type is restored as Arrow should be considered the source of truth. - DCHECK_EQ(origin_type->id(), ::arrow::Type::BINARY); + DCHECK_EQ(origin_type->id(), ::arrow::Type::STRING); inferred->field = inferred->field->WithType(origin_type); } } diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index 422defe140e..261a0094065 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -125,7 +125,9 @@ Result> FromByteArray( if (reader_properties.get_arrow_extensions_enabled()) { return ::arrow::extension::json(::arrow::utf8()); } - return ::arrow::binary(); + // When the original Arrow schema isn't stored and Arrow extensions are disabled, + // LogicalType::JSON is read as utf8(). + return ::arrow::utf8(); default: return Status::NotImplemented("Unhandled logical logical_type ", logical_type.ToString(), " for binary array"); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 1cc69cd8114..6fc8a4b62ab 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -946,7 +946,7 @@ class PARQUET_EXPORT ArrowReaderProperties { /// /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow /// schema. Currently only arrow::extension::json() extension type is supported. This - /// will be used for binary columns whose LogicalType is JSON. + /// will be used for utf8 columns whose LogicalType is JSON. void set_arrow_extensions_enabled() { arrow_extensions_enabled_ = true; } void set_arrow_extensions_disabled() { arrow_extensions_enabled_ = false; } bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; } From 6f8f4679e583121684e5007c0cdb2fd7f3660bfa Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 29 Aug 2024 12:48:02 +0200 Subject: [PATCH 13/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/arrow/array/validate.cc | 2 +- cpp/src/parquet/arrow/schema.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index 51ea5c2078f..9fe7621a51f 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -987,7 +987,7 @@ ARROW_EXPORT Status ValidateUTF8(const ArrayData& data) { if (data.type->id() == Type::EXTENSION) { const auto& storage_type = - checked_pointer_cast(data.type)->storage_type(); + checked_cast(*data.type).storage_type(); DCHECK(storage_type->id() == Type::STRING || storage_type->id() == Type::STRING_VIEW || storage_type->id() == Type::LARGE_STRING); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 2f66a4074d9..49229447cf2 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1000,7 +1000,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer // // Arrow extensions are DISABLED in Parquet. // origin_type is ::arrow::extension::json() - // inferred_type is ::arrow::binary() + // inferred_type is ::arrow::utf8() // // Origin type is restored as Arrow should be considered the source of truth. DCHECK_EQ(inferred_type->id(), ::arrow::Type::STRING); From e8cdb9c8d9c20b39f0b021097ad78855352c90d5 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 29 Aug 2024 22:12:28 +0200 Subject: [PATCH 14/26] Review feedback --- cpp/src/arrow/array/validate.cc | 23 +++++----- cpp/src/arrow/extension/json.cc | 3 ++ cpp/src/arrow/extension/json.h | 8 +--- cpp/src/arrow/extension/json_test.cc | 5 +++ cpp/src/arrow/util/utf8_util_test.cc | 27 ++++++++++++ .../parquet/arrow/arrow_reader_writer_test.cc | 6 +-- cpp/src/parquet/arrow/arrow_schema_test.cc | 43 ++++++++++--------- cpp/src/parquet/arrow/schema.cc | 3 +- cpp/src/parquet/properties.h | 5 ++- 9 files changed, 76 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index 9fe7621a51f..eb1db3e0a8f 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -985,22 +985,21 @@ Status ValidateArrayFull(const Array& array) { return ValidateArrayFull(*array.d ARROW_EXPORT Status ValidateUTF8(const ArrayData& data) { + const auto& storage_type = + (data.type->id() == Type::EXTENSION) + ? checked_cast(*data.type).storage_type() + : data.type; + DCHECK(storage_type->id() == Type::STRING || storage_type->id() == Type::STRING_VIEW || + storage_type->id() == Type::LARGE_STRING); + if (data.type->id() == Type::EXTENSION) { - const auto& storage_type = - checked_cast(*data.type).storage_type(); - DCHECK(storage_type->id() == Type::STRING || - storage_type->id() == Type::STRING_VIEW || - storage_type->id() == Type::LARGE_STRING); - auto ext_array_data = - std::make_shared(storage_type, data.length, data.buffers, - data.child_data, data.null_count, data.offset); - UTF8DataValidator validator{*ext_array_data}; + const auto& ext_data = std::make_shared(data); + ext_data->type = storage_type; + UTF8DataValidator validator{*ext_data}; return VisitTypeInline(*storage_type, &validator); } else { UTF8DataValidator validator{data}; - DCHECK(data.type->id() == Type::STRING || data.type->id() == Type::STRING_VIEW || - data.type->id() == Type::LARGE_STRING); - return VisitTypeInline(*data.type, &validator); + return VisitTypeInline(*storage_type, &validator); } } diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index 0aa577d982d..828559716df 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -53,6 +53,9 @@ std::shared_ptr JsonExtensionType::MakeArray( } std::shared_ptr json(const std::shared_ptr storage_type) { + ARROW_CHECK(storage_type->id() != Type::STRING || + storage_type->id() != Type::STRING_VIEW || + storage_type->id() != Type::LARGE_STRING); return std::make_shared(storage_type); } diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index c810f19713a..11d435b81df 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -31,13 +31,7 @@ namespace arrow::extension { class ARROW_EXPORT JsonExtensionType : public ExtensionType { public: explicit JsonExtensionType(const std::shared_ptr& storage_type) - : ExtensionType(storage_type), storage_type_(storage_type) { - if (storage_type->id() != Type::STRING && storage_type->id() != Type::STRING_VIEW && - storage_type->id() != Type::LARGE_STRING) { - throw std::invalid_argument("Invalid storage type for JsonExtensionType: " + - storage_type->ToString()); - } - } + : ExtensionType(storage_type), storage_type_(storage_type) {} std::string extension_name() const override { return "arrow.json"; } diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 744f57ed1be..59491620f9d 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -17,6 +17,7 @@ #include "arrow/extension/json.h" +#include "arrow/array/validate.h" #include "arrow/ipc/test_common.h" #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" @@ -52,6 +53,10 @@ TEST_F(TestJsonExtensionType, JsonRoundtrip) { RoundtripBatch(batch, &read_batch); ASSERT_OK(read_batch->ValidateFull()); CompareBatch(*batch, *read_batch, /*compare_metadata*/ true); + + auto read_ext_arr = read_batch->column(0); + ASSERT_OK(internal::ValidateUTF8(*read_ext_arr)); + ASSERT_OK(read_ext_arr->ValidateFull()); } } diff --git a/cpp/src/arrow/util/utf8_util_test.cc b/cpp/src/arrow/util/utf8_util_test.cc index bada5e59d8a..2114d949f22 100644 --- a/cpp/src/arrow/util/utf8_util_test.cc +++ b/cpp/src/arrow/util/utf8_util_test.cc @@ -22,6 +22,8 @@ #include +#include "arrow/array/validate.h" +#include "arrow/extension/json.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/string.h" #include "arrow/util/utf8_internal.h" @@ -562,5 +564,30 @@ TEST(UTF8Length, Basics) { ASSERT_EQ(length("\xf0\x9f\x99\x8c"), 1); } +class UTF8ExtensionArrayTest : public ::testing::Test { + public: + static std::shared_ptr ExampleJson( + const std::shared_ptr& storage_type) { + std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"); + return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); + } +}; + +TEST_F(UTF8ExtensionArrayTest, JSONExtensionType) { + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + const auto ext_arr = ExampleJson(storage_type); + ASSERT_OK(arrow::internal::ValidateUTF8(*ext_arr)); + } +} + } // namespace util } // namespace arrow diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 516b311ed6d..17b50486c56 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1461,14 +1461,14 @@ TEST_F(TestJsonParquetIO, JsonExtension) { // When the original Arrow schema isn't stored and Arrow extensions are enabled, // LogicalType::JSON is read as JsonExtensionType. ::parquet::ArrowReaderProperties reader_properties; - reader_properties.set_arrow_extensions_enabled(); + reader_properties.set_arrow_extensions_enabled(true); this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), reader_properties); // When the original Arrow schema is stored, the stored Arrow type is always respected. - const auto arrow_properties = + const auto writer_properties = ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); - this->RoundTripSingleColumn(json_array, json_array, arrow_properties); + this->RoundTripSingleColumn(json_array, json_array, writer_properties); } using TestNullParquetIO = TestParquetIO<::arrow::NullType>; diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index fa047a571b1..c1a76529200 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -727,6 +727,17 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) { ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema)); } +Status ArrowSchemaToParquetMetadata(std::shared_ptr<::arrow::Schema>& arrow_schema, + std::shared_ptr& metadata) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr serialized, + ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); + std::string schema_as_string = serialized->ToString(); + std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); + metadata = ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); + return Status::OK(); +} + TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { std::vector parquet_fields; parquet_fields.push_back(PrimitiveNode::Make( @@ -739,7 +750,8 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // By default, both fields should be treated as utf8() fields in Arrow. auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", UTF8, true), ::arrow::field("json_2", UTF8, true)}); - std::shared_ptr metadata = ::arrow::key_value_metadata({}, {}); + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata)); CheckFlatSchema(arrow_schema); } @@ -749,11 +761,12 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // If Arrow extensions are enabled, both fields should be treated as json() extension // fields. ArrowReaderProperties props; - props.set_arrow_extensions_enabled(); + props.set_arrow_extensions_enabled(true); auto arrow_schema = ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true), ::arrow::field("json_2", ::arrow::extension::json(), true)}); - std::shared_ptr metadata = ::arrow::key_value_metadata({}, {}); + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); CheckFlatSchema(arrow_schema); } @@ -768,14 +781,8 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), ::arrow::field("json_2", UTF8, true)}); - ASSERT_OK_AND_ASSIGN( - std::shared_ptr serialized, - ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); - std::string schema_as_string = serialized->ToString(); - std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); - std::shared_ptr metadata = - ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); - + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata)); CheckFlatSchema(arrow_schema, true /* check_metadata */); } @@ -784,23 +791,17 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // Parquet file contains Arrow schema. // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 // is a JSON column and json_2 is a utf8 column. json_2 should be treated as a - // utf8 column even if the get_arrow_extensions_enabled is true. + // utf8 column even if arrow extensions are enabled. ArrowReaderProperties props; - props.get_arrow_extensions_enabled(); + props.set_arrow_extensions_enabled(true); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), ::arrow::field("json_2", UTF8, true)}); - ASSERT_OK_AND_ASSIGN( - std::shared_ptr serialized, - ::arrow::ipc::SerializeSchema(*arrow_schema, ::arrow::default_memory_pool())); - std::string schema_as_string = serialized->ToString(); - std::string schema_base64 = ::arrow::util::base64_encode(schema_as_string); - std::shared_ptr metadata = - ::arrow::key_value_metadata({"ARROW:schema"}, {schema_base64}); - + std::shared_ptr metadata; + ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); CheckFlatSchema(arrow_schema, true /* check_metadata */); } diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 49229447cf2..2be02f9c1bb 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -432,7 +432,6 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, if (ext_type->extension_name() == std::string("arrow.json")) { type = ParquetType::BYTE_ARRAY; logical_type = LogicalType::JSON(); - break; } std::shared_ptr<::arrow::Field> storage_field = ::arrow::field( name, ext_type->storage_type(), field->nullable(), field->metadata()); @@ -995,6 +994,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); if (inferred_type->id() != ::arrow::Type::EXTENSION && + inferred_type->id() == ::arrow::Type::STRING && ex_type.extension_name() == std::string("arrow.json")) { // Schema mismatch. // @@ -1003,7 +1003,6 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer // inferred_type is ::arrow::utf8() // // Origin type is restored as Arrow should be considered the source of truth. - DCHECK_EQ(inferred_type->id(), ::arrow::Type::STRING); inferred->field = inferred->field->WithType(origin_type); RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred)); } else { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 6fc8a4b62ab..eb1118ce5ec 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -947,8 +947,9 @@ class PARQUET_EXPORT ArrowReaderProperties { /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow /// schema. Currently only arrow::extension::json() extension type is supported. This /// will be used for utf8 columns whose LogicalType is JSON. - void set_arrow_extensions_enabled() { arrow_extensions_enabled_ = true; } - void set_arrow_extensions_disabled() { arrow_extensions_enabled_ = false; } + void set_arrow_extensions_enabled(bool extensions_enabled) { + arrow_extensions_enabled_ = extensions_enabled; + } bool get_arrow_extensions_enabled() const { return arrow_extensions_enabled_; } private: From 7d3ec48f0915fe1fd958100764b0d2d672fc5ca6 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 30 Aug 2024 02:20:53 +0200 Subject: [PATCH 15/26] Review feedback --- .../parquet/arrow/arrow_reader_writer_test.cc | 17 +++++++++-------- cpp/src/parquet/arrow/arrow_schema_test.cc | 15 ++++++++------- cpp/src/parquet/arrow/schema.cc | 6 ++++-- cpp/src/parquet/properties.h | 10 ++++++---- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 17b50486c56..5223790677a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1394,8 +1394,8 @@ using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>; TEST_F(TestLargeBinaryParquetIO, Basics) { const char* json = "[\"foo\", \"\", null, \"\xff\"]"; - const auto large_type = ::arrow::large_utf8(); - const auto narrow_type = ::arrow::utf8(); + const auto large_type = ::arrow::large_binary(); + const auto narrow_type = ::arrow::binary(); const auto large_array = ::arrow::ArrayFromJSON(large_type, json); const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json); @@ -1417,8 +1417,8 @@ using TestLargeStringParquetIO = TestParquetIO<::arrow::LargeStringType>; TEST_F(TestLargeStringParquetIO, Basics) { const char* json = R"(["foo", "", null, "bar"])"; - const auto large_type = ::arrow::large_binary(); - const auto narrow_type = ::arrow::binary(); + const auto large_type = ::arrow::large_utf8(); + const auto narrow_type = ::arrow::utf8(); const auto large_array = ::arrow::ArrayFromJSON(large_type, json); const auto narrow_array = ::arrow::ArrayFromJSON(narrow_type, json); @@ -1455,15 +1455,16 @@ TEST_F(TestJsonParquetIO, JsonExtension) { // When the original Arrow schema isn't stored and Arrow extensions are disabled, // LogicalType::JSON is read as utf8. - const auto utf8_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); - this->RoundTripSingleColumn(json_array, utf8_array, default_arrow_writer_properties()); + this->RoundTripSingleColumn(json_array, json_string_array, + default_arrow_writer_properties()); // When the original Arrow schema isn't stored and Arrow extensions are enabled, // LogicalType::JSON is read as JsonExtensionType. ::parquet::ArrowReaderProperties reader_properties; reader_properties.set_arrow_extensions_enabled(true); - this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), - reader_properties); + // TODO: should be json_array, json_array + this->RoundTripSingleColumn(json_array, json_string_array, + default_arrow_writer_properties(), reader_properties); // When the original Arrow schema is stored, the stored Arrow type is always respected. const auto writer_properties = diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index c1a76529200..efc43c76800 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -750,8 +750,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // By default, both fields should be treated as utf8() fields in Arrow. auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", UTF8, true), ::arrow::field("json_2", UTF8, true)}); - std::shared_ptr metadata; - ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); + std::shared_ptr metadata{}; ASSERT_OK(ConvertSchema(parquet_fields, metadata)); CheckFlatSchema(arrow_schema); } @@ -765,8 +764,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { auto arrow_schema = ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true), ::arrow::field("json_2", ::arrow::extension::json(), true)}); - std::shared_ptr metadata; - ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); + std::shared_ptr metadata{}; ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); CheckFlatSchema(arrow_schema); } @@ -774,7 +772,9 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file contains Arrow schema. // Arrow schema has precedence. json_1 should be returned as a json() field even - // though extensions are not enabled. + // though extensions are not enabled and json_2 should be returned as a utf8() field. + ArrowReaderProperties props; + props.set_arrow_extensions_enabled(true); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( @@ -790,7 +790,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file contains Arrow schema. // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 - // is a JSON column and json_2 is a utf8 column. json_2 should be treated as a + // is a JSON column and json_2 is an utf8 column. json_2 should be treated as an // utf8 column even if arrow extensions are enabled. ArrowReaderProperties props; props.set_arrow_extensions_enabled(true); @@ -803,7 +803,8 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); - CheckFlatSchema(arrow_schema, true /* check_metadata */); + // TODO + // CheckFlatSchema(arrow_schema, true /* check_metadata */); } } diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 2be02f9c1bb..4fe4cbcf758 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -994,8 +994,10 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); if (inferred_type->id() != ::arrow::Type::EXTENSION && - inferred_type->id() == ::arrow::Type::STRING && - ex_type.extension_name() == std::string("arrow.json")) { + ex_type.extension_name() == ::arrow::extension::JsonExtensionType::type_name() && + (inferred_type->id() == ::arrow::Type::STRING || + inferred_type->id() == ::arrow::Type::LARGE_STRING || + inferred_type->id() == ::arrow::Type::STRING_VIEW)) { // Schema mismatch. // // Arrow extensions are DISABLED in Parquet. diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index eb1118ce5ec..5532331ab31 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -942,11 +942,13 @@ class PARQUET_EXPORT ArrowReaderProperties { return coerce_int96_timestamp_unit_; } - /// Enable Parquet supported Arrow Extension Types. + /// Enable Parquet supported Arrow ExtensionTypes. /// - /// When enabled, Parquet will use supported Arrow ExtensionTypes in mapping to Arrow - /// schema. Currently only arrow::extension::json() extension type is supported. This - /// will be used for utf8 columns whose LogicalType is JSON. + /// When enabled, Parquet will use supported Arrow ExtensionTypes by mapping correctly + /// mapping them to Arrow types at read time. Currently only arrow::extension::json() + /// extension type is supported. Columns whose LogicalType is JSON will be interpreted + /// as arrow::extension::json() ExtensionType with storage type utf8, large_utf8 or + /// utf8_view at parquet read time. void set_arrow_extensions_enabled(bool extensions_enabled) { arrow_extensions_enabled_ = extensions_enabled; } From 76628c8c12cf42b2310ff799c603a0b88b120572 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 30 Aug 2024 17:06:06 +0200 Subject: [PATCH 16/26] Add invalid utf8 test --- .../extension/fixed_shape_tensor_test.cc | 6 ++-- cpp/src/arrow/extension/json_test.cc | 35 +++++++++++++++++-- cpp/src/arrow/extension/uuid_test.cc | 4 +-- cpp/src/arrow/extension_type_test.cc | 6 ++-- cpp/src/arrow/ipc/test_common.cc | 17 ++++----- cpp/src/arrow/ipc/test_common.h | 4 +-- .../parquet/arrow/arrow_reader_writer_test.cc | 5 ++- cpp/src/parquet/arrow/arrow_schema_test.cc | 5 ++- cpp/src/parquet/arrow/schema.cc | 7 ++-- 9 files changed, 59 insertions(+), 30 deletions(-) diff --git a/cpp/src/arrow/extension/fixed_shape_tensor_test.cc b/cpp/src/arrow/extension/fixed_shape_tensor_test.cc index 842a78e1a4f..51aea4b25fd 100644 --- a/cpp/src/arrow/extension/fixed_shape_tensor_test.cc +++ b/cpp/src/arrow/extension/fixed_shape_tensor_test.cc @@ -205,7 +205,7 @@ TEST_F(TestExtensionType, RoundtripBatch) { std::shared_ptr read_batch; auto ext_field = field(/*name=*/"f0", /*type=*/ext_type_); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); // Pass extension metadata and storage array, expect getting back extension array @@ -216,7 +216,7 @@ TEST_F(TestExtensionType, RoundtripBatch) { ext_field = field(/*name=*/"f0", /*type=*/element_type_, /*nullable=*/true, /*metadata=*/ext_metadata); auto batch2 = RecordBatch::Make(schema({ext_field}), fsla_arr->length(), {fsla_arr}); - RoundtripBatch(batch2, &read_batch2); + ASSERT_OK(RoundtripBatch(batch2, &read_batch2)); CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true); } @@ -469,7 +469,7 @@ TEST_F(TestExtensionType, RoundtripBatchFromTensor) { auto ext_field = field("f0", ext_type_, true, ext_metadata); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); } diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 59491620f9d..dfe137390d4 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -21,6 +21,7 @@ #include "arrow/ipc/test_common.h" #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" +#include "parquet/exception.h" namespace arrow { @@ -43,14 +44,28 @@ std::shared_ptr ExampleJson(const std::shared_ptr& storage_type return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); } +static std::shared_ptr ExampleJsonInvalidUTF8( + const std::shared_ptr& storage_type) { + return ArrayFromJSON(storage_type, + "[" + R"( + "Hi", + "olá mundo", + "你好世界", + "", + )" + "\"\xa0\xa1\"" + "]"); +} + TEST_F(TestJsonExtensionType, JsonRoundtrip) { for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { - auto ext_arr = ExampleJson(storage_type); - + std::shared_ptr ext_arr = ExampleJson(storage_type); auto batch = RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); + std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); ASSERT_OK(read_batch->ValidateFull()); CompareBatch(*batch, *read_batch, /*compare_metadata*/ true); @@ -60,4 +75,18 @@ TEST_F(TestJsonExtensionType, JsonRoundtrip) { } } +TEST_F(TestJsonExtensionType, InvalidUTF8) { + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + std::shared_ptr ext_arr = ExampleJsonInvalidUTF8(storage_type); + auto batch = + RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); + ; + + std::shared_ptr read_batch; + ASSERT_RAISES_WITH_MESSAGE(IOError, + "IOError: Array length did not match record batch length", + RoundtripBatch(batch, &read_batch)); + } +} + } // namespace arrow diff --git a/cpp/src/arrow/extension/uuid_test.cc b/cpp/src/arrow/extension/uuid_test.cc index 3bbb6eeb4ae..1c1ffb6eb8e 100644 --- a/cpp/src/arrow/extension/uuid_test.cc +++ b/cpp/src/arrow/extension/uuid_test.cc @@ -54,7 +54,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) { std::shared_ptr read_batch; auto ext_field = field(/*name=*/"f0", /*type=*/ext_type); auto batch = RecordBatch::Make(schema({ext_field}), ext_arr->length(), {ext_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, /*compare_metadata=*/true); // Pass extension metadata and storage array, expect getting back extension array @@ -65,7 +65,7 @@ TEST(TestUuuidExtensionType, RoundtripBatch) { ext_field = field(/*name=*/"f0", /*type=*/exact_ext_type->storage_type(), /*nullable=*/true, /*metadata=*/ext_metadata); auto batch2 = RecordBatch::Make(schema({ext_field}), arr->length(), {arr}); - RoundtripBatch(batch2, &read_batch2); + ASSERT_OK(RoundtripBatch(batch2, &read_batch2)); CompareBatch(*batch, *read_batch2, /*compare_metadata=*/true); } diff --git a/cpp/src/arrow/extension_type_test.cc b/cpp/src/arrow/extension_type_test.cc index f49ffc5cba5..029d833b98c 100644 --- a/cpp/src/arrow/extension_type_test.cc +++ b/cpp/src/arrow/extension_type_test.cc @@ -219,14 +219,14 @@ TEST_F(TestExtensionType, IpcRoundtrip) { auto batch = RecordBatch::Make(schema({field("f0", uuid())}), 4, {ext_arr}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); // Wrap type in a ListArray and ensure it also makes it auto offsets_arr = ArrayFromJSON(int32(), "[0, 0, 2, 4]"); ASSERT_OK_AND_ASSIGN(auto list_arr, ListArray::FromArrays(*offsets_arr, *ext_arr)); batch = RecordBatch::Make(schema({field("f0", list(uuid()))}), 3, {list_arr}); - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); } @@ -289,7 +289,7 @@ TEST_F(TestExtensionType, ParametricTypes) { 4, {p1, p2, p3, p4}); std::shared_ptr read_batch; - RoundtripBatch(batch, &read_batch); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); CompareBatch(*batch, *read_batch, false /* compare_metadata */); } diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc index fb4f6bd8ead..e354e2f89b3 100644 --- a/cpp/src/arrow/ipc/test_common.cc +++ b/cpp/src/arrow/ipc/test_common.cc @@ -1236,18 +1236,19 @@ Status MakeRandomTensor(const std::shared_ptr& type, return Tensor::Make(type, buf, shape, strides).Value(out); } -void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out) { - ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create()); - ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), - out_stream.get())); +Status RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out) { + ARROW_ASSIGN_OR_RAISE(auto out_stream, io::BufferOutputStream::Create()); + RETURN_NOT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), + out_stream.get())); - ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish()); + ARROW_ASSIGN_OR_RAISE(auto complete_ipc_stream, out_stream->Finish()); io::BufferReader reader(complete_ipc_stream); std::shared_ptr batch_reader; - ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); - ASSERT_OK(batch_reader->ReadNext(out)); + ARROW_ASSIGN_OR_RAISE(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); + RETURN_NOT_OK(batch_reader->ReadNext(out)); + return Status::OK(); } } // namespace test diff --git a/cpp/src/arrow/ipc/test_common.h b/cpp/src/arrow/ipc/test_common.h index 9b7e7f13e3a..189de288795 100644 --- a/cpp/src/arrow/ipc/test_common.h +++ b/cpp/src/arrow/ipc/test_common.h @@ -184,8 +184,8 @@ Status MakeRandomTensor(const std::shared_ptr& type, const std::vector& shape, bool row_major_p, std::shared_ptr* out, uint32_t seed = 0); -ARROW_TESTING_EXPORT void RoundtripBatch(const std::shared_ptr& batch, - std::shared_ptr* out); +ARROW_TESTING_EXPORT Status RoundtripBatch(const std::shared_ptr& batch, + std::shared_ptr* out); } // namespace test } // namespace ipc diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 5223790677a..d25164eae64 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1462,9 +1462,8 @@ TEST_F(TestJsonParquetIO, JsonExtension) { // LogicalType::JSON is read as JsonExtensionType. ::parquet::ArrowReaderProperties reader_properties; reader_properties.set_arrow_extensions_enabled(true); - // TODO: should be json_array, json_array - this->RoundTripSingleColumn(json_array, json_string_array, - default_arrow_writer_properties(), reader_properties); + this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), + reader_properties); // When the original Arrow schema is stored, the stored Arrow type is always respected. const auto writer_properties = diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index efc43c76800..c5c1b3b0958 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -783,7 +783,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); - ASSERT_OK(ConvertSchema(parquet_fields, metadata)); + ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); CheckFlatSchema(arrow_schema, true /* check_metadata */); } @@ -803,8 +803,7 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); - // TODO - // CheckFlatSchema(arrow_schema, true /* check_metadata */); + CheckFlatSchema(arrow_schema, true /* check_metadata */); } } diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 4fe4cbcf758..67282facbd5 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -430,8 +430,10 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, auto ext_type = std::static_pointer_cast<::arrow::ExtensionType>(field->type()); // Built-in JSON extension is handled differently. if (ext_type->extension_name() == std::string("arrow.json")) { + // Set physical and logical types and instantiate primitive node. type = ParquetType::BYTE_ARRAY; logical_type = LogicalType::JSON(); + break; } std::shared_ptr<::arrow::Field> storage_field = ::arrow::field( name, ext_type->storage_type(), field->nullable(), field->metadata()); @@ -994,7 +996,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer if (origin_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*origin_type); if (inferred_type->id() != ::arrow::Type::EXTENSION && - ex_type.extension_name() == ::arrow::extension::JsonExtensionType::type_name() && + ex_type.extension_name() == std::string("arrow.json") && (inferred_type->id() == ::arrow::Type::STRING || inferred_type->id() == ::arrow::Type::LARGE_STRING || inferred_type->id() == ::arrow::Type::STRING_VIEW)) { @@ -1023,8 +1025,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer } else { if (inferred_type->id() == ::arrow::Type::EXTENSION) { const auto& ex_type = checked_cast(*inferred_type); - if (ex_type.extension_name() == - ::arrow::extension::JsonExtensionType::type_name()) { + if (ex_type.extension_name() == std::string("arrow.json")) { // Schema mismatch. // // Arrow extensions are ENABLED in Parquet. From 1b66f112edd0769761c0d1cbdc353dd46710ff0f Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Fri, 30 Aug 2024 17:45:34 +0200 Subject: [PATCH 17/26] Update cpp/src/arrow/array/validate.cc Co-authored-by: mwish --- cpp/src/arrow/array/validate.cc | 6 +++--- cpp/src/arrow/extension/json_test.cc | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/array/validate.cc b/cpp/src/arrow/array/validate.cc index eb1db3e0a8f..69f1646054f 100644 --- a/cpp/src/arrow/array/validate.cc +++ b/cpp/src/arrow/array/validate.cc @@ -993,9 +993,9 @@ Status ValidateUTF8(const ArrayData& data) { storage_type->id() == Type::LARGE_STRING); if (data.type->id() == Type::EXTENSION) { - const auto& ext_data = std::make_shared(data); - ext_data->type = storage_type; - UTF8DataValidator validator{*ext_data}; + ArrayData ext_data(data); + ext_data.type = storage_type; + UTF8DataValidator validator{ext_data}; return VisitTypeInline(*storage_type, &validator); } else { UTF8DataValidator validator{data}; diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index dfe137390d4..36c17e690c7 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -80,7 +80,6 @@ TEST_F(TestJsonExtensionType, InvalidUTF8) { std::shared_ptr ext_arr = ExampleJsonInvalidUTF8(storage_type); auto batch = RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); - ; std::shared_ptr read_batch; ASSERT_RAISES_WITH_MESSAGE(IOError, From eab70b67c9ffb2b3f82e11f79f092d5d2cf0b606 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Mon, 2 Sep 2024 21:54:38 +0200 Subject: [PATCH 18/26] Review feedback --- cpp/src/arrow/extension/json.h | 4 -- cpp/src/arrow/extension/json_test.cc | 58 ++++++++++++++++++---------- cpp/src/arrow/util/utf8_util_test.cc | 27 ------------- 3 files changed, 38 insertions(+), 51 deletions(-) diff --git a/cpp/src/arrow/extension/json.h b/cpp/src/arrow/extension/json.h index 11d435b81df..4793ab2bc9b 100644 --- a/cpp/src/arrow/extension/json.h +++ b/cpp/src/arrow/extension/json.h @@ -35,10 +35,6 @@ class ARROW_EXPORT JsonExtensionType : public ExtensionType { std::string extension_name() const override { return "arrow.json"; } - std::string ToString(bool show_metadata = false) const override { - return "extension"; - }; - bool ExtensionEquals(const ExtensionType& other) const override; Result> Deserialize( diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 36c17e690c7..8e414184b0a 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -44,20 +44,6 @@ std::shared_ptr ExampleJson(const std::shared_ptr& storage_type return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); } -static std::shared_ptr ExampleJsonInvalidUTF8( - const std::shared_ptr& storage_type) { - return ArrayFromJSON(storage_type, - "[" - R"( - "Hi", - "olá mundo", - "你好世界", - "", - )" - "\"\xa0\xa1\"" - "]"); -} - TEST_F(TestJsonExtensionType, JsonRoundtrip) { for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { std::shared_ptr ext_arr = ExampleJson(storage_type); @@ -77,14 +63,46 @@ TEST_F(TestJsonExtensionType, JsonRoundtrip) { TEST_F(TestJsonExtensionType, InvalidUTF8) { for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { - std::shared_ptr ext_arr = ExampleJsonInvalidUTF8(storage_type); - auto batch = - RecordBatch::Make(schema({field("f0", json(storage_type))}), 8, {ext_arr}); + auto json_type = json(storage_type); + auto invalid_input = ArrayFromJSON(storage_type, "[\"Ⱥa\xFFⱭ\", \"Ɽ\xe1\xbdⱤaA\"]"); + auto ext_arr = + ExtensionType::WrapArray(arrow::extension::json(storage_type), invalid_input); + + ASSERT_RAISES_WITH_MESSAGE(Invalid, + "Invalid: Invalid UTF8 sequence at string index 0", + ext_arr->ValidateFull()); + ASSERT_RAISES_WITH_MESSAGE(Invalid, + "Invalid: Invalid UTF8 sequence at string index 0", + arrow::internal::ValidateUTF8(*ext_arr)); + auto batch = RecordBatch::Make(schema({field("f0", json_type)}), 2, {ext_arr}); std::shared_ptr read_batch; - ASSERT_RAISES_WITH_MESSAGE(IOError, - "IOError: Array length did not match record batch length", - RoundtripBatch(batch, &read_batch)); + ASSERT_OK(RoundtripBatch(batch, &read_batch)); + } +} + +class UTF8ExtensionArrayTest : public ::testing::Test { + public: + static std::shared_ptr ExampleJson( + const std::shared_ptr& storage_type) { + std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ + "null", + "1234", + "3.14159", + "true", + "false", + "\"a json string\"", + "[\"a\", \"json\", \"array\"]", + "{\"obj\": \"a simple json object\"}" + ])"); + return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); + } +}; + +TEST_F(UTF8ExtensionArrayTest, JSONExtensionType) { + for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { + const auto ext_arr = ExampleJson(storage_type); + ASSERT_OK(arrow::internal::ValidateUTF8(*ext_arr)); } } diff --git a/cpp/src/arrow/util/utf8_util_test.cc b/cpp/src/arrow/util/utf8_util_test.cc index 2114d949f22..bada5e59d8a 100644 --- a/cpp/src/arrow/util/utf8_util_test.cc +++ b/cpp/src/arrow/util/utf8_util_test.cc @@ -22,8 +22,6 @@ #include -#include "arrow/array/validate.h" -#include "arrow/extension/json.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/string.h" #include "arrow/util/utf8_internal.h" @@ -564,30 +562,5 @@ TEST(UTF8Length, Basics) { ASSERT_EQ(length("\xf0\x9f\x99\x8c"), 1); } -class UTF8ExtensionArrayTest : public ::testing::Test { - public: - static std::shared_ptr ExampleJson( - const std::shared_ptr& storage_type) { - std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ - "null", - "1234", - "3.14159", - "true", - "false", - "\"a json string\"", - "[\"a\", \"json\", \"array\"]", - "{\"obj\": \"a simple json object\"}" - ])"); - return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); - } -}; - -TEST_F(UTF8ExtensionArrayTest, JSONExtensionType) { - for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { - const auto ext_arr = ExampleJson(storage_type); - ASSERT_OK(arrow::internal::ValidateUTF8(*ext_arr)); - } -} - } // namespace util } // namespace arrow From 51676cb448b845c3992edb4d1342c51a672a2327 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 5 Sep 2024 17:20:51 +0200 Subject: [PATCH 19/26] Update cpp/src/arrow/extension/json_test.cc Co-authored-by: Antoine Pitrou --- cpp/src/arrow/extension/json_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 8e414184b0a..850c903f7be 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -65,8 +65,7 @@ TEST_F(TestJsonExtensionType, InvalidUTF8) { for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { auto json_type = json(storage_type); auto invalid_input = ArrayFromJSON(storage_type, "[\"Ⱥa\xFFⱭ\", \"Ɽ\xe1\xbdⱤaA\"]"); - auto ext_arr = - ExtensionType::WrapArray(arrow::extension::json(storage_type), invalid_input); + auto ext_arr = ExtensionType::WrapArray(json_type, invalid_input); ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Invalid UTF8 sequence at string index 0", From 5551d7b4ca986d00db936d4ecf793e47ed27816b Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 5 Sep 2024 17:27:43 +0200 Subject: [PATCH 20/26] Review feedback --- cpp/src/arrow/extension/json.cc | 3 +-- cpp/src/arrow/extension/json_test.cc | 25 ------------------------- cpp/src/parquet/arrow/schema.cc | 2 +- 3 files changed, 2 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/extension/json.cc b/cpp/src/arrow/extension/json.cc index 828559716df..d793233c2b5 100644 --- a/cpp/src/arrow/extension/json.cc +++ b/cpp/src/arrow/extension/json.cc @@ -28,8 +28,7 @@ namespace arrow::extension { bool JsonExtensionType::ExtensionEquals(const ExtensionType& other) const { - const auto& other_ext = static_cast(other); - return other_ext.extension_name() == this->extension_name(); + return other.extension_name() == this->extension_name(); } Result> JsonExtensionType::Deserialize( diff --git a/cpp/src/arrow/extension/json_test.cc b/cpp/src/arrow/extension/json_test.cc index 850c903f7be..143e4f9ceea 100644 --- a/cpp/src/arrow/extension/json_test.cc +++ b/cpp/src/arrow/extension/json_test.cc @@ -80,29 +80,4 @@ TEST_F(TestJsonExtensionType, InvalidUTF8) { } } -class UTF8ExtensionArrayTest : public ::testing::Test { - public: - static std::shared_ptr ExampleJson( - const std::shared_ptr& storage_type) { - std::shared_ptr arr = ArrayFromJSON(storage_type, R"([ - "null", - "1234", - "3.14159", - "true", - "false", - "\"a json string\"", - "[\"a\", \"json\", \"array\"]", - "{\"obj\": \"a simple json object\"}" - ])"); - return ExtensionType::WrapArray(arrow::extension::json(storage_type), arr); - } -}; - -TEST_F(UTF8ExtensionArrayTest, JSONExtensionType) { - for (const auto& storage_type : {utf8(), large_utf8(), utf8_view()}) { - const auto ext_arr = ExampleJson(storage_type); - ASSERT_OK(arrow::internal::ValidateUTF8(*ext_arr)); - } -} - } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 67282facbd5..c6625c0d2bb 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1034,7 +1034,7 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer // // Origin type is restored as Arrow should be considered the source of truth. DCHECK_EQ(origin_type->id(), ::arrow::Type::STRING); - inferred->field = inferred->field->WithType(origin_type); + inferred->field = inferred->field->WithType(inferred_type); } } ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred)); From e9b44ad355b250ebc484bfe8a0bc7f6a47078a2c Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Thu, 5 Sep 2024 22:44:07 +0200 Subject: [PATCH 21/26] Review feedback --- cpp/src/parquet/arrow/arrow_schema_test.cc | 10 +++++----- cpp/src/parquet/arrow/schema.cc | 14 -------------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index c5c1b3b0958..7d6083f8c36 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -771,15 +771,15 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file contains Arrow schema. - // Arrow schema has precedence. json_1 should be returned as a json() field even - // though extensions are not enabled and json_2 should be returned as a utf8() field. + // Parquet logical type has precedence. Both json_1 and json_2 should be returned + // as a json() field even though extensions are not enabled. ArrowReaderProperties props; props.set_arrow_extensions_enabled(true); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", UTF8, true)}); + ::arrow::field("json_2", ::arrow::extension::json(), true)}); std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); @@ -791,14 +791,14 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // Parquet file contains Arrow schema. // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 // is a JSON column and json_2 is an utf8 column. json_2 should be treated as an - // utf8 column even if arrow extensions are enabled. + // JSON column even if arrow extensions are enabled. ArrowReaderProperties props; props.set_arrow_extensions_enabled(true); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", UTF8, true)}); + ::arrow::field("json_2", ::arrow::extension::json(), true)}); std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index c6625c0d2bb..1623d80dcb0 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -1023,20 +1023,6 @@ Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* infer } modified = true; } else { - if (inferred_type->id() == ::arrow::Type::EXTENSION) { - const auto& ex_type = checked_cast(*inferred_type); - if (ex_type.extension_name() == std::string("arrow.json")) { - // Schema mismatch. - // - // Arrow extensions are ENABLED in Parquet. - // origin_type is ::arrow::utf8() - // inferred_type is ::arrow::extension::json() - // - // Origin type is restored as Arrow should be considered the source of truth. - DCHECK_EQ(origin_type->id(), ::arrow::Type::STRING); - inferred->field = inferred->field->WithType(inferred_type); - } - } ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred)); } From 9c09cbe01e30002fb5c43aae1b82a6f693e86d44 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 10 Sep 2024 19:28:44 +0200 Subject: [PATCH 22/26] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- cpp/src/parquet/properties.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5532331ab31..7f2e371df66 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -942,13 +942,13 @@ class PARQUET_EXPORT ArrowReaderProperties { return coerce_int96_timestamp_unit_; } - /// Enable Parquet supported Arrow ExtensionTypes. + /// Enable Parquet-supported Arrow extension types. /// - /// When enabled, Parquet will use supported Arrow ExtensionTypes by mapping correctly - /// mapping them to Arrow types at read time. Currently only arrow::extension::json() + /// When enabled, Parquet logical types will be mapped to their corresponding Arrow + /// extension types at read time, if such exist. Currently only arrow::extension::json() /// extension type is supported. Columns whose LogicalType is JSON will be interpreted - /// as arrow::extension::json() ExtensionType with storage type utf8, large_utf8 or - /// utf8_view at parquet read time. + /// as arrow::extension::json(), with storage type inferred from the serialized Arrow + /// schema if present, or `utf8` by default. void set_arrow_extensions_enabled(bool extensions_enabled) { arrow_extensions_enabled_ = extensions_enabled; } From f518ebfd2acab3916f74d6a2e1088487ab095d82 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Tue, 10 Sep 2024 22:04:25 +0200 Subject: [PATCH 23/26] Review feedback --- .../parquet/arrow/arrow_reader_writer_test.cc | 20 ++++++++++++++----- cpp/src/parquet/arrow/arrow_schema_test.cc | 12 ++++++----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d25164eae64..5d990a5c6bd 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -1450,25 +1450,35 @@ TEST_F(TestJsonParquetIO, JsonExtension) { ])"; const auto json_type = ::arrow::extension::json(); - const auto json_string_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); - const auto json_array = ::arrow::ExtensionType::WrapArray(json_type, json_string_array); + const auto string_array = ::arrow::ArrayFromJSON(::arrow::utf8(), json); + const auto json_array = ::arrow::ExtensionType::WrapArray(json_type, string_array); + + const auto json_large_type = ::arrow::extension::json(::arrow::large_utf8()); + const auto large_string_array = ::arrow::ArrayFromJSON(::arrow::large_utf8(), json); + const auto json_large_array = + ::arrow::ExtensionType::WrapArray(json_large_type, large_string_array); // When the original Arrow schema isn't stored and Arrow extensions are disabled, // LogicalType::JSON is read as utf8. - this->RoundTripSingleColumn(json_array, json_string_array, + this->RoundTripSingleColumn(json_array, string_array, + default_arrow_writer_properties()); + this->RoundTripSingleColumn(json_large_array, string_array, default_arrow_writer_properties()); // When the original Arrow schema isn't stored and Arrow extensions are enabled, - // LogicalType::JSON is read as JsonExtensionType. + // LogicalType::JSON is read as JsonExtensionType with utf8 storage. ::parquet::ArrowReaderProperties reader_properties; reader_properties.set_arrow_extensions_enabled(true); this->RoundTripSingleColumn(json_array, json_array, default_arrow_writer_properties(), reader_properties); + this->RoundTripSingleColumn(json_large_array, json_array, + default_arrow_writer_properties(), reader_properties); - // When the original Arrow schema is stored, the stored Arrow type is always respected. + // When the original Arrow schema is stored, the stored Arrow type is respected. const auto writer_properties = ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); this->RoundTripSingleColumn(json_array, json_array, writer_properties); + this->RoundTripSingleColumn(json_large_array, json_large_array, writer_properties); } using TestNullParquetIO = TestParquetIO<::arrow::NullType>; diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 7d6083f8c36..03f9af10190 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -761,9 +761,10 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // fields. ArrowReaderProperties props; props.set_arrow_extensions_enabled(true); - auto arrow_schema = - ::arrow::schema({::arrow::field("json_1", ::arrow::extension::json(), true), - ::arrow::field("json_2", ::arrow::extension::json(), true)}); + auto arrow_schema = ::arrow::schema( + {::arrow::field("json_1", ::arrow::extension::json(), true), + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); std::shared_ptr metadata{}; ASSERT_OK(ConvertSchema(parquet_fields, metadata, props)); CheckFlatSchema(arrow_schema); @@ -774,12 +775,13 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { // Parquet logical type has precedence. Both json_1 and json_2 should be returned // as a json() field even though extensions are not enabled. ArrowReaderProperties props; - props.set_arrow_extensions_enabled(true); + props.set_arrow_extensions_enabled(false); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", ::arrow::extension::json(), true)}); + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); From e2f82a88f7a695a5e2c5f1d7cfb8478b07e2b899 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 11 Sep 2024 10:14:01 +0200 Subject: [PATCH 24/26] Update cpp/src/parquet/arrow/arrow_schema_test.cc Co-authored-by: Antoine Pitrou --- cpp/src/parquet/arrow/arrow_schema_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 03f9af10190..9a5e44d7237 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -772,8 +772,8 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { { // Parquet file contains Arrow schema. - // Parquet logical type has precedence. Both json_1 and json_2 should be returned - // as a json() field even though extensions are not enabled. + // Both json_1 and json_2 should be returned as a json() field + // even though extensions are not enabled. ArrowReaderProperties props; props.set_arrow_extensions_enabled(false); std::shared_ptr field_metadata = From e32805e389abbb959ec24baf60cc218afc473892 Mon Sep 17 00:00:00 2001 From: Rok Mihevc Date: Wed, 11 Sep 2024 10:18:56 +0200 Subject: [PATCH 25/26] Review feedback --- cpp/src/parquet/arrow/arrow_schema_test.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index 9a5e44d7237..cb485f3dd83 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -790,17 +790,15 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { } { - // Parquet file contains Arrow schema. - // A contrived example. Parquet believes both columns are JSON. Arrow believes json_1 - // is a JSON column and json_2 is an utf8 column. json_2 should be treated as an - // JSON column even if arrow extensions are enabled. + // Parquet file contains Arrow schema. Extensions are enabled. + // Both json_1 and json_2 should be returned as a json() field ArrowReaderProperties props; props.set_arrow_extensions_enabled(true); std::shared_ptr field_metadata = ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", ::arrow::extension::json(), true)}); + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), true)}); std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata)); From 1ca8f1b9d4b4abe8cb3527eeac95830d463590e6 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 11 Sep 2024 11:31:29 +0200 Subject: [PATCH 26/26] Fix lint --- cpp/src/parquet/arrow/arrow_schema_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_schema_test.cc b/cpp/src/parquet/arrow/arrow_schema_test.cc index cb485f3dd83..31ead461aa6 100644 --- a/cpp/src/parquet/arrow/arrow_schema_test.cc +++ b/cpp/src/parquet/arrow/arrow_schema_test.cc @@ -798,7 +798,8 @@ TEST_F(TestConvertParquetSchema, ParquetSchemaArrowExtensions) { ::arrow::key_value_metadata({"foo", "bar"}, {"biz", "baz"}); auto arrow_schema = ::arrow::schema( {::arrow::field("json_1", ::arrow::extension::json(), true, field_metadata), - ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), true)}); + ::arrow::field("json_2", ::arrow::extension::json(::arrow::large_utf8()), + true)}); std::shared_ptr metadata; ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata));