Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/exprs/anyval_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ FunctionContext::TypeDesc AnyValUtil::column_type_to_type_desc(const TypeDescrip
out.children.push_back(column_type_to_type_desc(t));
}
break;
case TYPE_MAP:
out.type = FunctionContext::TYPE_MAP;
for (const auto& t : type.children) {
out.children.push_back(column_type_to_type_desc(t));
}
break;
case TYPE_STRING:
out.type = FunctionContext::TYPE_STRING;
out.len = type.len;
Expand Down
33 changes: 30 additions & 3 deletions be/src/olap/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/types.h"
#include "olap/utils.h"
#include "runtime/collection_value.h"
#include "runtime/map_value.h"
#include "runtime/mem_pool.h"
#include "util/hash_util.hpp"
#include "util/mem_util.hpp"
Expand Down Expand Up @@ -455,10 +456,9 @@ uint32_t Field::hash_code(const CellType& cell, uint32_t seed) const {
return _type_info->hash_code(cell.cell_ptr(), seed);
}

class StructField : public Field {
class MapField : public Field {
public:
explicit StructField(const TabletColumn& column) : Field(column) {}

explicit MapField(const TabletColumn& column) : Field(column) {}
void consume(RowCursorCell* dst, const char* src, bool src_null, MemPool* mem_pool,
ObjectPool* agg_pool) const override {
dst->set_is_null(src_null);
Expand All @@ -467,7 +467,17 @@ class StructField : public Field {
}
_type_info->deep_copy(dst->mutable_cell_ptr(), src, mem_pool);
}
// make variable_ptr memory allocate to cell_ptr as MapValue
char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
return variable_ptr + _length;
}

size_t get_variable_len() const override { return _length; }
};

class StructField : public Field {
public:
explicit StructField(const TabletColumn& column) : Field(column) {}
char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
auto struct_v = (StructValue*)cell_ptr;
struct_v->set_values(reinterpret_cast<void**>(variable_ptr));
Expand Down Expand Up @@ -798,6 +808,14 @@ class FieldFactory {
local->add_sub_field(std::move(item_field));
return local;
}
case OLAP_FIELD_TYPE_MAP: {
std::unique_ptr<Field> key_field(FieldFactory::create(column.get_sub_column(0)));
std::unique_ptr<Field> val_field(FieldFactory::create(column.get_sub_column(1)));
auto* local = new MapField(column);
local->add_sub_field(std::move(key_field));
local->add_sub_field(std::move(val_field));
return local;
}
case OLAP_FIELD_TYPE_DECIMAL:
[[fallthrough]];
case OLAP_FIELD_TYPE_DECIMAL32:
Expand Down Expand Up @@ -847,6 +865,15 @@ class FieldFactory {
local->add_sub_field(std::move(item_field));
return local;
}
case OLAP_FIELD_TYPE_MAP: {
DCHECK(column.get_subtype_count() == 2);
auto* local = new MapField(column);
std::unique_ptr<Field> key_field(FieldFactory::create(column.get_sub_column(0)));
std::unique_ptr<Field> value_field(FieldFactory::create(column.get_sub_column(1)));
local->add_sub_field(std::move(key_field));
local->add_sub_field(std::move(value_field));
return local;
}
case OLAP_FIELD_TYPE_DECIMAL:
[[fallthrough]];
case OLAP_FIELD_TYPE_DECIMAL32:
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle,
void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle,
segment_v2::PageTypePB page_type, bool in_memory) {
auto deleter = [](const doris::CacheKey& key, void* value) { delete[] (uint8_t*)value; };

CachePriority priority = CachePriority::NORMAL;
if (in_memory) {
priority = CachePriority::DURABLE;
Expand Down
105 changes: 105 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "util/rle_encoding.h" // for RleDecoder
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_struct.h"
#include "vec/core/types.h"
#include "vec/runtime/vdatetime_value.h" //for VecDateTime
Expand Down Expand Up @@ -101,6 +102,32 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
*reader = std::move(array_reader);
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
// map reader now has 3 sub readers for key(arr), value(arr), null(scala)
std::unique_ptr<ColumnReader> map_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
std::unique_ptr<ColumnReader> key_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0), num_rows,
file_reader, &key_reader));
std::unique_ptr<ColumnReader> val_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1), num_rows,
file_reader, &val_reader));
std::unique_ptr<ColumnReader> null_reader;
if (meta.is_nullable()) {
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
meta.children_columns(2).num_rows(),
file_reader, &null_reader));
}
map_reader->_sub_readers.resize(meta.children_columns_size());

map_reader->_sub_readers[0] = std::move(key_reader);
map_reader->_sub_readers[1] = std::move(val_reader);
if (meta.is_nullable()) {
map_reader->_sub_readers[2] = std::move(null_reader);
}
*reader = std::move(map_reader);
return Status::OK();
}
default:
return Status::NotSupported("unsupported type for ColumnReader: {}",
std::to_string(type));
Expand Down Expand Up @@ -485,13 +512,91 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
null_iterator);
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
ColumnIterator* key_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
ColumnIterator* val_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
}
*iterator = new MapFileColumnIterator(this, null_iterator, key_iterator, val_iterator);
return Status::OK();
}
default:
return Status::NotSupported("unsupported type to create iterator: {}",
std::to_string(type));
}
}
}

///====================== MapFileColumnIterator ============================////
MapFileColumnIterator::MapFileColumnIterator(ColumnReader* reader, ColumnIterator* null_iterator,
ColumnIterator* key_iterator,
ColumnIterator* val_iterator)
: _map_reader(reader) {
_key_iterator.reset(key_iterator);
_val_iterator.reset(val_iterator);
if (_map_reader->is_nullable()) {
_null_iterator.reset(null_iterator);
}
}

Status MapFileColumnIterator::init(const ColumnIteratorOptions& opts) {
RETURN_IF_ERROR(_key_iterator->init(opts));
RETURN_IF_ERROR(_val_iterator->init(opts));
if (_map_reader->is_nullable()) {
RETURN_IF_ERROR(_null_iterator->init(opts));
}
return Status::OK();
}

Status MapFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) {
return Status::NotSupported("Not support next_batch");
}

Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) {
RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(ord));
RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(ord));
if (_map_reader->is_nullable()) {
RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord));
}
return Status::OK();
}

Status MapFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
const auto* column_map = vectorized::check_and_get_column<vectorized::ColumnMap>(
dst->is_nullable() ? static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()
: *dst);
size_t num_read = *n;
auto column_key_ptr = column_map->get_keys().assume_mutable();
auto column_val_ptr = column_map->get_values().assume_mutable();
RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, column_key_ptr, has_null));
RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, column_val_ptr, has_null));

if (dst->is_nullable()) {
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
bool null_signs_has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
DCHECK(num_read == *n);
}
return Status::OK();
}

Status MapFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
for (size_t i = 0; i < count; ++i) {
RETURN_IF_ERROR(seek_to_ordinal(rowids[i]));
size_t num_read = 1;
RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr));
DCHECK(num_read == 1);
}
return Status::OK();
}

////////////////////////////////////////////////////////////////////////////////

StructFileColumnIterator::StructFileColumnIterator(
Expand Down
35 changes: 35 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,41 @@ class EmptyFileColumnIterator final : public ColumnIterator {
ordinal_t get_current_ordinal() const override { return 0; }
};

// This iterator is used to read map value column
class MapFileColumnIterator final : public ColumnIterator {
public:
explicit MapFileColumnIterator(ColumnReader* reader, ColumnIterator* null_iterator,
ColumnIterator* key_iterator, ColumnIterator* val_iterator);

~MapFileColumnIterator() override = default;

Status init(const ColumnIteratorOptions& opts) override;

Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) override;

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;

Status seek_to_first() override {
RETURN_IF_ERROR(_key_iterator->seek_to_first());
RETURN_IF_ERROR(_val_iterator->seek_to_first());
RETURN_IF_ERROR(_null_iterator->seek_to_first());
return Status::OK();
}

Status seek_to_ordinal(ordinal_t ord) override;

ordinal_t get_current_ordinal() const override { return _key_iterator->get_current_ordinal(); }

private:
ColumnReader* _map_reader;
std::unique_ptr<ColumnIterator> _null_iterator;
std::unique_ptr<ColumnIterator> _key_iterator; // ArrayFileColumnIterator
std::unique_ptr<ColumnIterator> _val_iterator; // ArrayFileColumnIterator
};

class StructFileColumnIterator final : public ColumnIterator {
public:
explicit StructFileColumnIterator(ColumnReader* reader, ColumnIterator* null_iterator,
Expand Down
Loading