From aeb23bd7999ac7ccb1b074f2dd75f7919ea940c5 Mon Sep 17 00:00:00 2001 From: zhaochun Date: Thu, 1 Aug 2019 16:36:11 +0800 Subject: [PATCH 1/5] Add new format short key index In this patch, we create a new format for short key index. In orgin code index is stored in format like RowCusor which is not effecient to compare. Now we encode multiple column into binary, and we assure that this binary is sorted same with the key columns. --- be/src/gutil/endian.h | 153 ++++++++++++++ be/src/olap/CMakeLists.txt | 2 + be/src/olap/key_coder.cpp | 80 +++++++ be/src/olap/key_coder.h | 221 ++++++++++++++++++++ be/src/olap/short_key_index.cpp | 116 +++++++++++ be/src/olap/short_key_index.h | 197 ++++++++++++++++++ be/src/olap/types.h | 10 + be/src/olap/uint24.h | 13 +- be/src/util/CMakeLists.txt | 1 + be/src/util/slice.cpp | 28 +++ be/src/util/slice.h | 4 + be/test/olap/CMakeLists.txt | 2 + be/test/olap/key_coder_test.cpp | 287 ++++++++++++++++++++++++++ be/test/olap/short_key_index_test.cpp | 97 +++++++++ gensrc/proto/segment_v2.proto | 18 ++ run-ut.sh | 2 + 16 files changed, 1228 insertions(+), 3 deletions(-) create mode 100644 be/src/olap/key_coder.cpp create mode 100644 be/src/olap/key_coder.h create mode 100644 be/src/olap/short_key_index.cpp create mode 100644 be/src/olap/short_key_index.h create mode 100644 be/src/util/slice.cpp create mode 100644 be/test/olap/key_coder_test.cpp create mode 100644 be/test/olap/short_key_index_test.cpp diff --git a/be/src/gutil/endian.h b/be/src/gutil/endian.h index f6b2485b6d8bab..6b8a0bc772d414 100644 --- a/be/src/gutil/endian.h +++ b/be/src/gutil/endian.h @@ -32,6 +32,18 @@ inline uint64 gbswap_64(uint64 host_int) { #endif // bswap_64 } +inline unsigned __int128 gbswap_128(unsigned __int128 host_int) { + return static_cast(bswap_64(static_cast(host_int >> 64))) | + (static_cast(bswap_64(static_cast(host_int))) << 64); +} + +// Swap bytes of a 24-bit value. +inline uint32_t bswap_24(uint32_t x) { + return ((x & 0x0000ffULL) << 16) | + ((x & 0x00ff00ULL)) | + ((x & 0xff0000ULL) >> 16); +} + #ifdef IS_LITTLE_ENDIAN // Definitions for ntohl etc. that don't require us to include @@ -188,4 +200,145 @@ class LittleEndian { #define gntohll(x) ghtonll(x) #define ntohll(x) htonll(x) +// Utilities to convert numbers between the current hosts's native byte +// order and big-endian byte order (same as network byte order) +// +// Load/Store methods are alignment safe +class BigEndian { +public: +#ifdef IS_LITTLE_ENDIAN + + static uint16 FromHost16(uint16 x) { return bswap_16(x); } + static uint16 ToHost16(uint16 x) { return bswap_16(x); } + + static uint32 FromHost24(uint32 x) { return bswap_24(x); } + static uint32 ToHost24(uint32 x) { return bswap_24(x); } + + static uint32 FromHost32(uint32 x) { return bswap_32(x); } + static uint32 ToHost32(uint32 x) { return bswap_32(x); } + + static uint64 FromHost64(uint64 x) { return gbswap_64(x); } + static uint64 ToHost64(uint64 x) { return gbswap_64(x); } + + static unsigned __int128 FromHost128(unsigned __int128 x) { return gbswap_128(x); } + static unsigned __int128 ToHost128(unsigned __int128 x) { return gbswap_128(x); } + + static bool IsLittleEndian() { return true; } + +#elif defined IS_BIG_ENDIAN + + static uint16 FromHost16(uint16 x) { return x; } + static uint16 ToHost16(uint16 x) { return x; } + + static uint32 FromHost24(uint32 x) { return x; } + static uint32 ToHost24(uint32 x) { return x; } + + static uint32 FromHost32(uint32 x) { return x; } + static uint32 ToHost32(uint32 x) { return x; } + + static uint64 FromHost64(uint64 x) { return x; } + static uint64 ToHost64(uint64 x) { return x; } + + static uint128 FromHost128(uint128 x) { return x; } + static uint128 ToHost128(uint128 x) { return x; } + + static bool IsLittleEndian() { return false; } + +#endif /* ENDIAN */ + // Functions to do unaligned loads and stores in little-endian order. + static uint16 Load16(const void *p) { + return ToHost16(UNALIGNED_LOAD16(p)); + } + + static void Store16(void *p, uint16 v) { + UNALIGNED_STORE16(p, FromHost16(v)); + } + + static uint32 Load32(const void *p) { + return ToHost32(UNALIGNED_LOAD32(p)); + } + + static void Store32(void *p, uint32 v) { + UNALIGNED_STORE32(p, FromHost32(v)); + } + + static uint64 Load64(const void *p) { + return ToHost64(UNALIGNED_LOAD64(p)); + } + + // Build a uint64 from 1-8 bytes. + // 8 * len least significant bits are loaded from the memory with + // BigEndian order. The 64 - 8 * len most significant bits are + // set all to 0. + // In latex-friendly words, this function returns: + // $\sum_{i=0}^{len-1} p[i] 256^{i}$, where p[i] is unsigned. + // + // This function is equivalent with: + // uint64 val = 0; + // memcpy(&val, p, len); + // return ToHost64(val); + // TODO(user): write a small benchmark and benchmark the speed + // of a memcpy based approach. + // + // For speed reasons this function does not work for len == 0. + // The caller needs to guarantee that 1 <= len <= 8. + static uint64 Load64VariableLength(const void * const p, int len) { + assert(len >= 1 && len <= 8); + uint64 val = Load64(p); + uint64 mask = 0; + --len; + do { + mask = (mask << 8) | 0xff; + // (--len >= 0) is about 10 % faster than (len--) in some benchmarks. + } while (--len >= 0); + return val & mask; + } + + static void Store64(void *p, uint64 v) { + UNALIGNED_STORE64(p, FromHost64(v)); + } + + static uint128 Load128(const void *p) { + return uint128( + ToHost64(UNALIGNED_LOAD64(p)), + ToHost64(UNALIGNED_LOAD64(reinterpret_cast(p) + 1))); + } + + static void Store128(void *p, const uint128 v) { + UNALIGNED_STORE64(p, FromHost64(Uint128High64(v))); + UNALIGNED_STORE64(reinterpret_cast(p) + 1, + FromHost64(Uint128Low64(v))); + } + + // Build a uint128 from 1-16 bytes. + // 8 * len least significant bits are loaded from the memory with + // BigEndian order. The 128 - 8 * len most significant bits are + // set all to 0. + static uint128 Load128VariableLength(const void *p, int len) { + if (len <= 8) { + return uint128(Load64VariableLength(static_cast(p)+8, + len)); + } else { + return uint128( + Load64VariableLength(p, len-8), + Load64(static_cast(p)+8)); + } + } + + // Load & Store in machine's word size. + static uword_t LoadUnsignedWord(const void *p) { + if (sizeof(uword_t) == 8) + return Load64(p); + else + return Load32(p); + } + + static void StoreUnsignedWord(void *p, uword_t v) { + if (sizeof(uword_t) == 8) + Store64(p, v); + else + Store32(p, v); + } +}; // BigEndian + #endif // UTIL_ENDIAN_ENDIAN_H_ diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index c494c8d0f6c6a6..6f45285f26ddaf 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -41,6 +41,7 @@ add_library(Olap STATIC hll.cpp in_list_predicate.cpp in_stream.cpp + key_coder.cpp lru_cache.cpp memtable.cpp merger.cpp @@ -63,6 +64,7 @@ add_library(Olap STATIC serialize.cpp storage_engine.cpp data_dir.cpp + short_key_index.cpp snapshot_manager.cpp stream_index_common.cpp stream_index_reader.cpp diff --git a/be/src/olap/key_coder.cpp b/be/src/olap/key_coder.cpp new file mode 100644 index 00000000000000..68f2ace8961bcb --- /dev/null +++ b/be/src/olap/key_coder.cpp @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/key_coder.h" + +#include + +namespace doris { + +template +KeyCoder::KeyCoder(TraitsType traits) + : _encode_ascending(traits.encode_ascending), + _decode_ascending(traits.decode_ascending) { +} + +// Helper class used to get KeyCoder +class KeyCoderResolver { +public: + ~KeyCoderResolver() { + for (auto& iter : _coder_map) { + delete iter.second; + } + } + + static KeyCoderResolver* instance() { + static KeyCoderResolver s_instance; + return &s_instance; + } + + KeyCoder* get_coder(FieldType field_type) const { + auto it = _coder_map.find(field_type); + if (it != _coder_map.end()) { + return it->second; + } + return nullptr; + } + +private: + KeyCoderResolver() { + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + + add_mapping(); + add_mapping(); + add_mapping(); + add_mapping(); + } + + template + void add_mapping() { + _coder_map.emplace(field_type, new KeyCoder(KeyCoderTraits())); + } + + std::unordered_map _coder_map; +}; + +const KeyCoder* get_key_coder(FieldType type) { + return KeyCoderResolver::instance()->get_coder(type); +} + +} diff --git a/be/src/olap/key_coder.h b/be/src/olap/key_coder.h new file mode 100644 index 00000000000000..bf7fa52562fabf --- /dev/null +++ b/be/src/olap/key_coder.h @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "gutil/endian.h" +#include "gutil/strings/substitute.h" +#include "olap/types.h" +#include "util/arena.h" + +namespace doris { + +using strings::Substitute; + +using EncodeAscendingFunc = void (*)(const void* value, size_t index_size, std::string* buf); +using DecodeAscendingFunc = Status (*)(Slice* encoded_key, size_t index_size, uint8_t* cell_ptr, Arena* arena); + +class KeyCoder { +public: + template + KeyCoder(TraitsType traits); + + void encode_ascending(const void* value, size_t index_size, std::string* buf) const { + _encode_ascending(value, index_size, buf); + } + Status decode_ascending(Slice* encoded_key, size_t index_size, uint8_t* cell_ptr, Arena* arena) const { + return _decode_ascending(encoded_key, index_size, cell_ptr, arena); + } + +private: + EncodeAscendingFunc _encode_ascending; + DecodeAscendingFunc _decode_ascending; +}; + +extern const KeyCoder* get_key_coder(FieldType type); + +template +class KeyCoderTraits { +}; + +template +class KeyCoderTraits::CppType>::value>::type> { +public: + using CppType = typename CppTypeTraits::CppType; + using UnsignedCppType = typename CppTypeTraits::UnsignedCppType; + +private: + // Swap value's endian from/to big endian + static UnsignedCppType swap_big_endian(UnsignedCppType val) { + switch (sizeof(UnsignedCppType)) { + case 1: return val; + case 2: return BigEndian::FromHost16(val); + case 4: return BigEndian::FromHost32(val); + case 8: return BigEndian::FromHost64(val); + case 16: return BigEndian::FromHost128(val); + default: LOG(FATAL) << "Invalid type to big endian, type=" << field_type + << ", size=" << sizeof(UnsignedCppType); + } + } + +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, value, sizeof(unsigned_val)); + // swap MSB to encode integer + if (std::is_signed::value) { + unsigned_val ^= (static_cast(1) << (sizeof(UnsignedCppType) * CHAR_BIT - 1)); + } + // make it bigendian + unsigned_val = swap_big_endian(unsigned_val); + + buf->append((char*)&unsigned_val, sizeof(unsigned_val)); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < sizeof(UnsignedCppType)) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + sizeof(UnsignedCppType), encoded_key->size)); + } + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, encoded_key->data, sizeof(UnsignedCppType)); + unsigned_val = swap_big_endian(unsigned_val); + if (std::is_signed::value) { + unsigned_val ^= (static_cast(1) << (sizeof(UnsignedCppType) * CHAR_BIT - 1)); + } + memcpy(cell_ptr, &unsigned_val, sizeof(UnsignedCppType)); + encoded_key->remove_prefix(sizeof(UnsignedCppType)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + using CppType = typename CppTypeTraits::CppType; + using UnsignedCppType = typename CppTypeTraits::UnsignedCppType; + +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, value, sizeof(unsigned_val)); + // make it bigendian + unsigned_val = BigEndian::FromHost24(unsigned_val); + buf->append((char*)&unsigned_val, sizeof(unsigned_val)); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < sizeof(UnsignedCppType)) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + sizeof(UnsignedCppType), encoded_key->size)); + } + UnsignedCppType unsigned_val; + memcpy(&unsigned_val, encoded_key->data, sizeof(UnsignedCppType)); + unsigned_val = BigEndian::FromHost24(unsigned_val); + memcpy(cell_ptr, &unsigned_val, sizeof(UnsignedCppType)); + encoded_key->remove_prefix(sizeof(UnsignedCppType)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + decimal12_t decimal_val; + memcpy(&decimal_val, value, sizeof(decimal12_t)); + // encode integer + KeyCoderTraits::encode_ascending( + &decimal_val.integer, sizeof(decimal_val.integer), buf); + // encode integer + KeyCoderTraits::encode_ascending( + &decimal_val.fraction, sizeof(decimal_val.fraction), buf); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + decimal12_t decimal_val; + RETURN_IF_ERROR(KeyCoderTraits::decode_ascending( + encoded_key, sizeof(decimal_val.integer), (uint8_t*)&decimal_val.integer, arena)); + RETURN_IF_ERROR(KeyCoderTraits::decode_ascending( + encoded_key, sizeof(decimal_val.fraction), (uint8_t*)&decimal_val.fraction, arena)); + memcpy(cell_ptr, &decimal_val, sizeof(decimal12_t)); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + const Slice* slice = (const Slice*)value; + CHECK(index_size <= slice->size) << "index size is larger than char size, index=" << index_size << ", char=" << slice->size; + buf->append(slice->data, index_size); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + if (encoded_key->size < index_size) { + return Status::InvalidArgument( + Substitute("Key too short, need=$0 vs real=$1", + index_size, encoded_key->size)); + } + Slice* slice = (Slice*)cell_ptr; + slice->data = arena->Allocate(index_size); + slice->size = index_size; + memcpy(slice->data, encoded_key->data, index_size); + encoded_key->remove_prefix(index_size); + return Status::OK(); + } +}; + +template<> +class KeyCoderTraits { +public: + static void encode_ascending(const void* value, size_t index_size, std::string* buf) { + const Slice* slice = (const Slice*)value; + size_t copy_size = std::min(index_size, slice->size); + buf->append(slice->data, copy_size); + } + + static Status decode_ascending(Slice* encoded_key, size_t index_size, + uint8_t* cell_ptr, Arena* arena) { + CHECK(encoded_key->size <= index_size) + << "encoded_key size is larger than index_size, key_size=" << encoded_key->size + << ", index_size=" << index_size; + auto copy_size = encoded_key->size; + Slice* slice = (Slice*)cell_ptr; + slice->data = arena->Allocate(copy_size); + slice->size = copy_size; + memcpy(slice->data, encoded_key->data, copy_size); + encoded_key->remove_prefix(copy_size); + return Status::OK(); + } +}; + +} diff --git a/be/src/olap/short_key_index.cpp b/be/src/olap/short_key_index.cpp new file mode 100644 index 00000000000000..69e6c8a4700fec --- /dev/null +++ b/be/src/olap/short_key_index.cpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/short_key_index.h" + +#include + +#include "util/coding.h" +#include "gutil/strings/substitute.h" + +using strings::Substitute; + +namespace doris { + +Status ShortKeyIndexBuilder::add_item(const Slice& key) { + put_varint32(&_offset_buf, _key_buf.size()); + _footer.set_num_items(_footer.num_items() + 1); + _key_buf.append(key.data, key.size); + return Status::OK(); +} + +Status ShortKeyIndexBuilder::finalize(uint32_t segment_bytes, + uint32_t num_segment_rows, + std::vector* slices) { + _footer.set_num_segment_rows(num_segment_rows); + _footer.set_segment_bytes(segment_bytes); + _footer.set_key_bytes(_key_buf.size()); + _footer.set_offset_bytes(_offset_buf.size()); + + // encode header + if (!_footer.SerializeToString(&_footer_buf)) { + return Status::InternalError("Failed to serialize index footer"); + } + + put_fixed32_le(&_footer_buf, _footer_buf.size()); + uint32_t checksum = 0; + put_fixed32_le(&_footer_buf, checksum); + + slices->emplace_back(_key_buf); + slices->emplace_back(_offset_buf); + slices->emplace_back(_footer_buf); + return Status::OK(); +} + +Status ShortKeyIndexDecoder::parse() { + Slice data = _data; + + // 1. parse footer, get checksum and footer length + if (data.size < 2 * sizeof(uint32_t)) { + return Status::Corruption( + Substitute("Short key is too short, need=$0 vs real=$1", + 2 * sizeof(uint32_t), data.size)); + } + size_t offset = data.size - 2 * sizeof(uint32_t); + uint32_t footer_length = decode_fixed32_le((uint8_t*)data.data + offset); + uint32_t checksum = decode_fixed32_le((uint8_t*)data.data + offset + 4); + // TODO(zc): do checksum + if (checksum != 0) { + return Status::Corruption( + Substitute("Checksum not match, need=$0 vs read=$1", 0, checksum)); + } + // move offset to parse footer + offset -= footer_length; + std::string footer_buf(data.data + offset, footer_length); + if (!_footer.ParseFromString(footer_buf)) { + return Status::Corruption("Fail to parse index footer from string"); + } + + // check if real data size match footer's content + if (offset != _footer.key_bytes() + _footer.offset_bytes()) { + return Status::Corruption( + Substitute("Index size not match, need=$0, real=$1", + _footer.key_bytes() + _footer.offset_bytes(), offset)); + } + + // set index buffer + _key_data = Slice(_data.data, _footer.key_bytes()); + + // parse offset information + Slice offset_slice(_data.data + _footer.key_bytes(), _footer.offset_bytes()); + // +1 for record total length + _offsets.resize(_footer.num_items() + 1); + _offsets[_footer.num_items()] = _footer.key_bytes(); + for (uint32_t i = 0; i < _footer.num_items(); ++i) { + uint32_t offset = 0; + if (!get_varint32(&offset_slice, &offset)) { + return Status::Corruption("Fail to get varint from index offset buffer"); + } + DCHECK(offset <= _footer.key_bytes()) + << "Offset is larger than total bytes, offset=" << offset + << ", key_bytes=" << _footer.key_bytes(); + _offsets[i] = offset; + } + + if (offset_slice.size != 0) { + return Status::Corruption("Still has data after parse all key offset"); + } + + return Status::OK(); +} + +} diff --git a/be/src/olap/short_key_index.h b/be/src/olap/short_key_index.h new file mode 100644 index 00000000000000..e0ebf0ef78a850 --- /dev/null +++ b/be/src/olap/short_key_index.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/segment_v2.pb.h" +#include "util/faststring.h" +#include "util/slice.h" + +namespace doris { + +// Used to encode a segment short key indices to binary format. This version +// only accepts binary key, client should assure that input key is sorted, +// otherwise error could happens. This builder would arrange data in following +// format. +// index = encoded_keys + encoded_offsets + footer + footer_size + checksum +// encoded_keys = binary_key + [, ...] +// encoded_offsets = encoded_offset + [, ...] +// encoded_offset = variant32 +// footer = ShortKeyFooterPB +// footer_size = fixed32 +// checksum = fixed32 +// Usage: +// ShortKeyIndexBuilder builder(segment_id, num_rows_per_block); +// builder.add_item(key1); +// ... +// builder.add_item(keyN); +// builder.finalize(segment_size, num_rows, &slices); +// TODO(zc): +// 1. If this can leverage binary page to save key and offset data +// 2. Extending this to save in a BTree like struct, which can index full key +// more than short key +class ShortKeyIndexBuilder { +public: + ShortKeyIndexBuilder(uint32_t segment_id, + uint32_t num_rows_per_block) { + _footer.set_segment_id(segment_id); + _footer.set_num_rows_per_block(num_rows_per_block); + } + + Status add_item(const Slice& key); + + Status finalize(uint32_t segment_size, uint32_t num_rows, std::vector* slices); + +private: + segment_v2::ShortKeyFooterPB _footer; + + faststring _key_buf; + faststring _offset_buf; + std::string _footer_buf; + std::vector _offsets; +}; + +class ShortKeyIndexDecoder; + +// An Iterator to iterate one short key index. +// Client can use this class to iterator all items +// item in this index. +class ShortKeyIndexIterator { +public: + using iterator_category = std::random_access_iterator_tag; + using value_type = Slice; + using pointer = Slice*; + using reference = Slice&; + using difference_type = ssize_t; + + ShortKeyIndexIterator(const ShortKeyIndexDecoder* decoder, uint32_t ordinal = 0) + : _decoder(decoder), _ordinal(ordinal) { } + + ShortKeyIndexIterator& operator-=(ssize_t step) { + _ordinal -= step; + return *this; + } + + ShortKeyIndexIterator& operator+=(ssize_t step) { + _ordinal += step; + return *this; + } + + ShortKeyIndexIterator& operator++() { + _ordinal++; + return *this; + } + + bool operator!=(const ShortKeyIndexIterator& other) { + return _ordinal != other._ordinal || _decoder != other._decoder; + } + + bool operator==(const ShortKeyIndexIterator& other) { + return _ordinal == other._ordinal && _decoder == other._decoder; + } + + ssize_t operator-(const ShortKeyIndexIterator& other) const { + return _ordinal - other._ordinal; + } + + inline bool valid() const; + + Slice operator*() const; + + ssize_t ordinal() const { return _ordinal; } + +private: + const ShortKeyIndexDecoder* _decoder; + ssize_t _ordinal; +}; + +// Used to decode short key to header and encoded index data. +// Usage: +// MemIndex index; +// ShortKeyIndexDecoder decoder(slice) +// decoder.parse(); +// auto iter = decoder.lower_bound(key); +class ShortKeyIndexDecoder { +public: + // Client should assure that data is available when this class + // is used. + ShortKeyIndexDecoder(const Slice& data) : _data(data) { } + + Status parse(); + + ShortKeyIndexIterator begin() const { return {this, 0}; } + ShortKeyIndexIterator end() const { return {this, num_items()}; } + + // lower_bound will return a iterator which locates the first item + // equal with or larger than given key. + // NOTE: This function holds that without common prefix key, the one + // who has more length it the bigger one. Two key is the same only + // when their length are equal + ShortKeyIndexIterator lower_bound(const Slice& key) const { + return seek(key); + } + + // Return the iterator which locates the first item larger than the + // input key. + ShortKeyIndexIterator upper_bound(const Slice& key) const { + return seek(key); + } + + uint32_t num_items() const { return _footer.num_items(); } + + Slice key(ssize_t ordinal) const { + DCHECK(ordinal >= 0 && ordinal < num_items()); + return {_key_data.data + _offsets[ordinal], _offsets[ordinal + 1] - _offsets[ordinal]}; + } + +private: + template + ShortKeyIndexIterator seek(const Slice& key) const { + auto comparator = [this] (const Slice& lhs, const Slice& rhs) { + return lhs.compare(rhs) < 0; + }; + if (lower_bound) { + return std::lower_bound(begin(), end(), key, comparator); + } else { + return std::upper_bound(begin(), end(), key, comparator); + } + } + +private: + Slice _data; + + // All following fields are only valid after pares has been executed successfully + segment_v2::ShortKeyFooterPB _footer; + std::vector _offsets; + Slice _key_data; +}; + +inline Slice ShortKeyIndexIterator::operator*() const { + return _decoder->key(_ordinal); +} + +inline bool ShortKeyIndexIterator::valid() const { + return _ordinal >= 0 && _ordinal < _decoder->num_items(); +} + +} diff --git a/be/src/olap/types.h b/be/src/olap/types.h index d7597d36825601..0989d1a86599ce 100644 --- a/be/src/olap/types.h +++ b/be/src/olap/types.h @@ -104,24 +104,31 @@ struct CppTypeTraits { template<> struct CppTypeTraits { using CppType = bool; + using UnsignedCppType = bool; }; template<> struct CppTypeTraits { using CppType = int8_t; + using UnsignedCppType = uint8_t; }; template<> struct CppTypeTraits { using CppType = int16_t; + using UnsignedCppType = uint16_t; }; template<> struct CppTypeTraits { using CppType = int32_t; + using UnsignedCppType = uint32_t; }; template<> struct CppTypeTraits { using CppType = uint32_t; + using UnsignedCppType = uint32_t; }; template<> struct CppTypeTraits { using CppType = int64_t; + using UnsignedCppType = uint64_t; }; template<> struct CppTypeTraits { using CppType = int128_t; + using UnsignedCppType = unsigned int128_t; }; template<> struct CppTypeTraits { using CppType = float; @@ -131,12 +138,15 @@ template<> struct CppTypeTraits { }; template<> struct CppTypeTraits { using CppType = decimal12_t; + using UnsignedCppType = decimal12_t; }; template<> struct CppTypeTraits { using CppType = uint24_t; + using UnsignedCppType = uint24_t; }; template<> struct CppTypeTraits { using CppType = int64_t; + using UnsignedCppType = uint64_t; }; template<> struct CppTypeTraits { using CppType = Slice; diff --git a/be/src/olap/uint24.h b/be/src/olap/uint24.h index 7632b584c93d01..638ffae6e6a1ae 100644 --- a/be/src/olap/uint24.h +++ b/be/src/olap/uint24.h @@ -36,19 +36,26 @@ struct uint24_t { data[2] = value.data[2]; } - uint24_t(const int32_t& value) { + uint24_t(const uint32_t& value) { data[0] = static_cast(value); data[1] = static_cast(value >> 8); data[2] = static_cast(value >> 16); } + uint24_t& operator=(const uint32_t& value) { + data[0] = static_cast(value); + data[1] = static_cast(value >> 8); + data[2] = static_cast(value >> 16); + return *this; + } + uint24_t& operator+=(const uint24_t& value) { *this = static_cast(*this) + static_cast(value); return *this; } - operator int() const { - int value = static_cast(data[0]); + operator uint32_t() const { + uint32_t value = static_cast(data[0]); value += (static_cast(static_cast(data[1]))) << 8; value += (static_cast(static_cast(data[2]))) << 16; return value; diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index d2cc2b0152aec7..05cc4983a63f06 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -78,6 +78,7 @@ set(UTIL_FILES md5.cpp frontend_helper.cpp faststring.cc + slice.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/slice.cpp b/be/src/util/slice.cpp new file mode 100644 index 00000000000000..fc583a2d228bef --- /dev/null +++ b/be/src/util/slice.cpp @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/slice.h" + +#include "util/faststring.h" + +namespace doris { + +// NOTE(zc): we define this function here to make compile work. +Slice::Slice(const faststring& s) : // NOLINT(runtime/explicit) + data((char*)(s.data())), size(s.size()) { } + +} diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 5c31072fe2e1e4..1eebe3a5f29fdd 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -29,6 +29,8 @@ namespace doris { +class faststring; + /// @brief A wrapper around externally allocated data. /// /// Slice is a simple structure containing a pointer into some external @@ -66,6 +68,8 @@ struct Slice { /// Create a slice that refers to the contents of the given string. Slice(const std::string& s) : // NOLINT(runtime/explicit) data(const_cast(s.data())), size(s.size()) { } + + Slice(const faststring& s); /// Create a slice that refers to a C-string s[0,strlen(s)-1]. Slice(const char* s) : // NOLINT(runtime/explicit) diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 0a11d868f7466d..ab105eed67856f 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -61,3 +61,5 @@ ADD_BE_TEST(rowset/alpha_rowset_test) ADD_BE_TEST(olap_snapshot_converter_test) ADD_BE_TEST(txn_manager_test) ADD_BE_TEST(generic_iterators_test) +ADD_BE_TEST(key_coder_test) +ADD_BE_TEST(short_key_index_test) diff --git a/be/test/olap/key_coder_test.cpp b/be/test/olap/key_coder_test.cpp new file mode 100644 index 00000000000000..5d241bb0804806 --- /dev/null +++ b/be/test/olap/key_coder_test.cpp @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/key_coder.h" + +#include +#include +#include + +#include "util/debug_util.h" + +namespace doris { + +class KeyCoderTest : public testing::Test { +public: + KeyCoderTest() { } + virtual ~KeyCoderTest() { + } +}; + +template +void test_integer_encode() { + using CppType = typename CppTypeTraits::CppType; + + auto key_coder = get_key_coder(type); + + { + std::string buf; + CppType val = std::numeric_limits::min(); + key_coder->encode_ascending(&val, 1, &buf); + + std::string result; + for (int i = 0; i < sizeof(CppType); ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + std::string buf; + CppType val = std::numeric_limits::max(); + key_coder->encode_ascending(&val, sizeof(CppType), &buf); + + std::string result; + for (int i = 0; i < sizeof(CppType); ++i) { + result.append("FF"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + CppType val1 = random(); + CppType val2 = random(); + + std::string buf1; + std::string buf2; + + key_coder->encode_ascending(&val1, sizeof(CppType), &buf1); + key_coder->encode_ascending(&val2, sizeof(CppType), &buf2); + + if (val1 < val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } else if (val1 > val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } else { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) == 0); + } + } +} + +TEST(KeyCoderTest, test_int) { + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + test_integer_encode(); + + test_integer_encode(); +} + +TEST(KeyCoderTest, test_date) { + using CppType = uint24_t; + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_DATE); + + { + std::string buf; + CppType val = 0; + key_coder->encode_ascending(&val, 1, &buf); + + std::string result; + for (int i = 0; i < sizeof(uint24_t); ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + std::string buf; + CppType val = 10000; + key_coder->encode_ascending(&val, sizeof(CppType), &buf); + + std::string result("002710"); + + ASSERT_STREQ(result.c_str(), hexdump(buf.data(), buf.size()).c_str()); + { + Slice slice(buf); + CppType check_val; + key_coder->decode_ascending(&slice, sizeof(CppType), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(val, check_val); + } + } + + { + CppType val1 = random(); + CppType val2 = random(); + + std::string buf1; + std::string buf2; + + key_coder->encode_ascending(&val1, sizeof(CppType), &buf1); + key_coder->encode_ascending(&val2, sizeof(CppType), &buf2); + + if (val1 < val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } else if (val1 > val2) { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } else { + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) == 0); + } + } +} + +TEST(KeyCoderTest, test_decimal) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_DECIMAL); + + decimal12_t val1(1, 100000000); + std::string buf1; + + key_coder->encode_ascending(&val1, sizeof(decimal12_t), &buf1); + + decimal12_t check_val; + Slice slice1(buf1); + key_coder->decode_ascending(&slice1, sizeof(decimal12_t), (uint8_t*)&check_val, nullptr); + ASSERT_EQ(check_val, val1); + + { + decimal12_t val2(-1, -100000000); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + } + { + decimal12_t val2(1, 100000001); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) < 0); + } + { + decimal12_t val2(0, 0); + std::string buf2; + key_coder->encode_ascending(&val2, sizeof(decimal12_t), &buf2); + ASSERT_TRUE(memcmp(buf1.c_str(), buf2.c_str(), buf1.size()) > 0); + + std::string result("80"); + for (int i = 0; i < sizeof(int64_t) - 1; ++i) { + result.append("00"); + } + result.append("80"); + for (int i = 0; i < sizeof(int32_t) - 1; ++i) { + result.append("00"); + } + + ASSERT_STREQ(result.c_str(), hexdump(buf2.data(), buf2.size()).c_str()); + } +} + +TEST(KeyCoderTest, test_char) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_CHAR); + + char buf[] = "1234567890"; + Slice slice(buf, 10); + + { + std::string key; + key_coder->encode_ascending(&slice, 10, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 10, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("1234567890", check_slice.data); + } + + { + std::string key; + key_coder->encode_ascending(&slice, 5, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 5, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("12345", check_slice.data); + } +} + +TEST(KeyCoderTest, test_varchar) { + auto key_coder = get_key_coder(OLAP_FIELD_TYPE_VARCHAR); + + char buf[] = "1234567890"; + Slice slice(buf, 10); + + { + std::string key; + key_coder->encode_ascending(&slice, 15, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 15, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("1234567890", check_slice.data); + } + + { + std::string key; + key_coder->encode_ascending(&slice, 5, &key); + Slice encoded_key(key); + + Arena arena; + Slice check_slice; + auto st = key_coder->decode_ascending(&encoded_key, 5, (uint8_t*)&check_slice, &arena); + ASSERT_TRUE(st.ok()); + + ASSERT_STREQ("12345", check_slice.data); + } +} + + +} // namespace doris + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/short_key_index_test.cpp b/be/test/olap/short_key_index_test.cpp new file mode 100644 index 00000000000000..eba2ef84e54468 --- /dev/null +++ b/be/test/olap/short_key_index_test.cpp @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/short_key_index.h" + +#include + +#include "olap/lru_cache.h" +#include "olap/file_helper.h" + +namespace doris { + +class ShortKeyIndexTest : public testing::Test { +public: + ShortKeyIndexTest() { } + virtual ~ShortKeyIndexTest() { + } +}; + +TEST_F(ShortKeyIndexTest, buider) { + ShortKeyIndexBuilder builder(0, 1024); + + for (int i = 1000; i < 10000; i += 2) { + builder.add_item(std::to_string(i)); + } + std::vector slices; + auto st = builder.finalize(10000, 9000 * 1024, &slices); + ASSERT_TRUE(st.ok()); + + std::string buf; + for (auto& slice : slices) { + buf.append(slice.data, slice.size); + } + + ShortKeyIndexDecoder decoder(buf); + st = decoder.parse(); + ASSERT_TRUE(st.ok()); + + // find 1499 + { + auto iter = decoder.lower_bound("1499"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1500", (*iter).to_string().c_str()); + } + // find 1500 lower bound + { + auto iter = decoder.lower_bound("1500"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1500", (*iter).to_string().c_str()); + } + // find 1500 upper bound + { + auto iter = decoder.upper_bound("1500"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("1502", (*iter).to_string().c_str()); + } + // find prefix "87" + { + auto iter = decoder.lower_bound("87"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("8700", (*iter).to_string().c_str()); + } + // find prefix "87" + { + auto iter = decoder.upper_bound("87"); + ASSERT_TRUE(iter.valid()); + ASSERT_STREQ("8700", (*iter).to_string().c_str()); + } + + // find prefix "9999" + { + auto iter = decoder.upper_bound("9999"); + ASSERT_FALSE(iter.valid()); + } +} + +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index c66c2d88e023be..f7d95c6eaaa22f 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -117,3 +117,21 @@ message FileFooterPB { repeated MetadataPairPB file_meta_datas = 8; // meta data of file optional PagePointerPB key_index_page = 9; // short key index page } + +message ShortKeyFooterPB { + // How many index item in this index. + optional uint32 num_items = 1; + // The total bytes occupied by the index key + optional uint32 key_bytes = 2; + // The total bytes occupied by the key offsets + optional uint32 offset_bytes = 3; + // Segment id which this index is belong to + optional uint32 segment_id = 4; + // number rows in each block + optional uint32 num_rows_per_block = 5; + // How many rows in this segment + optional uint32 num_segment_rows = 6; + // Total bytes for this segment + optional uint32 segment_bytes = 7; +} + diff --git a/run-ut.sh b/run-ut.sh index 5c3a4a4bc6b721..59fab254b87421 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -252,6 +252,8 @@ ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test ${DORIS_TEST_BINARY_DIR}/olap/aggregate_func_test +${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test +${DORIS_TEST_BINARY_DIR}/olap/key_coder_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test From d7358d70e482cfd95b57ba6b0012aa4b1af83422 Mon Sep 17 00:00:00 2001 From: zhaochun Date: Fri, 2 Aug 2019 21:09:51 +0800 Subject: [PATCH 2/5] Add Segment Iterator for BetaRowset --- be/src/olap/CMakeLists.txt | 3 + be/src/olap/column_mapping.h | 4 +- be/src/olap/delete_handler.cpp | 1 + be/src/olap/delete_handler.h | 6 +- be/src/olap/delta_writer.cpp | 1 + be/src/olap/delta_writer.h | 3 +- be/src/olap/field.h | 14 + be/src/olap/iterators.h | 17 ++ be/src/olap/rowset/rowset_reader.h | 1 + be/src/olap/rowset/rowset_reader_context.h | 10 +- be/src/olap/rowset/rowset_writer.h | 4 +- be/src/olap/rowset/segment_v2/column_writer.h | 12 + be/src/olap/rowset/segment_v2/segment.cpp | 196 ++++++++++++++ be/src/olap/rowset/segment_v2/segment.h | 122 +++++++++ .../rowset/segment_v2/segment_iterator.cpp | 252 ++++++++++++++++++ .../olap/rowset/segment_v2/segment_iterator.h | 87 ++++++ .../olap/rowset/segment_v2/segment_writer.cpp | 191 +++++++++++++ .../olap/rowset/segment_v2/segment_writer.h | 90 +++++++ be/src/olap/schema_change.cpp | 45 ++++ be/src/olap/schema_change.h | 46 ---- be/src/olap/short_key_index.h | 76 ++++++ be/src/olap/storage_engine.h | 2 +- be/src/olap/tablet.h | 1 - be/test/olap/CMakeLists.txt | 1 + .../olap/rowset/segment_v2/segment_test.cpp | 227 ++++++++++++++++ gensrc/proto/segment_v2.proto | 34 ++- run-ut.sh | 1 + 27 files changed, 1380 insertions(+), 67 deletions(-) create mode 100644 be/src/olap/rowset/segment_v2/segment.cpp create mode 100644 be/src/olap/rowset/segment_v2/segment.h create mode 100644 be/src/olap/rowset/segment_v2/segment_iterator.cpp create mode 100644 be/src/olap/rowset/segment_v2/segment_iterator.h create mode 100644 be/src/olap/rowset/segment_v2/segment_writer.cpp create mode 100644 be/src/olap/rowset/segment_v2/segment_writer.h create mode 100644 be/test/olap/rowset/segment_v2/segment_test.cpp diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 6f45285f26ddaf..1790f9effa7f98 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -86,6 +86,9 @@ add_library(Olap STATIC rowset/segment_v2/encoding_info.cpp rowset/segment_v2/ordinal_page_index.cpp rowset/segment_v2/binary_dict_page.cpp + rowset/segment_v2/segment.cpp + rowset/segment_v2/segment_iterator.cpp + rowset/segment_v2/segment_writer.cpp rowset_factory.cpp task/engine_batch_load_task.cpp task/engine_checksum_task.cpp diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 1cebe4eab3e544..7b19d9c5f111ec 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -18,10 +18,10 @@ #ifndef DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H #define DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H -#include "olap/wrapper_field.h" - namespace doris { +class WrapperField; + struct ColumnMapping { ColumnMapping() : ref_column(-1), default_value(nullptr) {} virtual ~ColumnMapping() {} diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 70e7146247ecff..b6c36d64f6227e 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -29,6 +29,7 @@ #include "gen_cpp/olap_file.pb.h" #include "olap/olap_common.h" #include "olap/utils.h" +#include "olap/olap_cond.h" using apache::thrift::ThriftDebugString; using std::numeric_limits; diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h index 9e0f7689a34736..0c23a005ab91f6 100644 --- a/be/src/olap/delete_handler.h +++ b/be/src/olap/delete_handler.h @@ -23,14 +23,14 @@ #include "gen_cpp/AgentService_types.h" #include "gen_cpp/olap_file.pb.h" -#include "olap/field.h" -#include "olap/olap_cond.h" #include "olap/olap_define.h" -#include "olap/row_cursor.h" +#include "olap/tablet_schema.h" namespace doris { typedef google::protobuf::RepeatedPtrField DelPredicateArray; +class Conditions; +class RowCursor; class DeleteConditionHandler { public: diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d593aef5921a54..d3f63478c4e967 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -18,6 +18,7 @@ #include "olap/delta_writer.h" #include "olap/schema.h" +#include "olap/memtable.h" #include "olap/data_dir.h" #include "olap/rowset/alpha_rowset_writer.h" #include "olap/rowset/rowset_meta_manager.h" diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 16db33d4aab312..c909147b075100 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -18,7 +18,6 @@ #ifndef DORIS_BE_SRC_DELTA_WRITER_H #define DORIS_BE_SRC_DELTA_WRITER_H -#include "olap/memtable.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/schema_change.h" @@ -30,6 +29,8 @@ namespace doris { class SegmentGroup; +class MemTable; +class Schema; enum WriteType { LOAD = 1, diff --git a/be/src/olap/field.h b/be/src/olap/field.h index d6eef2a7753075..460c7bd936277e 100644 --- a/be/src/olap/field.h +++ b/be/src/olap/field.h @@ -26,6 +26,7 @@ #include "olap/olap_define.h" #include "olap/tablet_schema.h" #include "olap/types.h" +#include "olap/key_coder.h" #include "olap/utils.h" #include "olap/row_cursor_cell.h" #include "runtime/mem_pool.h" @@ -57,12 +58,14 @@ class Field { Field(const TabletColumn& column) : _type_info(get_type_info(column.type())), _agg_info(get_aggregate_info(column.aggregation(), column.type())), + _key_coder(get_key_coder(column.type())), _index_size(column.index_length()), _is_nullable(column.is_nullable()) { } Field(FieldType type) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(OLAP_FIELD_AGGREGATION_NONE, type)), + _key_coder(get_key_coder(type)), _index_size(_type_info->size()), _is_nullable(true) { } @@ -70,6 +73,7 @@ class Field { Field(const FieldAggregationMethod& agg, const FieldType& type, bool is_nullable) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(agg, type)), + _key_coder(get_key_coder(type)), _index_size(-1), _is_nullable(is_nullable) { } @@ -77,6 +81,7 @@ class Field { Field(const FieldAggregationMethod& agg, const FieldType& type, size_t index_size, bool is_nullable) : _type_info(get_type_info(type)), _agg_info(get_aggregate_info(agg, type)), + _key_coder(get_key_coder(type)), _index_size(index_size), _is_nullable(is_nullable) { } @@ -233,10 +238,19 @@ class Field { FieldType type() const { return _type_info->type(); } const TypeInfo* type_info() const { return _type_info; } bool is_nullable() const { return _is_nullable; } + + void encode_ascending(const void* value, std::string* buf) const { + _key_coder->encode_ascending(value, _index_size, buf); + } + + Status decode_ascending(Slice* encoded_key, uint8_t* cell_ptr, Arena* arena) const { + return _key_coder->decode_ascending(encoded_key, _index_size, cell_ptr, arena); + } private: // Field的最大长度,单位为字节,通常等于length, 变长字符串不同 const TypeInfo* _type_info; const AggregateInfo* _agg_info; + const KeyCoder* _key_coder; uint16_t _index_size; bool _is_nullable; }; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index d54cef13ff6840..63fb07eb1299eb 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -21,10 +21,27 @@ namespace doris { +class RowCursor; class RowBlockV2; class Schema; struct StorageReadOptions { + // lower_bound defines the smallest key at which iterator will + // return data. + // If lower_bound is null, won't return + std::shared_ptr lower_bound; + + // If include_lower_bound is true, data equal with lower_bound will + // be read + bool include_lower_bound; + + // upper_bound defines the extend upto which the iterator can return + // data. + std::shared_ptr upper_bound; + + // If include_upper_bound is true, data equal with upper_bound will + // be read + bool include_upper_bound; }; // Used to read data in RowBlockV2 one by one diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 0f60fc8e344a6b..57e3533b52e0f8 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -26,6 +26,7 @@ namespace doris { +class RowBlock; class RowsetReader; using RowsetReaderSharedPtr = std::shared_ptr; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 9d516242ee6ee2..f2c51100f75fde 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -18,17 +18,17 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H -#include "olap/schema.h" #include "olap/column_predicate.h" -#include "olap/row_cursor.h" -#include "olap/row_block.h" #include "olap/lru_cache.h" -#include "olap/olap_cond.h" -#include "olap/delete_handler.h" #include "runtime/runtime_state.h" namespace doris { +class RowCursor; +class Conditions; +class DeleteHandler; +class TabletSchema; + struct RowsetReaderContext { RowsetReaderContext() : reader_type(READER_QUERY), tablet_schema(nullptr), diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 9d3eea05243f94..a637dae353e09c 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -20,14 +20,14 @@ #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" -#include "olap/schema.h" -#include "olap/row_block.h" #include "gen_cpp/types.pb.h" #include "runtime/mem_pool.h" +#include "olap/column_mapping.h" namespace doris { class ContiguousRow; +class RowCursor; class RowsetWriter; using RowsetWriterSharedPtr = std::shared_ptr; diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index d137059e2141b7..2e7d778d8f1541 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -58,6 +58,18 @@ class ColumnWriter { ~ColumnWriter(); Status init(); + + template + Status append(const CellType& cell) { + if (_is_nullable) { + uint8_t nullmap = 0; + BitmapChange(&nullmap, 0, cell.is_null()); + return append_nullable(&nullmap, cell.cell_ptr(), 1); + } else { + return append(cell.cell_ptr(), 1); + } + } + // Now we only support append one by one, we should support append // multi rows in one call Status append(bool is_null, void* data) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp new file mode 100644 index 00000000000000..eab1eb101942e8 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment.h" + +#include "common/logging.h" // LOG +#include "env/env.h" // RandomAccessFile +#include "gutil/strings/substitute.h" +#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader +#include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "util/slice.h" // Slice +#include "olap/tablet_schema.h" + +namespace doris { +namespace segment_v2 { + +using strings::Substitute; + +Segment::Segment( + std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + size_t num_rows_per_block) + : _fname(std::move(fname)), + _segment_id(segment_id), + _tablet_schema(tablet_schema), + _num_rows_per_block(num_rows_per_block) { +} + +Segment::~Segment() { + for (auto reader : _column_readers) { + delete reader; + } +} + +Status Segment::open() { + RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file)); + RETURN_IF_ERROR(_input_file->size(&_file_size)); + // 24: 2 * magic + 2 * checksum + 2 * footer/header length + if (_file_size < 24) { + return Status::Corruption( + Substitute("Bad segment, file size is too small, real=$0 vs need=$1", + _file_size, 24)); + } + // parse footer to get meta + RETURN_IF_ERROR(_parse_footer()); + // parse footer to get meta + RETURN_IF_ERROR(_parse_header()); + // parse short key index + RETURN_IF_ERROR(_parse_index()); + // initial all column reader + RETURN_IF_ERROR(_initial_column_readers()); + return Status::OK(); +} + +Status Segment::new_iterator(const Schema& schema, std::unique_ptr* output) { + output->reset(new SegmentIterator(this->shared_from_this(), schema)); + return Status::OK(); +} + +// read data at offset of input file, and parse data into magic and +// header/footer's length. currently return header/footer's length +Status Segment::_parse_magic_and_len(uint64_t offset, uint32_t* length) { + // read magic and length + uint8_t buf[8]; + Slice slice(buf, 8); + RETURN_IF_ERROR(_input_file->read_at(offset, slice)); + + if (memcmp(slice.data, k_segment_magic, k_segment_magic_length) != 0) { + LOG(WARNING) << "Magic don't match, magic=" << std::string((char*)buf, 4); + return Status::Corruption("Bad segment, file magic don't match"); + } + *length = decode_fixed32_le((uint8_t*)slice.data + 4); + return Status::OK(); +} + +Status Segment::_parse_footer() { + uint64_t offset = _file_size - 8; + uint32_t footer_length = 0; + RETURN_IF_ERROR(_parse_magic_and_len(offset, &footer_length)); + + // check file size footer + checksum + if (offset < footer_length + 4) { + LOG(WARNING) << "Segment file is too small, size=" << _file_size << ", footer_size=" << footer_length; + return Status::Corruption("Bad segment, file size is too small"); + } + offset -= footer_length + 4; + + uint8_t checksum_buf[4]; + std::string footer_buf; + footer_buf.resize(footer_length); + + std::vector slices = {{checksum_buf, 4}, {footer_buf.data(), footer_length}}; + + RETURN_IF_ERROR(_input_file->readv_at(offset, &slices[0], 2)); + + // TOOD(zc): check footer's checksum + if (!_footer.ParseFromString(footer_buf)) { + return Status::Corruption("Bad segment, parse footer from PB failed"); + } + + return Status::OK(); +} + +Status Segment::_parse_header() { + uint64_t offset = 0; + uint32_t header_length = 0; + RETURN_IF_ERROR(_parse_magic_and_len(offset, &header_length)); + + offset += 8; + if ((offset + header_length + 4) > _file_size) { + LOG(WARNING) << "Segment file is too small, size=" << _file_size << ", header_size=" << header_length; + return Status::Corruption("Bad segment, file too small"); + } + std::string header_buf; + header_buf.resize(header_length); + uint8_t checksum_buf[4]; + std::vector slices{{header_buf.data(), header_length}, {checksum_buf, 4}}; + RETURN_IF_ERROR(_input_file->readv_at(offset, &slices[0], 2)); + + // TODO(zc): check checksum + if (!_header.ParseFromString(header_buf)) { + return Status::Corruption("Bad segment, parse header from PB failed"); + } + return Status::OK(); +} + +// load and parse short key index +Status Segment::_parse_index() { + // read short key index content + _sk_index_buf.resize(_footer.short_key_index_page().size()); + Slice slice(_sk_index_buf.data(), _sk_index_buf.size()); + RETURN_IF_ERROR(_input_file->read_at(_footer.short_key_index_page().offset(), slice)); + + // Parse short key index + _sk_index_decoder.reset(new ShortKeyIndexDecoder(_sk_index_buf)); + RETURN_IF_ERROR(_sk_index_decoder->parse()); + return Status::OK(); +} + +Status Segment::_initial_column_readers() { + // Map from column unique id to column ordinal in footer's ColumnMetaPB + // If we can't find unique id, it means this segment is created + // with an old schema. So we should create a DefaultValueIterator + // for this column. + std::unordered_map unique_id_to_ordinal; + for (auto& column_pb : _footer.columns()) { + LOG(INFO) << "column_id=" << column_pb.column_id() << ", unique_id=" << column_pb.unique_id(); + unique_id_to_ordinal.emplace(column_pb.column_id(), column_pb.unique_id()); + } + // TODO(zc): Lazy init()? + // There may be too many columns, majority of them would not be used + // in query, so we should not init them here. + _column_readers.resize(_tablet_schema->columns().size(), nullptr); + + for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) { + auto& column = _tablet_schema->columns()[ordinal]; + auto iter = unique_id_to_ordinal.find(column.unique_id()); + if (iter == unique_id_to_ordinal.end()) { + continue; + } + + ColumnReaderOptions opts; + std::unique_ptr reader( + new ColumnReader(opts, _footer.columns(iter->second), _input_file.get())); + RETURN_IF_ERROR(reader->init()); + + _column_readers[ordinal] = reader.release(); + } + return Status::OK(); +} + +Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) { + if (_column_readers[cid] == nullptr) { + // TODO(zc): create a DefaultValueIterator for this column + // create + } + return _column_readers[cid]->new_iterator(iter); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h new file mode 100644 index 00000000000000..d76c48a386a665 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include // for unique_ptr +#include + +#include "common/status.h" // Status +#include "gen_cpp/segment_v2.pb.h" // SegmentHeaderPB +#include "olap/rowset/segment_v2/common.h" // rowid_t +#include "olap/short_key_index.h" +#include "olap/tablet_schema.h" +#include "util/faststring.h" + +namespace doris { + +class RandomAccessFile; +class SegmentGroup; +class FieldInfo; +class TabletSchema; +class ShortKeyIndexDecoder; +class Schema; + +namespace segment_v2 { + +class ColumnReader; +class ColumnIterator; +class SegmentIterator; + +// A Segment is used to represent a segment in memory format. When segment is +// generated, it won't be modified, so this struct aimed to help read operation. +// It will prepare all ColumnReader to create ColumnIterator as needed. +// And user can create a SegmentIterator through new_iterator function. +// +// NOTE: This segment is used to a specified TabletSchema, when TabletSchema +// is changed, this segemnt can not be used any more. For eample, after a schema +// change finished, client should disalbe all cahced Segment for old TabletSchema. +class Segment : public std::enable_shared_from_this { +public: + Segment(std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + size_t num_rows_per_block); + ~Segment(); + + Status open(); + + Status new_iterator(const Schema& schema, std::unique_ptr* iter); + + uint64_t id() const { return _segment_id; } + + uint32_t num_rows() const { return _footer.num_rows(); } + +private: + friend class SegmentIterator; + + Status new_column_iterator(uint32_t cid, ColumnIterator** iter); + uint32_t num_rows_per_block() const { return _num_rows_per_block; } + size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); } + + Status _parse_magic_and_len(uint64_t offset, uint32_t* length); + Status _parse_footer(); + Status _parse_header(); + Status _parse_index(); + Status _initial_column_readers(); + + ShortKeyIndexIterator lower_bound(const Slice& key) const { + return _sk_index_decoder->lower_bound(key); + } + ShortKeyIndexIterator upper_bound(const Slice& key) const { + return _sk_index_decoder->upper_bound(key); + } + + // This will return the last row block in this segment. + // NOTE: Before call this function , client should assure that + // this segment is not empty. + uint32_t last_block() const { + DCHECK(num_rows() > 0); + return _sk_index_decoder->num_items() - 1; + } + +private: + std::string _fname; + uint32_t _segment_id; + std::shared_ptr _tablet_schema; + uint32_t _num_rows_per_block; + + SegmentHeaderPB _header; + SegmentFooterPB _footer; + std::unique_ptr _input_file; + uint64_t _file_size = 0; + + // ColumnReader for each column in TabletSchema. If ColumnReader is nullptr, + // This means that this segment has no data for that column, which may be added + // after this segment is generated. + std::vector _column_readers; + + // used to store short key index + faststring _sk_index_buf; + + // short key index decoder + std::unique_ptr _sk_index_decoder; +}; + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp new file mode 100644 index 00000000000000..601e25829cb7d0 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment_iterator.h" + +#include + +#include "gutil/strings/substitute.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/row_block2.h" +#include "olap/row_cursor.h" +#include "olap/short_key_index.h" + +using strings::Substitute; + +namespace doris { +namespace segment_v2 { + +SegmentIterator::SegmentIterator(std::shared_ptr segment, + const Schema& schema) + : _segment(std::move(segment)), + _schema(schema), + _column_iterators(_schema.num_columns(), nullptr) { +} + +SegmentIterator::~SegmentIterator() { + for (auto iter : _column_iterators) { + delete iter; + } +} + +Status SegmentIterator::init(const StorageReadOptions& opts) { + _opts = opts; + RETURN_IF_ERROR(_init_short_key_range()); + RETURN_IF_ERROR(_init_column_iterators()); + return Status::OK(); +} + +// This function will use input key bounds to get a row range. +Status SegmentIterator::_init_short_key_range() { + _lower_rowid = 0; + _upper_rowid = num_rows(); + + // fast path for empty segment + if (_upper_rowid == 0) { + return Status::OK(); + } + + if (_opts.lower_bound == nullptr && _opts.upper_bound == nullptr) { + return Status::OK(); + } + + RETURN_IF_ERROR(_prepare_seek()); + + // init row range with short key range + if (_opts.upper_bound != nullptr) { + // If client want to read upper_bound, the include_upper_bound is true. So we + // should get the first ordinal at which key is larger than upper_bound. + // So we call _lookup_ordinal with include_upper_bound's negate + RETURN_IF_ERROR(_lookup_ordinal( + *_opts.upper_bound, !_opts.include_upper_bound, num_rows(), &_upper_rowid)); + } + if (_opts.lower_bound != nullptr) { + RETURN_IF_ERROR(_lookup_ordinal( + *_opts.lower_bound, _opts.include_lower_bound, _upper_rowid, &_lower_rowid)); + } + + return Status::OK(); +} + +// Set up environment for the following seek. +Status SegmentIterator::_prepare_seek() { + std::vector key_fields; + std::set column_set; + if (_opts.lower_bound != nullptr) { + for (auto cid : _opts.lower_bound->schema()->column_ids()) { + column_set.emplace(cid); + key_fields.push_back(*_opts.lower_bound->schema()->column(cid)); + } + } + if (_opts.upper_bound != nullptr) { + for (auto cid : _opts.upper_bound->schema()->column_ids()) { + if (column_set.count(cid) == 0) { + key_fields.push_back(*_opts.upper_bound->schema()->column(cid)); + column_set.emplace(cid); + } + } + } + _seek_schema.reset(new Schema(key_fields, key_fields.size())); + _seek_block.reset(new RowBlockV2(*_seek_schema, 1, &_arena)); + + // create used column iterator + for (auto cid : _seek_schema->column_ids()) { + if (_column_iterators[cid] == nullptr) { + RETURN_IF_ERROR(_create_column_iterator(cid, &_column_iterators[cid])); + } + } + + return Status::OK(); +} + +Status SegmentIterator::_init_column_iterators() { + for (auto cid : _schema.column_ids()) { + if (_column_iterators[cid] == nullptr) { + RETURN_IF_ERROR(_create_column_iterator(cid, &_column_iterators[cid])); + } + + _column_iterators[cid]->seek_to_ordinal(_lower_rowid); + } + _cur_rowid = _lower_rowid; + return Status::OK(); +} + +Status SegmentIterator::_create_column_iterator(uint32_t cid, ColumnIterator** iter) { + _segment->new_column_iterator(cid, iter); + return Status::OK(); +} + +// Schema of lhs and rhs are different. +// callers should assure that rhs' schema has all columns in lhs schema +template +int compare_row_with_lhs_columns(const LhsRowType& lhs, const RhsRowType& rhs) { + for (auto cid : lhs.schema()->column_ids()) { + auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); + if (res != 0) { + return res; + } + } + return 0; +} + +// look up one key to get its ordinal at which can get data. +// 'upper_bound' is defined the max ordinal the function will search. +// We use upper_bound to reduce search times. +// If we find a valid ordinal, it will be set in rowid and with Status::OK() +// If we can not find a valid key in this segment, we will set rowid to upper_bound +// Otherwise return error. +// 1. get [start, end) ordinal through short key index +// 2. binary search to find exact ordinal that match the input condition +Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, + rowid_t upper_bound, rowid_t* rowid) { + std::string index_key; + encode_key(&index_key, key, _segment->num_short_keys(), is_include); + + uint32_t start_block_id = 0; + auto start_iter = _segment->lower_bound(index_key); + if (start_iter.valid()) { + // Because previous block may contain this key, so we should set rowid to + // last block's first row. + start_block_id = start_iter.ordinal(); + if (start_block_id > 0) { + start_block_id--; + } + } else { + // When we don't find a valid index item, which means all short key is + // smaller than input key, this means that this key may exist in the last + // row block. so we set the rowid to first row of last row block. + start_block_id = _segment->last_block(); + } + rowid_t start = start_block_id * _segment->num_rows_per_block(); + + rowid_t end = upper_bound; + auto end_iter = _segment->upper_bound(index_key); + if (end_iter.valid()) { + end = end_iter.ordinal() * _segment->num_rows_per_block(); + } + + // binary search to find the exact key + while (start < end) { + rowid_t mid = (start + end) / 2; + RETURN_IF_ERROR(_seek_and_peek(mid)); + int cmp = compare_row_with_lhs_columns(key, _seek_block->row(0)); + if (cmp > 0) { + start = mid + 1; + } else if (cmp == 0) { + if (is_include) { + // lower bound + end = mid; + } else { + // upper bound + start = mid + 1; + } + } else { + end = mid; + } + } + + *rowid = start; + return Status::OK(); +} + +// seek to the row and load that row to _key_cursor +Status SegmentIterator::_seek_and_peek(rowid_t rowid) { + for (auto cid : _seek_schema->column_ids()) { + _column_iterators[cid]->seek_to_ordinal(rowid); + } + size_t num_rows = 1; + _seek_block->resize(num_rows); + RETURN_IF_ERROR(_next_batch(_seek_block.get(), &num_rows)); + return Status::OK(); +} + +// Try to read data as much to block->num_rows(). The number of read rows +// will be set in rows_read when return OK. rows_read will small than +// block->num_rows() when reach the end of this segment +Status SegmentIterator::_next_batch(RowBlockV2* block, size_t* rows_read) { + bool has_read = false; + size_t first_read = 0; + for (int i = 0; i < block->schema()->column_ids().size(); ++i) { + auto cid = block->schema()->column_ids()[i]; + size_t num_rows = has_read ? first_read : block->num_rows(); + auto column_block = block->column_block(i); + RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&num_rows, &column_block)); + if (!has_read) { + has_read = true; + first_read = num_rows; + } else if (num_rows != first_read) { + return Status::InternalError( + Substitute("Read different rows in different columns" + ", column($0) read $1 vs column($2) read $3", + block->schema()->column_ids()[0], first_read, cid, num_rows)); + } + } + *rows_read = first_read; + return Status::OK(); +} + +Status SegmentIterator::next_batch(RowBlockV2* block) { + size_t rows_to_read = std::min((rowid_t)block->capacity(), _upper_rowid - _cur_rowid); + block->resize(rows_to_read); + RETURN_IF_ERROR(_next_batch(block, &rows_to_read)); + _cur_rowid += rows_to_read; + return Status::OK(); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h new file mode 100644 index 00000000000000..87054f67729b95 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "olap/rowset/segment_v2/common.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/iterators.h" +#include "olap/schema.h" +#include "util/arena.h" + +namespace doris { + +class RowCursor; +class RowBlockV2; +class ShortKeyIndexIterator; + +namespace segment_v2 { + +class ColumnIterator; + +class SegmentIterator : public RowwiseIterator { +public: + SegmentIterator(std::shared_ptr segment, const Schema& _schema); + ~SegmentIterator() override; + Status init(const StorageReadOptions& opts) override; + Status next_batch(RowBlockV2* row_block) override; + const Schema& schema() const override { return _schema; } +private: + Status _init_short_key_range(); + Status _prepare_seek(); + Status _init_column_iterators(); + Status _create_column_iterator(uint32_t cid, ColumnIterator** iter); + Status _lookup_ordinal(const RowCursor& key, bool is_include, + rowid_t upper_bound, rowid_t* rowid); + Status _seek_and_peek(rowid_t rowid); + Status _next_batch(RowBlockV2* block, size_t* rows_read); + + uint32_t segment_id() const { return _segment->id(); } + uint32_t num_rows() const { return _segment->num_rows(); } + +private: + std::shared_ptr _segment; + // TODO(zc): rethink if we need copy it + Schema _schema; + + StorageReadOptions _opts; + + // Only used when init is called, help to finish seek_and_peek. + // Data will be saved in this batch + std::unique_ptr _seek_schema; + + // used to read data from columns when do bianry search to find + // oridnal for input bounds + std::unique_ptr _seek_block; + // helper to save row to compare with input bounds + std::unique_ptr _key_cursor; + + std::vector _column_iterators; + + rowid_t _lower_rowid; + rowid_t _upper_rowid; + rowid_t _cur_rowid; + + Arena _arena; +}; + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp new file mode 100644 index 00000000000000..ff289cfc5a6047 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment_writer.h" + +#include "env/env.h" // Env +#include "olap/row_block.h" // RowBlock +#include "olap/row_cursor.h" // RowCursor +#include "olap/rowset/segment_v2/column_writer.h" // ColumnWriter +#include "olap/short_key_index.h" + +namespace doris { +namespace segment_v2 { + +const char* k_segment_magic = "D0R1"; +const uint32_t k_segment_magic_length = 4; + +SegmentWriter::SegmentWriter(std::string fname, uint32_t segment_id, + const std::shared_ptr& tablet_schema, + const SegmentWriterOptions& opts) + : _fname(std::move(fname)), + _segment_id(segment_id), + _tablet_schema(tablet_schema), + _opts(opts) { +} + +SegmentWriter::~SegmentWriter() { + for (auto writer : _column_writers) { + delete writer; + } +} + +Status SegmentWriter::init(uint32_t write_mbytes_per_sec) { + // create for write + RETURN_IF_ERROR(Env::Default()->new_writable_file(_fname, &_output_file)); + + uint32_t column_id = 0; + for (auto& column : _tablet_schema->columns()) { + ColumnMetaPB* column_meta = _footer.add_columns(); + // TODO(zc): Do we need this column_id?? + column_meta->set_column_id(column_id++); + column_meta->set_unique_id(column.unique_id()); + bool is_nullable = column.is_nullable(); + column_meta->set_is_nullable(is_nullable); + + // TODO(zc): we can add type_info into TabletColumn? + const TypeInfo* type_info = get_type_info(column.type()); + DCHECK(type_info != nullptr); + + ColumnWriterOptions opts; + std::unique_ptr writer(new ColumnWriter(opts, type_info, is_nullable, _output_file.get())); + RETURN_IF_ERROR(writer->init()); + _column_writers.push_back(writer.release()); + } + _index_builder.reset(new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block)); + return Status::OK(); +} + +template +Status SegmentWriter::append_row(const RowType& row) { + for (size_t cid = 0; cid < _column_writers.size(); ++cid) { + auto cell = row.cell(cid); + RETURN_IF_ERROR(_column_writers[cid]->append(cell)); + } + + if ((_row_count % _opts.num_rows_per_block) == 0) { + std::string encoded_key; + encode_key(&encoded_key, row, _tablet_schema->num_short_key_columns()); + RETURN_IF_ERROR(_index_builder->add_item(encoded_key)); + _block_count++; + } + _row_count++; + return Status::OK(); +} + +template Status SegmentWriter::append_row(const RowCursor& row); + +uint64_t SegmentWriter::estimate_segment_size() { + return 0; +} + +Status SegmentWriter::finalize(uint32_t* segment_file_size) { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->finish()); + } + RETURN_IF_ERROR(_write_header()); + RETURN_IF_ERROR(_write_data()); + RETURN_IF_ERROR(_write_ordinal_index()); + RETURN_IF_ERROR(_write_short_key_index()); + RETURN_IF_ERROR(_write_footer()); + return Status::OK(); +} + +// write header +Status SegmentWriter::_write_header() { + std::string header_buf; + SegmentHeaderPB header; + if (!header.SerializeToString(&header_buf)) { + return Status::InternalError("failed to serialize segment header"); + } + + // header length + std::string magic_and_len_buf; + magic_and_len_buf.append(k_segment_magic); + put_fixed32_le(&magic_and_len_buf, header_buf.size()); + + // checksum + uint8_t checksum_buf[4]; + // TODO(zc): add checksum + + std::vector slices = {magic_and_len_buf, header_buf, {checksum_buf, 4}}; + RETURN_IF_ERROR(_write_raw_data(slices)); + return Status::OK(); +} + +// write column data to file one by one +Status SegmentWriter::_write_data() { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->write_data()); + } + return Status::OK(); +} + +// write ordinal index after data has been written +Status SegmentWriter::_write_ordinal_index() { + for (auto column_writer : _column_writers) { + RETURN_IF_ERROR(column_writer->write_ordinal_index()); + } + return Status::OK(); +} + +Status SegmentWriter::_write_short_key_index() { + std::vector slices; + // TODO(zc): we should get segment_size + RETURN_IF_ERROR(_index_builder->finalize(_row_count * 100, _row_count, &slices)); + + uint64_t offset = _output_file->size(); + RETURN_IF_ERROR(_write_raw_data(slices)); + uint32_t written_bytes = _output_file->size() - offset; + + _footer.mutable_short_key_index_page()->set_offset(offset); + _footer.mutable_short_key_index_page()->set_size(written_bytes); + return Status::OK(); +} + +Status SegmentWriter::_write_footer() { + _footer.set_num_rows(_row_count); + // collect all + for (int i = 0; i < _column_writers.size(); ++i) { + _column_writers[i]->write_meta(_footer.mutable_columns(i)); + } + + // write footer + std::string footer_buf; + if (!_footer.SerializeToString(&footer_buf)) { + return Status::InternalError("failed to serialize segment footer"); + } + std::string magic_and_len_buf; + magic_and_len_buf.append(k_segment_magic, k_segment_magic_length); + put_fixed32_le(&magic_and_len_buf, footer_buf.size()); + + char checksum_buf[4]; + // TODO(zc): compute checsum + std::vector slices{{checksum_buf, 4}, footer_buf, magic_and_len_buf}; + // write offset and length + RETURN_IF_ERROR(_write_raw_data(slices)); + // magic + return Status::OK(); +} + +Status SegmentWriter::_write_raw_data(const std::vector& slices) { + RETURN_IF_ERROR(_output_file->appendv(&slices[0], slices.size())); + return Status::OK(); +} + +} +} diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h new file mode 100644 index 00000000000000..48403d6d736009 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include // unique_ptr +#include +#include + +#include "common/logging.h" // LOG +#include "common/status.h" // Status +#include "gen_cpp/segment_v2.pb.h" // SegmentHeaderPB +#include "olap/schema.h" + +namespace doris { + +class WritableFile; +class RowBlock; +class RowCursor; +class ShortKeyIndexBuilder; + +namespace segment_v2 { + +class ColumnWriter; + +extern const char* k_segment_magic; +extern const uint32_t k_segment_magic_length; + +struct SegmentWriterOptions { + uint32_t num_rows_per_block = 1024; +}; + +class SegmentWriter { +public: + explicit SegmentWriter(std::string file_name, + uint32_t segment_id, + const std::shared_ptr& tablet_schema, + const SegmentWriterOptions& opts); + + ~SegmentWriter(); + Status init(uint32_t write_mbytes_per_sec); + + template + Status append_row(const RowType& row); + + uint64_t estimate_segment_size(); + + Status finalize(uint32_t* segment_file_size); + +private: + Status _write_header(); + Status _write_data(); + Status _write_ordinal_index(); + Status _write_short_key_index(); + Status _write_footer(); + Status _write_raw_data(const std::vector& slices); + +private: + std::string _fname; + uint32_t _segment_id; + std::shared_ptr _tablet_schema; + size_t _num_short_keys; + SegmentWriterOptions _opts; + + SegmentHeaderPB _header; + SegmentFooterPB _footer; + std::unique_ptr _index_builder; + std::unique_ptr _output_file; + std::vector _column_writers; + uint64_t _row_count = 0; + uint32_t _block_count = 0; +}; + +} +} diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index dcc84e504e7be5..7afeefccdbd262 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -45,6 +45,51 @@ using std::vector; namespace doris { +class RowBlockSorter { +public: + explicit RowBlockSorter(RowBlockAllocator* allocator); + virtual ~RowBlockSorter(); + + bool sort(RowBlock** row_block); + +private: + static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) { + return compare_row(*a, *b) < 0; + } + + RowBlockAllocator* _row_block_allocator; + RowBlock* _swap_row_block; +}; + +class RowBlockMerger { +public: + explicit RowBlockMerger(TabletSharedPtr tablet); + virtual ~RowBlockMerger(); + + bool merge( + const std::vector& row_block_arr, + RowsetWriterSharedPtr rowset_writer, + uint64_t* merged_rows); + +private: + struct MergeElement { + bool operator<(const MergeElement& other) const { + return compare_row(*row_cursor, *other.row_cursor) > 0; + } + + const RowBlock* row_block; + RowCursor* row_cursor; + uint32_t row_block_index; + }; + + bool _make_heap(const std::vector& row_block_arr); + bool _pop_heap(); + + TabletSharedPtr _tablet; + std::priority_queue _heap; +}; + + RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr &base_tablet) { _schema_mapping.resize(tablet_schema.num_columns()); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 6081e6a6889e1b..7aa24f5d68a4c9 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -28,7 +28,6 @@ #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" #include "olap/column_mapping.h" -#include "olap/row.h" namespace doris { // defined in 'field.h' @@ -74,23 +73,6 @@ class RowBlockChanger { DISALLOW_COPY_AND_ASSIGN(RowBlockChanger); }; -class RowBlockAllocator; -class RowBlockSorter { -public: - explicit RowBlockSorter(RowBlockAllocator* allocator); - virtual ~RowBlockSorter(); - - bool sort(RowBlock** row_block); - -private: - static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) { - return compare_row(*a, *b) < 0; - } - - RowBlockAllocator* _row_block_allocator; - RowBlock* _swap_row_block; -}; - class RowBlockAllocator { public: RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation); @@ -107,34 +89,6 @@ class RowBlockAllocator { size_t _memory_limitation; }; -class RowBlockMerger { -public: - explicit RowBlockMerger(TabletSharedPtr tablet); - virtual ~RowBlockMerger(); - - bool merge( - const std::vector& row_block_arr, - RowsetWriterSharedPtr rowset_writer, - uint64_t* merged_rows); - -private: - struct MergeElement { - bool operator<(const MergeElement& other) const { - return compare_row(*row_cursor, *other.row_cursor) > 0; - } - - const RowBlock* row_block; - RowCursor* row_cursor; - uint32_t row_block_index; - }; - - bool _make_heap(const std::vector& row_block_arr); - bool _pop_heap(); - - TabletSharedPtr _tablet; - std::priority_queue _heap; -}; - class SchemaChange { public: SchemaChange() : _filtered_rows(0), _merged_rows(0) {} diff --git a/be/src/olap/short_key_index.h b/be/src/olap/short_key_index.h index e0ebf0ef78a850..4d1bf40c9df086 100644 --- a/be/src/olap/short_key_index.h +++ b/be/src/olap/short_key_index.h @@ -29,6 +29,82 @@ namespace doris { +// In our system, we have more complicated situation. +// First, our keys can be NULL. +// Second, when key columns are not complete we want to distinguish GT and GE. For examle, +// there are two key columns a and b, we have only one condition a > 1. We can only encode +// a prefix key 1, which is less than 1|2. This will make our read more data than +// we actually need. So we want to add more marker. +// a > 1: will be encoded into 1|\xFF +// a >= 1: will be encoded into 1|\x00 +// a = 1 and b > 1: will be encoded into 1|\x02|1 +// a = 1 and b is null: will be encoded into 1|\x01 + +// Used to represent minimal value for that field +constexpr uint8_t KEY_MINIMAL_MARKER = 0x00; +// Used to represent a null field, which value is seemed as minimal than other values +constexpr uint8_t KEY_NULL_FIRST_MARKER = 0x01; +// Used to represent a normal field, which content is encoded after this marker +constexpr uint8_t KEY_NORMAL_MARKER = 0x02; +// Used to represent +constexpr uint8_t KEY_NULL_LAST_MARKER = 0xFE; +// Used to represent maximal value for that field +constexpr uint8_t KEY_MAXIMAL_MARKER = 0xFF; + +// Encode one row into binary. +// We encoded one cell in the format which consists of a marker ahead and encoded content. +// This function will encode first min(row.columns_ids, num_keys) into key. If function +// can't find a column's value in row, this will finish this function. If all num_keys +// columns have been encoded nothing will fill. +template +void encode_key(std::string* buf, const RowType& row, + size_t num_keys, + bool fill_with_minimal) { + for (auto cid = 0; cid < num_keys; cid++) { + auto field = row.schema()->column(cid); + if (field == nullptr) { + if (fill_with_minimal) { + buf->push_back(KEY_MINIMAL_MARKER); + } else { + buf->push_back(KEY_MAXIMAL_MARKER); + } + break; + } + + auto cell = row.cell(cid); + if (cell.is_null()) { + if (null_first) { + buf->push_back(KEY_NULL_FIRST_MARKER); + } else { + buf->push_back(KEY_NULL_LAST_MARKER); + } + continue; + } + buf->push_back(KEY_NORMAL_MARKER); + field->encode_ascending(cell.cell_ptr(), buf); + } +} + +// Client call this function must assure that row contains the first +// num_keys columns. +template +void encode_key(std::string* buf, const RowType& row, size_t num_keys) { + for (auto cid = 0; cid < num_keys; cid++) { + auto field = row.schema()->column(cid); + auto cell = row.cell(cid); + if (cell.is_null()) { + if (null_first) { + buf->push_back(KEY_NULL_FIRST_MARKER); + } else { + buf->push_back(KEY_NULL_LAST_MARKER); + } + continue; + } + buf->push_back(KEY_NORMAL_MARKER); + field->encode_ascending(cell.cell_ptr(), buf); + } +} + // Used to encode a segment short key indices to binary format. This version // only accepts binary key, client should assure that input key is sorted, // otherwise error could happens. This builder would arrange data in following diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 99cd6aa307c94a..17f640cdd56e11 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -43,7 +43,6 @@ #include "olap/tablet.h" #include "olap/olap_meta.h" #include "olap/options.h" -#include "olap/rowset/segment_group.h" #include "olap/tablet_manager.h" #include "olap/txn_manager.h" #include "olap/task/engine_task.h" @@ -53,6 +52,7 @@ namespace doris { class Tablet; class DataDir; class EngineTask; +class SegmentGroup; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index fb34e1c381a8f1..fe26cca358c9cb 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -29,7 +29,6 @@ #include "gen_cpp/olap_file.pb.h" #include "olap/olap_define.h" #include "olap/tuple.h" -#include "olap/row_cursor.h" #include "olap/rowset_graph.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index ab105eed67856f..a258e0f73e4618 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -53,6 +53,7 @@ ADD_BE_TEST(rowset/segment_v2/encoding_info_test) ADD_BE_TEST(rowset/segment_v2/ordinal_page_index_test) ADD_BE_TEST(rowset/segment_v2/rle_page_test) ADD_BE_TEST(rowset/segment_v2/binary_dict_page_test) +ADD_BE_TEST(rowset/segment_v2/segment_test) ADD_BE_TEST(tablet_meta_manager_test) ADD_BE_TEST(tablet_mgr_test) ADD_BE_TEST(rowset/rowset_meta_manager_test) diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp new file mode 100644 index 00000000000000..0ba8668e143c23 --- /dev/null +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_writer.h" +#include "olap/rowset/segment_v2/segment_iterator.h" + +#include +#include + +#include "common/logging.h" +#include "olap/olap_common.h" +#include "olap/row_cursor.h" +#include "olap/tablet_schema.h" +#include "olap/row_block.h" +#include "olap/row_block2.h" +#include "olap/types.h" +#include "util/file_utils.h" + +namespace doris { +namespace segment_v2 { + +class SegmentReaderWriterTest : public testing::Test { +public: + SegmentReaderWriterTest() { } + virtual ~SegmentReaderWriterTest() { + } +}; + +TEST_F(SegmentReaderWriterTest, normal) { + std::shared_ptr tablet_schema(new TabletSchema()); + tablet_schema->_num_columns = 4; + tablet_schema->_num_key_columns = 3; + tablet_schema->_num_short_key_columns = 2; + tablet_schema->_num_rows_per_row_block = 1024; + for (int i = 0; i < tablet_schema->_num_columns; ++i) { + TabletColumn column; + column._type = OLAP_FIELD_TYPE_INT; + column._length = 4; + column._index_length = 4; + column._is_nullable = true; + column._unique_id = i; + if (i < tablet_schema->_num_key_columns) { + column._is_key = true; + } else { + column._is_key = false; + } + + tablet_schema->_cols.emplace_back(column); + } + + // segment write + std::string dname = "./ut_dir/segment_test"; + FileUtils::create_dir(dname); + + SegmentWriterOptions opts; + opts.num_rows_per_block = 1024; + + std::string fname = dname + "/int_case"; + SegmentWriter writer(fname, 0, tablet_schema, opts); + auto st = writer.init(10); + ASSERT_TRUE(st.ok()); + + RowCursor row; + auto olap_st = row.init(*tablet_schema); + ASSERT_EQ(OLAP_SUCCESS, olap_st); + + for (int i = 0; i < 1024; ++i) { + for (int j = 0; j < 4; ++j) { + auto cell = row.cell(j); + if (j > 2 && ((i + j) % 8) == 0) { + cell.set_null(); + } else { + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = i * 10 + j; + } + } + writer.append_row(row); + } + + uint32_t file_size = 0; + st = writer.finalize(&file_size); + ASSERT_TRUE(st.ok()); + // reader + { + std::shared_ptr segment(new Segment(fname, 0, tablet_schema, 1024)); + st = segment->open(); + LOG(INFO) << "segment open, msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(1024, segment->num_rows()); + Schema schema(*tablet_schema); + // scan all rows + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + st = iter->init(read_opts); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + + int left = 1024; + + int rowid = 0; + while (left > 0) { + int rows_read = left > 100 ? 100 : left; + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(rows_read, block.num_rows()); + left -= rows_read; + + for (int j = 0; j < block.schema()->column_ids().size(); ++j) { + auto cid = block.schema()->column_ids()[j]; + auto column_block = block.column_block(j); + for (int i = 0; i < rows_read; ++i) { + int rid = rowid + i; + if (cid > 2 && ((rid + cid) % 8) == 0) { + ASSERT_TRUE(BitmapTest(column_block.null_bitmap(), i)); + } else { + ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); + ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)); + } + } + } + rowid += rows_read; + } + } + // test seek, key + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + // lower bound + StorageReadOptions read_opts; + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 1); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 100; + } + read_opts.include_lower_bound = false; + + // upper bound + read_opts.upper_bound.reset(new RowCursor()); + RowCursor* upper_bound = read_opts.upper_bound.get(); + upper_bound->init(*tablet_schema, 1); + { + auto cell = upper_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 200; + } + read_opts.include_upper_bound = true; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(10, block.num_rows()); + auto column_block = block.column_block(0); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(110 + i * 10, *(int*)column_block.cell_ptr(i)); + } + } + // test seek, key + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + + // lower bound + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 1); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 10250; + } + read_opts.include_lower_bound = false; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(0, block.num_rows()); + } + } +} + +} +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index f7d95c6eaaa22f..f499f7f6bf3ecc 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -72,17 +72,21 @@ message ZoneMapPB { } message ColumnMetaPB { + // column id in table schema + optional uint32 column_id = 1; + // unique column id + optional uint32 unique_id = 2; // this field is FieldType's value - optional int32 type = 1; - optional EncodingTypePB encoding = 2; + optional int32 type = 3; + optional EncodingTypePB encoding = 4; // compress type for column - optional CompressionTypePB compression = 3; + optional CompressionTypePB compression = 5; // if this column can be nullable - optional bool is_nullable = 4; + optional bool is_nullable = 6; // if this column has checksum for each page - optional bool has_checksum = 5; + optional bool has_checksum = 7; // ordinal index page - optional PagePointerPB ordinal_index_page = 6; + optional PagePointerPB ordinal_index_page = 8; // // dictionary page for DICT_ENCODING // optional PagePointerPB dict_page = 2; @@ -135,3 +139,21 @@ message ShortKeyFooterPB { optional uint32 segment_bytes = 7; } +message SegmentHeaderPB { +} + +message SegmentFooterPB { + optional uint32 version = 1 [default = 1]; // file version + repeated ColumnMetaPB columns = 2; // tablet schema + optional uint64 num_rows = 3; // number of values + optional uint64 index_footprint = 4; // total idnex footprint of all columns + optional uint64 data_footprint = 5; // total data footprint of all columns + optional uint64 raw_data_footprint = 6; // raw data footprint + + optional CompressionTypePB compress_type = 7 [default = ZSTB]; // default compression type for file columns + repeated MetadataPairPB file_meta_datas = 8; // meta data of file + + // Short key index's page + optional PagePointerPB short_key_index_page = 9; +} + diff --git a/run-ut.sh b/run-ut.sh index 59fab254b87421..fdf7b32741de64 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -248,6 +248,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_plain_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_reader_writer_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/rle_page_test ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/binary_dict_page_test +${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/segment_test ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test ${DORIS_TEST_BINARY_DIR}/olap/generic_iterators_test From 940831fdace4da65a9aab905680e7550e650542c Mon Sep 17 00:00:00 2001 From: zhaochun Date: Sun, 4 Aug 2019 00:14:20 +0800 Subject: [PATCH 3/5] Change according to comments --- be/src/olap/key_coder.h | 3 + be/src/olap/rowset/segment_v2/segment.cpp | 6 +- .../rowset/segment_v2/segment_iterator.cpp | 15 ++- .../olap/rowset/segment_v2/segment_iterator.h | 1 + be/src/olap/short_key_index.cpp | 1 + be/src/olap/short_key_index.h | 43 +++---- be/test/olap/key_coder_test.cpp | 4 +- .../olap/rowset/segment_v2/segment_test.cpp | 112 +++++++++++------- be/test/olap/short_key_index_test.cpp | 68 ++++++++++- be/test/olap/tablet_schema_helper.h | 54 +++++++++ 10 files changed, 235 insertions(+), 72 deletions(-) create mode 100644 be/test/olap/tablet_schema_helper.h diff --git a/be/src/olap/key_coder.h b/be/src/olap/key_coder.h index bf7fa52562fabf..773c4e9ddb3188 100644 --- a/be/src/olap/key_coder.h +++ b/be/src/olap/key_coder.h @@ -33,6 +33,9 @@ using strings::Substitute; using EncodeAscendingFunc = void (*)(const void* value, size_t index_size, std::string* buf); using DecodeAscendingFunc = Status (*)(Slice* encoded_key, size_t index_size, uint8_t* cell_ptr, Arena* arena); +// Helper class that is used to encode types of value in memory format +// into a sorted binary. For example, this class will encode unsigned +// integer to bit endian format which can compare with memcmp. class KeyCoder { public: template diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index eab1eb101942e8..3a4df5843f6ff0 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -158,9 +158,9 @@ Status Segment::_initial_column_readers() { // with an old schema. So we should create a DefaultValueIterator // for this column. std::unordered_map unique_id_to_ordinal; - for (auto& column_pb : _footer.columns()) { - LOG(INFO) << "column_id=" << column_pb.column_id() << ", unique_id=" << column_pb.unique_id(); - unique_id_to_ordinal.emplace(column_pb.column_id(), column_pb.unique_id()); + for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) { + auto& column_pb = _footer.columns(ordinal); + unique_id_to_ordinal.emplace(column_pb.unique_id(), ordinal); } // TODO(zc): Lazy init()? // There may be too many columns, majority of them would not be used diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 601e25829cb7d0..ea7c3a010186ec 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -75,7 +75,7 @@ Status SegmentIterator::_init_short_key_range() { RETURN_IF_ERROR(_lookup_ordinal( *_opts.upper_bound, !_opts.include_upper_bound, num_rows(), &_upper_rowid)); } - if (_opts.lower_bound != nullptr) { + if (_upper_rowid > 0 && _opts.lower_bound != nullptr) { RETURN_IF_ERROR(_lookup_ordinal( *_opts.lower_bound, _opts.include_lower_bound, _upper_rowid, &_lower_rowid)); } @@ -115,14 +115,17 @@ Status SegmentIterator::_prepare_seek() { } Status SegmentIterator::_init_column_iterators() { + _cur_rowid = _lower_rowid; + if (_cur_rowid >= num_rows()) { + return Status::OK(); + } for (auto cid : _schema.column_ids()) { if (_column_iterators[cid] == nullptr) { RETURN_IF_ERROR(_create_column_iterator(cid, &_column_iterators[cid])); } - _column_iterators[cid]->seek_to_ordinal(_lower_rowid); + _column_iterators[cid]->seek_to_ordinal(_cur_rowid); } - _cur_rowid = _lower_rowid; return Status::OK(); } @@ -152,10 +155,11 @@ int compare_row_with_lhs_columns(const LhsRowType& lhs, const RhsRowType& rhs) { // Otherwise return error. // 1. get [start, end) ordinal through short key index // 2. binary search to find exact ordinal that match the input condition +// Make is_include template to reduce branch Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound, rowid_t* rowid) { std::string index_key; - encode_key(&index_key, key, _segment->num_short_keys(), is_include); + encode_key_with_padding(&index_key, key, _segment->num_short_keys(), is_include); uint32_t start_block_id = 0; auto start_iter = _segment->lower_bound(index_key); @@ -243,6 +247,9 @@ Status SegmentIterator::_next_batch(RowBlockV2* block, size_t* rows_read) { Status SegmentIterator::next_batch(RowBlockV2* block) { size_t rows_to_read = std::min((rowid_t)block->capacity(), _upper_rowid - _cur_rowid); block->resize(rows_to_read); + if (rows_to_read == 0) { + return Status::OK(); + } RETURN_IF_ERROR(_next_batch(block, &rows_to_read)); _cur_rowid += rows_to_read; return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 87054f67729b95..46a1e696b307a9 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -49,6 +49,7 @@ class SegmentIterator : public RowwiseIterator { Status _prepare_seek(); Status _init_column_iterators(); Status _create_column_iterator(uint32_t cid, ColumnIterator** iter); + Status _lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound, rowid_t* rowid); Status _seek_and_peek(rowid_t rowid); diff --git a/be/src/olap/short_key_index.cpp b/be/src/olap/short_key_index.cpp index 69e6c8a4700fec..03d66dffbf2564 100644 --- a/be/src/olap/short_key_index.cpp +++ b/be/src/olap/short_key_index.cpp @@ -47,6 +47,7 @@ Status ShortKeyIndexBuilder::finalize(uint32_t segment_bytes, } put_fixed32_le(&_footer_buf, _footer_buf.size()); + // TODO(zc): checksum uint32_t checksum = 0; put_fixed32_le(&_footer_buf, checksum); diff --git a/be/src/olap/short_key_index.h b/be/src/olap/short_key_index.h index 4d1bf40c9df086..5bc0374898f6ae 100644 --- a/be/src/olap/short_key_index.h +++ b/be/src/olap/short_key_index.h @@ -27,6 +27,8 @@ #include "util/faststring.h" #include "util/slice.h" +#include "util/debug_util.h" + namespace doris { // In our system, we have more complicated situation. @@ -51,19 +53,19 @@ constexpr uint8_t KEY_NULL_LAST_MARKER = 0xFE; // Used to represent maximal value for that field constexpr uint8_t KEY_MAXIMAL_MARKER = 0xFF; -// Encode one row into binary. -// We encoded one cell in the format which consists of a marker ahead and encoded content. -// This function will encode first min(row.columns_ids, num_keys) into key. If function -// can't find a column's value in row, this will finish this function. If all num_keys -// columns have been encoded nothing will fill. +// Encode one row into binary according given num_keys. +// A cell will be encoded in the format of a marker and encoded content. +// When function encoding row, if any cell isn't found in row, this function will +// fill a marker and return. If padding_minimal is true, KEY_MINIMAL_MARKER will +// be added, if padding_minimal is false, KEY_MAXIMAL_MARKER will be added. +// If all num_keys are found in row, no marker will be added. template -void encode_key(std::string* buf, const RowType& row, - size_t num_keys, - bool fill_with_minimal) { +void encode_key_with_padding(std::string* buf, const RowType& row, + size_t num_keys, bool padding_minimal) { for (auto cid = 0; cid < num_keys; cid++) { auto field = row.schema()->column(cid); if (field == nullptr) { - if (fill_with_minimal) { + if (padding_minimal) { buf->push_back(KEY_MINIMAL_MARKER); } else { buf->push_back(KEY_MAXIMAL_MARKER); @@ -85,12 +87,12 @@ void encode_key(std::string* buf, const RowType& row, } } +// Encode one row into binary according given num_keys. // Client call this function must assure that row contains the first // num_keys columns. template void encode_key(std::string* buf, const RowType& row, size_t num_keys) { for (auto cid = 0; cid < num_keys; cid++) { - auto field = row.schema()->column(cid); auto cell = row.cell(cid); if (cell.is_null()) { if (null_first) { @@ -101,7 +103,7 @@ void encode_key(std::string* buf, const RowType& row, size_t num_keys) { continue; } buf->push_back(KEY_NORMAL_MARKER); - field->encode_ascending(cell.cell_ptr(), buf); + row.schema()->column(cid)->encode_ascending(cell.cell_ptr(), buf); } } @@ -122,6 +124,8 @@ void encode_key(std::string* buf, const RowType& row, size_t num_keys) { // ... // builder.add_item(keyN); // builder.finalize(segment_size, num_rows, &slices); +// NOTE: This is used for BetaRowset and is not compatible with AlphaRowset's +// short key index format. // TODO(zc): // 1. If this can leverage binary page to save key and offset data // 2. Extending this to save in a BTree like struct, which can index full key @@ -144,14 +148,12 @@ class ShortKeyIndexBuilder { faststring _key_buf; faststring _offset_buf; std::string _footer_buf; - std::vector _offsets; }; class ShortKeyIndexDecoder; // An Iterator to iterate one short key index. -// Client can use this class to iterator all items -// item in this index. +// Client can use this class to iterator all items in this index. class ShortKeyIndexIterator { public: using iterator_category = std::random_access_iterator_tag; @@ -218,16 +220,15 @@ class ShortKeyIndexDecoder { ShortKeyIndexIterator begin() const { return {this, 0}; } ShortKeyIndexIterator end() const { return {this, num_items()}; } - // lower_bound will return a iterator which locates the first item - // equal with or larger than given key. - // NOTE: This function holds that without common prefix key, the one - // who has more length it the bigger one. Two key is the same only - // when their length are equal + // Return an iterator which locates at the first item who is + // equal with or greater than the given key. + // NOTE: If one key is the prefix of other key, this funciton thinks + // that longer key is greater than the shorter key. ShortKeyIndexIterator lower_bound(const Slice& key) const { return seek(key); } - // Return the iterator which locates the first item larger than the + // Return the iterator which locates the first item greater than the // input key. ShortKeyIndexIterator upper_bound(const Slice& key) const { return seek(key); @@ -256,7 +257,7 @@ class ShortKeyIndexDecoder { private: Slice _data; - // All following fields are only valid after pares has been executed successfully + // All following fields are only valid after parse has been executed successfully segment_v2::ShortKeyFooterPB _footer; std::vector _offsets; Slice _key_data; diff --git a/be/test/olap/key_coder_test.cpp b/be/test/olap/key_coder_test.cpp index 5d241bb0804806..9c6ddd6c34076a 100644 --- a/be/test/olap/key_coder_test.cpp +++ b/be/test/olap/key_coder_test.cpp @@ -77,7 +77,7 @@ void test_integer_encode() { } } - { + for (auto i = 0; i < 100; ++i) { CppType val1 = random(); CppType val2 = random(); @@ -148,7 +148,7 @@ TEST(KeyCoderTest, test_date) { } } - { + for (auto i = 0; i < 100; ++i) { CppType val1 = random(); CppType val2 = random(); diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index 0ba8668e143c23..c5c5e71a2c3d00 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -29,6 +29,7 @@ #include "olap/row_block.h" #include "olap/row_block2.h" #include "olap/types.h" +#include "olap/tablet_schema_helper.h" #include "util/file_utils.h" namespace doris { @@ -42,33 +43,25 @@ class SegmentReaderWriterTest : public testing::Test { }; TEST_F(SegmentReaderWriterTest, normal) { + + size_t num_rows_per_block = 10; + std::shared_ptr tablet_schema(new TabletSchema()); tablet_schema->_num_columns = 4; tablet_schema->_num_key_columns = 3; tablet_schema->_num_short_key_columns = 2; - tablet_schema->_num_rows_per_row_block = 1024; - for (int i = 0; i < tablet_schema->_num_columns; ++i) { - TabletColumn column; - column._type = OLAP_FIELD_TYPE_INT; - column._length = 4; - column._index_length = 4; - column._is_nullable = true; - column._unique_id = i; - if (i < tablet_schema->_num_key_columns) { - column._is_key = true; - } else { - column._is_key = false; - } - - tablet_schema->_cols.emplace_back(column); - } + tablet_schema->_num_rows_per_row_block = num_rows_per_block; + tablet_schema->_cols.push_back(create_int_key(1)); + tablet_schema->_cols.push_back(create_int_key(2)); + tablet_schema->_cols.push_back(create_int_key(3)); + tablet_schema->_cols.push_back(create_int_value(4)); // segment write std::string dname = "./ut_dir/segment_test"; FileUtils::create_dir(dname); SegmentWriterOptions opts; - opts.num_rows_per_block = 1024; + opts.num_rows_per_block = num_rows_per_block; std::string fname = dname + "/int_case"; SegmentWriter writer(fname, 0, tablet_schema, opts); @@ -79,15 +72,14 @@ TEST_F(SegmentReaderWriterTest, normal) { auto olap_st = row.init(*tablet_schema); ASSERT_EQ(OLAP_SUCCESS, olap_st); - for (int i = 0; i < 1024; ++i) { + // 0, 1, 2, 3 + // 10, 11, 12, 13 + // 20, 21, 22, 23 + for (int i = 0; i < 4096; ++i) { for (int j = 0; j < 4; ++j) { auto cell = row.cell(j); - if (j > 2 && ((i + j) % 8) == 0) { - cell.set_null(); - } else { - cell.set_not_null(); - *(int*)cell.mutable_cell_ptr() = i * 10 + j; - } + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = i * 10 + j; } writer.append_row(row); } @@ -97,11 +89,11 @@ TEST_F(SegmentReaderWriterTest, normal) { ASSERT_TRUE(st.ok()); // reader { - std::shared_ptr segment(new Segment(fname, 0, tablet_schema, 1024)); + std::shared_ptr segment(new Segment(fname, 0, tablet_schema, num_rows_per_block)); st = segment->open(); LOG(INFO) << "segment open, msg=" << st.to_string(); ASSERT_TRUE(st.ok()); - ASSERT_EQ(1024, segment->num_rows()); + ASSERT_EQ(4096, segment->num_rows()); Schema schema(*tablet_schema); // scan all rows { @@ -114,13 +106,13 @@ TEST_F(SegmentReaderWriterTest, normal) { ASSERT_TRUE(st.ok()); Arena arena; - RowBlockV2 block(schema, 100, &arena); + RowBlockV2 block(schema, 1024, &arena); - int left = 1024; + int left = 4096; int rowid = 0; while (left > 0) { - int rows_read = left > 100 ? 100 : left; + int rows_read = left > 1024 ? 1024 : left; st = iter->next_batch(&block); ASSERT_TRUE(st.ok()); ASSERT_EQ(rows_read, block.num_rows()); @@ -131,12 +123,8 @@ TEST_F(SegmentReaderWriterTest, normal) { auto column_block = block.column_block(j); for (int i = 0; i < rows_read; ++i) { int rid = rowid + i; - if (cid > 2 && ((rid + cid) % 8) == 0) { - ASSERT_TRUE(BitmapTest(column_block.null_bitmap(), i)); - } else { - ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); - ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)); - } + ASSERT_FALSE(BitmapTest(column_block.null_bitmap(), i)); + ASSERT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i)); } } rowid += rows_read; @@ -152,12 +140,17 @@ TEST_F(SegmentReaderWriterTest, normal) { StorageReadOptions read_opts; read_opts.lower_bound.reset(new RowCursor()); RowCursor* lower_bound = read_opts.lower_bound.get(); - lower_bound->init(*tablet_schema, 1); + lower_bound->init(*tablet_schema, 2); { auto cell = lower_bound->cell(0); cell.set_not_null(); *(int*)cell.mutable_cell_ptr() = 100; } + { + auto cell = lower_bound->cell(1); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = 100; + } read_opts.include_lower_bound = false; // upper bound @@ -179,10 +172,10 @@ TEST_F(SegmentReaderWriterTest, normal) { RowBlockV2 block(schema, 100, &arena); st = iter->next_batch(&block); ASSERT_TRUE(st.ok()); - ASSERT_EQ(10, block.num_rows()); + ASSERT_EQ(11, block.num_rows()); auto column_block = block.column_block(0); - for (int i = 0; i < 10; ++i) { - ASSERT_EQ(110 + i * 10, *(int*)column_block.cell_ptr(i)); + for (int i = 0; i < 11; ++i) { + ASSERT_EQ(100 + i * 10, *(int*)column_block.cell_ptr(i)); } } // test seek, key @@ -200,10 +193,49 @@ TEST_F(SegmentReaderWriterTest, normal) { { auto cell = lower_bound->cell(0); cell.set_not_null(); - *(int*)cell.mutable_cell_ptr() = 10250; + *(int*)cell.mutable_cell_ptr() = 40970; + } + read_opts.include_lower_bound = false; + + st = iter->init(read_opts); + LOG(INFO) << "iterator init msg=" << st.to_string(); + ASSERT_TRUE(st.ok()); + + Arena arena; + RowBlockV2 block(schema, 100, &arena); + st = iter->next_batch(&block); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(0, block.num_rows()); + } + // test seek, key (-2, -1) + { + std::unique_ptr iter; + st = segment->new_iterator(schema, &iter); + ASSERT_TRUE(st.ok()); + + StorageReadOptions read_opts; + + // lower bound + read_opts.lower_bound.reset(new RowCursor()); + RowCursor* lower_bound = read_opts.lower_bound.get(); + lower_bound->init(*tablet_schema, 1); + { + auto cell = lower_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = -2; } read_opts.include_lower_bound = false; + read_opts.upper_bound.reset(new RowCursor()); + RowCursor* upper_bound = read_opts.upper_bound.get(); + upper_bound->init(*tablet_schema, 1); + { + auto cell = upper_bound->cell(0); + cell.set_not_null(); + *(int*)cell.mutable_cell_ptr() = -1; + } + read_opts.include_upper_bound = false; + st = iter->init(read_opts); LOG(INFO) << "iterator init msg=" << st.to_string(); ASSERT_TRUE(st.ok()); diff --git a/be/test/olap/short_key_index_test.cpp b/be/test/olap/short_key_index_test.cpp index eba2ef84e54468..1b825e9320292c 100644 --- a/be/test/olap/short_key_index_test.cpp +++ b/be/test/olap/short_key_index_test.cpp @@ -19,8 +19,9 @@ #include -#include "olap/lru_cache.h" -#include "olap/file_helper.h" +#include "olap/tablet_schema_helper.h" +#include "olap/row_cursor.h" +#include "util/debug_util.h" namespace doris { @@ -88,6 +89,69 @@ TEST_F(ShortKeyIndexTest, buider) { } } + +TEST_F(ShortKeyIndexTest, enocde) { + TabletSchema tablet_schema; + tablet_schema._cols.push_back(create_int_key(0)); + tablet_schema._cols.push_back(create_int_key(1)); + tablet_schema._cols.push_back(create_int_key(2)); + tablet_schema._cols.push_back(create_int_value(3)); + tablet_schema._num_columns = 4; + tablet_schema._num_key_columns = 3; + tablet_schema._num_short_key_columns = 3; + + // test encoding with padding + { + RowCursor row; + row.init(tablet_schema, 2); + + { + // test padding + { + auto cell = row.cell(0); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 12345; + } + { + auto cell = row.cell(1); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 54321; + } + std::string buf; + encode_key_with_padding(&buf, row, 3, true); + // should be \x02\x80\x00\x30\x39\x02\x80\x00\xD4\x31\x00 + ASSERT_STREQ("0280003039028000D43100", hexdump(buf.c_str(), buf.size()).c_str()); + } + // test with null + { + { + auto cell = row.cell(0); + cell.set_is_null(false); + *(int*)cell.mutable_cell_ptr() = 54321; + } + { + auto cell = row.cell(1); + cell.set_is_null(true); + *(int*)cell.mutable_cell_ptr() = 54321; + } + + { + std::string buf; + encode_key_with_padding(&buf, row, 3, false); + // should be \x02\x80\x00\xD4\x31\x01\xff + ASSERT_STREQ("028000D43101FF", hexdump(buf.c_str(), buf.size()).c_str()); + } + // encode key + { + std::string buf; + encode_key(&buf, row, 2); + // should be \x02\x80\x00\xD4\x31\x01 + ASSERT_STREQ("028000D43101", hexdump(buf.c_str(), buf.size()).c_str()); + } + } + } +} + } int main(int argc, char** argv) { diff --git a/be/test/olap/tablet_schema_helper.h b/be/test/olap/tablet_schema_helper.h new file mode 100644 index 00000000000000..01f43bc0eb7a6e --- /dev/null +++ b/be/test/olap/tablet_schema_helper.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include "olap/tablet_schema.h" + +namespace doris { + +TabletColumn create_int_key(int32_t id, bool is_nullable = true) { + TabletColumn column; + column._unique_id = id; + column._col_name = std::to_string(id); + column._type = OLAP_FIELD_TYPE_INT; + column._is_key = true; + column._is_nullable = is_nullable; + column._length = 4; + column._index_length = 4; + return column; +} + + +TabletColumn create_int_value( + int32_t id, + FieldAggregationMethod agg_method = OLAP_FIELD_AGGREGATION_SUM, + bool is_nullable = true) { + TabletColumn column; + column._unique_id = id; + column._col_name = std::to_string(id); + column._type = OLAP_FIELD_TYPE_INT; + column._is_key = false; + column._aggregation = agg_method; + column._is_nullable = is_nullable; + column._length = 4; + column._index_length = 4; + return column; +} + +} From 983ca604c150961cdc0ed2e8aa80de85bf9ee637 Mon Sep 17 00:00:00 2001 From: zhaochun Date: Mon, 5 Aug 2019 15:32:24 +0800 Subject: [PATCH 4/5] Update according to review --- be/src/olap/iterators.h | 2 + be/src/olap/rowset/segment_v2/segment.cpp | 83 ++++++++----------- be/src/olap/rowset/segment_v2/segment.h | 6 +- .../rowset/segment_v2/segment_iterator.cpp | 3 +- .../olap/rowset/segment_v2/segment_writer.cpp | 41 +++------ .../olap/rowset/segment_v2/segment_writer.h | 4 +- gensrc/proto/segment_v2.proto | 5 +- 7 files changed, 53 insertions(+), 91 deletions(-) diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 63fb07eb1299eb..bbb2ce62906b2e 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/status.h" namespace doris { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 3a4df5843f6ff0..28c0b8f372371c 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -50,16 +50,19 @@ Segment::~Segment() { Status Segment::open() { RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file)); RETURN_IF_ERROR(_input_file->size(&_file_size)); - // 24: 2 * magic + 2 * checksum + 2 * footer/header length - if (_file_size < 24) { + + // 24: 1 * magic + 1 * checksum + 1 * footer length + if (_file_size < 12) { return Status::Corruption( Substitute("Bad segment, file size is too small, real=$0 vs need=$1", - _file_size, 24)); + _file_size, 12)); } + + // check header's magic + RETURN_IF_ERROR(_check_magic(0)); + // parse footer to get meta RETURN_IF_ERROR(_parse_footer()); - // parse footer to get meta - RETURN_IF_ERROR(_parse_header()); // parse short key index RETURN_IF_ERROR(_parse_index()); // initial all column reader @@ -72,43 +75,50 @@ Status Segment::new_iterator(const Schema& schema, std::unique_ptrread_at(offset, slice)); if (memcmp(slice.data, k_segment_magic, k_segment_magic_length) != 0) { - LOG(WARNING) << "Magic don't match, magic=" << std::string((char*)buf, 4); - return Status::Corruption("Bad segment, file magic don't match"); + return Status::Corruption( + Substitute("Bad segment, file magic don't match, magic=$0 vs need=$1", + std::string((char*)buf, k_segment_magic_length), k_segment_magic)); } - *length = decode_fixed32_le((uint8_t*)slice.data + 4); return Status::OK(); } Status Segment::_parse_footer() { uint64_t offset = _file_size - 8; - uint32_t footer_length = 0; - RETURN_IF_ERROR(_parse_magic_and_len(offset, &footer_length)); + // read footer's length and checksum + uint8_t buf[8]; + Slice slice(buf, 8); + RETURN_IF_ERROR(_input_file->read_at(offset, slice)); + + uint32_t footer_length = decode_fixed32_le((uint8_t*)slice.data); + uint32_t checksum = decode_fixed32_le((uint8_t*)slice.data + 4); - // check file size footer + checksum - if (offset < footer_length + 4) { - LOG(WARNING) << "Segment file is too small, size=" << _file_size << ", footer_size=" << footer_length; - return Status::Corruption("Bad segment, file size is too small"); + // check file size footer + if (offset < footer_length) { + return Status::Corruption( + Substitute("Bad segment, file size is too small, file_size=$0 vs footer_size=$1", + _file_size, footer_length)); } - offset -= footer_length + 4; + offset -= footer_length; - uint8_t checksum_buf[4]; std::string footer_buf; footer_buf.resize(footer_length); + RETURN_IF_ERROR(_input_file->read_at(offset, footer_buf)); - std::vector slices = {{checksum_buf, 4}, {footer_buf.data(), footer_length}}; - - RETURN_IF_ERROR(_input_file->readv_at(offset, &slices[0], 2)); + // TODO(zc): check footer's checksum + if (checksum != 0) { + return Status::Corruption( + Substitute("Bad segment, segment footer checksum not match, real=$0 vs expect=$1", + 0, checksum)); + } - // TOOD(zc): check footer's checksum if (!_footer.ParseFromString(footer_buf)) { return Status::Corruption("Bad segment, parse footer from PB failed"); } @@ -116,29 +126,6 @@ Status Segment::_parse_footer() { return Status::OK(); } -Status Segment::_parse_header() { - uint64_t offset = 0; - uint32_t header_length = 0; - RETURN_IF_ERROR(_parse_magic_and_len(offset, &header_length)); - - offset += 8; - if ((offset + header_length + 4) > _file_size) { - LOG(WARNING) << "Segment file is too small, size=" << _file_size << ", header_size=" << header_length; - return Status::Corruption("Bad segment, file too small"); - } - std::string header_buf; - header_buf.resize(header_length); - uint8_t checksum_buf[4]; - std::vector slices{{header_buf.data(), header_length}, {checksum_buf, 4}}; - RETURN_IF_ERROR(_input_file->readv_at(offset, &slices[0], 2)); - - // TODO(zc): check checksum - if (!_header.ParseFromString(header_buf)) { - return Status::Corruption("Bad segment, parse header from PB failed"); - } - return Status::OK(); -} - // load and parse short key index Status Segment::_parse_index() { // read short key index content diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index d76c48a386a665..e69a10e7513ad0 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -23,7 +23,7 @@ #include #include "common/status.h" // Status -#include "gen_cpp/segment_v2.pb.h" // SegmentHeaderPB +#include "gen_cpp/segment_v2.pb.h" #include "olap/rowset/segment_v2/common.h" // rowid_t #include "olap/short_key_index.h" #include "olap/tablet_schema.h" @@ -74,9 +74,8 @@ class Segment : public std::enable_shared_from_this { uint32_t num_rows_per_block() const { return _num_rows_per_block; } size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); } - Status _parse_magic_and_len(uint64_t offset, uint32_t* length); + Status _check_magic(uint64_t offset); Status _parse_footer(); - Status _parse_header(); Status _parse_index(); Status _initial_column_readers(); @@ -101,7 +100,6 @@ class Segment : public std::enable_shared_from_this { std::shared_ptr _tablet_schema; uint32_t _num_rows_per_block; - SegmentHeaderPB _header; SegmentFooterPB _footer; std::unique_ptr _input_file; uint64_t _file_size = 0; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index ea7c3a010186ec..8ad70a1d525c34 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -130,8 +130,7 @@ Status SegmentIterator::_init_column_iterators() { } Status SegmentIterator::_create_column_iterator(uint32_t cid, ColumnIterator** iter) { - _segment->new_column_iterator(cid, iter); - return Status::OK(); + return _segment->new_column_iterator(cid, iter); } // Schema of lhs and rhs are different. diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index ff289cfc5a6047..c3975732c30e55 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -97,7 +97,7 @@ Status SegmentWriter::finalize(uint32_t* segment_file_size) { for (auto column_writer : _column_writers) { RETURN_IF_ERROR(column_writer->finish()); } - RETURN_IF_ERROR(_write_header()); + RETURN_IF_ERROR(_write_raw_data({k_segment_magic})); RETURN_IF_ERROR(_write_data()); RETURN_IF_ERROR(_write_ordinal_index()); RETURN_IF_ERROR(_write_short_key_index()); @@ -105,28 +105,6 @@ Status SegmentWriter::finalize(uint32_t* segment_file_size) { return Status::OK(); } -// write header -Status SegmentWriter::_write_header() { - std::string header_buf; - SegmentHeaderPB header; - if (!header.SerializeToString(&header_buf)) { - return Status::InternalError("failed to serialize segment header"); - } - - // header length - std::string magic_and_len_buf; - magic_and_len_buf.append(k_segment_magic); - put_fixed32_le(&magic_and_len_buf, header_buf.size()); - - // checksum - uint8_t checksum_buf[4]; - // TODO(zc): add checksum - - std::vector slices = {magic_and_len_buf, header_buf, {checksum_buf, 4}}; - RETURN_IF_ERROR(_write_raw_data(slices)); - return Status::OK(); -} - // write column data to file one by one Status SegmentWriter::_write_data() { for (auto column_writer : _column_writers) { @@ -169,16 +147,19 @@ Status SegmentWriter::_write_footer() { if (!_footer.SerializeToString(&footer_buf)) { return Status::InternalError("failed to serialize segment footer"); } - std::string magic_and_len_buf; - magic_and_len_buf.append(k_segment_magic, k_segment_magic_length); - put_fixed32_le(&magic_and_len_buf, footer_buf.size()); - char checksum_buf[4]; - // TODO(zc): compute checsum - std::vector slices{{checksum_buf, 4}, footer_buf, magic_and_len_buf}; + std::string footer_info_buf; + // put footer's size + put_fixed32_le(&footer_info_buf, footer_buf.size()); + // TODO(zc): compute checksum for footer + uint32_t checksum = 0; + put_fixed32_le(&footer_info_buf, checksum); + + // I think we don't need to put a tail magic. + + std::vector slices{footer_buf, footer_info_buf}; // write offset and length RETURN_IF_ERROR(_write_raw_data(slices)); - // magic return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 48403d6d736009..a6a1bd8a26b20a 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -24,7 +24,7 @@ #include "common/logging.h" // LOG #include "common/status.h" // Status -#include "gen_cpp/segment_v2.pb.h" // SegmentHeaderPB +#include "gen_cpp/segment_v2.pb.h" #include "olap/schema.h" namespace doris { @@ -63,7 +63,6 @@ class SegmentWriter { Status finalize(uint32_t* segment_file_size); private: - Status _write_header(); Status _write_data(); Status _write_ordinal_index(); Status _write_short_key_index(); @@ -77,7 +76,6 @@ class SegmentWriter { size_t _num_short_keys; SegmentWriterOptions _opts; - SegmentHeaderPB _header; SegmentFooterPB _footer; std::unique_ptr _index_builder; std::unique_ptr _output_file; diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index f499f7f6bf3ecc..edc7b23ff832d5 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -139,9 +139,6 @@ message ShortKeyFooterPB { optional uint32 segment_bytes = 7; } -message SegmentHeaderPB { -} - message SegmentFooterPB { optional uint32 version = 1 [default = 1]; // file version repeated ColumnMetaPB columns = 2; // tablet schema @@ -150,7 +147,7 @@ message SegmentFooterPB { optional uint64 data_footprint = 5; // total data footprint of all columns optional uint64 raw_data_footprint = 6; // raw data footprint - optional CompressionTypePB compress_type = 7 [default = ZSTB]; // default compression type for file columns + optional CompressionTypePB compress_type = 7 [default = LZ4]; // default compression type for file columns repeated MetadataPairPB file_meta_datas = 8; // meta data of file // Short key index's page From 4984b432d28de2db021aee87abcc25fb87c2bf78 Mon Sep 17 00:00:00 2001 From: zhaochun Date: Tue, 6 Aug 2019 09:59:12 +0800 Subject: [PATCH 5/5] Update according to review --- be/src/olap/storage_engine.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 17f640cdd56e11..f3718028834a38 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -52,7 +52,6 @@ namespace doris { class Tablet; class DataDir; class EngineTask; -class SegmentGroup; // StorageEngine singleton to manage all Table pointers. // Providing add/drop/get operations. @@ -299,7 +298,6 @@ class StorageEngine { static StorageEngine* _s_instance; - std::unordered_map> _gc_files; std::unordered_map _unused_rowsets; Mutex _gc_mutex;