Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,17 +1000,22 @@ Status MapColumnWriter::finish() {
return Status::OK();
}

// todo. make keys and values write
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
}
RETURN_IF_ERROR(append_data(ptr, num_rows));
return Status::OK();
}

Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
auto kv_ptr = reinterpret_cast<const uint64_t*>(*ptr);
for (size_t i = 0; i < 2; ++i) {
auto data = *(kv_ptr + i);
const uint8_t* val_ptr = (const uint8_t*)data;
RETURN_IF_ERROR(_kv_writers[i]->append_data(&val_ptr, num_rows));
}
if (is_nullable()) {
return write_null_column(num_rows, false);
}
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
Status init() override;

Status append_data(const uint8_t** ptr, size_t num_rows) override;

Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override;
uint64_t estimate_buffer_size() override;

Status finish() override;
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo
break;
case TYPE_MAP:
DCHECK(col_desc.children.size() == 2);
// todo. (Amory) Support Map contains_nulls in FE MapType.java Later PR
nested = std::make_shared<vectorized::DataTypeMap>(
create_data_type(col_desc.children[0], col_desc.contains_nulls[0]),
create_data_type(col_desc.children[1], col_desc.contains_nulls[1]));
create_data_type(col_desc.children[0], true),
create_data_type(col_desc.children[1], true));
break;
case TYPE_STRUCT: {
DCHECK(col_desc.children.size() >= 1);
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/data_types/data_type_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class DataTypeFactory {
return entity.second;
}
}
if (type_ptr->get_type_id() == TypeIndex::Struct) {
if (type_ptr->get_type_id() == TypeIndex::Struct ||
type_ptr->get_type_id() == TypeIndex::Map) {
DataTypeFactory::instance().register_data_type(type_ptr->get_name(), type_ptr);
for (const auto& entity : _invert_data_type_map) {
if (entity.first->equals(*type_ptr)) {
Expand Down
171 changes: 126 additions & 45 deletions be/src/vec/data_types/data_type_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
#include "gen_cpp/data.pb.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"

namespace doris::vectorized {

DataTypeMap::DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_) {
key_type = keys_;
value_type = values_;
key_type = make_nullable(keys_);
value_type = make_nullable(values_);

keys = std::make_shared<DataTypeArray>(key_type);
values = std::make_shared<DataTypeArray>(value_type);
Expand All @@ -53,15 +55,15 @@ std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) const
ss << ", ";
}
if (nested_keys_column.is_null_at(i)) {
ss << "NULL";
ss << "null";
} else if (WhichDataType(remove_nullable(key_type)).is_string_or_fixed_string()) {
ss << "'" << key_type->to_string(nested_keys_column, i) << "'";
} else {
ss << key_type->to_string(nested_keys_column, i);
}
ss << ":";
if (nested_values_column.is_null_at(i)) {
ss << "NULL";
ss << "null";
} else if (WhichDataType(remove_nullable(value_type)).is_string_or_fixed_string()) {
ss << "'" << value_type->to_string(nested_values_column, i) << "'";
} else {
Expand All @@ -78,6 +80,85 @@ void DataTypeMap::to_string(const class doris::vectorized::IColumn& column, size
ostr.write(ss.c_str(), strlen(ss.c_str()));
}

bool next_slot_from_string(ReadBuffer& rb, StringRef& output, bool& has_quota) {
StringRef element(rb.position(), 0);
has_quota = false;
if (rb.eof()) {
return false;
}

// ltrim
while (!rb.eof() && isspace(*rb.position())) {
++rb.position();
element.data = rb.position();
}

// parse string
if (*rb.position() == '"' || *rb.position() == '\'') {
const char str_sep = *rb.position();
size_t str_len = 1;
// search until next '"' or '\''
while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) {
++str_len;
}
// invalid string
if (str_len >= rb.count()) {
rb.position() = rb.end();
return false;
}
has_quota = true;
rb.position() += str_len + 1;
element.size += str_len + 1;
}

// parse array element until map separator ':' or ',' or end '}'
while (!rb.eof() && (*rb.position() != ':') && (*rb.position() != ',') &&
(rb.count() != 1 || *rb.position() != '}')) {
if (has_quota && !isspace(*rb.position())) {
return false;
}
++rb.position();
++element.size;
}
// invalid array element
if (rb.eof()) {
return false;
}
// adjust read buffer position to first char of next array element
++rb.position();

// rtrim
while (element.size > 0 && isspace(element.data[element.size - 1])) {
--element.size;
}

// trim '"' and '\'' for string
if (element.size >= 2 && (element.data[0] == '"' || element.data[0] == '\'') &&
element.data[0] == element.data[element.size - 1]) {
++element.data;
element.size -= 2;
}
output = element;
return true;
}

bool is_empty_null_element(StringRef element, IColumn* nested_column, bool has_quota) {
auto& nested_null_col = reinterpret_cast<ColumnNullable&>(*nested_column);
// handle empty element
if (element.size == 0) {
nested_null_col.get_nested_column().insert_default();
nested_null_col.get_null_map_data().push_back(0);
return true;
}

// handle null element
if (!has_quota && element.size == 4 && strncmp(element.data, "null", 4) == 0) {
nested_null_col.get_nested_column().insert_default();
nested_null_col.get_null_map_data().push_back(1);
return true;
}
return false;
}
Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
DCHECK(!rb.eof());
auto* map_column = assert_cast<ColumnMap*>(column);
Expand All @@ -91,57 +172,57 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
*(rb.end() - 1));
}

std::stringstream keyCharset;
std::stringstream valCharset;

if (rb.count() == 2) {
// empty map {} , need to make empty array to add offset
keyCharset << "[]";
valCharset << "[]";
map_column->insert_default();
} else {
// {"aaa": 1, "bbb": 20}, need to handle key and value to make key column arr and value arr
// {"aaa": 1, "bbb": 20}, need to handle key slot and value slot to make key column arr and value arr
// skip "{"
++rb.position();
keyCharset << "[";
valCharset << "[";
auto& keys_arr = reinterpret_cast<ColumnArray&>(map_column->get_keys());
ColumnArray::Offsets64& key_off = keys_arr.get_offsets();
auto& values_arr = reinterpret_cast<ColumnArray&>(map_column->get_values());
ColumnArray::Offsets64& val_off = values_arr.get_offsets();

IColumn& nested_key_column = keys_arr.get_data();
DCHECK(nested_key_column.is_nullable());
IColumn& nested_val_column = values_arr.get_data();
DCHECK(nested_val_column.is_nullable());

size_t element_num = 0;
while (!rb.eof()) {
size_t kv_len = 0;
auto start = rb.position();
while (!rb.eof() && *start != ',' && *start != '}') {
kv_len++;
start++;
StringRef key_element(rb.position(), rb.count());
bool has_quota = false;
if (!next_slot_from_string(rb, key_element, has_quota)) {
return Status::InvalidArgument("Cannot read map key from text '{}'",
key_element.to_string());
}
if (kv_len >= rb.count()) {
return Status::InvalidArgument("Invalid Length");
if (!is_empty_null_element(key_element, &nested_key_column, has_quota)) {
ReadBuffer krb(const_cast<char*>(key_element.data), key_element.size);
if (auto st = key_type->from_string(krb, &nested_key_column); !st.ok()) {
map_column->pop_back(element_num);
return st;
}
}

size_t k_len = 0;
auto k_rb = rb.position();
while (kv_len > 0 && *k_rb != ':') {
k_len++;
k_rb++;
has_quota = false;
StringRef value_element(rb.position(), rb.count());
if (!next_slot_from_string(rb, value_element, has_quota)) {
return Status::InvalidArgument("Cannot read map value from text '{}'",
value_element.to_string());
}
ReadBuffer key_rb(rb.position(), k_len);
ReadBuffer val_rb(k_rb + 1, kv_len - k_len - 1);

// handle key
keyCharset << key_rb.to_string();
keyCharset << ",";

// handle value
valCharset << val_rb.to_string();
valCharset << ",";

rb.position() += kv_len + 1;
if (!is_empty_null_element(value_element, &nested_val_column, has_quota)) {
ReadBuffer vrb(const_cast<char*>(value_element.data), value_element.size);
if (auto st = value_type->from_string(vrb, &nested_val_column); !st.ok()) {
map_column->pop_back(element_num);
return st;
}
}
++element_num;
}
keyCharset << ']';
valCharset << ']';
key_off.push_back(key_off.back() + element_num);
val_off.push_back(val_off.back() + element_num);
}

ReadBuffer kb(keyCharset.str().data(), keyCharset.str().length());
ReadBuffer vb(valCharset.str().data(), valCharset.str().length());
keys->from_string(kb, &map_column->get_keys());
values->from_string(vb, &map_column->get_values());
return Status::OK();
}

Expand Down Expand Up @@ -199,4 +280,4 @@ const char* DataTypeMap::deserialize(const char* buf, IColumn* column, int data_
data_version);
}

} // namespace doris::vectorized
} // namespace doris::vectorized
9 changes: 9 additions & 0 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ FunctionContext::TypeDesc VExpr::column_type_to_type_desc(const TypeDescriptor&
out.children.push_back(VExpr::column_type_to_type_desc(t));
}
break;
case TYPE_MAP:
CHECK(type.children.size() == 2);
// only support map key is scalar
CHECK(!type.children[0].is_complex_type());
out.type = FunctionContext::TYPE_MAP;
for (const auto& t : type.children) {
out.children.push_back(VExpr::column_type_to_type_desc(t));
}
break;
case TYPE_STRING:
out.type = FunctionContext::TYPE_STRING;
out.len = type.len;
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/functions/function_cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,16 @@ class FunctionCast final : public IFunctionBase {
return &ConvertImplGenericToJsonb::execute;
}
}

WrapperType create_map_wrapper(const DataTypePtr& from_type, const DataTypeMap& to_type) const {
switch (from_type->get_type_id()) {
case TypeIndex::String:
return &ConvertImplGenericFromString<ColumnString>::execute;
default:
return create_unsupport_wrapper(from_type->get_name(), to_type.get_name());
}
}

// check struct value type and get to_type value
// TODO: need handle another type to cast struct
WrapperType create_struct_wrapper(const DataTypePtr& from_type,
Expand Down Expand Up @@ -1727,6 +1737,8 @@ class FunctionCast final : public IFunctionBase {
static_cast<const DataTypeArray&>(*to_type));
case TypeIndex::Struct:
return create_struct_wrapper(from_type, static_cast<const DataTypeStruct&>(*to_type));
case TypeIndex::Map:
return create_map_wrapper(from_type, static_cast<const DataTypeMap&>(*to_type));
default:
break;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/olap/olap_data_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ class OlapBlockDataConvertor {

Status convert_to_olap() override;
const void* get_data() const override { return _results.data(); };

const void* get_data_at(size_t offset) const override {
LOG(FATAL) << "now not support get_data_at for OlapColumnDataConvertorMap";
};
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/runtime/vfile_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ Status VFileResultWriter::_write_csv_file(const Block& block) {
_plain_text_outstream << col.type->to_string(*col.column, i);
break;
}
case TYPE_MAP: {
_plain_text_outstream << col.type->to_string(*col.column, i);
break;
}
default: {
// not supported type, like BITMAP, just export null
_plain_text_outstream << NULL_IN_CSV;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ Status VMysqlResultWriter<is_binary_format>::_add_one_column(
return Status::InternalError("pack mysql buffer failed.");
}

if constexpr (is_nullable) {
if (column_ptr->is_null_at(i)) {
buf_ret = rows_buffer[i].push_null();
continue;
}
}
rows_buffer[i].open_dynamic_mode();
std::string cell_str = map_type.to_string(*column, i);
buf_ret = rows_buffer[i].push_string(cell_str.c_str(), strlen(cell_str.c_str()));
Expand Down
8 changes: 8 additions & 0 deletions regression-test/data/export/test_map_export.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 \N
2 {}
3 {' 33,amory ':2, ' bet ':20, ' cler ':26}
4 {'k3':23, null:20, 'k4':null}
5 {null:null}

8 changes: 8 additions & 0 deletions regression-test/data/load/insert/test_map_dml.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 {' amory ':6, 'happy':38}
6 {'amory':6, 'is':38, 'cl':0}

-- !select --
100 {1:'1', 2:'2', 3:'3'} {32767:'32767', 32768:'32768', 32769:'32769'} [65534, 65535, 65536] {2022-07-13:1} {2022-07-13 12:30:00:'2022-07-13 12:30:00'} {0.33:33, 0.67:67}

15 changes: 15 additions & 0 deletions regression-test/data/load_p0/stream_load/test_map.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
1 \N
2 {" 11amory ":23, "beat":20, " clever ": 66}
3 {"k1": 31, "k2": 300}
4 {}
5 \N
6 {"k1":41, "k2": 400}
7 {" 33,amory ":2, " bet ":20, " cler ": 26}
8 {}
9 {' 1,amy ':2, " k2 ":90, " k7 ": 33}
10 {}
11 {"k1': 4, "k2": 400}
12 {"k3":23, null: 20, "k4": null}
13 {"null":1}
15 {:2, "k2":}
16 {null:null}
Loading