Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,6 @@ TEST(TestArrowWriteDictionaries, AutoReadAsDictionary) {
}

TEST(TestArrowWriteDictionaries, NestedSubfield) {
// FIXME (ARROW-9943): Automatic decoding of dictionary subfields
auto offsets = ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 0, 2, 3]");
auto indices = ::arrow::ArrayFromJSON(::arrow::int32(), "[0, 0, 0]");
auto dict = ::arrow::ArrayFromJSON(::arrow::utf8(), "[\"foo\"]");
Expand All @@ -3442,20 +3441,14 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) {
ASSERT_OK_AND_ASSIGN(auto values,
::arrow::ListArray::FromArrays(*offsets, *dict_values));

auto dense_ty = ::arrow::list(::arrow::utf8());
auto dense_values =
::arrow::ArrayFromJSON(dense_ty, "[[], [\"foo\", \"foo\"], [\"foo\"]]");

auto table = MakeSimpleTable(values, /*nullable=*/true);
auto expected_table = MakeSimpleTable(dense_values, /*nullable=*/true);

auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
std::shared_ptr<Table> actual;
DoRoundtrip(table, values->length(), &actual, default_writer_properties(),
props_store_schema);

// The nested subfield is not automatically decoded to dictionary
::arrow::AssertTablesEqual(*expected_table, *actual);
::arrow::AssertTablesEqual(*table, *actual);
}

} // namespace arrow
Expand Down
34 changes: 18 additions & 16 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -744,12 +744,20 @@ Status StructReader::BuildArray(int64_t length_upper_bound,
// ----------------------------------------------------------------------
// File reader implementation

Status GetStorageReader(const SchemaField& field,
const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
Status GetReader(const SchemaField& field, const std::shared_ptr<Field>& arrow_field,
const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS

auto type_id = field.storage_field->type()->id();
auto type_id = arrow_field->type()->id();

if (type_id == ::arrow::Type::EXTENSION) {
auto storage_field = arrow_field->WithType(
checked_cast<const ExtensionType&>(*arrow_field->type()).storage_type());
RETURN_NOT_OK(GetReader(field, storage_field, ctx, out));
out->reset(new ExtensionReader(arrow_field, std::move(*out)));
return Status::OK();
}

if (field.children.size() == 0) {
if (!field.is_leaf()) {
Expand All @@ -761,18 +769,16 @@ Status GetStorageReader(const SchemaField& field,
}
std::unique_ptr<FileColumnIterator> input(
ctx->iterator_factory(field.column_index, ctx->reader));
out->reset(
new LeafReader(ctx, field.storage_field, std::move(input), field.level_info));
out->reset(new LeafReader(ctx, arrow_field, std::move(input), field.level_info));
} else if (type_id == ::arrow::Type::LIST) {
auto list_field = field.storage_field;
auto child = &field.children[0];
std::unique_ptr<ColumnReaderImpl> child_reader;
RETURN_NOT_OK(GetReader(*child, ctx, &child_reader));
if (child_reader == nullptr) {
*out = nullptr;
return Status::OK();
}
out->reset(new ListReader<int32_t>(ctx, list_field, field.level_info,
out->reset(new ListReader<int32_t>(ctx, arrow_field, field.level_info,
std::move(child_reader)));
} else if (type_id == ::arrow::Type::STRUCT) {
std::vector<std::shared_ptr<Field>> child_fields;
Expand All @@ -792,12 +798,12 @@ Status GetStorageReader(const SchemaField& field,
return Status::OK();
}
auto filtered_field =
::arrow::field(field.storage_field->name(), ::arrow::struct_(child_fields),
field.storage_field->nullable(), field.storage_field->metadata());
::arrow::field(arrow_field->name(), ::arrow::struct_(child_fields),
arrow_field->nullable(), arrow_field->metadata());
out->reset(new StructReader(ctx, filtered_field, field.level_info,
std::move(child_readers)));
} else {
return Status::Invalid("Unsupported nested type: ", field.storage_field->ToString());
return Status::Invalid("Unsupported nested type: ", arrow_field->ToString());
}
return Status::OK();

Expand All @@ -806,11 +812,7 @@ Status GetStorageReader(const SchemaField& field,

Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>& ctx,
std::unique_ptr<ColumnReaderImpl>* out) {
RETURN_NOT_OK(GetStorageReader(field, ctx, out));
if (field.field->type()->id() == ::arrow::Type::EXTENSION) {
out->reset(new ExtensionReader(field.field, std::move(*out)));
}
return Status::OK();
return GetReader(field, field.field, ctx, out);
}

} // namespace
Expand Down
93 changes: 69 additions & 24 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "parquet/arrow/schema.h"

#include <functional>
#include <string>
#include <vector>

Expand All @@ -35,6 +36,7 @@
#include "parquet/types.h"

using arrow::Field;
using arrow::FieldVector;
using arrow::KeyValueMetadata;
using arrow::Status;
using arrow::internal::checked_cast;
Expand Down Expand Up @@ -425,7 +427,6 @@ Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
LevelInfo current_levels, SchemaTreeContext* ctx,
const SchemaField* parent, SchemaField* out) {
out->field = field;
out->storage_field = field;
out->column_index = column_index;
out->level_info = current_levels;
ctx->RecordLeaf(out);
Expand Down Expand Up @@ -462,7 +463,6 @@ Status GroupToStruct(const GroupNode& node, LevelInfo current_levels,
auto struct_type = ::arrow::struct_(arrow_fields);
out->field = ::arrow::field(node.name(), struct_type, node.is_optional(),
FieldIdMetadata(node.field_id()));
out->storage_field = out->field;
out->level_info = current_levels;
return Status::OK();
}
Expand Down Expand Up @@ -543,7 +543,6 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
}
out->field = ::arrow::field(group.name(), ::arrow::list(child_field->field),
group.is_optional(), FieldIdMetadata(group.field_id()));
out->storage_field = out->field;
out->level_info = current_levels;
// At this point current levels contains the def level for this list,
// we need to reset to the prior parent.
Expand Down Expand Up @@ -571,7 +570,6 @@ Status GroupToSchemaField(const GroupNode& node, LevelInfo current_levels,
RETURN_NOT_OK(GroupToStruct(node, current_levels, ctx, out, &out->children[0]));
out->field = ::arrow::field(node.name(), ::arrow::list(out->children[0].field),
/*nullable=*/false, FieldIdMetadata(node.field_id()));
out->storage_field = out->field;

ctx->LinkParent(&out->children[0], out);
out->level_info = current_levels;
Expand Down Expand Up @@ -623,7 +621,6 @@ Status NodeToSchemaField(const Node& node, LevelInfo current_levels,

out->field = ::arrow::field(node.name(), ::arrow::list(child_field),
/*nullable=*/false, FieldIdMetadata(node.field_id()));
out->storage_field = out->field;
out->level_info = current_levels;
// At this point current_levels has consider this list the ancestor so restore
// the actual ancenstor.
Expand Down Expand Up @@ -689,10 +686,63 @@ Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,
// but that is not necessarily present in the field reconstitued from Parquet data
// (for example, Parquet timestamp types doesn't carry timezone information).

Status ApplyOriginalStorageMetadata(const Field& origin_field, SchemaField* inferred) {
Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred);

std::function<std::shared_ptr<::arrow::DataType>(FieldVector)> GetNestedFactory(
const ArrowType& origin_type, const ArrowType& inferred_type) {
switch (inferred_type.id()) {
case ::arrow::Type::STRUCT:
if (origin_type.id() == ::arrow::Type::STRUCT) {
return ::arrow::struct_;
}
break;
case ::arrow::Type::LIST:
// TODO also allow LARGE_LIST and FIXED_SIZE_LIST
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield I think you'll be able to add your changes here.

if (origin_type.id() == ::arrow::Type::LIST) {
return [](FieldVector fields) {
DCHECK_EQ(fields.size(), 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a schema be deserialized that has more then 1 field? i.e. should this return a user space error instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally no, because we already checked that origin_type->num_fields() == inferred_type->num_fields().

return ::arrow::list(std::move(fields[0]));
};
}
break;
default:
break;
}
return {};
}

Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field,
SchemaField* inferred) {
bool modified = false;

auto origin_type = origin_field.type();
auto inferred_type = inferred->field->type();

const int num_children = inferred_type->num_fields();

if (num_children > 0 && origin_type->num_fields() == num_children) {
DCHECK_EQ(static_cast<int>(inferred->children.size()), num_children);
const auto factory = GetNestedFactory(*origin_type, *inferred_type);
if (factory) {
// Apply original metadata recursively to children
for (int i = 0; i < inferred_type->num_fields(); ++i) {
ARROW_ASSIGN_OR_RAISE(
const bool child_modified,
ApplyOriginalMetadata(*origin_type->field(i), &inferred->children[i]));
modified |= child_modified;
}
if (modified) {
// Recreate this field using the modified child fields
::arrow::FieldVector modified_children(inferred_type->num_fields());
for (int i = 0; i < inferred_type->num_fields(); ++i) {
modified_children[i] = inferred->children[i].field;
}
inferred->field =
inferred->field->WithType(factory(std::move(modified_children)));
}
}
}

if (origin_type->id() == ::arrow::Type::TIMESTAMP &&
inferred_type->id() == ::arrow::Type::TIMESTAMP) {
// Restore time zone, if any
Expand All @@ -706,15 +756,19 @@ Status ApplyOriginalStorageMetadata(const Field& origin_field, SchemaField* infe
ts_origin_type.timezone() != "") {
inferred->field = inferred->field->WithType(origin_type);
}
modified = true;
}

if (origin_type->id() == ::arrow::Type::DICTIONARY &&
inferred_type->id() != ::arrow::Type::DICTIONARY &&
IsDictionaryReadSupported(*inferred_type)) {
// Direct dictionary reads are only suppored for a couple primitive types,
// so no need to recurse on value types.
const auto& dict_origin_type =
checked_cast<const ::arrow::DictionaryType&>(*origin_type);
inferred->field = inferred->field->WithType(
::arrow::dictionary(::arrow::int32(), inferred_type, dict_origin_type.ordered()));
modified = true;
}

// Restore field metadata
Expand All @@ -725,23 +779,15 @@ Status ApplyOriginalStorageMetadata(const Field& origin_field, SchemaField* infe
field_metadata = field_metadata->Merge(*inferred->field->metadata());
}
inferred->field = inferred->field->WithMetadata(field_metadata);
modified = true;
}

if (origin_type->id() == ::arrow::Type::EXTENSION) {
// Restore extension type, if the storage type is as read from Parquet
const auto& ex_type = checked_cast<const ::arrow::ExtensionType&>(*origin_type);
if (ex_type.storage_type()->Equals(*inferred_type)) {
inferred->field = inferred->field->WithType(origin_type);
}
}

// TODO Should apply metadata recursively to children, but for that we need
// to move metadata application inside NodeToSchemaField (ARROW-9943)

return Status::OK();
return modified;
}

Status ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred) {
Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred) {
bool modified = false;

auto origin_type = origin_field.type();
auto inferred_type = inferred->field->type();

Expand All @@ -751,19 +797,18 @@ Status ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred) {

// Apply metadata recursively to storage type
RETURN_NOT_OK(ApplyOriginalStorageMetadata(*origin_storage_field, inferred));
inferred->storage_field = inferred->field;

// Restore extension type, if the storage type is the same as inferred
// from the Parquet type
if (ex_type.storage_type()->Equals(*inferred_type)) {
if (ex_type.storage_type()->Equals(*inferred->field->type())) {
inferred->field = inferred->field->WithType(origin_type);
}
modified = true;
} else {
RETURN_NOT_OK(ApplyOriginalStorageMetadata(origin_field, inferred));
inferred->storage_field = inferred->field;
ARROW_ASSIGN_OR_RAISE(modified, ApplyOriginalStorageMetadata(origin_field, inferred));
}

return Status::OK();
return modified;
}

} // namespace
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/parquet/arrow/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ ::arrow::Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
/// \brief Bridge between an arrow::Field and parquet column indices.
struct PARQUET_EXPORT SchemaField {
std::shared_ptr<::arrow::Field> field;
// If field has an extension type, an equivalent field with the storage type,
// otherwise the field itself.
std::shared_ptr<::arrow::Field> storage_field;
std::vector<SchemaField> children;

// Only set for leaf nodes
Expand Down
Loading