Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace test {
const int SMALL_SIZE = 100;
// Larger size to test some corner cases, only used in some specific cases.
const int LARGE_SIZE = 10000;
// Very large size to test dictionary fallback.
const int VERY_LARGE_SIZE = 400000;

template <typename TestType>
class TestPrimitiveWriter : public ::testing::Test {
Expand Down Expand Up @@ -74,10 +76,10 @@ class TestPrimitiveWriter : public ::testing::Test {
repetition_levels_out_.resize(SMALL_SIZE);

SetUpSchemaRequired();
metadata_accessor_ =
ColumnChunkMetaData::Make(reinterpret_cast<uint8_t*>(&thrift_metadata_));
}

Type::type type_num() { return TestType::type_num; }

void BuildReader() {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
Expand Down Expand Up @@ -130,7 +132,23 @@ class TestPrimitiveWriter : public ::testing::Test {
ASSERT_EQ(this->values_, this->values_out_);
}

int64_t metadata_num_values() const { return metadata_accessor_->num_values(); }
int64_t metadata_num_values() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
return metadata_accessor->num_values();
}

std::vector<Encoding::type> metadata_encodings() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(reinterpret_cast<const uint8_t*>(&thrift_metadata_));
return metadata_accessor->encodings();
}

protected:
int64_t values_read_;
Expand All @@ -156,7 +174,6 @@ class TestPrimitiveWriter : public ::testing::Test {
NodePtr node_;
format::ColumnChunk thrift_metadata_;
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
std::unique_ptr<ColumnChunkMetaData> metadata_accessor_;
std::shared_ptr<ColumnDescriptor> schema_;
std::unique_ptr<InMemoryOutputStream> sink_;
std::shared_ptr<WriterProperties> writer_properties_;
Expand Down Expand Up @@ -334,5 +351,31 @@ TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
ASSERT_EQ(this->values_, this->values_out_);
}

// Test case for dictionary fallback encoding
TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
this->GenerateData(VERY_LARGE_SIZE);

auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();

// Just read the first SMALL_SIZE rows to ensure we could read it back in
this->ReadColumn();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
this->values_.resize(SMALL_SIZE);
ASSERT_EQ(this->values_, this->values_out_);
std::vector<Encoding::type> encodings = this->metadata_encodings();
// There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
// Dictionary encoding is not allowed for boolean type
// There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
ASSERT_EQ(Encoding::RLE, encodings[0]);
if (this->type_num() != Type::BOOLEAN) {
ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[1]);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment to explicitly verify the encodings did uncover a bug in the metadata writer

ASSERT_EQ(Encoding::PLAIN, encodings[2]);
} else {
ASSERT_EQ(Encoding::PLAIN, encodings[1]);
}
}

} // namespace test
} // namespace parquet
5 changes: 4 additions & 1 deletion src/parquet/column/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ class PageWriter {
public:
virtual ~PageWriter() {}

virtual void Close() = 0;
// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
// page limit
virtual void Close(bool has_dictionary, bool fallback) = 0;

virtual int64_t WriteDataPage(const DataPage& page) = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/parquet/column/properties-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ TEST(TestWriterProperties, Basics) {
std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();

ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
}

Expand Down
41 changes: 27 additions & 14 deletions src/parquet/column/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ ReaderProperties PARQUET_EXPORT default_reader_properties();

static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
Expand All @@ -96,7 +97,8 @@ class PARQUET_EXPORT WriterProperties {
Builder()
: allocator_(default_allocator()),
dictionary_enabled_default_(DEFAULT_IS_DICTIONARY_ENABLED),
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION),
created_by_(DEFAULT_CREATED_BY),
Expand Down Expand Up @@ -137,8 +139,13 @@ class PARQUET_EXPORT WriterProperties {
return this->enable_dictionary(path->ToDotString());
}

Builder* dictionary_pagesize(int64_t dictionary_psize) {
dictionary_pagesize_ = dictionary_psize;
Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
dictionary_pagesize_limit_ = dictionary_psize_limit;
return this;
}

Builder* write_batch_size(int64_t write_batch_size) {
write_batch_size_ = write_batch_size;
return this;
}

Expand Down Expand Up @@ -214,17 +221,18 @@ class PARQUET_EXPORT WriterProperties {
}

std::shared_ptr<WriterProperties> build() {
return std::shared_ptr<WriterProperties>(
new WriterProperties(allocator_, dictionary_enabled_default_,
dictionary_enabled_, dictionary_pagesize_, pagesize_, version_, created_by_,
default_encoding_, encodings_, default_codec_, codecs_));
return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
dictionary_enabled_default_, dictionary_enabled_, dictionary_pagesize_limit_,
write_batch_size_, pagesize_, version_, created_by_, default_encoding_,
encodings_, default_codec_, codecs_));
}

private:
MemoryAllocator* allocator_;
bool dictionary_enabled_default_;
std::unordered_map<std::string, bool> dictionary_enabled_;
int64_t dictionary_pagesize_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t pagesize_;
ParquetVersion::type version_;
std::string created_by_;
Expand All @@ -246,7 +254,9 @@ class PARQUET_EXPORT WriterProperties {
return dictionary_enabled_default_;
}

inline int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }

inline int64_t write_batch_size() const { return write_batch_size_; }

inline int64_t data_pagesize() const { return pagesize_; }

Expand Down Expand Up @@ -286,14 +296,16 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled_default,
std::unordered_map<std::string, bool> dictionary_enabled,
int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version,
const std::string& created_by, Encoding::type default_encoding,
int64_t dictionary_pagesize, int64_t write_batch_size, int64_t pagesize,
ParquetVersion::type version, const std::string& created_by,
Encoding::type default_encoding,
std::unordered_map<std::string, Encoding::type> encodings,
Compression::type default_codec, const ColumnCodecs& codecs)
: allocator_(allocator),
dictionary_enabled_default_(dictionary_enabled_default),
dictionary_enabled_(dictionary_enabled),
dictionary_pagesize_(dictionary_pagesize),
dictionary_pagesize_limit_(dictionary_pagesize),
write_batch_size_(write_batch_size),
pagesize_(pagesize),
parquet_version_(version),
parquet_created_by_(created_by),
Expand All @@ -304,7 +316,8 @@ class PARQUET_EXPORT WriterProperties {
MemoryAllocator* allocator_;
bool dictionary_enabled_default_;
std::unordered_map<std::string, bool> dictionary_enabled_;
int64_t dictionary_pagesize_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
Expand Down
60 changes: 43 additions & 17 deletions src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
num_buffered_encoded_values_(0),
num_rows_(0),
total_bytes_written_(0),
closed_(false) {
closed_(false),
fallback_(false) {
InitSinks();
}

Expand Down Expand Up @@ -118,7 +119,13 @@ void ColumnWriter::AddDataPage() {
DataPage page(
uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);

data_pages_.push_back(std::move(page));
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
data_pages_.push_back(std::move(page));
} else { // Eagerly write pages
WriteDataPage(page);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fast path to serialize data pages.


// Re-initialize the sinks as GetBuffer made them invalid.
InitSinks();
Expand All @@ -134,52 +141,71 @@ void ColumnWriter::WriteDataPage(const DataPage& page) {
int64_t ColumnWriter::Close() {
if (!closed_) {
closed_ = true;
if (has_dictionary_) { WriteDictionaryPage(); }
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) { AddDataPage(); }
if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }

FlushBufferedDataPages();

for (size_t i = 0; i < data_pages_.size(); i++) {
WriteDataPage(data_pages_[i]);
}
pager_->Close(has_dictionary_, fallback_);
}

if (num_rows_ != expected_rows_) {
throw ParquetException(
"Less then the number of expected rows written in"
"Less than the number of expected rows written in"
" the current column chunk");
}

pager_->Close();

return total_bytes_written_;
}

void ColumnWriter::FlushBufferedDataPages() {
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) { AddDataPage(); }
for (size_t i = 0; i < data_pages_.size(); i++) {
WriteDataPage(data_pages_[i]);
}
data_pages_.clear();
}

// ----------------------------------------------------------------------
// TypedColumnWriter

template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* descr,
std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
const WriterProperties* properties)
: ColumnWriter(schema, std::move(pager), expected_rows,
: ColumnWriter(descr, std::move(pager), expected_rows,
(encoding == Encoding::PLAIN_DICTIONARY ||
encoding == Encoding::RLE_DICTIONARY),
encoding, properties) {
switch (encoding) {
case Encoding::PLAIN:
current_encoder_ = std::unique_ptr<EncoderType>(
new PlainEncoder<Type>(schema, properties->allocator()));
current_encoder_.reset(new PlainEncoder<Type>(descr, properties->allocator()));
break;
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
current_encoder_ = std::unique_ptr<EncoderType>(
new DictEncoder<Type>(schema, &pool_, properties->allocator()));
current_encoder_.reset(
new DictEncoder<Type>(descr, &pool_, properties->allocator()));
break;
default:
ParquetException::NYI("Selected encoding is not supported");
}
}

// Only one Dictionary Page is written.
// Fallback to PLAIN if dictionary page limit is reached.
template <typename Type>
void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
WriteDictionaryPage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract the following lines into a function FlushBufferedPages ?

// Serialize the buffered Dictionary Indicies
FlushBufferedDataPages();
fallback_ = true;
// Only PLAIN encoding is supported for fallback in V1
current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator()));
}
}

template <typename Type>
void TypedColumnWriter<Type>::WriteDictionaryPage() {
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
Expand Down
Loading