diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 1778a154c6d..eba964f9678 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4243,6 +4243,40 @@ TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) { ASSERT_EQ(stats1->EncodeMin(), "b"); ASSERT_EQ(stats0->EncodeMax(), "b"); ASSERT_EQ(stats1->EncodeMax(), "c"); + + // Check page statistics + const auto expected_page_type = + GetParquetDataPageVersion() == ParquetDataPageVersion::V1 ? PageType::DATA_PAGE + : PageType::DATA_PAGE_V2; + auto rg0_page_reader = parquet_reader->RowGroup(0)->GetColumnPageReader(0); + ASSERT_EQ(PageType::DICTIONARY_PAGE, rg0_page_reader->NextPage()->type()); + const std::vector rg0_min_values = {"a", "a", "a"}; + const std::vector rg0_max_values = {"a", "a", "b"}; + for (int i = 0; i < 3; ++i) { + auto page = rg0_page_reader->NextPage(); + ASSERT_EQ(expected_page_type, page->type()); + auto data_page = std::static_pointer_cast(page); + ASSERT_EQ(3, data_page->num_values()); + const auto& stats = data_page->statistics(); + EXPECT_EQ(1, stats.null_count); + EXPECT_EQ(rg0_min_values[i], stats.min()); + EXPECT_EQ(rg0_max_values[i], stats.max()); + } + ASSERT_EQ(rg0_page_reader->NextPage(), nullptr); + + auto rg1_page_reader = parquet_reader->RowGroup(1)->GetColumnPageReader(0); + ASSERT_EQ(PageType::DICTIONARY_PAGE, rg1_page_reader->NextPage()->type()); + { + auto page = rg1_page_reader->NextPage(); + ASSERT_EQ(expected_page_type, page->type()); + auto data_page = std::static_pointer_cast(page); + ASSERT_EQ(3, data_page->num_values()); + const auto& stats = data_page->statistics(); + EXPECT_EQ(1, stats.null_count); + EXPECT_EQ("b", stats.min()); + EXPECT_EQ("c", stats.max()); + } + ASSERT_EQ(rg1_page_reader->NextPage(), nullptr); } // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 48d5b93d24a..9f4c0b6900f 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1484,6 +1484,39 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( std::shared_ptr<::arrow::Array> dictionary = data.dictionary(); std::shared_ptr<::arrow::Array> indices = data.indices(); + auto update_stats = [&](int64_t num_chunk_levels, + const std::shared_ptr& chunk_indices) { + // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the + // indices array to a (hopefully smaller) referenced indices array. Second, a copy + // of the values array to a (probably not smaller) referenced values array. + // + // Once the MinMax kernel supports all data types we should use that kernel instead + // as it does not make any copies. + ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); + exec_ctx.set_use_threads(false); + + std::shared_ptr<::arrow::Array> referenced_dictionary; + PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, + ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); + + // On first run, we might be able to re-use the existing dictionary + if (referenced_indices.length() == dictionary->length()) { + referenced_dictionary = dictionary; + } else { + PARQUET_ASSIGN_OR_THROW( + ::arrow::Datum referenced_dictionary_datum, + ::arrow::compute::Take(dictionary, referenced_indices, + ::arrow::compute::TakeOptions(/*boundscheck=*/false), + &exec_ctx)); + referenced_dictionary = referenced_dictionary_datum.make_array(); + } + + int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); + page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + }; + int64_t value_offset = 0; auto WriteIndicesChunk = [&](int64_t offset, int64_t batch_size) { int64_t batch_num_values = 0; @@ -1498,6 +1531,9 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); + if (page_statistics_) { + update_stats(/*num_chunk_levels=*/batch_size, writeable_indices); + } PARQUET_ASSIGN_OR_THROW( writeable_indices, MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool)); @@ -1506,43 +1542,6 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( value_offset += batch_num_spaced_values; }; - auto update_stats = [&]() { - // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the - // indices array to a (hopefully smaller) referenced indices array. Second, a copy - // of the values array to a (probably not smaller) referenced values array. - // - // Once the MinMax kernel supports all data types we should use that kernel instead - // as it does not make any copies. - ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); - exec_ctx.set_use_threads(false); - - std::shared_ptr<::arrow::Array> referenced_dictionary; - // If dictionary is the same dictionary we already have, just use that - if (preserved_dictionary_ && preserved_dictionary_ == dictionary) { - referenced_dictionary = preserved_dictionary_; - } else { - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*indices, &exec_ctx)); - - // On first run, we might be able to re-use the existing dictionary - if (referenced_indices.length() == dictionary->length()) { - referenced_dictionary = dictionary; - } else { - PARQUET_ASSIGN_OR_THROW( - ::arrow::Datum referenced_dictionary_datum, - ::arrow::compute::Take(dictionary, referenced_indices, - ::arrow::compute::TakeOptions(/*boundscheck=*/false), - &exec_ctx)); - referenced_dictionary = referenced_dictionary_datum.make_array(); - } - } - - int64_t non_null_count = indices->length() - indices->null_count(); - page_statistics_->IncrementNullCount(num_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); - }; - // Handle seeing dictionary for the first time if (!preserved_dictionary_) { // It's a new dictionary. Call PutDictionary and keep track of it @@ -1556,19 +1555,11 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( return WriteDense(); } - if (page_statistics_ != nullptr) { - update_stats(); - } preserved_dictionary_ = dictionary; } else if (!dictionary->Equals(*preserved_dictionary_)) { // Dictionary has changed PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding()); return WriteDense(); - } else { - // Dictionary is same, but we need to update stats - if (page_statistics_ != nullptr) { - update_stats(); - } } PARQUET_CATCH_NOT_OK(