Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 95 additions & 58 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
// ----------------------------------------------------------------------
// TypedColumnWriter

template <typename Action>
inline void DoInBatches(int64_t total, int64_t batch_size, Action&& action) {
int64_t num_batches = static_cast<int>(total / batch_size);
for (int round = 0; round < num_batches; round++) {
action(round * batch_size, batch_size, /*check_page_size=*/true);
}
// Write the remaining values
if (total % batch_size > 0) {
action(num_batches * batch_size, total % batch_size, /*check_page_size=*/true);
}
}
// DoInBatches for non-repeated columns
template <typename Action, typename GetBufferedRows>
inline void DoInBatchesNonRepeated(int64_t num_levels, int64_t batch_size,
int64_t max_rows_per_page, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
int64_t offset = 0;
while (offset < num_levels) {
int64_t page_buffered_rows = curr_page_buffered_rows();
ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);

template <typename Action>
inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, int64_t batch_size, Action&& action,
bool pages_change_on_record_boundaries) {
if (!pages_change_on_record_boundaries || !rep_levels) {
// If rep_levels is null, then we are writing a non-repeated column.
// In this case, every record contains only one level.
return DoInBatches(num_levels, batch_size, std::forward<Action>(action));
// Every record contains only one level.
int64_t max_batch_size = std::min(batch_size, num_levels - offset);
max_batch_size = std::min(max_batch_size, max_rows_per_page - page_buffered_rows);
int64_t end_offset = offset + max_batch_size;

ARROW_DCHECK_LE(offset, end_offset);
ARROW_DCHECK_LE(end_offset, num_levels);

// Always check page limit for non-repeated columns.
action(offset, end_offset - offset, /*check_page_limit=*/true);

offset = end_offset;
}
}

// DoInBatches for repeated columns
template <typename Action, typename GetBufferedRows>
inline void DoInBatchesRepeated(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, int64_t batch_size,
int64_t max_rows_per_page,
bool pages_change_on_record_boundaries, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
int64_t offset = 0;
while (offset < num_levels) {
int64_t end_offset = std::min(offset + batch_size, num_levels);

// Find next record boundary (i.e. rep_level = 0)
while (end_offset < num_levels && rep_levels[end_offset] != 0) {
end_offset++;
}

if (end_offset < num_levels) {
// This is not the last chunk of batch and end_offset is a record boundary.
// It is a good chance to check the page size.
action(offset, end_offset - offset, /*check_page_size=*/true);
} else {
DCHECK_EQ(end_offset, num_levels);
// This is the last chunk of batch, and we do not know whether end_offset is a
// record boundary. Find the offset to beginning of last record in this chunk,
// so we can check page size.
int64_t last_record_begin_offset = num_levels - 1;
while (last_record_begin_offset >= offset &&
rep_levels[last_record_begin_offset] != 0) {
last_record_begin_offset--;
int64_t max_batch_size = std::min(batch_size, num_levels - offset);
int64_t end_offset = num_levels; // end offset of the current batch
int64_t check_page_limit_end_offset = -1; // offset to check page limit (if not -1)

int64_t page_buffered_rows = curr_page_buffered_rows();
ARROW_DCHECK_LE(page_buffered_rows, max_rows_per_page);

// Iterate rep_levels to find the shortest sequence that ends before a record
// boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
for (int64_t i = offset; i < num_levels; ++i) {
Comment on lines +1194 to +1196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do backward scan like previous algorithm acquiring last_record_begin_offset? To get page_buffered_rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then all levels must be checked, otherwise we can't tell how many records in this batch from the beginning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would:

  1. Existing benchmark shows an extract std::count faster?
  2. If it's slower and remaining count greater than batch_size, can we avoid checking?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth arguing about this. I doubt that a simple loop on levels will be slower than encoding them using RLE-bit-packed encoding, or encoding the values, or compressing them.

(and why would std::count be faster?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing benchmark shows an extract std::count faster?

I don't quite understand this. Did you mean to use std::count to do a quick pass? I think it is already O(N) and requires the 2nd pass to delimit records.

If it's slower and remaining count greater than batch_size, can we avoid checking?

We still need to check record boundary, at least in the reverse direction as before.

Copy link
Member

@mapleFU mapleFU Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know would cpu do count job faster ( since it's simpler for cpu without branching and fast to vectorize), it depends on benchmarking

if (rep_levels[i] == 0) {
// Use the beginning of last record to check page limit.
check_page_limit_end_offset = i;
if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) {
end_offset = i;
break;
}
page_buffered_rows += 1;
}
}

if (offset <= last_record_begin_offset) {
// We have found the beginning of last record and can check page size.
action(offset, last_record_begin_offset - offset, /*check_page_size=*/true);
offset = last_record_begin_offset;
}
ARROW_DCHECK_LE(offset, end_offset);
ARROW_DCHECK_LE(check_page_limit_end_offset, end_offset);

// Write remaining data after the record boundary,
// or all data if no boundary was found.
action(offset, end_offset - offset, /*check_page_size=*/false);
if (check_page_limit_end_offset >= 0) {
// At least one record boundary is included in this batch.
// It is a good chance to check the page limit.
action(offset, check_page_limit_end_offset - offset, /*check_page_limit=*/true);
offset = check_page_limit_end_offset;
}
if (end_offset > offset) {
// The is the last chunk of batch, and we do not know whether end_offset is a
// record boundary so we cannot check page limit if pages cannot change on
// record boundaries.
ARROW_DCHECK_EQ(end_offset, num_levels);
action(offset, end_offset - offset,
/*check_page_limit=*/!pages_change_on_record_boundaries);
}

offset = end_offset;
}
}

template <typename Action, typename GetBufferedRows>
inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page,
bool pages_change_on_record_boundaries, Action&& action,
GetBufferedRows&& curr_page_buffered_rows) {
if (!rep_levels) {
DoInBatchesNonRepeated(num_levels, batch_size, max_rows_per_page,
std::forward<Action>(action),
std::forward<GetBufferedRows>(curr_page_buffered_rows));
} else {
DoInBatchesRepeated(def_levels, rep_levels, num_levels, batch_size, max_rows_per_page,
pages_change_on_record_boundaries, std::forward<Action>(action),
std::forward<GetBufferedRows>(curr_page_buffered_rows));
}
}

namespace {

bool DictionaryDirectWriteSupported(const ::arrow::Array& array) {
Expand Down Expand Up @@ -1318,7 +1350,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(),
WriteChunk, pages_change_on_record_boundaries());
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; });
return value_offset;
}

Expand Down Expand Up @@ -1368,7 +1401,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
CheckDictionarySizeLimit();
};
DoInBatches(def_levels, rep_levels, num_values, properties_->write_batch_size(),
WriteChunk, pages_change_on_record_boundaries());
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; });
}

Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
Expand Down Expand Up @@ -1769,13 +1803,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
}

void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
int64_t num_nulls, bool check_page_size) {
int64_t num_nulls, bool check_page_limit) {
num_buffered_values_ += num_levels;
num_buffered_encoded_values_ += num_values;
num_buffered_nulls_ += num_nulls;

if (check_page_size &&
current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
if (check_page_limit &&
(current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize() ||
num_buffered_rows_ >= properties_->max_rows_per_page())) {
AddDataPage();
}
}
Expand Down Expand Up @@ -1996,9 +2031,10 @@ Status TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
return WriteDense();
}

PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
properties_->write_batch_size(), WriteIndicesChunk,
pages_change_on_record_boundaries()));
PARQUET_CATCH_NOT_OK(
DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteIndicesChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}

Expand Down Expand Up @@ -2441,9 +2477,10 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
value_offset += batch_num_spaced_values;
};

PARQUET_CATCH_NOT_OK(DoInBatches(def_levels, rep_levels, num_levels,
properties_->write_batch_size(), WriteChunk,
pages_change_on_record_boundaries()));
PARQUET_CATCH_NOT_OK(
DoInBatches(def_levels, rep_levels, num_levels, properties_->write_batch_size(),
properties_->max_rows_per_page(), pages_change_on_record_boundaries(),
WriteChunk, [this]() { return num_buffered_rows_; }));
return Status::OK();
}

Expand Down
Loading
Loading