Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 74 additions & 39 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
return manifest;
}

bool IsNan(const Scalar& value) {
if (value.is_valid) {
if (value.type->id() == Type::FLOAT) {
const FloatScalar& float_scalar = checked_cast<const FloatScalar&>(value);
return std::isnan(float_scalar.value);
} else if (value.type->id() == Type::DOUBLE) {
const DoubleScalar& double_scalar = checked_cast<const DoubleScalar&>(value);
return std::isnan(double_scalar.value);
}
}
return false;
}

std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
Expand All @@ -112,50 +125,13 @@ std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(

auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
auto statistics = column_metadata->statistics();
if (statistics == nullptr) {
return std::nullopt;
}

const auto& field = schema_field.field;
auto field_expr = compute::field_ref(field->name());

// Optimize for corner case where all values are nulls
if (statistics->num_values() == 0 && statistics->null_count() > 0) {
return is_null(std::move(field_expr));
}

std::shared_ptr<Scalar> min, max;
if (!StatisticsAsScalars(*statistics, &min, &max).ok()) {
if (statistics == nullptr) {
return std::nullopt;
}

auto maybe_min = min->CastTo(field->type());
auto maybe_max = max->CastTo(field->type());
if (maybe_min.ok() && maybe_max.ok()) {
min = maybe_min.MoveValueUnsafe();
max = maybe_max.MoveValueUnsafe();

if (min->Equals(max)) {
auto single_value = compute::equal(field_expr, compute::literal(std::move(min)));

if (statistics->null_count() == 0) {
return single_value;
}
return compute::or_(std::move(single_value), is_null(std::move(field_expr)));
}

auto lower_bound =
compute::greater_equal(field_expr, compute::literal(std::move(min)));
auto upper_bound = compute::less_equal(field_expr, compute::literal(std::move(max)));

auto in_range = compute::and_(std::move(lower_bound), std::move(upper_bound));
if (statistics->null_count() != 0) {
return compute::or_(std::move(in_range), compute::is_null(field_expr));
}
return in_range;
}

return std::nullopt;
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
}

void AddColumnIndices(const SchemaField& schema_field,
Expand Down Expand Up @@ -306,6 +282,65 @@ Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,

} // namespace

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field.name());

// Optimize for corner case where all values are nulls
if (statistics.num_values() == 0 && statistics.null_count() > 0) {
return is_null(std::move(field_expr));
}

std::shared_ptr<Scalar> min, max;
if (!StatisticsAsScalars(statistics, &min, &max).ok()) {
return std::nullopt;
}

auto maybe_min = min->CastTo(field.type());
auto maybe_max = max->CastTo(field.type());

if (maybe_min.ok() && maybe_max.ok()) {
min = maybe_min.MoveValueUnsafe();
max = maybe_max.MoveValueUnsafe();

if (min->Equals(max)) {
auto single_value = compute::equal(field_expr, compute::literal(std::move(min)));

if (statistics.null_count() == 0) {
return single_value;
}
return compute::or_(std::move(single_value), is_null(std::move(field_expr)));
}

auto lower_bound = compute::greater_equal(field_expr, compute::literal(min));
auto upper_bound = compute::less_equal(field_expr, compute::literal(max));
compute::Expression in_range;

// Since the minimum & maximum values are NaN, useful statistics
// cannot be extracted for checking the presence of a value within
// range
if (IsNan(*min) && IsNan(*max)) {
return std::nullopt;
}

// If either minimum or maximum is NaN, it should be ignored for the
// range computation
if (IsNan(*min)) {
in_range = std::move(upper_bound);
} else if (IsNan(*max)) {
in_range = std::move(lower_bound);
} else {
in_range = compute::and_(std::move(lower_bound), std::move(upper_bound));
}

if (statistics.null_count() != 0) {
return compute::or_(std::move(in_range), compute::is_null(field_expr));
}
return in_range;
}
return std::nullopt;
}

ParquetFileFormat::ParquetFileFormat()
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {}

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);

static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics);

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
#include "arrow/testing/util.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/io_util.h"
#include "arrow/util/range.h"

#include "parquet/arrow/writer.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/statistics.h"
#include "parquet/types.h"

namespace arrow {

Expand Down Expand Up @@ -632,5 +636,17 @@ INSTANTIATE_TEST_SUITE_P(TestScan, TestParquetFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);

TEST(TestParquetStatistics, NullMax) {
auto field = ::arrow::field("x", float32());
ASSERT_OK_AND_ASSIGN(std::string dir_string,
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));
auto reader =
parquet::ParquetFileReader::OpenFile(dir_string + "/nan_in_stats.parquet");
auto statistics = reader->RowGroup(0)->metadata()->ColumnChunk(0)->statistics();
auto stat_expression =
ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

} // namespace dataset
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/submodules/parquet-testing
2 changes: 1 addition & 1 deletion testing