Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 115 additions & 8 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,13 +1041,117 @@ Status MapFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t
dst->resize(count);
return Status::OK();
}
if (count == 0) {
return Status::OK();
}
// resolve ColumnMap and nullable wrapper
const auto* column_map = vectorized::check_and_get_column<vectorized::ColumnMap>(
dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()
: *dst);
auto offsets_ptr = column_map->get_offsets_column().assume_mutable();
auto& offsets = static_cast<vectorized::ColumnArray::ColumnOffsets&>(*offsets_ptr);
size_t base = offsets.get_data().empty() ? 0 : offsets.get_data().back();

// 1. bulk read null-map if nullable
std::vector<uint8_t> null_mask; // 0: not null, 1: null
if (_map_reader->is_nullable()) {
// For nullable map columns, the destination column must also be nullable.
if (UNLIKELY(!dst->is_nullable())) {
return Status::InternalError(
"unexpected non-nullable destination column for nullable map reader");
}
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
size_t null_before = null_map_ptr->size();
RETURN_IF_ERROR(_null_iterator->read_by_rowids(rowids, count, null_map_ptr));
// extract a light-weight view to decide element reads
auto& null_map_col = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr);
null_mask.reserve(count);
for (size_t i = 0; i < count; ++i) {
null_mask.push_back(null_map_col.get_element(null_before + i));
}
} else if (dst->is_nullable()) {
// in not-null to null linked-schemachange mode,
// actually we do not change dat data include meta in footer,
// so may dst from changed meta which is nullable but old data is not nullable,
// if so, we should set null_map to all null by default
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
auto& null_map = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr);
null_map.insert_many_vals(0, count);
}

// 2. bulk read start ordinals for requested rows
vectorized::MutableColumnPtr starts_col = vectorized::ColumnOffset64::create();
starts_col->reserve(count);
RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(rowids, count, starts_col));

// 3. bulk read next-start ordinals for rowid+1 (within bounds)
std::vector<rowid_t> next_rowids(count);
for (size_t i = 0; i < count; ++i) {
RETURN_IF_ERROR(seek_to_ordinal(rowids[i]));
size_t num_read = 1;
RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr));
DCHECK(num_read == 1);
uint64_t nr = rowids[i] + 1;
next_rowids[i] = nr < _map_reader->num_rows() ? static_cast<rowid_t>(nr)
: static_cast<rowid_t>(0); // placeholder
}
vectorized::MutableColumnPtr next_starts_col = vectorized::ColumnOffset64::create();
next_starts_col->reserve(count);
// read for all; we'll fix out-of-bound cases below
RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(next_rowids.data(), count, next_starts_col));

// 4. fix next_start for rows whose next_rowid is out-of-bound (rowid == num_rows-1)
for (size_t i = 0; i < count; ++i) {
if (rowids[i] + 1 >= _map_reader->num_rows()) {
// seek to the last row and consume one to move decoder to end-of-page,
// then peek page-tail sentinel next_array_item_ordinal as next_start
RETURN_IF_ERROR(_offsets_iterator->seek_to_ordinal(rowids[i]));
size_t one = 1;
bool has_null_unused = false;
vectorized::MutableColumnPtr tmp = vectorized::ColumnOffset64::create();
RETURN_IF_ERROR(_offsets_iterator->next_batch(&one, tmp, &has_null_unused));
ordinal_t ns = 0;
RETURN_IF_ERROR(_offsets_iterator->_peek_one_offset(&ns));
// overwrite with sentinel
assert_cast<vectorized::ColumnOffset64&>(*next_starts_col).get_data()[i] = ns;
}
}

// 5. compute sizes and append offsets prefix-sum
auto& starts_data = assert_cast<vectorized::ColumnOffset64&>(*starts_col).get_data();
auto& next_starts_data = assert_cast<vectorized::ColumnOffset64&>(*next_starts_col).get_data();
std::vector<size_t> sizes(count, 0);
size_t acc = base;
offsets.get_data().reserve(offsets.get_data().size() + count);
for (size_t i = 0; i < count; ++i) {
size_t sz = static_cast<size_t>(next_starts_data[i] - starts_data[i]);
if (_map_reader->is_nullable() && !null_mask.empty() && null_mask[i]) {
sz = 0; // null rows do not consume elements
}
sizes[i] = sz;
acc += sz;
offsets.get_data().push_back(acc);
}

// 6. read key/value elements for non-empty sizes
auto keys_ptr = column_map->get_keys().assume_mutable();
auto vals_ptr = column_map->get_values().assume_mutable();

for (size_t i = 0; i < count; ++i) {
size_t sz = sizes[i];
if (sz == 0) {
continue;
}
ordinal_t start = static_cast<ordinal_t>(starts_data[i]);
RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(start));
RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(start));
size_t n = sz;
bool dummy_has_null = false;
RETURN_IF_ERROR(_key_iterator->next_batch(&n, keys_ptr, &dummy_has_null));
DCHECK(n == sz);
n = sz;
RETURN_IF_ERROR(_val_iterator->next_batch(&n, vals_ptr, &dummy_has_null));
DCHECK(n == sz);
}

return Status::OK();
}

Expand Down Expand Up @@ -1319,6 +1423,8 @@ Status StructFileColumnIterator::set_access_paths(
////////////////////////////////////////////////////////////////////////////////
Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_offset_iterator->init(opts));
// allocate peek tmp column once
_peek_tmp_col = vectorized::ColumnOffset64::create();
return Status::OK();
}

Expand All @@ -1331,11 +1437,12 @@ Status OffsetFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumn
Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
if (_offset_iterator->get_current_page()->has_remaining()) {
PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder.get();
vectorized::MutableColumnPtr offset_col = vectorized::ColumnOffset64::create();
size_t n = 1;
RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); // not null
DCHECK(offset_col->size() == 1);
*offset = assert_cast<const vectorized::ColumnOffset64*>(offset_col.get())->get_element(0);
_peek_tmp_col->clear();
RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, _peek_tmp_col)); // not null
DCHECK(_peek_tmp_col->size() == 1);
*offset =
assert_cast<const vectorized::ColumnOffset64*>(_peek_tmp_col.get())->get_element(0);
} else {
*offset = _offset_iterator->get_current_page()->next_array_item_ordinal;
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,15 @@ class OffsetFileColumnIterator final : public ColumnIterator {
Status _calculate_offsets(ssize_t start,
vectorized::ColumnArray::ColumnOffsets& column_offsets);

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override {
return _offset_iterator->read_by_rowids(rowids, count, dst);
}

private:
std::unique_ptr<FileColumnIterator> _offset_iterator;
// reuse a tiny column for peek to avoid frequent allocations
vectorized::MutableColumnPtr _peek_tmp_col;
};

// This iterator is used to read map value column
Expand Down
86 changes: 86 additions & 0 deletions be/test/olap/rowset/segment_v2/column_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,90 @@ TEST_F(ColumnReaderTest, MultiAccessPaths) {
ASSERT_EQ(map_iter->_key_iterator->_reading_flag, ColumnIterator::ReadingFlag::NEED_TO_READ);
ASSERT_EQ(map_iter->_val_iterator->_reading_flag, ColumnIterator::ReadingFlag::SKIP_READING);
}

TEST_F(ColumnReaderTest, OffsetPeekUsesPageSentinelWhenNoRemaining) {
// create a bare FileColumnIterator with a dummy ColumnReader
auto reader = std::make_shared<ColumnReader>();
auto file_iter = std::make_unique<FileColumnIterator>(reader);
auto* page = file_iter->get_current_page();

// simulate a page that has no remaining offsets in decoder but has a valid
// next_array_item_ordinal recorded in footer
page->num_rows = 0;
page->offset_in_page = 0;
page->next_array_item_ordinal = 12345;

OffsetFileColumnIterator offset_iter(std::move(file_iter));
ordinal_t offset = 0;
auto st = offset_iter._peek_one_offset(&offset);

ASSERT_TRUE(st.ok()) << "peek one offset failed: " << st.to_string();
ASSERT_EQ(static_cast<ordinal_t>(12345), offset);
}

TEST_F(ColumnReaderTest, OffsetCalculateOffsetsUsesPageSentinelForLastOffset) {
// create offset iterator with a page whose sentinel offset is set in footer
auto reader = std::make_shared<ColumnReader>();
auto file_iter = std::make_unique<FileColumnIterator>(reader);
auto* page = file_iter->get_current_page();

// simulate page with no remaining values, but a valid next_array_item_ordinal
page->num_rows = 0;
page->offset_in_page = 0;
page->next_array_item_ordinal = 15;

OffsetFileColumnIterator offset_iter(std::move(file_iter));

// prepare in-memory column offsets:
// offsets_data = [first_column_offset, first_storage_offset, next_storage_offset_placeholder]
// first_column_offset = 100
// first_storage_offset = 10
// placeholder real next_storage_offset will be fetched from page sentinel (15)
vectorized::ColumnArray::ColumnOffsets column_offsets;
auto& data = column_offsets.get_data();
data.push_back(100); // index 0: first_column_offset
data.push_back(10); // index 1: first_storage_offset
data.push_back(12); // index 2: placeholder storage offset for middle element

auto st = offset_iter._calculate_offsets(1, column_offsets);
ASSERT_TRUE(st.ok()) << "calculate offsets failed: " << st.to_string();

// after calculation:
// data[1] = 100 + (12 - 10) = 102
// data[2] = 100 + (15 - 10) = 105 (using page sentinel as next_storage_offset)
ASSERT_EQ(static_cast<ordinal_t>(100), data[0]);
ASSERT_EQ(static_cast<ordinal_t>(102), data[1]);
ASSERT_EQ(static_cast<ordinal_t>(105), data[2]);
}

TEST_F(ColumnReaderTest, MapReadByRowidsSkipReadingResizesDestination) {
// create a basic map iterator with dummy readers/iterators
auto map_reader = std::make_shared<ColumnReader>();
auto null_iter = std::make_unique<FileColumnIterator>(std::make_shared<ColumnReader>());
auto offsets_iter = std::make_unique<OffsetFileColumnIterator>(
std::make_unique<FileColumnIterator>(std::make_shared<ColumnReader>()));
auto key_iter = std::make_unique<FileColumnIterator>(std::make_shared<ColumnReader>());
auto val_iter = std::make_unique<FileColumnIterator>(std::make_shared<ColumnReader>());

MapFileColumnIterator map_iter(map_reader, std::move(null_iter), std::move(offsets_iter),
std::move(key_iter), std::move(val_iter));
map_iter.set_column_name("map_col");
map_iter.set_reading_flag(ColumnIterator::ReadingFlag::SKIP_READING);

// prepare an empty ColumnMap as destination
auto keys = vectorized::ColumnInt32::create();
auto values = vectorized::ColumnInt32::create();
auto offsets = vectorized::ColumnArray::ColumnOffsets::create();
offsets->get_data().push_back(0);
auto column_map =
vectorized::ColumnMap::create(std::move(keys), std::move(values), std::move(offsets));
vectorized::MutableColumnPtr dst = std::move(column_map);

const rowid_t rowids[] = {1, 5, 7};
size_t count = sizeof(rowids) / sizeof(rowids[0]);
auto st = map_iter.read_by_rowids(rowids, count, dst);

ASSERT_TRUE(st.ok()) << "read_by_rowids failed: " << st.to_string();
ASSERT_EQ(count, dst->size());
}
} // namespace doris::segment_v2
Loading