From e2b927e3c9480844ef5f0a9e698be3b2b5e311ec Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 29 Aug 2022 18:43:18 +0000 Subject: [PATCH 1/9] Refactor variable names to make it clear that Skip works on values and not on rows. --- cpp/src/parquet/column_reader.h | 12 +++++++----- cpp/src/parquet/column_reader_test.cc | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 1d35e3988ca..765abdda15a 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -104,12 +104,12 @@ class PARQUET_EXPORT PageReader { virtual ~PageReader() = default; static std::unique_ptr Open( - std::shared_ptr stream, int64_t total_num_rows, + std::shared_ptr stream, int64_t total_num_values, Compression::type codec, bool always_compressed = false, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const CryptoContext* ctx = NULLPTR); static std::unique_ptr Open(std::shared_ptr stream, - int64_t total_num_rows, Compression::type codec, + int64_t total_num_values, Compression::type codec, const ReaderProperties& properties, bool always_compressed = false, const CryptoContext* ctx = NULLPTR); @@ -218,9 +218,11 @@ class TypedColumnReader : public ColumnReader { int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count) = 0; - // Skip reading levels - // Returns the number of levels skipped - virtual int64_t Skip(int64_t num_rows_to_skip) = 0; + // Skip reading values. + // Returns the number of values skipped. + // This function will NOT skip rows, and repeated fields may have multiple values + // corresponding to the same row. + virtual int64_t Skip(int64_t num_values_to_skip) = 0; // Read a batch of repetition levels, definition levels, and indices from the // column. And read the dictionary if a dictionary page is encountered during diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index eddfdfb04e8..ec76e78e886 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -229,7 +229,7 @@ class TestPrimitiveReader : public ::testing::Test { }; TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { - int levels_per_page = 100; + int levels_per_page = 10; int num_pages = 50; max_def_level_ = 0; max_rep_level_ = 0; From ca4eeecfe02014837ea8343399e75b244d9880ae Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 29 Aug 2022 18:43:18 +0000 Subject: [PATCH 2/9] Refactor variable names to make it clear that Skip works on values and not on rows. --- cpp/src/parquet/column_reader.cc | 26 +++++++++++++------------- cpp/src/parquet/column_reader.h | 12 +++++++----- cpp/src/parquet/column_reader_test.cc | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 523030fd783..3958393fe2b 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -222,15 +222,15 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) { // and the page metadata. class SerializedPageReader : public PageReader { public: - SerializedPageReader(std::shared_ptr stream, int64_t total_num_rows, + SerializedPageReader(std::shared_ptr stream, int64_t total_num_values, Compression::type codec, const ReaderProperties& properties, const CryptoContext* crypto_ctx, bool always_compressed) : properties_(properties), stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)), page_ordinal_(0), - seen_num_rows_(0), - total_num_rows_(total_num_rows), + seen_num_values_(0), + total_num_values_(total_num_values), decryption_buffer_(AllocateBuffer(properties_.memory_pool(), 0)) { if (crypto_ctx != nullptr) { crypto_ctx_ = *crypto_ctx; @@ -284,10 +284,10 @@ class SerializedPageReader : public PageReader { uint32_t max_page_header_size_; // Number of rows read in data pages so far - int64_t seen_num_rows_; + int64_t seen_num_values_; // Number of rows in all the data pages - int64_t total_num_rows_; + int64_t total_num_values_; // data_page_aad_ and data_page_header_aad_ contain the AAD for data page and data page // header in a single column respectively. @@ -335,7 +335,7 @@ std::shared_ptr SerializedPageReader::NextPage() { // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with - while (seen_num_rows_ < total_num_rows_) { + while (seen_num_values_ < total_num_values_) { uint32_t header_size = 0; uint32_t allowed_page_size = kDefaultPageHeaderSize; @@ -430,7 +430,7 @@ std::shared_ptr SerializedPageReader::NextPage() { throw ParquetException("Invalid page header (negative number of values)"); } EncodedStatistics page_statistics = ExtractStatsFromHeader(header); - seen_num_rows_ += header.num_values; + seen_num_values_ += header.num_values; // Uncompress if needed page_buffer = @@ -457,7 +457,7 @@ std::shared_ptr SerializedPageReader::NextPage() { (header.__isset.is_compressed ? header.is_compressed : false) || always_compressed_; EncodedStatistics page_statistics = ExtractStatsFromHeader(header); - seen_num_rows_ += header.num_values; + seen_num_values_ += header.num_values; // Uncompress if needed int levels_byte_len; @@ -519,23 +519,23 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( } // namespace std::unique_ptr PageReader::Open(std::shared_ptr stream, - int64_t total_num_rows, + int64_t total_num_values, Compression::type codec, const ReaderProperties& properties, bool always_compressed, const CryptoContext* ctx) { return std::unique_ptr(new SerializedPageReader( - std::move(stream), total_num_rows, codec, properties, ctx, always_compressed)); + std::move(stream), total_num_values, codec, properties, ctx, always_compressed)); } std::unique_ptr PageReader::Open(std::shared_ptr stream, - int64_t total_num_rows, + int64_t total_num_values, Compression::type codec, bool always_compressed, ::arrow::MemoryPool* pool, const CryptoContext* ctx) { return std::unique_ptr( - new SerializedPageReader(std::move(stream), total_num_rows, codec, + new SerializedPageReader(std::move(stream), total_num_values, codec, ReaderProperties(pool), ctx, always_compressed)); } @@ -901,7 +901,7 @@ class TypedColumnReaderImpl : public TypedColumnReader, int64_t* levels_read, int64_t* values_read, int64_t* null_count) override; - int64_t Skip(int64_t num_rows_to_skip) override; + int64_t Skip(int64_t num_values_to_skip) override; Type::type type() const override { return this->descr_->physical_type(); } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 1d35e3988ca..765abdda15a 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -104,12 +104,12 @@ class PARQUET_EXPORT PageReader { virtual ~PageReader() = default; static std::unique_ptr Open( - std::shared_ptr stream, int64_t total_num_rows, + std::shared_ptr stream, int64_t total_num_values, Compression::type codec, bool always_compressed = false, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const CryptoContext* ctx = NULLPTR); static std::unique_ptr Open(std::shared_ptr stream, - int64_t total_num_rows, Compression::type codec, + int64_t total_num_values, Compression::type codec, const ReaderProperties& properties, bool always_compressed = false, const CryptoContext* ctx = NULLPTR); @@ -218,9 +218,11 @@ class TypedColumnReader : public ColumnReader { int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count) = 0; - // Skip reading levels - // Returns the number of levels skipped - virtual int64_t Skip(int64_t num_rows_to_skip) = 0; + // Skip reading values. + // Returns the number of values skipped. + // This function will NOT skip rows, and repeated fields may have multiple values + // corresponding to the same row. + virtual int64_t Skip(int64_t num_values_to_skip) = 0; // Read a batch of repetition levels, definition levels, and indices from the // column. And read the dictionary if a dictionary page is encountered during diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index eddfdfb04e8..ec76e78e886 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -229,7 +229,7 @@ class TestPrimitiveReader : public ::testing::Test { }; TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { - int levels_per_page = 100; + int levels_per_page = 10; int num_pages = 50; max_def_level_ = 0; max_rep_level_ = 0; From 03ee3c0a32e31d5755ea844089ed01c1df3a31e1 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 29 Aug 2022 23:47:57 +0000 Subject: [PATCH 3/9] Fix lint issue --- cpp/src/parquet/column_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index f85c0081792..68a7286423a 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1133,8 +1133,8 @@ template int64_t TypedColumnReaderImpl::Skip(int64_t num_values_to_skip) { int64_t values_to_skip = num_values_to_skip; while (HasNext() && values_to_skip > 0) { - // If the number of values to skip is more than the number of undecoded values, skip the - // Page. + // If the number of values to skip is more than the number of undecoded values, skip + // the Page. if (values_to_skip > (this->num_buffered_values_ - this->num_decoded_values_)) { values_to_skip -= this->num_buffered_values_ - this->num_decoded_values_; this->num_decoded_values_ = this->num_buffered_values_; From b500271ba3a43a04606cd89341fe6d7cc205bc60 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Tue, 30 Aug 2022 16:21:24 +0000 Subject: [PATCH 4/9] Clarify the comment for the Skip method. --- cpp/src/parquet/column_reader.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 765abdda15a..9907742524e 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -218,10 +218,13 @@ class TypedColumnReader : public ColumnReader { int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count) = 0; - // Skip reading values. + // Skip reading values. This method will work for both repeated and + // non-repeated fields. Note that this method is skipping values and not + // records. This distinction is important for repeated fields, meaning that + // we are not skipping over the values to the next record. We are skipping + // through them. So after the skip the iterator could be in the middle of a + // repeated field. // Returns the number of values skipped. - // This function will NOT skip rows, and repeated fields may have multiple values - // corresponding to the same row. virtual int64_t Skip(int64_t num_values_to_skip) = 0; // Read a batch of repetition levels, definition levels, and indices from the From e0ff7faa8399369e8fa6f914f89f3dc63203b555 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Tue, 30 Aug 2022 16:24:51 +0000 Subject: [PATCH 5/9] Fix another lint issue. --- cpp/src/parquet/column_reader.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 9907742524e..36ef47c68e8 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -109,7 +109,8 @@ class PARQUET_EXPORT PageReader { ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), const CryptoContext* ctx = NULLPTR); static std::unique_ptr Open(std::shared_ptr stream, - int64_t total_num_values, Compression::type codec, + int64_t total_num_values, + Compression::type codec, const ReaderProperties& properties, bool always_compressed = false, const CryptoContext* ctx = NULLPTR); From ad749a77772a0aa0e00f626a15e944ffede83a01 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Tue, 30 Aug 2022 17:23:25 +0000 Subject: [PATCH 6/9] Add a Skip test for repeated field making it clear that it is skipping values and not records. Add some comments to the existing test to make it more clear. --- cpp/src/parquet/column_reader_test.cc | 62 ++++++++++++++++++++++++--- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index eddfdfb04e8..3c12bf5e776 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -261,7 +261,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { ASSERT_NO_FATAL_FAILURE(ExecuteDict(num_pages, levels_per_page, &descr)); } -TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { +// Tests skipping around page boundaries. +TEST_F(TestPrimitiveReader, TestSkipAroundPageBoundries) { int levels_per_page = 100; int num_pages = 5; max_def_level_ = 0; @@ -290,10 +291,10 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { values_.begin() + static_cast(2.5 * static_cast(levels_per_page))); ASSERT_TRUE(vector_equal(sub_values, vresult)); - // 2) skip_size == page_size (skip across two pages) + // 2) skip_size == page_size (skip across two pages from page 2.5 to 3.5) levels_skipped = reader->Skip(levels_per_page); ASSERT_EQ(levels_per_page, levels_skipped); - // Read half a page + // Read half a page (page 3.5 to 4) reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read); sub_values.clear(); @@ -304,10 +305,10 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { ASSERT_TRUE(vector_equal(sub_values, vresult)); // 3) skip_size < page_size (skip limited to a single page) - // Skip half a page + // Skip half a page (page 4 to 4.5) levels_skipped = reader->Skip(levels_per_page / 2); ASSERT_EQ(0.5 * levels_per_page, levels_skipped); - // Read half a page + // Read half a page (page 4.5 to 5) reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read); sub_values.clear(); @@ -317,6 +318,15 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { values_.end()); ASSERT_TRUE(vector_equal(sub_values, vresult)); + // 4) skip_size = 0 + levels_skipped = reader->Skip(0); + ASSERT_EQ(0, levels_skipped); + + // 5) Skip past the end page. There are 5 pages and we have either skipped + // or read all of them, so there is nothing left to skip. + levels_skipped = reader->Skip(10); + ASSERT_EQ(0, levels_skipped); + values_.clear(); def_levels_.clear(); rep_levels_.clear(); @@ -324,6 +334,48 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { reader_.reset(); } +// Skip with repeated field. This test makes it clear that we are skipping +// values and not records. +TEST_F(TestPrimitiveReader, TestSkipRepeatedField) { + // Example schema: message M { repeated int32 b = 1 } + max_def_level_ = 1; + max_rep_level_ = 1; + NodePtr type = schema::Int32("b", Repetition::REPEATED); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + // Example rows: {}, {[10, 10]}, {[20, 20, 20]} + std::vector values = {10, 10, 20, 20, 20}; + std::vector def_levels{0, 1, 1, 1, 1, 1}; + std::vector rep_levels = {0, 0, 1, 0, 1, 1}; + std::shared_ptr page = + MakeDataPage(&descr, values, /*num_values=*/def_levels.size(), + Encoding::PLAIN, /*indices=*/{}, + /*indices_size=*/0, def_levels, max_def_level_, + rep_levels, max_rep_level_); + + pages_.push_back(std::move(page)); + + InitReader(&descr); + Int32Reader* reader = static_cast(reader_.get()); + + std::vector vresult(4, -1); + std::vector dresult(4, -1); + std::vector rresult(4, -1); + + // Skip two levels. + int64_t levels_skipped = reader->Skip(2); + ASSERT_EQ(2, levels_skipped); + + int64_t values_read = 0; + // Read the next set of values + reader->ReadBatch(10, dresult.data(), rresult.data(), vresult.data(), + &values_read); + ASSERT_EQ(values_read, 4); + // Note that we end up in the record with {[10, 10]} + ASSERT_TRUE(vector_equal({10, 20, 20, 20}, vresult)); + ASSERT_TRUE(vector_equal({1, 1, 1, 1}, dresult)); + ASSERT_TRUE(vector_equal({1, 0, 1, 1}, rresult)); +} + // Page claims to have two values but only 1 is present. TEST_F(TestPrimitiveReader, TestReadValuesMissing) { max_def_level_ = 1; From 174a8b076b65cb8321c71a460d7694a49015698a Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Fri, 2 Sep 2022 18:23:06 +0000 Subject: [PATCH 7/9] Address comments --- cpp/src/parquet/column_reader_test.cc | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index 3c12bf5e776..7743a008887 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -344,7 +344,7 @@ TEST_F(TestPrimitiveReader, TestSkipRepeatedField) { const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); // Example rows: {}, {[10, 10]}, {[20, 20, 20]} std::vector values = {10, 10, 20, 20, 20}; - std::vector def_levels{0, 1, 1, 1, 1, 1}; + std::vector def_levels = {0, 1, 1, 1, 1, 1}; std::vector rep_levels = {0, 0, 1, 0, 1, 1}; std::shared_ptr page = MakeDataPage(&descr, values, /*num_values=*/def_levels.size(), @@ -357,23 +357,24 @@ TEST_F(TestPrimitiveReader, TestSkipRepeatedField) { InitReader(&descr); Int32Reader* reader = static_cast(reader_.get()); - std::vector vresult(4, -1); - std::vector dresult(4, -1); - std::vector rresult(4, -1); + // Vecotrs to hold read values, definition levels, and repetition levels. + std::vector read_vals(4, -1); + std::vector read_defs(4, -1); + std::vector read_reps(4, -1); // Skip two levels. int64_t levels_skipped = reader->Skip(2); ASSERT_EQ(2, levels_skipped); - int64_t values_read = 0; + int64_t num_read_values = 0; // Read the next set of values - reader->ReadBatch(10, dresult.data(), rresult.data(), vresult.data(), - &values_read); - ASSERT_EQ(values_read, 4); + reader->ReadBatch(10, read_defs.data(), read_reps.data(), read_vals.data(), + &num_read_values); + ASSERT_EQ(num_read_values, 4); // Note that we end up in the record with {[10, 10]} - ASSERT_TRUE(vector_equal({10, 20, 20, 20}, vresult)); - ASSERT_TRUE(vector_equal({1, 1, 1, 1}, dresult)); - ASSERT_TRUE(vector_equal({1, 0, 1, 1}, rresult)); + ASSERT_TRUE(vector_equal({10, 20, 20, 20}, read_vals)); + ASSERT_TRUE(vector_equal({1, 1, 1, 1}, read_defs)); + ASSERT_TRUE(vector_equal({1, 0, 1, 1}, read_reps)); } // Page claims to have two values but only 1 is present. From ff4ad3d4a2438a7f35aaa0d695c046422b7f46f6 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 17 Oct 2022 19:48:16 +0000 Subject: [PATCH 8/9] Resolve conflict --- cpp/src/parquet/column_reader.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 36ef47c68e8..4bebdba7e95 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -222,9 +222,10 @@ class TypedColumnReader : public ColumnReader { // Skip reading values. This method will work for both repeated and // non-repeated fields. Note that this method is skipping values and not // records. This distinction is important for repeated fields, meaning that - // we are not skipping over the values to the next record. We are skipping - // through them. So after the skip the iterator could be in the middle of a - // repeated field. + // we are not skipping over the values to the next record. For example, + // consider the following two consecutive records containing one repeated field: + // {[1, 2, 3]}, {[4, 5]}. If we Skip(2), our next read value will be 3, which + // is inside the first record. // Returns the number of values skipped. virtual int64_t Skip(int64_t num_values_to_skip) = 0; From 99692d2fb63a41f20771bb37ffb73fc6acab1476 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 18 Oct 2022 16:49:35 +0200 Subject: [PATCH 9/9] Lint / fix downcast / improve test a bit --- cpp/src/parquet/column_reader_test.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index 510c86584a9..b2f947eea46 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -345,11 +345,10 @@ TEST_F(TestPrimitiveReader, TestSkipRepeatedField) { std::vector values = {10, 10, 20, 20, 20}; std::vector def_levels = {0, 1, 1, 1, 1, 1}; std::vector rep_levels = {0, 0, 1, 0, 1, 1}; - std::shared_ptr page = - MakeDataPage(&descr, values, /*num_values=*/def_levels.size(), - Encoding::PLAIN, /*indices=*/{}, - /*indices_size=*/0, def_levels, max_def_level_, - rep_levels, max_rep_level_); + num_values_ = static_cast(def_levels.size()); + std::shared_ptr page = MakeDataPage( + &descr, values, num_values_, Encoding::PLAIN, /*indices=*/{}, + /*indices_size=*/0, def_levels, max_def_level_, rep_levels, max_rep_level_); pages_.push_back(std::move(page)); @@ -374,6 +373,13 @@ TEST_F(TestPrimitiveReader, TestSkipRepeatedField) { ASSERT_TRUE(vector_equal({10, 20, 20, 20}, read_vals)); ASSERT_TRUE(vector_equal({1, 1, 1, 1}, read_defs)); ASSERT_TRUE(vector_equal({1, 0, 1, 1}, read_reps)); + + // No values remain in data page + levels_skipped = reader->Skip(2); + ASSERT_EQ(0, levels_skipped); + reader->ReadBatch(10, read_defs.data(), read_reps.data(), read_vals.data(), + &num_read_values); + ASSERT_EQ(num_read_values, 0); } // Page claims to have two values but only 1 is present.