From 84f360d6a39999e083bcfb9d167e25b4e1d93fc8 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Sun, 11 Sep 2016 21:05:38 -0400 Subject: [PATCH 1/8] added dictionary fallback support with tests --- src/parquet/column/column-writer-test.cc | 35 ++++++++++++++-- src/parquet/column/page.h | 2 +- src/parquet/column/writer.cc | 53 ++++++++++++++++++------ src/parquet/column/writer.h | 39 ++++++++++++----- src/parquet/file/metadata.cc | 5 ++- src/parquet/file/writer-internal.cc | 6 +-- src/parquet/file/writer-internal.h | 2 +- 7 files changed, 107 insertions(+), 35 deletions(-) diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index b3ca0806..46bf2099 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -39,6 +39,8 @@ namespace test { const int SMALL_SIZE = 100; // Larger size to test some corner cases, only used in some specific cases. const int LARGE_SIZE = 10000; +// Very large size to test dictionary fallback. +const int VERY_LARGE_SIZE = 1000000; template class TestPrimitiveWriter : public ::testing::Test { @@ -74,8 +76,6 @@ class TestPrimitiveWriter : public ::testing::Test { repetition_levels_out_.resize(SMALL_SIZE); SetUpSchemaRequired(); - metadata_accessor_ = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); } void BuildReader() { @@ -130,7 +130,17 @@ class TestPrimitiveWriter : public ::testing::Test { ASSERT_EQ(this->values_, this->values_out_); } - int64_t metadata_num_values() const { return metadata_accessor_->num_values(); } + int64_t metadata_num_values() { + auto metadata_accessor = + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + return metadata_accessor->num_values(); + } + + int64_t metadata_num_encodings() { + auto metadata_accessor = + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + return metadata_accessor->encodings().size(); + } protected: int64_t values_read_; @@ -156,7 +166,6 @@ class TestPrimitiveWriter : public ::testing::Test { NodePtr node_; format::ColumnChunk thrift_metadata_; std::unique_ptr metadata_; - std::unique_ptr metadata_accessor_; std::shared_ptr schema_; std::unique_ptr sink_; std::shared_ptr writer_properties_; @@ -205,6 +214,8 @@ void TestPrimitiveWriter::GenerateData(int64_t num_values) { typedef ::testing::Types TestTypes; +typedef TestPrimitiveWriter TestInt32PrimitiveWriter; + TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { @@ -334,5 +345,21 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { ASSERT_EQ(this->values_, this->values_out_); } +// Test case for dictionary fallback encoding +TEST_F(TestInt32PrimitiveWriter, RequiredVeryLargeChunk) { + this->GenerateData(VERY_LARGE_SIZE); + + auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY); + writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); + writer->Close(); + + // Just read the first SMALL_SIZE rows to ensure we could read it back in + this->ReadColumn(); + ASSERT_EQ(SMALL_SIZE, this->values_read_); + this->values_.resize(SMALL_SIZE); + ASSERT_EQ(this->values_, this->values_out_); + // There are 3 encodings in a fallback case + ASSERT_EQ(3, this->metadata_num_encodings()); +} } // namespace test } // namespace parquet diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 13dec2c1..7b9992dc 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -171,7 +171,7 @@ class PageWriter { public: virtual ~PageWriter() {} - virtual void Close() = 0; + virtual void Close(bool fallback) = 0; virtual int64_t WriteDataPage(const DataPage& page) = 0; diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index da4b17c6..736577a9 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -33,7 +33,7 @@ std::shared_ptr default_writer_properties() { } ColumnWriter::ColumnWriter(const ColumnDescriptor* descr, - std::unique_ptr pager, int64_t expected_rows, bool has_dictionary, + std::unique_ptr pager, int64_t expected_rows, bool has_dictionary, Encoding::type encoding, const WriterProperties* properties) : descr_(descr), pager_(std::move(pager)), @@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr, num_buffered_encoded_values_(0), num_rows_(0), total_bytes_written_(0), - closed_(false) { + closed_(false), + fallback_(false) { InitSinks(); } @@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() { DataPage page( uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE); - data_pages_.push_back(std::move(page)); + // Write the page to OutputStream eagerly if there is no dictionary or + // if dictionary encoding has fallen back to PLAIN + if (has_dictionary_ && !fallback_) {// Saves pages until end of dictionary encoding + data_pages_.push_back(std::move(page)); + } else {// Eagerly write pages + WriteDataPage(page); + } // Re-initialize the sinks as GetBuffer made them invalid. InitSinks(); @@ -134,23 +141,23 @@ void ColumnWriter::WriteDataPage(const DataPage& page) { int64_t ColumnWriter::Close() { if (!closed_) { closed_ = true; - if (has_dictionary_) { WriteDictionaryPage(); } + if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); } // Write all outstanding data to a new page if (num_buffered_values_ > 0) { AddDataPage(); } for (size_t i = 0; i < data_pages_.size(); i++) { WriteDataPage(data_pages_[i]); } + + pager_->Close(fallback_); } if (num_rows_ != expected_rows_) { - throw ParquetException( - "Less then the number of expected rows written in" - " the current column chunk"); + throw ParquetException( + "Less than the number of expected rows written in" + " the current column chunk"); } - pager_->Close(); - return total_bytes_written_; } @@ -158,28 +165,48 @@ int64_t ColumnWriter::Close() { // TypedColumnWriter template -TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* schema, +TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, std::unique_ptr pager, int64_t expected_rows, Encoding::type encoding, const WriterProperties* properties) - : ColumnWriter(schema, std::move(pager), expected_rows, + : ColumnWriter(descr, std::move(pager), expected_rows, (encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY), encoding, properties) { switch (encoding) { case Encoding::PLAIN: current_encoder_ = std::unique_ptr( - new PlainEncoder(schema, properties->allocator())); + new PlainEncoder(descr, properties->allocator())); break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: current_encoder_ = std::unique_ptr( - new DictEncoder(schema, &pool_, properties->allocator())); + new DictEncoder(descr, &pool_, properties->allocator())); break; default: ParquetException::NYI("Selected encoding is not supported"); } } +// Only one Dictionary Page is written. +// Fallback to PLAIN if dictionary page limit is reached. +template +void TypedColumnWriter::VerifyDictionaryFallback() { + auto dict_encoder = static_cast*>(current_encoder_.get()); + if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize()) { + WriteDictionaryPage(); + // Write the buffered Dictionary Indicies + AddDataPage(); + for (size_t i = 0; i < data_pages_.size(); i++) { + WriteDataPage(data_pages_[i]); + } + data_pages_.clear(); + fallback_ = true; + // Only PLAIN encoding is supported for fallback in V1 + current_encoder_ = std::unique_ptr( + new PlainEncoder(descr_, properties_->allocator())); + } +} + template void TypedColumnWriter::WriteDictionaryPage() { auto dict_encoder = static_cast*>(current_encoder_.get()); diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index c88ead16..1ae00436 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -33,6 +33,7 @@ namespace parquet { +static constexpr int WRITE_BURST = 1000; class PARQUET_EXPORT ColumnWriter { public: ColumnWriter(const ColumnDescriptor*, std::unique_ptr, @@ -57,6 +58,7 @@ class PARQUET_EXPORT ColumnWriter { protected: virtual std::shared_ptr GetValuesBuffer() = 0; virtual void WriteDictionaryPage() = 0; + virtual void VerifyDictionaryFallback() = 0; void AddDataPage(); void WriteDataPage(const DataPage& page); @@ -103,13 +105,15 @@ class PARQUET_EXPORT ColumnWriter { int total_bytes_written_; bool closed_; + bool fallback_; + std::unique_ptr definition_levels_sink_; std::unique_ptr repetition_levels_sink_; + std::vector data_pages_; + private: void InitSinks(); - - std::vector data_pages_; }; // API to write values to a single column. This is the main client facing API. @@ -131,26 +135,23 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { return current_encoder_->FlushValues(); } void WriteDictionaryPage() override; + void VerifyDictionaryFallback() override; private: + void WriteMiniBatch(int64_t num_values, const int16_t* def_levels, + const int16_t* rep_levels, const T* values); + typedef Encoder EncoderType; // Write values to a temporary buffer before they are encoded into pages void WriteValues(int64_t num_values, const T* values); - - // Map of encoding type to the respective encoder object. For example, a - // column chunk's data pages may include both dictionary-encoded and - // plain-encoded data. - std::unordered_map> encoders_; - std::unique_ptr current_encoder_; }; template -inline void TypedColumnWriter::WriteBatch(int64_t num_values, +inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { int64_t values_to_write = 0; - // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { for (int64_t i = 0; i < num_values; ++i) { @@ -178,7 +179,7 @@ inline void TypedColumnWriter::WriteBatch(int64_t num_values, } if (num_rows_ > expected_rows_) { - throw ParquetException("More rows were written in the column chunk then expected"); + throw ParquetException("More rows were written in the column chunk than expected"); } WriteValues(values_to_write, values); @@ -189,8 +190,24 @@ inline void TypedColumnWriter::WriteBatch(int64_t num_values, if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { AddDataPage(); } + if (has_dictionary_ && !fallback_) { VerifyDictionaryFallback(); } } +template +inline void TypedColumnWriter::WriteBatch(int64_t num_values, + const int16_t* def_levels, const int16_t* rep_levels, const T* values) { + +// WriteMiniBatch(num_values, def_levels, rep_levels, values); + int num_batches = num_values / WRITE_BURST; + int64_t num_remaining = num_values % WRITE_BURST; + for (int round = 0; round < num_batches; round++) { + int64_t offset = round * WRITE_BURST; + WriteMiniBatch(WRITE_BURST, &def_levels[offset], &rep_levels[offset], &values[offset]); + } + int64_t offset = num_batches * WRITE_BURST; + WriteMiniBatch(num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); +} + template void TypedColumnWriter::WriteValues(int64_t num_values, const T* values) { current_encoder_->Put(values, num_values); diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index bc0f7b9a..35cb771f 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -375,9 +375,12 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); } } - if (!properties_->dictionary_enabled(column_->path()) || dictionary_fallback) { + else { // Dictionary not enabled thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); } + if (dictionary_fallback) {// Only PLAIN encoding is supported for fallback in V1 + thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); + } column_chunk_->meta_data.__set_encodings(thrift_encodings); } diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index fb05f13f..93489dc3 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -46,11 +46,9 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type compressor_ = Codec::Create(codec); } -void SerializedPageWriter::Close() { - // index_page_offset = 0 since they are not supported - // TODO: Remove default fallback = 'false' when implemented +void SerializedPageWriter::Close(bool fallback) { metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, - total_compressed_size_, total_uncompressed_size_, false); + total_compressed_size_, total_uncompressed_size_, fallback); } std::shared_ptr SerializedPageWriter::Compress( diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 645d4bfc..6c3941e0 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -44,7 +44,7 @@ class SerializedPageWriter : public PageWriter { int64_t WriteDictionaryPage(const DictionaryPage& page) override; - void Close() override; + void Close(bool fallback) override; private: OutputStream* sink_; From 54af38adc0d4cb34f99904e1811a17eb13c4bb3d Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Mon, 12 Sep 2016 11:42:10 -0400 Subject: [PATCH 2/8] clang format --- src/parquet/column/column-writer-test.cc | 2 +- src/parquet/column/writer.cc | 36 ++++++++++++------------ src/parquet/column/writer.h | 11 ++++---- src/parquet/file/metadata.cc | 5 ++-- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 46bf2099..8d7084fc 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -130,7 +130,7 @@ class TestPrimitiveWriter : public ::testing::Test { ASSERT_EQ(this->values_, this->values_out_); } - int64_t metadata_num_values() { + int64_t metadata_num_values() { auto metadata_accessor = ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); return metadata_accessor->num_values(); diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 736577a9..5b222202 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -33,7 +33,7 @@ std::shared_ptr default_writer_properties() { } ColumnWriter::ColumnWriter(const ColumnDescriptor* descr, - std::unique_ptr pager, int64_t expected_rows, bool has_dictionary, + std::unique_ptr pager, int64_t expected_rows, bool has_dictionary, Encoding::type encoding, const WriterProperties* properties) : descr_(descr), pager_(std::move(pager)), @@ -121,10 +121,10 @@ void ColumnWriter::AddDataPage() { // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN - if (has_dictionary_ && !fallback_) {// Saves pages until end of dictionary encoding + if (has_dictionary_ && !fallback_) { // Saves pages until end of dictionary encoding data_pages_.push_back(std::move(page)); - } else {// Eagerly write pages - WriteDataPage(page); + } else { // Eagerly write pages + WriteDataPage(page); } // Re-initialize the sinks as GetBuffer made them invalid. @@ -153,9 +153,9 @@ int64_t ColumnWriter::Close() { } if (num_rows_ != expected_rows_) { - throw ParquetException( - "Less than the number of expected rows written in" - " the current column chunk"); + throw ParquetException( + "Less than the number of expected rows written in" + " the current column chunk"); } return total_bytes_written_; @@ -193,18 +193,18 @@ template void TypedColumnWriter::VerifyDictionaryFallback() { auto dict_encoder = static_cast*>(current_encoder_.get()); if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize()) { - WriteDictionaryPage(); - // Write the buffered Dictionary Indicies - AddDataPage(); - for (size_t i = 0; i < data_pages_.size(); i++) { - WriteDataPage(data_pages_[i]); - } - data_pages_.clear(); - fallback_ = true; - // Only PLAIN encoding is supported for fallback in V1 - current_encoder_ = std::unique_ptr( - new PlainEncoder(descr_, properties_->allocator())); + WriteDictionaryPage(); + // Write the buffered Dictionary Indicies + AddDataPage(); + for (size_t i = 0; i < data_pages_.size(); i++) { + WriteDataPage(data_pages_[i]); } + data_pages_.clear(); + fallback_ = true; + // Only PLAIN encoding is supported for fallback in V1 + current_encoder_ = std::unique_ptr( + new PlainEncoder(descr_, properties_->allocator())); + } } template diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 1ae00436..f30c9920 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -196,18 +196,19 @@ inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, template inline void TypedColumnWriter::WriteBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { - -// WriteMiniBatch(num_values, def_levels, rep_levels, values); + // WriteMiniBatch(num_values, def_levels, rep_levels, values); int num_batches = num_values / WRITE_BURST; int64_t num_remaining = num_values % WRITE_BURST; for (int round = 0; round < num_batches; round++) { int64_t offset = round * WRITE_BURST; - WriteMiniBatch(WRITE_BURST, &def_levels[offset], &rep_levels[offset], &values[offset]); + WriteMiniBatch( + WRITE_BURST, &def_levels[offset], &rep_levels[offset], &values[offset]); } int64_t offset = num_batches * WRITE_BURST; - WriteMiniBatch(num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); + WriteMiniBatch( + num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); } - + template void TypedColumnWriter::WriteValues(int64_t num_values, const T* values) { current_encoder_->Put(values, num_values); diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 35cb771f..158b9a03 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -374,11 +374,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { if (properties_->version() == ParquetVersion::PARQUET_2_0) { thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding())); } - } - else { // Dictionary not enabled + } else { // Dictionary not enabled thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); } - if (dictionary_fallback) {// Only PLAIN encoding is supported for fallback in V1 + if (dictionary_fallback) { // Only PLAIN encoding is supported for fallback in V1 thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); } column_chunk_->meta_data.__set_encodings(thrift_encodings); From dd0cc7e404f947c562fa9f1d6adf6ccce92ac52d Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Mon, 12 Sep 2016 12:28:07 -0400 Subject: [PATCH 3/8] Add all types to the test --- src/parquet/column/column-writer-test.cc | 21 ++++++++++++++------- src/parquet/column/writer.cc | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 8d7084fc..a46a5676 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -40,7 +40,7 @@ const int SMALL_SIZE = 100; // Larger size to test some corner cases, only used in some specific cases. const int LARGE_SIZE = 10000; // Very large size to test dictionary fallback. -const int VERY_LARGE_SIZE = 1000000; +const int VERY_LARGE_SIZE = 400000; template class TestPrimitiveWriter : public ::testing::Test { @@ -78,6 +78,8 @@ class TestPrimitiveWriter : public ::testing::Test { SetUpSchemaRequired(); } + Type::type type_num() { return TestType::type_num; } + void BuildReader() { auto buffer = sink_->GetBuffer(); std::unique_ptr source(new InMemoryInputStream(buffer)); @@ -214,8 +216,6 @@ void TestPrimitiveWriter::GenerateData(int64_t num_values) { typedef ::testing::Types TestTypes; -typedef TestPrimitiveWriter TestInt32PrimitiveWriter; - TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); TYPED_TEST(TestPrimitiveWriter, RequiredPlain) { @@ -345,8 +345,8 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { ASSERT_EQ(this->values_, this->values_out_); } -// Test case for dictionary fallback encoding -TEST_F(TestInt32PrimitiveWriter, RequiredVeryLargeChunk) { +// Test case for dictionary fallback encoding skip BooleanType +TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { this->GenerateData(VERY_LARGE_SIZE); auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY); @@ -358,8 +358,15 @@ TEST_F(TestInt32PrimitiveWriter, RequiredVeryLargeChunk) { ASSERT_EQ(SMALL_SIZE, this->values_read_); this->values_.resize(SMALL_SIZE); ASSERT_EQ(this->values_, this->values_out_); - // There are 3 encodings in a fallback case - ASSERT_EQ(3, this->metadata_num_encodings()); + if (this->type_num() != Type::BOOLEAN) { + // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case + ASSERT_EQ(3, this->metadata_num_encodings()); + } else { + // Dictionary encoding is not allowed for boolean type + // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case + ASSERT_EQ(2, this->metadata_num_encodings()); + } } + } // namespace test } // namespace parquet diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 5b222202..41d8e4e7 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -121,7 +121,7 @@ void ColumnWriter::AddDataPage() { // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN - if (has_dictionary_ && !fallback_) { // Saves pages until end of dictionary encoding + if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding data_pages_.push_back(std::move(page)); } else { // Eagerly write pages WriteDataPage(page); From 312bad896815d91f5d3b3a98d21de434f65f3e8e Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Mon, 12 Sep 2016 12:48:39 -0400 Subject: [PATCH 4/8] minor changes --- src/parquet/column/writer.h | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index f30c9920..1b325e2e 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -33,7 +33,7 @@ namespace parquet { -static constexpr int WRITE_BURST = 1000; +static constexpr int WRITE_BATCH_SIZE = 1000; class PARQUET_EXPORT ColumnWriter { public: ColumnWriter(const ColumnDescriptor*, std::unique_ptr, @@ -196,15 +196,14 @@ inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, template inline void TypedColumnWriter::WriteBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { - // WriteMiniBatch(num_values, def_levels, rep_levels, values); - int num_batches = num_values / WRITE_BURST; - int64_t num_remaining = num_values % WRITE_BURST; + int num_batches = num_values / WRITE_BATCH_SIZE; + int64_t num_remaining = num_values % WRITE_BATCH_SIZE; for (int round = 0; round < num_batches; round++) { - int64_t offset = round * WRITE_BURST; + int64_t offset = round * WRITE_BATCH_SIZE; WriteMiniBatch( - WRITE_BURST, &def_levels[offset], &rep_levels[offset], &values[offset]); + WRITE_BATCH_SIZE, &def_levels[offset], &rep_levels[offset], &values[offset]); } - int64_t offset = num_batches * WRITE_BURST; + int64_t offset = num_batches * WRITE_BATCH_SIZE; WriteMiniBatch( num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); } From da460337add0e1180b4f86a05f947cd39be544e6 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Tue, 13 Sep 2016 11:50:48 -0400 Subject: [PATCH 5/8] added comments and fixed review suggestions --- src/parquet/column/column-writer-test.cc | 27 +++++++++----- src/parquet/column/page.h | 4 +- src/parquet/column/properties-test.cc | 2 +- src/parquet/column/properties.h | 33 ++++++++++++----- src/parquet/column/writer.cc | 38 +++++++++---------- src/parquet/column/writer.h | 47 ++++++++++++++++++++---- src/parquet/file/file-metadata-test.cc | 8 ++-- src/parquet/file/metadata.cc | 12 +++--- src/parquet/file/metadata.h | 2 +- src/parquet/file/writer-internal.cc | 4 +- src/parquet/file/writer-internal.h | 2 +- 11 files changed, 117 insertions(+), 62 deletions(-) diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index a46a5676..353b461b 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -133,15 +133,21 @@ class TestPrimitiveWriter : public ::testing::Test { } int64_t metadata_num_values() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) auto metadata_accessor = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); return metadata_accessor->num_values(); } - int64_t metadata_num_encodings() { + std::vector metadata_encodings() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) auto metadata_accessor = - ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); - return metadata_accessor->encodings().size(); + ColumnChunkMetaData::Make(reinterpret_cast(&thrift_metadata_)); + return metadata_accessor->encodings(); } protected: @@ -358,13 +364,16 @@ TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { ASSERT_EQ(SMALL_SIZE, this->values_read_); this->values_.resize(SMALL_SIZE); ASSERT_EQ(this->values_, this->values_out_); + std::vector encodings = this->metadata_encodings(); + // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case + // Dictionary encoding is not allowed for boolean type + // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case + ASSERT_EQ(Encoding::RLE, encodings[0]); if (this->type_num() != Type::BOOLEAN) { - // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case - ASSERT_EQ(3, this->metadata_num_encodings()); + ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]); + ASSERT_EQ(Encoding::PLAIN, encodings[2]); } else { - // Dictionary encoding is not allowed for boolean type - // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case - ASSERT_EQ(2, this->metadata_num_encodings()); + ASSERT_EQ(Encoding::PLAIN, encodings[1]); } } diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 7b9992dc..22dd4129 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -171,7 +171,9 @@ class PageWriter { public: virtual ~PageWriter() {} - virtual void Close(bool fallback) = 0; + // The Column Writer decides if dictionary encoding is used if set and + // if the dictionary encoding has fallen back to default encoding on reaching dictionary page limit + virtual void Close(bool has_dictionary, bool fallback) = 0; virtual int64_t WriteDataPage(const DataPage& page) = 0; diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc index f1eeaf32..0d7314ba 100644 --- a/src/parquet/column/properties-test.cc +++ b/src/parquet/column/properties-test.cc @@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) { std::shared_ptr props = WriterProperties::Builder().build(); ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize()); - ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize()); + ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit()); ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version()); } diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index c8f103b2..6f1392cd 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties(); static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024; static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; -static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; +static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE; +static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; @@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties { Builder() : allocator_(default_allocator()), dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED), - dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE), + dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), + write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), pagesize_(DEFAULT_PAGE_SIZE), version_(DEFAULT_WRITER_VERSION), created_by_(DEFAULT_CREATED_BY), @@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties { return this->enable_dictionary(path->ToDotString()); } - Builder* dictionary_pagesize(int64_t dictionary_psize) { - dictionary_pagesize_ = dictionary_psize; + Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) { + dictionary_pagesize_limit_ = dictionary_psize_limit; + return this; + } + + Builder* write_batch_size(int64_t write_batch_size) { + write_batch_size_ = write_batch_size; return this; } @@ -216,7 +223,7 @@ class PARQUET_EXPORT WriterProperties { std::shared_ptr build() { return std::shared_ptr( new WriterProperties(allocator_, dictionary_enabled_default_, - dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_, + dictionary_enabled_, dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_, default_encoding_, encodings_, default_codec_, codecs_)); } @@ -224,7 +231,8 @@ class PARQUET_EXPORT WriterProperties { MemoryAllocator* allocator_; bool dictionary_enabled_default_; std::unordered_map dictionary_enabled_; - int64_t dictionary_pagesize_; + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type version_; std::string created_by_; @@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties { return dictionary_enabled_default_; } - inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; } + inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; } + + inline int64_t write_batch_size() const { return write_batch_size_; } inline int64_t data_pagesize() const { return pagesize_; } @@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default, std::unordered_map dictionary_enabled, - int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version, + int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize, + ParquetVersion::type version, const std::string& created_by, Encoding::type default_encoding, std::unordered_map encodings, Compression::type default_codec, const ColumnCodecs& codecs) : allocator_(allocator), dictionary_enabled_default_(dictionary_enabled_default), dictionary_enabled_(dictionary_enabled), - dictionary_pagesize_(dictionary_pagesize), + dictionary_pagesize_limit_(dictionary_pagesize), + write_batch_size_(write_batch_size), pagesize_(pagesize), parquet_version_(version), parquet_created_by_(created_by), @@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties { MemoryAllocator* allocator_; bool dictionary_enabled_default_; std::unordered_map dictionary_enabled_; - int64_t dictionary_pagesize_; + int64_t dictionary_pagesize_limit_; + int64_t write_batch_size_; int64_t pagesize_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 41d8e4e7..ab45dcf1 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -142,14 +142,10 @@ int64_t ColumnWriter::Close() { if (!closed_) { closed_ = true; if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); } - // Write all outstanding data to a new page - if (num_buffered_values_ > 0) { AddDataPage(); } - for (size_t i = 0; i < data_pages_.size(); i++) { - WriteDataPage(data_pages_[i]); - } + FlushBufferedDataPages(); - pager_->Close(fallback_); + pager_->Close(has_dictionary_, fallback_); } if (num_rows_ != expected_rows_) { @@ -161,6 +157,15 @@ int64_t ColumnWriter::Close() { return total_bytes_written_; } +void ColumnWriter::FlushBufferedDataPages() { + // Write all outstanding data to a new page + if (num_buffered_values_ > 0) { AddDataPage(); } + for (size_t i = 0; i < data_pages_.size(); i++) { + WriteDataPage(data_pages_[i]); + } + data_pages_.clear(); +} + // ---------------------------------------------------------------------- // TypedColumnWriter @@ -174,13 +179,11 @@ TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, encoding, properties) { switch (encoding) { case Encoding::PLAIN: - current_encoder_ = std::unique_ptr( - new PlainEncoder(descr, properties->allocator())); + current_encoder_.reset(new PlainEncoder(descr, properties->allocator())); break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: - current_encoder_ = std::unique_ptr( - new DictEncoder(descr, &pool_, properties->allocator())); + current_encoder_.reset(new DictEncoder(descr, &pool_, properties->allocator())); break; default: ParquetException::NYI("Selected encoding is not supported"); @@ -190,20 +193,15 @@ TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, // Only one Dictionary Page is written. // Fallback to PLAIN if dictionary page limit is reached. template -void TypedColumnWriter::VerifyDictionaryFallback() { +void TypedColumnWriter::CheckDictionarySizeLimit() { auto dict_encoder = static_cast*>(current_encoder_.get()); - if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize()) { + if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) { WriteDictionaryPage(); - // Write the buffered Dictionary Indicies - AddDataPage(); - for (size_t i = 0; i < data_pages_.size(); i++) { - WriteDataPage(data_pages_[i]); - } - data_pages_.clear(); + // Serialize the buffered Dictionary Indicies + FlushBufferedDataPages(); fallback_ = true; // Only PLAIN encoding is supported for fallback in V1 - current_encoder_ = std::unique_ptr( - new PlainEncoder(descr_, properties_->allocator())); + current_encoder_.reset(new PlainEncoder(descr_, properties_->allocator())); } } diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 1b325e2e..e7602fe1 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -57,10 +57,26 @@ class PARQUET_EXPORT ColumnWriter { protected: virtual std::shared_ptr GetValuesBuffer() = 0; + /** + * Serializes Dictionary Page if enabled + */ + virtual void WriteDictionaryPage() = 0; - virtual void VerifyDictionaryFallback() = 0; + /** + * Checks if the Dictionary Page size limit is reached + * If the limit is reached, the Dictionary and Data Pages are serialized + * The encoding is switched to PLAIN + */ + + virtual void CheckDictionarySizeLimit() = 0; + /** + * Adds Data Pages to an in memory buffer in dictionary encoding mode + * Serializes the Data Pages in other encoding modes + */ void AddDataPage(); + + // Serializes Data Pages void WriteDataPage(const DataPage& page); // Write multiple definition levels @@ -72,6 +88,9 @@ class PARQUET_EXPORT ColumnWriter { std::shared_ptr RleEncodeLevels( const std::shared_ptr& buffer, int16_t max_level); + // Serialize the buffered Data Pages + void FlushBufferedDataPages(); + const ColumnDescriptor* descr_; std::unique_ptr pager_; @@ -102,9 +121,13 @@ class PARQUET_EXPORT ColumnWriter { // Total number of rows written with this ColumnWriter int num_rows_; + // Records the total number of bytes written by the serializer int total_bytes_written_; + + // Flag to check if the Writer has been closed bool closed_; + // Flag to infer if dictionary encoding has fallen back to PLAIN bool fallback_; std::unique_ptr definition_levels_sink_; @@ -135,7 +158,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { return current_encoder_->FlushValues(); } void WriteDictionaryPage() override; - void VerifyDictionaryFallback() override; + void CheckDictionarySizeLimit() override; private: void WriteMiniBatch(int64_t num_values, const int16_t* def_levels, @@ -190,20 +213,28 @@ inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) { AddDataPage(); } - if (has_dictionary_ && !fallback_) { VerifyDictionaryFallback(); } + if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); } } template inline void TypedColumnWriter::WriteBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { - int num_batches = num_values / WRITE_BATCH_SIZE; - int64_t num_remaining = num_values % WRITE_BATCH_SIZE; + /* We check for DataPage limits only after we have inserted the values. If a user + * writes a large number of values, the DataPage size can be much above the limit. + * The purpose of this chunking is to bound this. Even if a user writes large number + * of values, the chunking will ensure the AddDataPage() is called at a reasonable + * pagesize limit + */ + int64_t write_batch_size = properties_->write_batch_size(); + int num_batches = num_values / write_batch_size; + int64_t num_remaining = num_values % write_batch_size; for (int round = 0; round < num_batches; round++) { - int64_t offset = round * WRITE_BATCH_SIZE; + int64_t offset = round * write_batch_size; WriteMiniBatch( - WRITE_BATCH_SIZE, &def_levels[offset], &rep_levels[offset], &values[offset]); + write_batch_size, &def_levels[offset], &rep_levels[offset], &values[offset]); } - int64_t offset = num_batches * WRITE_BATCH_SIZE; + // Write the remaining values + int64_t offset = num_batches * write_batch_size; WriteMiniBatch( num_remaining, &def_levels[offset], &rep_levels[offset], &values[offset]); } diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 852072cd..a7f83c57 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -64,8 +64,8 @@ TEST(Metadata, TestBuildAccess) { // column metadata col1_builder->SetStatistics(stats_int); col2_builder->SetStatistics(stats_float); - col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, false); - col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, false); + col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false); + col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false); rg1_builder->Finish(1024); // rowgroup2 metadata @@ -74,8 +74,8 @@ TEST(Metadata, TestBuildAccess) { // column metadata col1_builder->SetStatistics(stats_int); col2_builder->SetStatistics(stats_float); - col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, false); - col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, false); + col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false); + col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false); rg2_builder->Finish(1024); // Read the metadata diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 158b9a03..a2287eea 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -353,7 +353,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, - int64_t uncompressed_size, bool dictionary_fallback = false) { + int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { if (dictionary_page_offset > 0) { column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size); } else { @@ -368,7 +368,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_total_compressed_size(compressed_size); std::vector thrift_encodings; thrift_encodings.push_back(ToThrift(Encoding::RLE)); - if (properties_->dictionary_enabled(column_->path())) { + if (has_dictionary) { thrift_encodings.push_back(ToThrift(properties_->dictionary_page_encoding())); // add the encoding only if it is unique if (properties_->version() == ParquetVersion::PARQUET_2_0) { @@ -377,7 +377,9 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { } else { // Dictionary not enabled thrift_encodings.push_back(ToThrift(properties_->encoding(column_->path()))); } - if (dictionary_fallback) { // Only PLAIN encoding is supported for fallback in V1 + // Only PLAIN encoding is supported for fallback in V1 + // TODO(majetideepak): Use user specified encoding for V2 + if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); } column_chunk_->meta_data.__set_encodings(thrift_encodings); @@ -412,9 +414,9 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, - int64_t compressed_size, int64_t uncompressed_size, bool dictionary_fallback) { + int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, - compressed_size, uncompressed_size, dictionary_fallback); + compressed_size, uncompressed_size, has_dictionary, dictionary_fallback); } const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 1d966213..0ef6fa03 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -149,7 +149,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // commit the metadata void Finish(int64_t num_values, int64_t dictonary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, - int64_t uncompressed_size, bool dictionary_fallback); + int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback); private: explicit ColumnChunkMetaDataBuilder(const std::shared_ptr& props, diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 93489dc3..2d396b7f 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -46,9 +46,9 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type compressor_ = Codec::Create(codec); } -void SerializedPageWriter::Close(bool fallback) { +void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, - total_compressed_size_, total_uncompressed_size_, fallback); + total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback); } std::shared_ptr SerializedPageWriter::Compress( diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 6c3941e0..e6364e9d 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -44,7 +44,7 @@ class SerializedPageWriter : public PageWriter { int64_t WriteDictionaryPage(const DictionaryPage& page) override; - void Close(bool fallback) override; + void Close(bool has_dictionary, bool fallback) override; private: OutputStream* sink_; From eac91145cf11ac9df05d218b053576719158c443 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Tue, 13 Sep 2016 11:58:08 -0400 Subject: [PATCH 6/8] clang format --- src/parquet/column/page.h | 3 ++- src/parquet/column/properties.h | 12 ++++++------ src/parquet/column/writer.cc | 15 ++++++++------- src/parquet/column/writer.h | 2 +- src/parquet/file/metadata.cc | 7 +++---- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index 22dd4129..c06d3de9 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -172,7 +172,8 @@ class PageWriter { virtual ~PageWriter() {} // The Column Writer decides if dictionary encoding is used if set and - // if the dictionary encoding has fallen back to default encoding on reaching dictionary page limit + // if the dictionary encoding has fallen back to default encoding on reaching dictionary + // page limit virtual void Close(bool has_dictionary, bool fallback) = 0; virtual int64_t WriteDataPage(const DataPage& page) = 0; diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index 6f1392cd..91a4672d 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -221,10 +221,10 @@ class PARQUET_EXPORT WriterProperties { } std::shared_ptr build() { - return std::shared_ptr( - new WriterProperties(allocator_, dictionary_enabled_default_, - dictionary_enabled_, dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_, - default_encoding_, encodings_, default_codec_, codecs_)); + return std::shared_ptr(new WriterProperties(allocator_, + dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_, + write_batch_size_, pagesize_, version_, created_by_, default_encoding_, + encodings_, default_codec_, codecs_)); } private: @@ -297,8 +297,8 @@ class PARQUET_EXPORT WriterProperties { explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default, std::unordered_map dictionary_enabled, int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize, - ParquetVersion::type version, - const std::string& created_by, Encoding::type default_encoding, + ParquetVersion::type version, const std::string& created_by, + Encoding::type default_encoding, std::unordered_map encodings, Compression::type default_codec, const ColumnCodecs& codecs) : allocator_(allocator), diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index ab45dcf1..1fbea62d 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -158,12 +158,12 @@ int64_t ColumnWriter::Close() { } void ColumnWriter::FlushBufferedDataPages() { - // Write all outstanding data to a new page - if (num_buffered_values_ > 0) { AddDataPage(); } - for (size_t i = 0; i < data_pages_.size(); i++) { - WriteDataPage(data_pages_[i]); - } - data_pages_.clear(); + // Write all outstanding data to a new page + if (num_buffered_values_ > 0) { AddDataPage(); } + for (size_t i = 0; i < data_pages_.size(); i++) { + WriteDataPage(data_pages_[i]); + } + data_pages_.clear(); } // ---------------------------------------------------------------------- @@ -183,7 +183,8 @@ TypedColumnWriter::TypedColumnWriter(const ColumnDescriptor* descr, break; case Encoding::PLAIN_DICTIONARY: case Encoding::RLE_DICTIONARY: - current_encoder_.reset(new DictEncoder(descr, &pool_, properties->allocator())); + current_encoder_.reset( + new DictEncoder(descr, &pool_, properties->allocator())); break; default: ParquetException::NYI("Selected encoding is not supported"); diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index e7602fe1..e4aed836 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -123,7 +123,7 @@ class PARQUET_EXPORT ColumnWriter { // Records the total number of bytes written by the serializer int total_bytes_written_; - + // Flag to check if the Writer has been closed bool closed_; diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index a2287eea..00ce990b 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -379,9 +379,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { } // Only PLAIN encoding is supported for fallback in V1 // TODO(majetideepak): Use user specified encoding for V2 - if (dictionary_fallback) { - thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); - } + if (dictionary_fallback) { thrift_encodings.push_back(ToThrift(Encoding::PLAIN)); } column_chunk_->meta_data.__set_encodings(thrift_encodings); } @@ -414,7 +412,8 @@ void ColumnChunkMetaDataBuilder::set_file_path(const std::string& path) { void ColumnChunkMetaDataBuilder::Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, - int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) { + int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, + bool dictionary_fallback) { impl_->Finish(num_values, dictionary_page_offset, index_page_offset, data_page_offset, compressed_size, uncompressed_size, has_dictionary, dictionary_fallback); } From c498aebb40f684218619a43b6144463d28822b8f Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Wed, 14 Sep 2016 10:15:27 -0400 Subject: [PATCH 7/8] modify comment style --- src/parquet/column/writer.h | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index e4aed836..6a6ee5fc 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -57,23 +57,18 @@ class PARQUET_EXPORT ColumnWriter { protected: virtual std::shared_ptr GetValuesBuffer() = 0; - /** - * Serializes Dictionary Page if enabled - */ + // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; - /** - * Checks if the Dictionary Page size limit is reached - * If the limit is reached, the Dictionary and Data Pages are serialized - * The encoding is switched to PLAIN - */ + + // Checks if the Dictionary Page size limit is reached + // If the limit is reached, the Dictionary and Data Pages are serialized + // The encoding is switched to PLAIN virtual void CheckDictionarySizeLimit() = 0; - /** - * Adds Data Pages to an in memory buffer in dictionary encoding mode - * Serializes the Data Pages in other encoding modes - */ + // Adds Data Pages to an in memory buffer in dictionary encoding mode + // Serializes the Data Pages in other encoding modes void AddDataPage(); // Serializes Data Pages @@ -219,12 +214,11 @@ inline void TypedColumnWriter::WriteMiniBatch(int64_t num_values, template inline void TypedColumnWriter::WriteBatch(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const T* values) { - /* We check for DataPage limits only after we have inserted the values. If a user - * writes a large number of values, the DataPage size can be much above the limit. - * The purpose of this chunking is to bound this. Even if a user writes large number - * of values, the chunking will ensure the AddDataPage() is called at a reasonable - * pagesize limit - */ + // We check for DataPage limits only after we have inserted the values. If a user + // writes a large number of values, the DataPage size can be much above the limit. + // The purpose of this chunking is to bound this. Even if a user writes large number + // of values, the chunking will ensure the AddDataPage() is called at a reasonable + // pagesize limit int64_t write_batch_size = properties_->write_batch_size(); int num_batches = num_values / write_batch_size; int64_t num_remaining = num_values % write_batch_size; From 6f51df65943786f52a4654c6309a010f5ed40b2b Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Wed, 14 Sep 2016 10:22:25 -0400 Subject: [PATCH 8/8] minor comment fix --- src/parquet/column/column-writer-test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 353b461b..a87dc485 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -351,7 +351,7 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) { ASSERT_EQ(this->values_, this->values_out_); } -// Test case for dictionary fallback encoding skip BooleanType +// Test case for dictionary fallback encoding TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) { this->GenerateData(VERY_LARGE_SIZE);