Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ Status ColumnReader::init() {
strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));

for (int i = 0; i < _meta.indexes_size(); i++) {
auto& index_meta = _meta.indexes(i);
Expand Down Expand Up @@ -144,12 +143,13 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
}

Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer) {
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec) {
iter_opts.sanity_check();
PageReadOptions opts;
opts.rblock = iter_opts.rblock;
opts.page_pointer = pp;
opts.codec = _compress_codec.get();
opts.codec = codec;
opts.stats = iter_opts.stats;
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
Expand Down Expand Up @@ -465,6 +465,12 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool

FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {}

Status FileColumnIterator::init(const ColumnIteratorOptions& opts) {
_opts = opts;
RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));
return Status::OK();
}

FileColumnIterator::~FileColumnIterator() = default;

Status FileColumnIterator::seek_to_first() {
Expand Down Expand Up @@ -653,7 +659,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
Slice page_body;
PageFooterPB footer;
_opts.type = DATA_PAGE;
RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer));
RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer,
_compress_codec.get()));
// parse data page
RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(),
_reader->encoding_info(), iter.page(), iter.page_index(),
Expand All @@ -673,7 +680,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
PageFooterPB dict_footer;
_opts.type = INDEX_PAGE;
RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(),
&_dict_page_handle, &dict_data, &dict_footer));
&_dict_page_handle, &dict_data, &dict_footer,
_compress_codec.get()));
// ignore dict_footer.dict_page_footer().encoding() due to only
// PLAIN_ENCODING is supported for dict page right now
_dict_decoder = std::make_unique<BinaryPlainPageDecoder>(dict_data);
Expand Down
11 changes: 9 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class ColumnReader {

// read a page from file into a page handle
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer);
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec);

bool is_nullable() const { return _meta.is_nullable(); }

Expand Down Expand Up @@ -131,6 +132,8 @@ class ColumnReader {

bool is_empty() const { return _num_rows == 0; }

CompressionTypePB get_compression() const { return _meta.compression(); }

private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
FilePathDesc path_desc);
Expand Down Expand Up @@ -175,7 +178,6 @@ class ColumnReader {
TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
nullptr; // initialized in init(), used for create PageDecoder
std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()

// meta for various column indexes (null if the index is absent)
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
Expand Down Expand Up @@ -253,6 +255,8 @@ class FileColumnIterator final : public ColumnIterator {
explicit FileColumnIterator(ColumnReader* reader);
~FileColumnIterator() override;

Status init(const ColumnIteratorOptions& opts) override;

Status seek_to_first() override;

Status seek_to_ordinal(ordinal_t ord) override;
Expand Down Expand Up @@ -285,6 +289,9 @@ class FileColumnIterator final : public ColumnIterator {
private:
ColumnReader* _reader;

// iterator owned compress codec, should NOT be shared by threads, initialized in init()
std::unique_ptr<BlockCompressionCodec> _compress_codec;

// 1. The _page represents current page.
// 2. We define an operation is one seek and following read,
// If new seek is issued, the _page will be reset.
Expand Down
17 changes: 12 additions & 5 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
_value_key_coder = get_key_coder(_type_info->type());

std::unique_ptr<fs::ReadableBlock> rblock;
Expand Down Expand Up @@ -72,18 +71,21 @@ Status IndexedColumnReader::load_index_page(fs::ReadableBlock* rblock, const Pag
PageHandle* handle, IndexPageReader* reader) {
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE));
std::unique_ptr<BlockCompressionCodec> local_compress_codec;
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec));
RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
local_compress_codec.get()));
RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
return Status::OK();
}

Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePointer& pp,
PageHandle* handle, Slice* body, PageFooterPB* footer,
PageTypePB type) const {
PageTypePB type, BlockCompressionCodec* codec) const {
PageReadOptions opts;
opts.rblock = rblock;
opts.page_pointer = pp;
opts.codec = _compress_codec.get();
opts.codec = codec;
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
opts.use_page_cache = _use_page_cache;
Expand All @@ -96,10 +98,15 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
///////////////////////////////////////////////////////////////////////////////

Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
// there is not init() for IndexedColumnIterator, so do it here
if (!_compress_codec.get())
RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec));

PageHandle handle;
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE));
RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE,
_compress_codec.get()));
// parse data page
// note that page_index is not used in IndexedColumnIterator, so we pass 0
return ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ class IndexedColumnReader {

// read a page specified by `pp' from `file' into `handle'
Status read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle,
Slice* body, PageFooterPB* footer, PageTypePB type) const;
Slice* body, PageFooterPB* footer, PageTypePB type,
BlockCompressionCodec* codec) const;

int64_t num_values() const { return _num_values; }
const EncodingInfo* encoding_info() const { return _encoding_info; }
const TypeInfo* type_info() const { return _type_info; }
bool support_ordinal_seek() const { return _meta.has_ordinal_index_meta(); }
bool support_value_seek() const { return _meta.has_value_index_meta(); }

CompressionTypePB get_compression() const { return _meta.compression(); }

private:
Status load_index_page(fs::ReadableBlock* rblock, const PagePointerPB& pp, PageHandle* handle,
IndexPageReader* reader);
Expand All @@ -84,7 +87,6 @@ class IndexedColumnReader {

const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
std::unique_ptr<BlockCompressionCodec> _compress_codec;
const KeyCoder* _value_key_coder = nullptr;
};

Expand Down Expand Up @@ -145,6 +147,8 @@ class IndexedColumnIterator {
ordinal_t _current_ordinal = 0;
// open file handle
std::unique_ptr<fs::ReadableBlock> _rblock;
// iterator owned compress codec, should NOT be shared by threads, initialized before used
std::unique_ptr<BlockCompressionCodec> _compress_codec;
};

} // namespace segment_v2
Expand Down
7 changes: 6 additions & 1 deletion be/src/util/block_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ namespace doris {
// This class only used to compress a block data, which means all data
// should given when call compress or decompress. This class don't handle
// stream compression.
//
// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
//
class BlockCompressionCodec {
public:
virtual ~BlockCompressionCodec() {}
Expand Down Expand Up @@ -59,7 +62,9 @@ class BlockCompressionCodec {
// Get a BlockCompressionCodec through type.
// Return Status::OK if a valid codec is found. If codec is null, it means it is
// NO_COMPRESSION. If codec is not null, user can use it to compress/decompress
// data. And client doesn't have to release the codec.
// data.
//
// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads
//
// Return not OK, if error happens.
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
Expand Down