Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/variant_internal.h"
#include "parquet/file_reader.h"
#include "parquet/schema.h"
#include "parquet/schema_internal.h"
Expand Down Expand Up @@ -941,46 +942,61 @@ TEST_F(TestConvertParquetSchema, ParquetVariant) {
PrimitiveNode::Make("metadata", Repetition::REQUIRED, ParquetType::BYTE_ARRAY);
auto value =
PrimitiveNode::Make("value", Repetition::REQUIRED, ParquetType::BYTE_ARRAY);

auto variant =
GroupNode::Make("variant_unshredded", Repetition::OPTIONAL, {metadata, value});
auto variant = GroupNode::Make("variant_unshredded", Repetition::OPTIONAL,
{metadata, value}, LogicalType::Variant());
parquet_fields.push_back(variant);

{
// Test converting from parquet schema to arrow schema.
std::vector<std::shared_ptr<Field>> arrow_fields;
auto arrow_metadata =
::arrow::field("metadata", ::arrow::binary(), /*nullable=*/false);
auto arrow_value = ::arrow::field("value", ::arrow::binary(), /*nullable=*/false);
auto arrow_variant = ::arrow::struct_({arrow_metadata, arrow_value});
arrow_fields.push_back(
::arrow::field("variant_unshredded", arrow_variant, /*nullable=*/true));
auto arrow_schema = ::arrow::schema(arrow_fields);
// Arrow schema for unshredded variant struct.
auto arrow_metadata = ::arrow::field("metadata", ::arrow::binary(), /*nullable=*/false);
auto arrow_value = ::arrow::field("value", ::arrow::binary(), /*nullable=*/false);
auto arrow_variant = ::arrow::struct_({arrow_metadata, arrow_value});
auto variant_extension = std::make_shared<VariantExtensionType>(arrow_variant);

{
// Parquet file does not contain Arrow schema.
// By default, field should be treated as a normal struct in Arrow.
auto arrow_schema =
::arrow::schema({::arrow::field("variant_unshredded", arrow_variant)});
ASSERT_OK(ConvertSchema(parquet_fields));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema, /*check_metadata=*/true));
}

{
// Test converting from parquet schema to arrow schema even though
// extensions are not enabled.
for (bool register_extension : {true, false}) {
::arrow::ExtensionTypeGuard guard(register_extension
? ::arrow::DataTypeVector{variant_extension}
: ::arrow::DataTypeVector{});

// Parquet file does not contain Arrow schema.
// If Arrow extensions are enabled, field should be interpreted as Parquet Variant
// extension type if registered.
ArrowReaderProperties props;
props.set_arrow_extensions_enabled(true);

auto arrow_schema = ::arrow::schema({::arrow::field(
"variant_unshredded", register_extension ? variant_extension : arrow_variant)});

ASSERT_OK(ConvertSchema(parquet_fields, /*metadata=*/nullptr, props));
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema, /*check_metadata=*/true));
}

for (bool register_extension : {true, false}) {
::arrow::ExtensionTypeGuard guard(register_extension
? ::arrow::DataTypeVector{variant_extension}
: ::arrow::DataTypeVector{});

// Parquet file does contain Arrow schema.
// Field should be interpreted as Parquet Variant extension, if registered,
// even though extensions are not enabled.
ArrowReaderProperties props;
props.set_arrow_extensions_enabled(false);

// Test converting from parquet schema to arrow schema.
std::vector<std::shared_ptr<Field>> arrow_fields;
auto arrow_metadata =
::arrow::field("metadata", ::arrow::binary(), /*nullable=*/false);
auto arrow_value = ::arrow::field("value", ::arrow::binary(), /*nullable=*/false);
auto arrow_variant = ::arrow::struct_({arrow_metadata, arrow_value});
arrow_fields.push_back(
::arrow::field("variant_unshredded", arrow_variant, /*nullable=*/true));
auto arrow_schema = ::arrow::schema(arrow_fields);
auto arrow_schema = ::arrow::schema({::arrow::field(
"variant_unshredded", register_extension ? variant_extension : arrow_variant)});

std::shared_ptr<KeyValueMetadata> metadata;
ASSERT_OK(ArrowSchemaToParquetMetadata(arrow_schema, metadata));
ASSERT_OK(ConvertSchema(parquet_fields, metadata, props));
CheckFlatSchema(arrow_schema, true /* check_metadata */);
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema, /*check_metadata=*/true));
}
}

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,15 @@ Status GroupToStruct(const GroupNode& node, LevelInfo current_levels,
arrow_fields.push_back(out->children[i].field);
}
auto struct_type = ::arrow::struct_(arrow_fields);
if (ctx->properties.get_arrow_extensions_enabled() &&
node.logical_type()->is_variant()) {
auto extension_type = ::arrow::GetExtensionType("parquet.variant");
if (extension_type) {
ARROW_ASSIGN_OR_RAISE(
struct_type,
extension_type->Deserialize(std::move(struct_type), /*serialized_data=*/""));
}
}
out->field = ::arrow::field(node.name(), struct_type, node.is_optional(),
FieldIdMetadata(node.field_id()));
out->level_info = current_levels;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ TEST(TestLogicalTypeOperation, LogicalTypeProperties) {
{BSONLogicalType::Make(), false, true, true},
{UUIDLogicalType::Make(), false, true, true},
{Float16LogicalType::Make(), false, true, true},
{VariantLogicalType::Make(), false, true, true},
{VariantLogicalType::Make(), true, true, true},
{NoLogicalType::Make(), false, false, true},
};

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,9 @@ bool LogicalType::is_valid() const {
}
bool LogicalType::is_invalid() const { return !is_valid(); }
bool LogicalType::is_nested() const {
return (impl_->type() == LogicalType::Type::LIST) ||
(impl_->type() == LogicalType::Type::MAP);
return impl_->type() == LogicalType::Type::LIST ||
impl_->type() == LogicalType::Type::MAP ||
impl_->type() == LogicalType::Type::VARIANT;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to add this, otherwise the check will fail at

if (logical_type_->is_nested()) {
// For backward compatibility, assign equivalent legacy converted type (if possible)
converted_type_ = logical_type_->ToConvertedType(nullptr);
} else {
std::stringstream error;
error << "Logical type ";
error << logical_type_->ToString();
error << " cannot be applied to group node";
throw ParquetException(error.str());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// For backward compatibility, assign equivalent legacy converted type (if possible)

I don't think it's legacy... Should we split a new branch?

if (logical_type_->is_variant()) {

} else if (...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the word legacy is for converted type because it also includes (if possible).

}
bool LogicalType::is_nonnested() const { return !is_nested(); }
bool LogicalType::is_serialized() const { return impl_->is_serialized(); }
Expand Down
Loading