-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-44010: [C++] Add arrow::RecordBatch::MakeStatisticsArray()
#44252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0787eeb
79b9915
7d87603
397a7af
b173bd1
bda0f34
e93d0f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -80,6 +80,24 @@ struct ArrowArray { | |
| void* private_data; | ||
| }; | ||
|
|
||
| # define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT "ARROW:average_byte_width:exact" | ||
|
||
| # define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE \ | ||
| "ARROW:average_byte_width:approximate" | ||
| # define ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT "ARROW:distinct_count:exact" | ||
| # define ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE \ | ||
| "ARROW:distinct_count:approximate" | ||
| # define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT "ARROW:max_byte_width:exact" | ||
| # define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE \ | ||
| "ARROW:max_byte_width:approximate" | ||
| # define ARROW_STATISTICS_KEY_MAX_VALUE_EXACT "ARROW:max_value:exact" | ||
| # define ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE "ARROW:max_value:approximate" | ||
| # define ARROW_STATISTICS_KEY_MIN_VALUE_EXACT "ARROW:min_value:exact" | ||
| # define ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE "ARROW:min_value:approximate" | ||
| # define ARROW_STATISTICS_KEY_NULL_COUNT_EXACT "ARROW:null_count:exact" | ||
| # define ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE "ARROW:null_count:approximate" | ||
| # define ARROW_STATISTICS_KEY_ROW_COUNT_EXACT "ARROW:row_count:exact" | ||
| # define ARROW_STATISTICS_KEY_ROW_COUNT_APPROXIMATE "ARROW:row_count:approximate" | ||
|
|
||
| #endif // ARROW_C_DATA_INTERFACE | ||
|
|
||
| #ifndef ARROW_C_DEVICE_DATA_INTERFACE | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,8 +26,13 @@ | |
| #include <utility> | ||
|
|
||
| #include "arrow/array.h" | ||
| #include "arrow/array/builder_binary.h" | ||
| #include "arrow/array/builder_dict.h" | ||
| #include "arrow/array/builder_nested.h" | ||
| #include "arrow/array/builder_union.h" | ||
| #include "arrow/array/concatenate.h" | ||
| #include "arrow/array/validate.h" | ||
| #include "arrow/c/abi.h" | ||
| #include "arrow/pretty_print.h" | ||
| #include "arrow/status.h" | ||
| #include "arrow/table.h" | ||
|
|
@@ -465,6 +470,220 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo( | |
| return Make(schema_, num_rows(), std::move(copied_columns)); | ||
| } | ||
|
|
||
| namespace { | ||
| struct EnumeratedStatistics { | ||
| int nth_statistics = 0; | ||
| bool start_new_column = false; | ||
| std::optional<int32_t> nth_column = std::nullopt; | ||
| const char* key = nullptr; | ||
| std::shared_ptr<DataType> type = nullptr; | ||
| ArrayStatistics::ValueType value = false; | ||
| }; | ||
| using OnStatistics = | ||
| std::function<Status(const EnumeratedStatistics& enumerated_statistics)>; | ||
| Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) { | ||
|
||
| EnumeratedStatistics statistics; | ||
| statistics.nth_statistics = 0; | ||
| statistics.start_new_column = true; | ||
| statistics.nth_column = std::nullopt; | ||
|
|
||
| statistics.key = ARROW_STATISTICS_KEY_ROW_COUNT_EXACT; | ||
|
||
| statistics.type = int64(); | ||
| statistics.value = record_batch.num_rows(); | ||
| RETURN_NOT_OK(on_statistics(statistics)); | ||
mapleFU marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| statistics.start_new_column = false; | ||
|
|
||
| const auto num_fields = record_batch.schema()->num_fields(); | ||
| for (int nth_column = 0; nth_column < num_fields; ++nth_column) { | ||
| auto column_statistics = record_batch.column(nth_column)->statistics(); | ||
| if (!column_statistics) { | ||
| continue; | ||
| } | ||
|
|
||
| statistics.start_new_column = true; | ||
| statistics.nth_column = nth_column; | ||
| if (column_statistics->null_count.has_value()) { | ||
| statistics.nth_statistics++; | ||
| statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_EXACT; | ||
| statistics.type = int64(); | ||
| statistics.value = column_statistics->null_count.value(); | ||
| RETURN_NOT_OK(on_statistics(statistics)); | ||
| statistics.start_new_column = false; | ||
| } | ||
|
|
||
| if (column_statistics->distinct_count.has_value()) { | ||
| statistics.nth_statistics++; | ||
| statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT; | ||
| statistics.type = int64(); | ||
| statistics.value = column_statistics->distinct_count.value(); | ||
| RETURN_NOT_OK(on_statistics(statistics)); | ||
| statistics.start_new_column = false; | ||
| } | ||
|
|
||
| if (column_statistics->min.has_value()) { | ||
| statistics.nth_statistics++; | ||
| if (column_statistics->is_min_exact) { | ||
| statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_EXACT; | ||
| } else { | ||
| statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE; | ||
| } | ||
| statistics.type = column_statistics->MinArrowType(); | ||
| statistics.value = column_statistics->min.value(); | ||
| RETURN_NOT_OK(on_statistics(statistics)); | ||
| statistics.start_new_column = false; | ||
| } | ||
|
|
||
| if (column_statistics->max.has_value()) { | ||
| statistics.nth_statistics++; | ||
| if (column_statistics->is_max_exact) { | ||
| statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_EXACT; | ||
| } else { | ||
| statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE; | ||
| } | ||
| statistics.type = column_statistics->MaxArrowType(); | ||
| statistics.value = column_statistics->max.value(); | ||
| RETURN_NOT_OK(on_statistics(statistics)); | ||
| statistics.start_new_column = false; | ||
| } | ||
| } | ||
| return Status::OK(); | ||
| } | ||
| } // namespace | ||
|
|
||
| Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray( | ||
| MemoryPool* memory_pool) const { | ||
| // Statistics schema: | ||
| // struct< | ||
| // column: int32, | ||
| // statistics: map< | ||
| // key: dictionary< | ||
| // indices: int32, | ||
| // dictionary: utf8, | ||
| // >, | ||
| // items: dense_union<...all needed types...>, | ||
| // > | ||
| // > | ||
|
|
||
| // Statistics schema doesn't define static dense union type for | ||
| // values. Each statistics schema have a dense union type that has | ||
| // needled value types. The following block collects these types. | ||
|
||
| std::vector<std::shared_ptr<Field>> values_types; | ||
| std::vector<int8_t> values_type_indexes; | ||
| RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) { | ||
| int8_t i = 0; | ||
| for (const auto& field : values_types) { | ||
| if (field->type()->id() == statistics.type->id()) { | ||
| break; | ||
| } | ||
| i++; | ||
| } | ||
| if (i == static_cast<int8_t>(values_types.size())) { | ||
| values_types.push_back(field(statistics.type->name(), statistics.type)); | ||
| } | ||
| values_type_indexes.push_back(i); | ||
| return Status::OK(); | ||
| })); | ||
|
|
||
| // statistics.key: dictionary<indices: int32, dictionary: utf8> | ||
| auto keys_type = dictionary(int32(), utf8(), false); | ||
| // statistics.items: dense_union<...all needed types...> | ||
| auto values_type = dense_union(values_types); | ||
| // struct< | ||
| // column: int32, | ||
| // statistics: map< | ||
| // key: dictionary< | ||
| // indices: int32, | ||
| // dictionary: utf8, | ||
| // >, | ||
| // items: dense_union<...all needed types...>, | ||
| // > | ||
| // > | ||
| auto statistics_type = | ||
| struct_({field("column", int32()), | ||
| field("statistics", map(keys_type, values_type, false))}); | ||
|
|
||
| std::vector<std::shared_ptr<ArrayBuilder>> field_builders; | ||
| // columns: int32 | ||
| auto columns_builder = std::make_shared<Int32Builder>(memory_pool); | ||
| field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(columns_builder)); | ||
| // statistics.key: dictionary<indices: int32, dictionary: utf8> | ||
| auto keys_builder = std::make_shared<StringDictionary32Builder>(); | ||
| // statistics.items: dense_union<...all needed types...> | ||
| std::vector<std::shared_ptr<ArrayBuilder>> values_builders; | ||
| for (const auto& values_type : values_types) { | ||
| std::unique_ptr<ArrayBuilder> values_builder; | ||
| RETURN_NOT_OK(MakeBuilder(memory_pool, values_type->type(), &values_builder)); | ||
| values_builders.push_back(std::shared_ptr<ArrayBuilder>(std::move(values_builder))); | ||
| } | ||
| auto items_builder = std::make_shared<DenseUnionBuilder>( | ||
| memory_pool, std::move(values_builders), values_type); | ||
| // statistics: | ||
| // map< | ||
| // key: dictionary< | ||
| // indices: int32, | ||
| // dictionary: utf8, | ||
| // >, | ||
| // items: dense_union<...all needed types...>, | ||
| // > | ||
| auto values_builder = std::make_shared<MapBuilder>( | ||
| memory_pool, std::static_pointer_cast<ArrayBuilder>(keys_builder), | ||
| std::static_pointer_cast<ArrayBuilder>(items_builder)); | ||
| field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(values_builder)); | ||
| // struct< | ||
| // column: int32, | ||
| // statistics: map< | ||
| // key: dictionary< | ||
| // indices: int32, | ||
| // dictionary: utf8, | ||
| // >, | ||
| // items: dense_union<...all needed types...>, | ||
| // > | ||
| // > | ||
| StructBuilder builder(statistics_type, memory_pool, std::move(field_builders)); | ||
|
|
||
| // Append statistics. | ||
| RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) { | ||
| if (statistics.start_new_column) { | ||
| RETURN_NOT_OK(builder.Append()); | ||
| if (statistics.nth_column.has_value()) { | ||
| RETURN_NOT_OK(columns_builder->Append(statistics.nth_column.value())); | ||
| } else { | ||
| RETURN_NOT_OK(columns_builder->AppendNull()); | ||
| } | ||
| RETURN_NOT_OK(values_builder->Append()); | ||
| } | ||
| RETURN_NOT_OK(keys_builder->Append(statistics.key, | ||
| static_cast<int32_t>(strlen(statistics.key)))); | ||
| const auto values_type_index = values_type_indexes[statistics.nth_statistics]; | ||
| RETURN_NOT_OK(items_builder->Append(values_type_index)); | ||
| struct Visitor { | ||
| ArrayBuilder* builder; | ||
|
|
||
| Status operator()(const bool& value) { | ||
| return static_cast<BooleanBuilder*>(builder)->Append(value); | ||
| } | ||
| Status operator()(const int64_t& value) { | ||
| return static_cast<Int64Builder*>(builder)->Append(value); | ||
| } | ||
| Status operator()(const uint64_t& value) { | ||
| return static_cast<UInt64Builder*>(builder)->Append(value); | ||
| } | ||
| Status operator()(const double& value) { | ||
| return static_cast<DoubleBuilder*>(builder)->Append(value); | ||
| } | ||
| Status operator()(const std::string& value) { | ||
| return static_cast<StringBuilder*>(builder)->Append( | ||
| value.data(), static_cast<int32_t>(value.size())); | ||
| } | ||
| } visitor; | ||
| visitor.builder = values_builders[values_type_index].get(); | ||
| RETURN_NOT_OK(std::visit(visitor, statistics.value)); | ||
| return Status::OK(); | ||
| })); | ||
|
|
||
| return builder.Finish(); | ||
| } | ||
|
|
||
| Status RecordBatch::Validate() const { | ||
| return ValidateBatch(*this, /*full_validation=*/false); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may forgot a bit but we don't distinct "bytes" and "utf8" in stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we didn't discuss it...
Let's discuss it in #44579.
We can assume "utf8" here for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a
// TODO(GH-44579)here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I should have added it...
I've added it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I forgot to push the commit... I pushed now.