diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 6dc8358f502..8ba9200f1b6 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -906,6 +906,7 @@ endif() if(ARROW_JSON) arrow_add_object_library(ARROW_JSON extension/fixed_shape_tensor.cc + extension/opaque.cc json/options.cc json/chunked_builder.cc json/chunker.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc index 3df86e7d693..724e01daeba 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc @@ -22,6 +22,7 @@ #include "arrow/compute/kernels/scalar_cast_internal.h" #include "arrow/compute/kernels/util_internal.h" #include "arrow/scalar.h" +#include "arrow/type_fwd.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/float16.h" #include "arrow/util/int_util.h" @@ -865,6 +866,25 @@ std::shared_ptr GetCastToHalfFloat() { return func; } +struct NullExtensionTypeMatcher : public TypeMatcher { + ~NullExtensionTypeMatcher() override = default; + + bool Matches(const DataType& type) const override { + return type.id() == Type::EXTENSION && + static_cast(type).storage_id() == Type::NA; + } + + std::string ToString() const override { return "extension"; } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } +}; + } // namespace std::vector> GetNumericCasts() { @@ -875,6 +895,10 @@ std::vector> GetNumericCasts() { auto cast_null = std::make_shared("cast_null", Type::NA); DCHECK_OK(cast_null->AddKernel(Type::DICTIONARY, {InputType(Type::DICTIONARY)}, null(), OutputAllNull)); + // Explicitly allow casting extension type with null backing array to null + DCHECK_OK(cast_null->AddKernel( + Type::EXTENSION, {InputType(std::make_shared())}, null(), + OutputAllNull)); functions.push_back(cast_null); functions.push_back(GetCastToInteger("cast_int8")); diff --git a/cpp/src/arrow/extension/CMakeLists.txt b/cpp/src/arrow/extension/CMakeLists.txt index c15c42874d4..6741ab602f5 100644 --- a/cpp/src/arrow/extension/CMakeLists.txt +++ b/cpp/src/arrow/extension/CMakeLists.txt @@ -21,4 +21,10 @@ add_arrow_test(test PREFIX "arrow-fixed-shape-tensor") +add_arrow_test(test + SOURCES + opaque_test.cc + PREFIX + "arrow-extension-opaque") + arrow_install_all_headers("arrow/extension") diff --git a/cpp/src/arrow/extension/opaque.cc b/cpp/src/arrow/extension/opaque.cc new file mode 100644 index 00000000000..3d9a46b4c8c --- /dev/null +++ b/cpp/src/arrow/extension/opaque.cc @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension/opaque.h" + +#include + +#include "arrow/json/rapidjson_defs.h" // IWYU pragma: keep +#include "arrow/util/logging.h" + +#include +#include +#include + +namespace arrow::extension { + +std::string OpaqueType::ToString(bool show_metadata) const { + std::stringstream ss; + ss << "extension<" << this->extension_name() + << "[storage_type=" << storage_type_->ToString() << ", type_name=" << type_name_ + << ", vendor_name=" << vendor_name_ << "]>"; + return ss.str(); +} + +bool OpaqueType::ExtensionEquals(const ExtensionType& other) const { + if (extension_name() != other.extension_name()) { + return false; + } + const auto& opaque = internal::checked_cast(other); + return storage_type()->Equals(*opaque.storage_type()) && + type_name() == opaque.type_name() && vendor_name() == opaque.vendor_name(); +} + +std::string OpaqueType::Serialize() const { + rapidjson::Document document; + document.SetObject(); + rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); + + rapidjson::Value type_name(rapidjson::StringRef(type_name_)); + document.AddMember(rapidjson::Value("type_name", allocator), type_name, allocator); + rapidjson::Value vendor_name(rapidjson::StringRef(vendor_name_)); + document.AddMember(rapidjson::Value("vendor_name", allocator), vendor_name, allocator); + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + document.Accept(writer); + return buffer.GetString(); +} + +Result> OpaqueType::Deserialize( + std::shared_ptr storage_type, const std::string& serialized_data) const { + rapidjson::Document document; + const auto& parsed = document.Parse(serialized_data.data(), serialized_data.length()); + if (parsed.HasParseError()) { + return Status::Invalid("Invalid serialized JSON data for OpaqueType: ", + rapidjson::GetParseError_En(parsed.GetParseError()), ": ", + serialized_data); + } else if (!document.IsObject()) { + return Status::Invalid("Invalid serialized JSON data for OpaqueType: not an object"); + } + if (!document.HasMember("type_name")) { + return Status::Invalid( + "Invalid serialized JSON data for OpaqueType: missing type_name"); + } else if (!document.HasMember("vendor_name")) { + return Status::Invalid( + "Invalid serialized JSON data for OpaqueType: missing vendor_name"); + } + + const auto& type_name = document["type_name"]; + const auto& vendor_name = document["vendor_name"]; + if (!type_name.IsString()) { + return Status::Invalid( + "Invalid serialized JSON data for OpaqueType: type_name is not a string"); + } else if (!vendor_name.IsString()) { + return Status::Invalid( + "Invalid serialized JSON data for OpaqueType: vendor_name is not a string"); + } + + return opaque(std::move(storage_type), type_name.GetString(), vendor_name.GetString()); +} + +std::shared_ptr OpaqueType::MakeArray(std::shared_ptr data) const { + DCHECK_EQ(data->type->id(), Type::EXTENSION); + DCHECK_EQ("arrow.opaque", + internal::checked_cast(*data->type).extension_name()); + return std::make_shared(data); +} + +std::shared_ptr opaque(std::shared_ptr storage_type, + std::string type_name, std::string vendor_name) { + return std::make_shared(std::move(storage_type), std::move(type_name), + std::move(vendor_name)); +} + +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/opaque.h b/cpp/src/arrow/extension/opaque.h new file mode 100644 index 00000000000..9814b391cba --- /dev/null +++ b/cpp/src/arrow/extension/opaque.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/extension_type.h" +#include "arrow/type.h" + +namespace arrow::extension { + +/// \brief Opaque is a placeholder for a type from an external (usually +/// non-Arrow) system that could not be interpreted. +class ARROW_EXPORT OpaqueType : public ExtensionType { + public: + /// \brief Construct an OpaqueType. + /// + /// \param[in] storage_type The underlying storage type. Should be + /// arrow::null if there is no data. + /// \param[in] type_name The name of the type in the external system. + /// \param[in] vendor_name The name of the external system. + explicit OpaqueType(std::shared_ptr storage_type, std::string type_name, + std::string vendor_name) + : ExtensionType(std::move(storage_type)), + type_name_(std::move(type_name)), + vendor_name_(std::move(vendor_name)) {} + + std::string extension_name() const override { return "arrow.opaque"; } + std::string ToString(bool show_metadata) const override; + bool ExtensionEquals(const ExtensionType& other) const override; + std::string Serialize() const override; + Result> Deserialize( + std::shared_ptr storage_type, + const std::string& serialized_data) const override; + /// Create an OpaqueArray from ArrayData + std::shared_ptr MakeArray(std::shared_ptr data) const override; + + std::string_view type_name() const { return type_name_; } + std::string_view vendor_name() const { return vendor_name_; } + + private: + std::string type_name_; + std::string vendor_name_; +}; + +/// \brief Opaque is a wrapper for (usually binary) data from an external +/// (often non-Arrow) system that could not be interpreted. +class ARROW_EXPORT OpaqueArray : public ExtensionArray { + public: + using ExtensionArray::ExtensionArray; +}; + +/// \brief Return an OpaqueType instance. +ARROW_EXPORT std::shared_ptr opaque(std::shared_ptr storage_type, + std::string type_name, + std::string vendor_name); + +} // namespace arrow::extension diff --git a/cpp/src/arrow/extension/opaque_test.cc b/cpp/src/arrow/extension/opaque_test.cc new file mode 100644 index 00000000000..719ff888034 --- /dev/null +++ b/cpp/src/arrow/extension/opaque_test.cc @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/extension/opaque.h" +#include "arrow/extension_type.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/testing/extension_type.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type_fwd.h" +#include "arrow/util/checked_cast.h" + +namespace arrow { + +TEST(OpaqueType, Basics) { + auto type = internal::checked_pointer_cast( + extension::opaque(null(), "type", "vendor")); + auto type2 = internal::checked_pointer_cast( + extension::opaque(null(), "type2", "vendor")); + ASSERT_EQ("arrow.opaque", type->extension_name()); + ASSERT_EQ(*type, *type); + ASSERT_NE(*arrow::null(), *type); + ASSERT_NE(*type, *type2); + ASSERT_EQ(*arrow::null(), *type->storage_type()); + ASSERT_THAT(type->Serialize(), ::testing::Not(::testing::IsEmpty())); + ASSERT_EQ(R"({"type_name":"type","vendor_name":"vendor"})", type->Serialize()); + ASSERT_EQ("type", type->type_name()); + ASSERT_EQ("vendor", type->vendor_name()); + ASSERT_EQ( + "extension", + type->ToString(false)); +} + +TEST(OpaqueType, Equals) { + auto type = internal::checked_pointer_cast( + extension::opaque(null(), "type", "vendor")); + auto type2 = internal::checked_pointer_cast( + extension::opaque(null(), "type2", "vendor")); + auto type3 = internal::checked_pointer_cast( + extension::opaque(null(), "type", "vendor2")); + auto type4 = internal::checked_pointer_cast( + extension::opaque(int64(), "type", "vendor")); + auto type5 = internal::checked_pointer_cast( + extension::opaque(null(), "type", "vendor")); + + ASSERT_EQ(*type, *type); + ASSERT_EQ(*type2, *type2); + ASSERT_EQ(*type3, *type3); + ASSERT_EQ(*type4, *type4); + ASSERT_EQ(*type5, *type5); + + ASSERT_EQ(*type, *type5); + + ASSERT_NE(*type, *type2); + ASSERT_NE(*type, *type3); + ASSERT_NE(*type, *type4); + + ASSERT_NE(*type2, *type); + ASSERT_NE(*type2, *type3); + ASSERT_NE(*type2, *type4); + + ASSERT_NE(*type3, *type); + ASSERT_NE(*type3, *type2); + ASSERT_NE(*type3, *type4); + + ASSERT_NE(*type4, *type); + ASSERT_NE(*type4, *type2); + ASSERT_NE(*type4, *type3); +} + +TEST(OpaqueType, CreateFromArray) { + auto type = internal::checked_pointer_cast( + extension::opaque(binary(), "geometry", "adbc.postgresql")); + auto storage = ArrayFromJSON(binary(), R"(["foobar", null])"); + auto array = ExtensionType::WrapArray(type, storage); + ASSERT_EQ(2, array->length()); + ASSERT_EQ(1, array->null_count()); +} + +void CheckDeserialize(const std::string& serialized, + const std::shared_ptr& expected) { + auto type = internal::checked_pointer_cast(expected); + ASSERT_OK_AND_ASSIGN(auto deserialized, + type->Deserialize(type->storage_type(), serialized)); + ASSERT_EQ(*expected, *deserialized); +} + +TEST(OpaqueType, Deserialize) { + ASSERT_NO_FATAL_FAILURE( + CheckDeserialize(R"({"type_name": "type", "vendor_name": "vendor"})", + extension::opaque(null(), "type", "vendor"))); + ASSERT_NO_FATAL_FAILURE( + CheckDeserialize(R"({"type_name": "long name", "vendor_name": "long name"})", + extension::opaque(null(), "long name", "long name"))); + ASSERT_NO_FATAL_FAILURE( + CheckDeserialize(R"({"type_name": "名前", "vendor_name": "名字"})", + extension::opaque(null(), "名前", "名字"))); + ASSERT_NO_FATAL_FAILURE(CheckDeserialize( + R"({"type_name": "type", "vendor_name": "vendor", "extra_field": 2})", + extension::opaque(null(), "type", "vendor"))); + + auto type = internal::checked_pointer_cast( + extension::opaque(null(), "type", "vendor")); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("The document is empty"), + type->Deserialize(null(), R"()")); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, + testing::HasSubstr("Missing a name for object member"), + type->Deserialize(null(), R"({)")); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("not an object"), + type->Deserialize(null(), R"([])")); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("missing type_name"), + type->Deserialize(null(), R"({})")); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("type_name is not a string"), + type->Deserialize(null(), R"({"type_name": 2, "vendor_name": ""})")); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("type_name is not a string"), + type->Deserialize(null(), R"({"type_name": null, "vendor_name": ""})")); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("vendor_name is not a string"), + type->Deserialize(null(), R"({"vendor_name": 2, "type_name": ""})")); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("vendor_name is not a string"), + type->Deserialize(null(), R"({"vendor_name": null, "type_name": ""})")); +} + +TEST(OpaqueType, MetadataRoundTrip) { + for (const auto& type : { + extension::opaque(null(), "foo", "bar"), + extension::opaque(binary(), "geometry", "postgis"), + extension::opaque(fixed_size_list(int64(), 4), "foo", "bar"), + extension::opaque(utf8(), "foo", "bar"), + }) { + auto opaque = internal::checked_pointer_cast(type); + std::string serialized = opaque->Serialize(); + ASSERT_OK_AND_ASSIGN(auto deserialized, + opaque->Deserialize(opaque->storage_type(), serialized)); + ASSERT_EQ(*type, *deserialized); + } +} + +TEST(OpaqueType, BatchRoundTrip) { + auto type = internal::checked_pointer_cast( + extension::opaque(binary(), "geometry", "adbc.postgresql")); + ExtensionTypeGuard guard(type); + + auto storage = ArrayFromJSON(binary(), R"(["foobar", null])"); + auto array = ExtensionType::WrapArray(type, storage); + auto batch = + RecordBatch::Make(schema({field("field", type)}), array->length(), {array}); + + std::shared_ptr written; + { + ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create()); + ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(), + out_stream.get())); + + ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish()); + + io::BufferReader reader(complete_ipc_stream); + std::shared_ptr batch_reader; + ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader)); + ASSERT_OK(batch_reader->ReadNext(&written)); + } + + ASSERT_EQ(*batch->schema(), *written->schema()); + ASSERT_BATCHES_EQUAL(*batch, *written); +} + +} // namespace arrow diff --git a/docs/source/format/CanonicalExtensions.rst b/docs/source/format/CanonicalExtensions.rst index c258f889dc6..1d86fcf23c4 100644 --- a/docs/source/format/CanonicalExtensions.rst +++ b/docs/source/format/CanonicalExtensions.rst @@ -283,6 +283,116 @@ UUID A specific UUID version is not required or guaranteed. This extension represents UUIDs as FixedSizeBinary(16) with big-endian notation and does not interpret the bytes in any way. +Opaque +====== + +Opaque represents a type that an Arrow-based system received from an external +(often non-Arrow) system, but that it cannot interpret. In this case, it can +pass on Opaque to its clients to at least show that a field exists and +preserve metadata about the type from the other system. + +Extension parameters: + +* Extension name: ``arrow.opaque``. + +* The storage type of this extension is any type. If there is no underlying + data, the storage type should be Null. + +* Extension type parameters: + + * **type_name** = the name of the unknown type in the external system. + * **vendor_name** = the name of the external system. + +* Description of the serialization: + + A valid JSON object containing the parameters as fields. In the future, + additional fields may be added, but all fields current and future are never + required to interpret the array. + + Developers **should not** attempt to enable public semantic interoperability + of Opaque by canonicalizing specific values of these parameters. + +Rationale +--------- + +Interfacing with non-Arrow systems requires a way to handle data that doesn't +have an equivalent Arrow type. In this case, use the Opaque type, which +explicitly represents an unsupported field. Other solutions are inadequate: + +* Raising an error means even one unsupported field makes all operations + impossible, even if (for instance) the user is just trying to view a schema. +* Dropping unsupported columns misleads the user as to the actual schema. +* An extension type may not exist for the unsupported type. +* Generating an extension type on the fly would falsely imply support. + +Applications **should not** make conventions around vendor_name and type_name. +These parameters are meant for human end users to understand what type wasn't +supported. Applications may try to interpret these fields, but must be +prepared for breakage (e.g., when the type becomes supported with a custom +extension type later on). Similarly, **Opaque is not a generic container for +file formats**. Considerations such as MIME types are irrelevant. In both of +these cases, create a custom extension type instead. + +Examples: + +* A Flight SQL service that supports connecting external databases may + encounter columns with unsupported types in external tables. In this case, + it can use the Opaque[Null] type to at least report that a column exists + with a particular name and type name. This lets clients know that a column + exists, but is not supported. Null is used as the storage type here because + only schemas are involved. + + An example of the extension metadata would be:: + + {"type_name": "varray", "vendor_name": "Oracle"} + +* The ADBC PostgreSQL driver gets results as a series of length-prefixed byte + fields. But the driver will not always know how to parse the bytes, as + there may be extensions (e.g. PostGIS). It can use Opaque[Binary] to still + return those bytes to the application, which may be able to parse the data + itself. Opaque differentiates the column from an actual binary column and + makes it clear that the value is directly from PostgreSQL. (A custom + extension type is preferred, but there will always be extensions that the + driver does not know about.) + + An example of the extension metadata would be:: + + {"type_name": "geometry", "vendor_name": "PostGIS"} + +* The ADBC PostgreSQL driver may also know how to parse the bytes, but not + know the intended semantics. For example, `composite types + `_ can add new + semantics to existing types, somewhat like Arrow extension types. The + driver would be able to parse the underlying bytes in this case, but would + still use the Opaque type. + + Consider the example in the PostgreSQL documentation of a ``complex`` type. + Mapping the type to a plain Arrow ``struct`` type would lose meaning, just + like how an Arrow system deciding to treat all extension types by dropping + the extension metadata would be undesirable. Instead, the driver can use + Opaque[Struct] to pass on the composite type info. (It would be wrong to + try to map this to an Arrow-defined complex type: it does not know the + proper semantics of a user-defined type, which cannot and should not be + hardcoded into the driver in the first place.) + + An example of the extension metadata would be:: + + {"type_name": "database_name.schema_name.complex", "vendor_name": "PostgreSQL"} + +* The JDBC adapter in the Arrow Java libraries converts JDBC result sets into + Arrow arrays, and can get Arrow schemas from result sets. JDBC, however, + allows drivers to return `arbitrary Java objects + `_. + + The driver can use Opaque[Null] as a placeholder during schema conversion, + only erroring if the application tries to fetch the actual data. That way, + clients can at least introspect result schemas to decide whether it can + proceed to fetch the data, or only query certain columns. + + An example of the extension metadata would be:: + + {"type_name": "OTHER", "vendor_name": "JDBC driver name"} + ========================= Community Extension Types ========================= diff --git a/go/arrow/extensions/doc.go b/go/arrow/extensions/doc.go new file mode 100644 index 00000000000..65b086e2eca --- /dev/null +++ b/go/arrow/extensions/doc.go @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package extensions provides implementations of Arrow canonical extension +// types as defined in the Arrow specification. +// https://arrow.apache.org/docs/format/CanonicalExtensions.html +package extensions diff --git a/go/arrow/extensions/opaque.go b/go/arrow/extensions/opaque.go new file mode 100644 index 00000000000..9533c4a20bc --- /dev/null +++ b/go/arrow/extensions/opaque.go @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions + +import ( + "encoding/json" + "fmt" + "reflect" + "unsafe" + + "github.com/apache/arrow/go/v18/arrow" + "github.com/apache/arrow/go/v18/arrow/array" +) + +// OpaqueType is a placeholder for a type from an extenal (usually +// non-arrow) system that could not be interpreted. +type OpaqueType struct { + arrow.ExtensionBase `json:"-"` + + TypeName string `json:"type_name"` + VendorName string `json:"vendor_name"` +} + +func NewOpaqueType(storageType arrow.DataType, name, vendorName string) *OpaqueType { + return &OpaqueType{ExtensionBase: arrow.ExtensionBase{Storage: storageType}, + TypeName: name, VendorName: vendorName} +} + +func (*OpaqueType) ArrayType() reflect.Type { + return reflect.TypeOf(OpaqueArray{}) +} + +func (*OpaqueType) ExtensionName() string { + return "arrow.opaque" +} + +func (o *OpaqueType) String() string { + return fmt.Sprintf("extension<%s[storage_type=%s, type_name=%s, vendor_name=%s]>", + o.ExtensionName(), o.Storage, o.TypeName, o.VendorName) +} + +func (o *OpaqueType) Serialize() string { + data, _ := json.Marshal(o) + return string(data) +} + +func (*OpaqueType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { + var out OpaqueType + err := json.Unmarshal(unsafe.Slice(unsafe.StringData(data), len(data)), &out) + if err != nil { + return nil, err + } + + switch { + case out.TypeName == "": + return nil, fmt.Errorf("%w: serialized JSON data for OpaqueType missing type_name", + arrow.ErrInvalid) + case out.VendorName == "": + return nil, fmt.Errorf("%w: serialized JSON data for OpaqueType missing vendor_name", + arrow.ErrInvalid) + } + + out.ExtensionBase = arrow.ExtensionBase{Storage: storageType} + return &out, nil +} + +func (o *OpaqueType) ExtensionEquals(other arrow.ExtensionType) bool { + if o.ExtensionName() != other.ExtensionName() { + return false + } + + rhs, ok := other.(*OpaqueType) + if !ok { + return false + } + + return arrow.TypeEqual(o.Storage, rhs.Storage) && + o.TypeName == rhs.TypeName && + o.VendorName == rhs.VendorName +} + +type OpaqueArray struct { + array.ExtensionArrayBase +} + +var ( + _ arrow.ExtensionType = (*OpaqueType)(nil) + _ array.ExtensionArray = (*OpaqueArray)(nil) +) diff --git a/go/arrow/extensions/opaque_test.go b/go/arrow/extensions/opaque_test.go new file mode 100644 index 00000000000..b6686e97bc0 --- /dev/null +++ b/go/arrow/extensions/opaque_test.go @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package extensions_test + +import ( + "bytes" + "strings" + "testing" + + "github.com/apache/arrow/go/v18/arrow" + "github.com/apache/arrow/go/v18/arrow/array" + "github.com/apache/arrow/go/v18/arrow/extensions" + "github.com/apache/arrow/go/v18/arrow/ipc" + "github.com/apache/arrow/go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOpaqueTypeBasics(t *testing.T) { + typ := extensions.NewOpaqueType(arrow.Null, "type", "vendor") + typ2 := extensions.NewOpaqueType(arrow.Null, "type2", "vendor") + + assert.Equal(t, "arrow.opaque", typ.ExtensionName()) + assert.True(t, typ.ExtensionEquals(typ)) + assert.False(t, arrow.TypeEqual(arrow.Null, typ)) + assert.False(t, arrow.TypeEqual(typ, typ2)) + assert.True(t, arrow.TypeEqual(arrow.Null, typ.StorageType())) + assert.JSONEq(t, `{"type_name": "type", "vendor_name": "vendor"}`, typ.Serialize()) + assert.Equal(t, "type", typ.TypeName) + assert.Equal(t, "vendor", typ.VendorName) + assert.Equal(t, "extension", + typ.String()) +} + +func TestOpaqueTypeEquals(t *testing.T) { + typ := extensions.NewOpaqueType(arrow.Null, "type", "vendor") + typ2 := extensions.NewOpaqueType(arrow.Null, "type2", "vendor") + typ3 := extensions.NewOpaqueType(arrow.Null, "type", "vendor2") + typ4 := extensions.NewOpaqueType(arrow.PrimitiveTypes.Int64, "type", "vendor") + typ5 := extensions.NewOpaqueType(arrow.Null, "type", "vendor") + + tests := []struct { + lhs, rhs arrow.ExtensionType + expected bool + }{ + {typ, typ, true}, + {typ2, typ2, true}, + {typ3, typ3, true}, + {typ4, typ4, true}, + {typ5, typ5, true}, + {typ, typ5, true}, + {typ, typ2, false}, + {typ, typ3, false}, + {typ, typ4, false}, + {typ2, typ, false}, + {typ2, typ3, false}, + {typ2, typ4, false}, + {typ3, typ, false}, + {typ3, typ2, false}, + {typ3, typ4, false}, + {typ4, typ, false}, + {typ4, typ2, false}, + {typ4, typ3, false}, + } + + for _, tt := range tests { + assert.Equalf(t, tt.expected, arrow.TypeEqual(tt.lhs, tt.rhs), + "%s == %s", tt.lhs, tt.rhs) + } +} + +func TestOpaqueTypeCreateFromArray(t *testing.T) { + typ := extensions.NewOpaqueType(arrow.BinaryTypes.String, "geometry", "adbc.postgresql") + storage, _, err := array.FromJSON(memory.DefaultAllocator, arrow.BinaryTypes.String, + strings.NewReader(`["foobar", null]`)) + require.NoError(t, err) + defer storage.Release() + + arr := array.NewExtensionArrayWithStorage(typ, storage) + defer arr.Release() + + assert.Equal(t, 2, arr.Len()) + assert.Equal(t, 1, arr.NullN()) +} + +func TestOpaqueTypeDeserialize(t *testing.T) { + tests := []struct { + serialized string + expected *extensions.OpaqueType + }{ + {`{"type_name": "type", "vendor_name": "vendor"}`, + extensions.NewOpaqueType(arrow.Null, "type", "vendor")}, + {`{"type_name": "long name", "vendor_name": "long name"}`, + extensions.NewOpaqueType(arrow.Null, "long name", "long name")}, + {`{"type_name": "名前", "vendor_name": "名字"}`, + extensions.NewOpaqueType(arrow.Null, "名前", "名字")}, + {`{"type_name": "type", "vendor_name": "vendor", "extra_field": 2}`, + extensions.NewOpaqueType(arrow.Null, "type", "vendor")}, + } + + for _, tt := range tests { + deserialized, err := tt.expected.Deserialize(tt.expected.Storage, tt.serialized) + require.NoError(t, err) + assert.Truef(t, arrow.TypeEqual(tt.expected, deserialized), "%s != %s", + tt.expected, deserialized) + } + + typ := extensions.NewOpaqueType(arrow.Null, "type", "vendor") + _, err := typ.Deserialize(arrow.Null, "") + assert.ErrorContains(t, err, "unexpected end of JSON input") + + _, err = typ.Deserialize(arrow.Null, "[]") + assert.ErrorContains(t, err, "cannot unmarshal array") + + _, err = typ.Deserialize(arrow.Null, "{}") + assert.ErrorIs(t, err, arrow.ErrInvalid) + assert.ErrorContains(t, err, "serialized JSON data for OpaqueType missing type_name") + + _, err = typ.Deserialize(arrow.Null, `{"type_name": ""}`) + assert.ErrorIs(t, err, arrow.ErrInvalid) + assert.ErrorContains(t, err, "serialized JSON data for OpaqueType missing type_name") + + _, err = typ.Deserialize(arrow.Null, `{"type_name": "type"}`) + assert.ErrorIs(t, err, arrow.ErrInvalid) + assert.ErrorContains(t, err, "serialized JSON data for OpaqueType missing vendor_name") + + _, err = typ.Deserialize(arrow.Null, `{"type_name": "type", "vendor_name": ""}`) + assert.ErrorIs(t, err, arrow.ErrInvalid) + assert.ErrorContains(t, err, "serialized JSON data for OpaqueType missing vendor_name") +} + +func TestOpaqueTypeMetadataRoundTrip(t *testing.T) { + tests := []*extensions.OpaqueType{ + extensions.NewOpaqueType(arrow.Null, "foo", "bar"), + extensions.NewOpaqueType(arrow.BinaryTypes.Binary, "geometry", "postgis"), + extensions.NewOpaqueType(arrow.FixedSizeListOf(4, arrow.PrimitiveTypes.Int64), "foo", "bar"), + extensions.NewOpaqueType(arrow.BinaryTypes.String, "foo", "bar"), + } + + for _, tt := range tests { + serialized := tt.Serialize() + deserialized, err := tt.Deserialize(tt.Storage, serialized) + require.NoError(t, err) + assert.Truef(t, arrow.TypeEqual(tt, deserialized), "%s != %s", tt, deserialized) + } +} + +func TestOpaqueTypeBatchRoundTrip(t *testing.T) { + typ := extensions.NewOpaqueType(arrow.BinaryTypes.String, "geometry", "adbc.postgresql") + arrow.RegisterExtensionType(typ) + defer arrow.UnregisterExtensionType(typ.ExtensionName()) + + storage, _, err := array.FromJSON(memory.DefaultAllocator, arrow.BinaryTypes.String, + strings.NewReader(`["foobar", null]`)) + require.NoError(t, err) + defer storage.Release() + + arr := array.NewExtensionArrayWithStorage(typ, storage) + defer arr.Release() + + batch := array.NewRecord(arrow.NewSchema([]arrow.Field{{Name: "field", Type: typ, Nullable: true}}, nil), + []arrow.Array{arr}, -1) + defer batch.Release() + + var written arrow.Record + { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(batch.Schema())) + require.NoError(t, wr.Write(batch)) + require.NoError(t, wr.Close()) + + rdr, err := ipc.NewReader(&buf) + require.NoError(t, err) + written, err = rdr.Read() + require.NoError(t, err) + written.Retain() + defer written.Release() + rdr.Release() + } + + assert.Truef(t, batch.Schema().Equal(written.Schema()), "expected: %s, got: %s", + batch.Schema(), written.Schema()) + + assert.Truef(t, array.RecordEqual(batch, written), "expected: %s, got: %s", + batch, written) +} diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java index 783a373c6d0..ea9ffe55d33 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java @@ -211,6 +211,8 @@ public JdbcToArrowConfigBuilder setTargetBatchSize(int targetBatchSize) { * *

Defaults to wrapping {@link JdbcToArrowUtils#getArrowTypeFromJdbcType(JdbcFieldInfo, * Calendar)}. + * + * @see JdbcToArrowUtils#reportUnsupportedTypesAsUnknown(Function) */ public JdbcToArrowConfigBuilder setJdbcToArrowTypeConverter( Function jdbcToArrowTypeConverter) { diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java index 8397d4c9e0d..aecb734a8bb 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java @@ -18,6 +18,7 @@ import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; +import static org.apache.arrow.vector.types.Types.MinorType; import java.io.IOException; import java.math.RoundingMode; @@ -37,6 +38,7 @@ import java.util.Locale; import java.util.Map; import java.util.TimeZone; +import java.util.function.Function; import org.apache.arrow.adapter.jdbc.consumer.ArrayConsumer; import org.apache.arrow.adapter.jdbc.consumer.BigIntConsumer; import org.apache.arrow.adapter.jdbc.consumer.BinaryConsumer; @@ -80,6 +82,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.extension.OpaqueType; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; @@ -216,11 +219,28 @@ public static ArrowType getArrowTypeFromJdbcType( case Types.STRUCT: return new ArrowType.Struct(); default: - // no-op, shouldn't get here throw new UnsupportedOperationException("Unmapped JDBC type: " + fieldInfo.getJdbcType()); } } + /** + * Wrap a JDBC to Arrow type converter such that {@link UnsupportedOperationException} becomes + * {@link OpaqueType}. + * + * @param typeConverter The type converter to wrap. + * @param vendorName The database name to report as the Opaque type's vendor name. + */ + public static Function reportUnsupportedTypesAsOpaque( + Function typeConverter, String vendorName) { + return (final JdbcFieldInfo fieldInfo) -> { + try { + return typeConverter.apply(fieldInfo); + } catch (UnsupportedOperationException e) { + return new OpaqueType(MinorType.NULL.getType(), fieldInfo.getTypeName(), vendorName); + } + }; + } + /** * Create Arrow {@link Schema} object for the given JDBC {@link java.sql.ResultSetMetaData}. * diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowDataTypesTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowDataTypesTest.java index 5537e1acba2..c246bb2bec4 100644 --- a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowDataTypesTest.java +++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowDataTypesTest.java @@ -32,19 +32,27 @@ import static org.apache.arrow.adapter.jdbc.JdbcToArrowTestHelper.assertTinyIntVectorValues; import static org.apache.arrow.adapter.jdbc.JdbcToArrowTestHelper.assertVarBinaryVectorValues; import static org.apache.arrow.adapter.jdbc.JdbcToArrowTestHelper.assertVarcharVectorValues; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.IOException; +import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; import java.util.Arrays; import java.util.Calendar; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.arrow.adapter.jdbc.AbstractJdbcToArrowTest; +import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; import org.apache.arrow.adapter.jdbc.JdbcToArrowTestHelper; import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; import org.apache.arrow.adapter.jdbc.Table; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -62,7 +70,12 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.extension.OpaqueType; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -189,6 +202,44 @@ public void testJdbcSchemaMetadata(Table table) throws SQLException, ClassNotFou JdbcToArrowTestHelper.assertFieldMetadataMatchesResultSetMetadata(rsmd, schema); } + @Test + void testOpaqueType() throws SQLException, ClassNotFoundException { + try (BufferAllocator allocator = new RootAllocator()) { + String url = "jdbc:h2:mem:JdbcToArrowTest"; + String driver = "org.h2.Driver"; + Class.forName(driver); + conn = DriverManager.getConnection(url); + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("CREATE TABLE unknowntype (a GEOMETRY, b INT)"); + } + + String query = "SELECT * FROM unknowntype"; + Calendar calendar = Calendar.getInstance(); + Function typeConverter = + (field) -> JdbcToArrowUtils.getArrowTypeFromJdbcType(field, calendar); + JdbcToArrowConfig config = + new JdbcToArrowConfigBuilder() + .setAllocator(allocator) + .setJdbcToArrowTypeConverter( + JdbcToArrowUtils.reportUnsupportedTypesAsOpaque(typeConverter, "H2")) + .build(); + Schema schema; + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query)) { + schema = + assertDoesNotThrow(() -> JdbcToArrowUtils.jdbcToArrowSchema(rs.getMetaData(), config)); + } + + Schema expected = + new Schema( + Arrays.asList( + Field.nullable( + "A", new OpaqueType(Types.MinorType.NULL.getType(), "GEOMETRY", "H2")), + Field.nullable("B", Types.MinorType.INT.getType()))); + assertEquals(expected, schema); + } + } + /** * This method calls the assert methods for various DataSets. * diff --git a/java/vector/src/main/java/module-info.java b/java/vector/src/main/java/module-info.java index 73af2d1b67e..1e4789fd914 100644 --- a/java/vector/src/main/java/module-info.java +++ b/java/vector/src/main/java/module-info.java @@ -25,6 +25,7 @@ exports org.apache.arrow.vector.complex.writer; exports org.apache.arrow.vector.compression; exports org.apache.arrow.vector.dictionary; + exports org.apache.arrow.vector.extension; exports org.apache.arrow.vector.holders; exports org.apache.arrow.vector.ipc; exports org.apache.arrow.vector.ipc.message; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/extension/InvalidExtensionMetadataException.java b/java/vector/src/main/java/org/apache/arrow/vector/extension/InvalidExtensionMetadataException.java new file mode 100644 index 00000000000..2349a7d4bc2 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/extension/InvalidExtensionMetadataException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.extension; + +/** The extension metadata was malformed. */ +public class InvalidExtensionMetadataException extends RuntimeException { + public InvalidExtensionMetadataException(String message) { + super(message); + } + + public InvalidExtensionMetadataException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java new file mode 100644 index 00000000000..a0e898a543f --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueType.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.extension; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.DurationVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float2Vector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.IntervalDayVector; +import org.apache.arrow.vector.IntervalMonthDayNanoVector; +import org.apache.arrow.vector.IntervalYearVector; +import org.apache.arrow.vector.LargeVarBinaryVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.NullVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecTZVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.ViewVarBinaryVector; +import org.apache.arrow.vector.ViewVarCharVector; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ExtensionTypeRegistry; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; + +/** + * Opaque is a placeholder for a type from an external (usually non-Arrow) system that could not be + * interpreted. + */ +public class OpaqueType extends ArrowType.ExtensionType { + private static final AtomicBoolean registered = new AtomicBoolean(false); + public static final String EXTENSION_NAME = "arrow.opaque"; + private final ArrowType storageType; + private final String typeName; + private final String vendorName; + + /** Register the extension type so it can be used globally. */ + public static void ensureRegistered() { + if (!registered.getAndSet(true)) { + // The values don't matter, we just need an instance + ExtensionTypeRegistry.register(new OpaqueType(Types.MinorType.NULL.getType(), "", "")); + } + } + + /** + * Create a new type instance. + * + * @param storageType The underlying Arrow type. + * @param typeName The name of the unknown type. + * @param vendorName The name of the originating system of the unknown type. + */ + public OpaqueType(ArrowType storageType, String typeName, String vendorName) { + this.storageType = Objects.requireNonNull(storageType, "storageType"); + this.typeName = Objects.requireNonNull(typeName, "typeName"); + this.vendorName = Objects.requireNonNull(vendorName, "vendorName"); + } + + @Override + public ArrowType storageType() { + return storageType; + } + + public String typeName() { + return typeName; + } + + public String vendorName() { + return vendorName; + } + + @Override + public String extensionName() { + return EXTENSION_NAME; + } + + @Override + public boolean extensionEquals(ExtensionType other) { + return other != null + && EXTENSION_NAME.equals(other.extensionName()) + && other instanceof OpaqueType + && storageType.equals(other.storageType()) + && typeName.equals(((OpaqueType) other).typeName()) + && vendorName.equals(((OpaqueType) other).vendorName()); + } + + @Override + public String serialize() { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode object = mapper.createObjectNode(); + object.put("type_name", typeName); + object.put("vendor_name", vendorName); + try { + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not serialize " + this, e); + } + } + + @Override + public ArrowType deserialize(ArrowType storageType, String serializedData) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode object; + try { + object = mapper.readTree(serializedData); + } catch (JsonProcessingException e) { + throw new InvalidExtensionMetadataException("Extension metadata is invalid", e); + } + JsonNode typeName = object.get("type_name"); + JsonNode vendorName = object.get("vendor_name"); + if (typeName == null) { + throw new InvalidExtensionMetadataException("typeName is missing"); + } + if (vendorName == null) { + throw new InvalidExtensionMetadataException("vendorName is missing"); + } + if (!typeName.isTextual()) { + throw new InvalidExtensionMetadataException("typeName should be string, was " + typeName); + } + if (!vendorName.isTextual()) { + throw new InvalidExtensionMetadataException("vendorName should be string, was " + vendorName); + } + return new OpaqueType(storageType, typeName.asText(), vendorName.asText()); + } + + @Override + public FieldVector getNewVector(String name, FieldType fieldType, BufferAllocator allocator) { + // XXX: fieldType is supposed to be the extension type + final Field field = new Field(name, fieldType, Collections.emptyList()); + final FieldVector underlyingVector = + storageType.accept(new UnderlyingVectorTypeVisitor(name, allocator)); + return new OpaqueVector(field, allocator, underlyingVector); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), storageType, typeName, vendorName); + } + + @Override + public String toString() { + return "OpaqueType(" + + storageType + + ", typeName='" + + typeName + + '\'' + + ", vendorName='" + + vendorName + + '\'' + + ')'; + } + + private static class UnderlyingVectorTypeVisitor implements ArrowTypeVisitor { + private final String name; + private final BufferAllocator allocator; + + UnderlyingVectorTypeVisitor(String name, BufferAllocator allocator) { + this.name = name; + this.allocator = allocator; + } + + @Override + public FieldVector visit(Null type) { + return new NullVector(name); + } + + private RuntimeException unsupported(ArrowType type) { + throw new UnsupportedOperationException( + "OpaqueType#getUnderlyingVector is not supported for storage type: " + type); + } + + @Override + public FieldVector visit(Struct type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(List type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(LargeList type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(FixedSizeList type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(Union type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(Map type) { + throw unsupported(type); + } + + @Override + public FieldVector visit(Int type) { + return new IntVector(name, allocator); + } + + @Override + public FieldVector visit(FloatingPoint type) { + switch (type.getPrecision()) { + case HALF: + return new Float2Vector(name, allocator); + case SINGLE: + return new Float4Vector(name, allocator); + case DOUBLE: + return new Float8Vector(name, allocator); + default: + throw unsupported(type); + } + } + + @Override + public FieldVector visit(Utf8 type) { + return new VarCharVector(name, allocator); + } + + @Override + public FieldVector visit(Utf8View type) { + return new ViewVarCharVector(name, allocator); + } + + @Override + public FieldVector visit(LargeUtf8 type) { + return new LargeVarCharVector(name, allocator); + } + + @Override + public FieldVector visit(Binary type) { + return new VarBinaryVector(name, allocator); + } + + @Override + public FieldVector visit(BinaryView type) { + return new ViewVarBinaryVector(name, allocator); + } + + @Override + public FieldVector visit(LargeBinary type) { + return new LargeVarBinaryVector(name, allocator); + } + + @Override + public FieldVector visit(FixedSizeBinary type) { + return new FixedSizeBinaryVector(Field.nullable(name, type), allocator); + } + + @Override + public FieldVector visit(Bool type) { + return new BitVector(name, allocator); + } + + @Override + public FieldVector visit(Decimal type) { + if (type.getBitWidth() == 128) { + return new DecimalVector(Field.nullable(name, type), allocator); + } else if (type.getBitWidth() == 256) { + return new Decimal256Vector(Field.nullable(name, type), allocator); + } + throw unsupported(type); + } + + @Override + public FieldVector visit(Date type) { + switch (type.getUnit()) { + case DAY: + return new DateDayVector(name, allocator); + case MILLISECOND: + return new DateMilliVector(name, allocator); + default: + throw unsupported(type); + } + } + + @Override + public FieldVector visit(Time type) { + switch (type.getUnit()) { + case SECOND: + return new TimeSecVector(name, allocator); + case MILLISECOND: + return new TimeMilliVector(name, allocator); + case MICROSECOND: + return new TimeMicroVector(name, allocator); + case NANOSECOND: + return new TimeNanoVector(name, allocator); + default: + throw unsupported(type); + } + } + + @Override + public FieldVector visit(Timestamp type) { + if (type.getTimezone() == null || type.getTimezone().isEmpty()) { + switch (type.getUnit()) { + case SECOND: + return new TimeStampSecVector(Field.nullable(name, type), allocator); + case MILLISECOND: + return new TimeStampMilliVector(Field.nullable(name, type), allocator); + case MICROSECOND: + return new TimeStampMicroVector(Field.nullable(name, type), allocator); + case NANOSECOND: + return new TimeStampNanoVector(Field.nullable(name, type), allocator); + default: + throw unsupported(type); + } + } + switch (type.getUnit()) { + case SECOND: + return new TimeStampSecTZVector(Field.nullable(name, type), allocator); + case MILLISECOND: + return new TimeStampMilliTZVector(Field.nullable(name, type), allocator); + case MICROSECOND: + return new TimeStampMicroTZVector(Field.nullable(name, type), allocator); + case NANOSECOND: + return new TimeStampNanoTZVector(Field.nullable(name, type), allocator); + default: + throw unsupported(type); + } + } + + @Override + public FieldVector visit(Interval type) { + switch (type.getUnit()) { + case YEAR_MONTH: + return new IntervalYearVector(name, allocator); + case DAY_TIME: + return new IntervalDayVector(name, allocator); + case MONTH_DAY_NANO: + return new IntervalMonthDayNanoVector(name, allocator); + default: + throw unsupported(type); + } + } + + @Override + public FieldVector visit(Duration type) { + return new DurationVector(Field.nullable(name, type), allocator); + } + + @Override + public FieldVector visit(ListView type) { + throw unsupported(type); + } + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueVector.java new file mode 100644 index 00000000000..00eb9a984e6 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/extension/OpaqueVector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.extension; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.hash.ArrowBufHasher; +import org.apache.arrow.vector.ExtensionTypeVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueIterableVector; +import org.apache.arrow.vector.types.pojo.Field; + +/** + * Opaque is a wrapper for (usually binary) data from an external (often non-Arrow) system that + * could not be interpreted. + */ +public class OpaqueVector extends ExtensionTypeVector + implements ValueIterableVector { + private final Field field; + + public OpaqueVector(Field field, BufferAllocator allocator, FieldVector underlyingVector) { + super(field, allocator, underlyingVector); + this.field = field; + } + + @Override + public Field getField() { + return field; + } + + @Override + public Object getObject(int index) { + return getUnderlyingVector().getObject(index); + } + + @Override + public int hashCode(int index) { + return hashCode(index, null); + } + + @Override + public int hashCode(int index, ArrowBufHasher hasher) { + return getUnderlyingVector().hashCode(index, hasher); + } +} diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestOpaqueExtensionType.java b/java/vector/src/test/java/org/apache/arrow/vector/TestOpaqueExtensionType.java new file mode 100644 index 00000000000..9fd9b580b36 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestOpaqueExtensionType.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.stream.Stream; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.extension.InvalidExtensionMetadataException; +import org.apache.arrow.vector.extension.OpaqueType; +import org.apache.arrow.vector.extension.OpaqueVector; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +class TestOpaqueExtensionType { + BufferAllocator allocator; + + @BeforeEach + void beforeEach() { + allocator = new RootAllocator(); + } + + @AfterEach + void afterEach() { + allocator.close(); + } + + @ParameterizedTest + @ValueSource( + strings = { + "{\"type_name\": \"\", \"vendor_name\": \"\"}", + "{\"type_name\": \"\", \"vendor_name\": \"\", \"extra_field\": 42}", + "{\"type_name\": \"array\", \"vendor_name\": \"postgresql\"}", + "{\"type_name\": \"foo.bar\", \"vendor_name\": \"postgresql\"}", + }) + void testDeserializeValid(String serialized) { + ArrowType storageType = Types.MinorType.NULL.getType(); + OpaqueType type = new OpaqueType(storageType, "", ""); + + assertDoesNotThrow(() -> type.deserialize(storageType, serialized)); + } + + @ParameterizedTest + @ValueSource( + strings = { + "", + "{\"type_name\": \"\"}", + "{\"vendor_name\": \"\"}", + "{\"type_name\": null, \"vendor_name\": \"\"}", + "{\"type_name\": \"\", \"vendor_name\": null}", + "{\"type_name\": 42, \"vendor_name\": \"\"}", + "{\"type_name\": \"\", \"vendor_name\": 42}", + "{\"type_name\": \"\", \"vendor_name\": \"\"", + }) + void testDeserializeInvalid(String serialized) { + ArrowType storageType = Types.MinorType.NULL.getType(); + OpaqueType type = new OpaqueType(storageType, "", ""); + + assertThrows( + InvalidExtensionMetadataException.class, () -> type.deserialize(storageType, serialized)); + } + + @ParameterizedTest + @MethodSource("storageType") + void testRoundTrip(ArrowType storageType) { + OpaqueType type = new OpaqueType(storageType, "foo", "bar"); + assertEquals(storageType, type.storageType()); + assertEquals("foo", type.typeName()); + if (storageType.isComplex()) { + assertThrows( + UnsupportedOperationException.class, + () -> type.getNewVector("name", FieldType.nullable(type), allocator)); + } else { + assertDoesNotThrow(() -> type.getNewVector("name", FieldType.nullable(type), allocator)) + .close(); + } + + String serialized = assertDoesNotThrow(type::serialize); + OpaqueType holder = new OpaqueType(Types.MinorType.NULL.getType(), "", ""); + OpaqueType deserialized = (OpaqueType) holder.deserialize(storageType, serialized); + assertEquals(type, deserialized); + assertNotEquals(holder, deserialized); + } + + @ParameterizedTest + @MethodSource("storageType") + void testIpcRoundTrip(ArrowType storageType) { + OpaqueType.ensureRegistered(); + + OpaqueType type = new OpaqueType(storageType, "foo", "bar"); + Schema schema = new Schema(Collections.singletonList(Field.nullable("unknown", type))); + byte[] serialized = schema.serializeAsMessage(); + Schema deseralized = Schema.deserializeMessage(ByteBuffer.wrap(serialized)); + assertEquals(schema, deseralized); + } + + @Test + void testVectorType() throws IOException { + OpaqueType.ensureRegistered(); + + ArrowType storageType = Types.MinorType.VARBINARY.getType(); + OpaqueType type = new OpaqueType(storageType, "foo", "bar"); + try (FieldVector vector = type.getNewVector("field", FieldType.nullable(type), allocator)) { + OpaqueVector opaque = assertInstanceOf(OpaqueVector.class, vector); + assertEquals("field", opaque.getField().getName()); + assertEquals(type, opaque.getField().getType()); + + VarBinaryVector binary = + assertInstanceOf(VarBinaryVector.class, opaque.getUnderlyingVector()); + binary.setSafe(0, new byte[] {0, 1, 2, 3}); + binary.setNull(1); + opaque.setValueCount(2); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(opaque)); + ArrowStreamWriter writer = + new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), baos)) { + writer.start(); + writer.writeBatch(); + } + + try (ArrowStreamReader reader = + new ArrowStreamReader(new ByteArrayInputStream(baos.toByteArray()), allocator)) { + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + assertEquals(2, root.getRowCount()); + assertEquals(new Schema(Collections.singletonList(opaque.getField())), root.getSchema()); + + OpaqueVector actual = assertInstanceOf(OpaqueVector.class, root.getVector("field")); + assertFalse(actual.isNull(0)); + assertTrue(actual.isNull(1)); + assertArrayEquals(new byte[] {0, 1, 2, 3}, (byte[]) actual.getObject(0)); + assertNull(actual.getObject(1)); + } + } + } + + static Stream storageType() { + return Stream.of( + Types.MinorType.NULL.getType(), + Types.MinorType.BIGINT.getType(), + Types.MinorType.BIT.getType(), + Types.MinorType.VARBINARY.getType(), + Types.MinorType.VARCHAR.getType(), + Types.MinorType.LIST.getType(), + new ArrowType.Decimal(12, 4, 128)); + } +} diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index e52e0d242be..7a63907f742 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -173,6 +173,7 @@ def print_entry(label, value): dictionary, run_end_encoded, fixed_shape_tensor, + opaque, field, type_for_alias, DataType, DictionaryType, StructType, @@ -182,7 +183,7 @@ def print_entry(label, value): TimestampType, Time32Type, Time64Type, DurationType, FixedSizeBinaryType, Decimal128Type, Decimal256Type, BaseExtensionType, ExtensionType, - RunEndEncodedType, FixedShapeTensorType, + RunEndEncodedType, FixedShapeTensorType, OpaqueType, PyExtensionType, UnknownExtensionType, register_extension_type, unregister_extension_type, DictionaryMemo, @@ -216,7 +217,7 @@ def print_entry(label, value): Time32Array, Time64Array, DurationArray, MonthDayNanoIntervalArray, Decimal128Array, Decimal256Array, StructArray, ExtensionArray, - RunEndEncodedArray, FixedShapeTensorArray, + RunEndEncodedArray, FixedShapeTensorArray, OpaqueArray, scalar, NA, _NULL as NULL, Scalar, NullScalar, BooleanScalar, Int8Scalar, Int16Scalar, Int32Scalar, Int64Scalar, diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index b1f90cd1653..b6b3754f261 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -4438,6 +4438,34 @@ cdef class FixedShapeTensorArray(ExtensionArray): ) +cdef class OpaqueArray(ExtensionArray): + """ + Concrete class for opaque extension arrays. + + Examples + -------- + Define the extension type for an opaque array + + >>> import pyarrow as pa + >>> opaque_type = pa.opaque( + ... pa.binary(), + ... type_name="geometry", + ... vendor_name="postgis", + ... ) + + Create an extension array + + >>> arr = [None, b"data"] + >>> storage = pa.array(arr, pa.binary()) + >>> pa.ExtensionArray.from_storage(opaque_type, storage) + + [ + null, + 64617461 + ] + """ + + cdef dict _array_classes = { _Type_NA: NullArray, _Type_BOOL: BooleanArray, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 0d871f411b1..9b008d150f1 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2882,6 +2882,19 @@ cdef extern from "arrow/extension/fixed_shape_tensor.h" namespace "arrow::extens " arrow::extension::FixedShapeTensorArray"(CExtensionArray): const CResult[shared_ptr[CTensor]] ToTensor() const + +cdef extern from "arrow/extension/opaque.h" namespace "arrow::extension" nogil: + cdef cppclass COpaqueType \ + " arrow::extension::OpaqueType"(CExtensionType): + + c_string type_name() + c_string vendor_name() + + cdef cppclass COpaqueArray \ + " arrow::extension::OpaqueArray"(CExtensionArray): + pass + + cdef extern from "arrow/util/compression.h" namespace "arrow" nogil: cdef enum CCompressionType" arrow::Compression::type": CCompressionType_UNCOMPRESSED" arrow::Compression::UNCOMPRESSED" diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 082d8470cdb..2cb302d20a8 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -215,6 +215,11 @@ cdef class FixedShapeTensorType(BaseExtensionType): const CFixedShapeTensorType* tensor_ext_type +cdef class OpaqueType(BaseExtensionType): + cdef: + const COpaqueType* opaque_ext_type + + cdef class PyExtensionType(ExtensionType): pass diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi index 966273b4bea..2f9fc1c5542 100644 --- a/python/pyarrow/public-api.pxi +++ b/python/pyarrow/public-api.pxi @@ -124,6 +124,8 @@ cdef api object pyarrow_wrap_data_type( return cpy_ext_type.GetInstance() elif ext_type.extension_name() == b"arrow.fixed_shape_tensor": out = FixedShapeTensorType.__new__(FixedShapeTensorType) + elif ext_type.extension_name() == b"arrow.opaque": + out = OpaqueType.__new__(OpaqueType) else: out = BaseExtensionType.__new__(BaseExtensionType) else: diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi index 41bfde39adb..12a99c2aece 100644 --- a/python/pyarrow/scalar.pxi +++ b/python/pyarrow/scalar.pxi @@ -1085,6 +1085,12 @@ cdef class FixedShapeTensorScalar(ExtensionScalar): return pyarrow_wrap_tensor(ctensor) +cdef class OpaqueScalar(ExtensionScalar): + """ + Concrete class for opaque extension scalar. + """ + + cdef dict _scalar_classes = { _Type_BOOL: BooleanScalar, _Type_UINT8: UInt8Scalar, diff --git a/python/pyarrow/tests/test_extension_type.py b/python/pyarrow/tests/test_extension_type.py index 1c4d0175a2d..09b950bf39b 100644 --- a/python/pyarrow/tests/test_extension_type.py +++ b/python/pyarrow/tests/test_extension_type.py @@ -1661,3 +1661,49 @@ def test_legacy_int_type(): batch = ipc_read_batch(buf) assert isinstance(batch.column(0).type, LegacyIntType) assert batch.column(0) == ext_arr + + +@pytest.mark.parametrize("storage_type,storage", [ + (pa.null(), [None] * 4), + (pa.int64(), [1, 2, None, 4]), + (pa.binary(), [None, b"foobar"]), + (pa.list_(pa.int64()), [[], [1, 2], None, [3, None]]), +]) +def test_opaque_type(pickle_module, storage_type, storage): + opaque_type = pa.opaque(storage_type, "type", "vendor") + assert opaque_type.extension_name == "arrow.opaque" + assert opaque_type.storage_type == storage_type + assert opaque_type.type_name == "type" + assert opaque_type.vendor_name == "vendor" + assert "arrow.opaque" in str(opaque_type) + + assert opaque_type == opaque_type + assert opaque_type != storage_type + assert opaque_type != pa.opaque(storage_type, "type2", "vendor") + assert opaque_type != pa.opaque(storage_type, "type", "vendor2") + assert opaque_type != pa.opaque(pa.decimal128(12, 3), "type", "vendor") + + # Pickle roundtrip + result = pickle_module.loads(pickle_module.dumps(opaque_type)) + assert result == opaque_type + + # IPC roundtrip + tensor_arr_class = opaque_type.__arrow_ext_class__() + storage = pa.array(storage, storage_type) + arr = pa.ExtensionArray.from_storage(opaque_type, storage) + assert isinstance(arr, tensor_arr_class) + + with registered_extension_type(opaque_type): + buf = ipc_write_batch(pa.RecordBatch.from_arrays([arr], ["ext"])) + batch = ipc_read_batch(buf) + + assert batch.column(0).type.extension_name == "arrow.opaque" + assert isinstance(batch.column(0), tensor_arr_class) + + # cast storage -> extension type + result = storage.cast(opaque_type) + assert result == arr + + # cast extension type -> storage type + inner = arr.cast(storage_type) + assert inner == storage diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 4343d7ea300..5b5f28e91b3 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1810,6 +1810,50 @@ cdef class FixedShapeTensorType(BaseExtensionType): return FixedShapeTensorScalar +cdef class OpaqueType(BaseExtensionType): + """ + Concrete class for opaque extension type. + + Opaque is a placeholder for a type from an external (often non-Arrow) + system that could not be interpreted. + + Examples + -------- + Create an instance of opaque extension type: + + >>> import pyarrow as pa + >>> pa.opaque(pa.int32(), "geometry", "postgis") + OpaqueType(extension) + """ + + cdef void init(self, const shared_ptr[CDataType]& type) except *: + BaseExtensionType.init(self, type) + self.opaque_ext_type = type.get() + + @property + def type_name(self): + """ + The name of the type in the external system. + """ + return frombytes(c_string(self.opaque_ext_type.type_name())) + + @property + def vendor_name(self): + """ + The name of the external system. + """ + return frombytes(c_string(self.opaque_ext_type.vendor_name())) + + def __arrow_ext_class__(self): + return OpaqueArray + + def __reduce__(self): + return opaque, (self.storage_type, self.type_name, self.vendor_name) + + def __arrow_ext_scalar_class__(self): + return OpaqueScalar + + _py_extension_type_auto_load = False @@ -5207,6 +5251,63 @@ def fixed_shape_tensor(DataType value_type, shape, dim_names=None, permutation=N return out +def opaque(DataType storage_type, str type_name not None, str vendor_name not None): + """ + Create instance of opaque extension type. + + Parameters + ---------- + storage_type : DataType + The underlying data type. + type_name : str + The name of the type in the external system. + vendor_name : str + The name of the external system. + + Examples + -------- + Create an instance of an opaque extension type: + + >>> import pyarrow as pa + >>> type = pa.opaque(pa.binary(), "other", "jdbc") + >>> type + OpaqueType(extension) + + Inspect the data type: + + >>> type.storage_type + DataType(binary) + >>> type.type_name + 'other' + >>> type.vendor_name + 'jdbc' + + Create a table with an opaque array: + + >>> arr = [None, b"foobar"] + >>> storage = pa.array(arr, pa.binary()) + >>> other = pa.ExtensionArray.from_storage(type, storage) + >>> pa.table([other], names=["unknown_col"]) + pyarrow.Table + unknown_col: extension + ---- + unknown_col: [[null,666F6F626172]] + + Returns + ------- + type : OpaqueType + """ + + cdef: + c_string c_type_name = tobytes(type_name) + c_string c_vendor_name = tobytes(vendor_name) + shared_ptr[CDataType] c_type = make_shared[COpaqueType]( + storage_type.sp_type, c_type_name, c_vendor_name) + OpaqueType out = OpaqueType.__new__(OpaqueType) + out.init(c_type) + return out + + cdef dict _type_aliases = { 'null': null, 'bool': bool_,