diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 8e3962236fef63..6e8a4e1916e0c0 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -1041,13 +1041,117 @@ Status MapFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t dst->resize(count); return Status::OK(); } + if (count == 0) { + return Status::OK(); + } + // resolve ColumnMap and nullable wrapper + const auto* column_map = vectorized::check_and_get_column( + dst->is_nullable() ? static_cast(*dst).get_nested_column() + : *dst); + auto offsets_ptr = column_map->get_offsets_column().assume_mutable(); + auto& offsets = static_cast(*offsets_ptr); + size_t base = offsets.get_data().empty() ? 0 : offsets.get_data().back(); + // 1. bulk read null-map if nullable + std::vector null_mask; // 0: not null, 1: null + if (_map_reader->is_nullable()) { + // For nullable map columns, the destination column must also be nullable. + if (UNLIKELY(!dst->is_nullable())) { + return Status::InternalError( + "unexpected non-nullable destination column for nullable map reader"); + } + auto null_map_ptr = + static_cast(*dst).get_null_map_column_ptr(); + size_t null_before = null_map_ptr->size(); + RETURN_IF_ERROR(_null_iterator->read_by_rowids(rowids, count, null_map_ptr)); + // extract a light-weight view to decide element reads + auto& null_map_col = assert_cast(*null_map_ptr); + null_mask.reserve(count); + for (size_t i = 0; i < count; ++i) { + null_mask.push_back(null_map_col.get_element(null_before + i)); + } + } else if (dst->is_nullable()) { + // in not-null to null linked-schemachange mode, + // actually we do not change dat data include meta in footer, + // so may dst from changed meta which is nullable but old data is not nullable, + // if so, we should set null_map to all null by default + auto null_map_ptr = + static_cast(*dst).get_null_map_column_ptr(); + auto& null_map = assert_cast(*null_map_ptr); + null_map.insert_many_vals(0, count); + } + + // 2. bulk read start ordinals for requested rows + vectorized::MutableColumnPtr starts_col = vectorized::ColumnOffset64::create(); + starts_col->reserve(count); + RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(rowids, count, starts_col)); + + // 3. bulk read next-start ordinals for rowid+1 (within bounds) + std::vector next_rowids(count); for (size_t i = 0; i < count; ++i) { - RETURN_IF_ERROR(seek_to_ordinal(rowids[i])); - size_t num_read = 1; - RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr)); - DCHECK(num_read == 1); + uint64_t nr = rowids[i] + 1; + next_rowids[i] = nr < _map_reader->num_rows() ? static_cast(nr) + : static_cast(0); // placeholder + } + vectorized::MutableColumnPtr next_starts_col = vectorized::ColumnOffset64::create(); + next_starts_col->reserve(count); + // read for all; we'll fix out-of-bound cases below + RETURN_IF_ERROR(_offsets_iterator->read_by_rowids(next_rowids.data(), count, next_starts_col)); + + // 4. fix next_start for rows whose next_rowid is out-of-bound (rowid == num_rows-1) + for (size_t i = 0; i < count; ++i) { + if (rowids[i] + 1 >= _map_reader->num_rows()) { + // seek to the last row and consume one to move decoder to end-of-page, + // then peek page-tail sentinel next_array_item_ordinal as next_start + RETURN_IF_ERROR(_offsets_iterator->seek_to_ordinal(rowids[i])); + size_t one = 1; + bool has_null_unused = false; + vectorized::MutableColumnPtr tmp = vectorized::ColumnOffset64::create(); + RETURN_IF_ERROR(_offsets_iterator->next_batch(&one, tmp, &has_null_unused)); + ordinal_t ns = 0; + RETURN_IF_ERROR(_offsets_iterator->_peek_one_offset(&ns)); + // overwrite with sentinel + assert_cast(*next_starts_col).get_data()[i] = ns; + } + } + + // 5. compute sizes and append offsets prefix-sum + auto& starts_data = assert_cast(*starts_col).get_data(); + auto& next_starts_data = assert_cast(*next_starts_col).get_data(); + std::vector sizes(count, 0); + size_t acc = base; + offsets.get_data().reserve(offsets.get_data().size() + count); + for (size_t i = 0; i < count; ++i) { + size_t sz = static_cast(next_starts_data[i] - starts_data[i]); + if (_map_reader->is_nullable() && !null_mask.empty() && null_mask[i]) { + sz = 0; // null rows do not consume elements + } + sizes[i] = sz; + acc += sz; + offsets.get_data().push_back(acc); } + + // 6. read key/value elements for non-empty sizes + auto keys_ptr = column_map->get_keys().assume_mutable(); + auto vals_ptr = column_map->get_values().assume_mutable(); + + for (size_t i = 0; i < count; ++i) { + size_t sz = sizes[i]; + if (sz == 0) { + continue; + } + ordinal_t start = static_cast(starts_data[i]); + RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(start)); + RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(start)); + size_t n = sz; + bool dummy_has_null = false; + RETURN_IF_ERROR(_key_iterator->next_batch(&n, keys_ptr, &dummy_has_null)); + DCHECK(n == sz); + n = sz; + RETURN_IF_ERROR(_val_iterator->next_batch(&n, vals_ptr, &dummy_has_null)); + DCHECK(n == sz); + } + return Status::OK(); } @@ -1319,6 +1423,8 @@ Status StructFileColumnIterator::set_access_paths( //////////////////////////////////////////////////////////////////////////////// Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) { RETURN_IF_ERROR(_offset_iterator->init(opts)); + // allocate peek tmp column once + _peek_tmp_col = vectorized::ColumnOffset64::create(); return Status::OK(); } @@ -1331,11 +1437,12 @@ Status OffsetFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumn Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) { if (_offset_iterator->get_current_page()->has_remaining()) { PageDecoder* offset_page_decoder = _offset_iterator->get_current_page()->data_decoder.get(); - vectorized::MutableColumnPtr offset_col = vectorized::ColumnOffset64::create(); size_t n = 1; - RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); // not null - DCHECK(offset_col->size() == 1); - *offset = assert_cast(offset_col.get())->get_element(0); + _peek_tmp_col->clear(); + RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, _peek_tmp_col)); // not null + DCHECK(_peek_tmp_col->size() == 1); + *offset = + assert_cast(_peek_tmp_col.get())->get_element(0); } else { *offset = _offset_iterator->get_current_page()->next_array_item_ordinal; } diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index ce5ae546a7f1af..03944a8ef3339c 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -524,8 +524,15 @@ class OffsetFileColumnIterator final : public ColumnIterator { Status _calculate_offsets(ssize_t start, vectorized::ColumnArray::ColumnOffsets& column_offsets); + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + return _offset_iterator->read_by_rowids(rowids, count, dst); + } + private: std::unique_ptr _offset_iterator; + // reuse a tiny column for peek to avoid frequent allocations + vectorized::MutableColumnPtr _peek_tmp_col; }; // This iterator is used to read map value column diff --git a/be/test/olap/rowset/segment_v2/column_reader_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_test.cpp index 98478df7016196..04a8f8317a5d92 100644 --- a/be/test/olap/rowset/segment_v2/column_reader_test.cpp +++ b/be/test/olap/rowset/segment_v2/column_reader_test.cpp @@ -217,4 +217,90 @@ TEST_F(ColumnReaderTest, MultiAccessPaths) { ASSERT_EQ(map_iter->_key_iterator->_reading_flag, ColumnIterator::ReadingFlag::NEED_TO_READ); ASSERT_EQ(map_iter->_val_iterator->_reading_flag, ColumnIterator::ReadingFlag::SKIP_READING); } + +TEST_F(ColumnReaderTest, OffsetPeekUsesPageSentinelWhenNoRemaining) { + // create a bare FileColumnIterator with a dummy ColumnReader + auto reader = std::make_shared(); + auto file_iter = std::make_unique(reader); + auto* page = file_iter->get_current_page(); + + // simulate a page that has no remaining offsets in decoder but has a valid + // next_array_item_ordinal recorded in footer + page->num_rows = 0; + page->offset_in_page = 0; + page->next_array_item_ordinal = 12345; + + OffsetFileColumnIterator offset_iter(std::move(file_iter)); + ordinal_t offset = 0; + auto st = offset_iter._peek_one_offset(&offset); + + ASSERT_TRUE(st.ok()) << "peek one offset failed: " << st.to_string(); + ASSERT_EQ(static_cast(12345), offset); +} + +TEST_F(ColumnReaderTest, OffsetCalculateOffsetsUsesPageSentinelForLastOffset) { + // create offset iterator with a page whose sentinel offset is set in footer + auto reader = std::make_shared(); + auto file_iter = std::make_unique(reader); + auto* page = file_iter->get_current_page(); + + // simulate page with no remaining values, but a valid next_array_item_ordinal + page->num_rows = 0; + page->offset_in_page = 0; + page->next_array_item_ordinal = 15; + + OffsetFileColumnIterator offset_iter(std::move(file_iter)); + + // prepare in-memory column offsets: + // offsets_data = [first_column_offset, first_storage_offset, next_storage_offset_placeholder] + // first_column_offset = 100 + // first_storage_offset = 10 + // placeholder real next_storage_offset will be fetched from page sentinel (15) + vectorized::ColumnArray::ColumnOffsets column_offsets; + auto& data = column_offsets.get_data(); + data.push_back(100); // index 0: first_column_offset + data.push_back(10); // index 1: first_storage_offset + data.push_back(12); // index 2: placeholder storage offset for middle element + + auto st = offset_iter._calculate_offsets(1, column_offsets); + ASSERT_TRUE(st.ok()) << "calculate offsets failed: " << st.to_string(); + + // after calculation: + // data[1] = 100 + (12 - 10) = 102 + // data[2] = 100 + (15 - 10) = 105 (using page sentinel as next_storage_offset) + ASSERT_EQ(static_cast(100), data[0]); + ASSERT_EQ(static_cast(102), data[1]); + ASSERT_EQ(static_cast(105), data[2]); +} + +TEST_F(ColumnReaderTest, MapReadByRowidsSkipReadingResizesDestination) { + // create a basic map iterator with dummy readers/iterators + auto map_reader = std::make_shared(); + auto null_iter = std::make_unique(std::make_shared()); + auto offsets_iter = std::make_unique( + std::make_unique(std::make_shared())); + auto key_iter = std::make_unique(std::make_shared()); + auto val_iter = std::make_unique(std::make_shared()); + + MapFileColumnIterator map_iter(map_reader, std::move(null_iter), std::move(offsets_iter), + std::move(key_iter), std::move(val_iter)); + map_iter.set_column_name("map_col"); + map_iter.set_reading_flag(ColumnIterator::ReadingFlag::SKIP_READING); + + // prepare an empty ColumnMap as destination + auto keys = vectorized::ColumnInt32::create(); + auto values = vectorized::ColumnInt32::create(); + auto offsets = vectorized::ColumnArray::ColumnOffsets::create(); + offsets->get_data().push_back(0); + auto column_map = + vectorized::ColumnMap::create(std::move(keys), std::move(values), std::move(offsets)); + vectorized::MutableColumnPtr dst = std::move(column_map); + + const rowid_t rowids[] = {1, 5, 7}; + size_t count = sizeof(rowids) / sizeof(rowids[0]); + auto st = map_iter.read_by_rowids(rowids, count, dst); + + ASSERT_TRUE(st.ok()) << "read_by_rowids failed: " << st.to_string(); + ASSERT_EQ(count, dst->size()); +} } // namespace doris::segment_v2 \ No newline at end of file