Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ set(VEC_FILES
functions/array/function_array_apply.cpp
functions/array/function_array_concat.cpp
functions/array/function_array_pushfront.cpp
functions/function_map.cpp
exprs/table_function/vexplode_json_array.cpp
functions/math.cpp
functions/function_bitmap.cpp
Expand Down
17 changes: 14 additions & 3 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,26 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
const IColumn& get_keys() const { return *keys_column; }
IColumn& get_keys() { return *keys_column; }

const ColumnPtr get_keys_array_ptr() const {
return ColumnArray::create(keys_column, offsets_column);
}
ColumnPtr get_keys_array_ptr() { return ColumnArray::create(keys_column, offsets_column); }

const ColumnPtr& get_values_ptr() const { return values_column; }
ColumnPtr& get_values_ptr() { return values_column; }

const IColumn& get_values() const { return *values_column; }
IColumn& get_values() { return *values_column; }

const ColumnPtr get_values_array_ptr() const {
return ColumnArray::create(values_column, offsets_column);
}
ColumnPtr get_values_array_ptr() { return ColumnArray::create(values_column, offsets_column); }

size_t ALWAYS_INLINE size_at(ssize_t i) const {
return get_offsets()[i] - get_offsets()[i - 1];
}

private:
friend class COWHelper<IColumn, ColumnMap>;

Expand All @@ -160,9 +174,6 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
WrappedPtr offsets_column; // offset

size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 1]; }
size_t ALWAYS_INLINE size_at(ssize_t i) const {
return get_offsets()[i] - get_offsets()[i - 1];
}

ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, MutableColumnPtr&& offsets);

Expand Down
330 changes: 330 additions & 0 deletions be/src/vec/functions/function_map.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/columns/column_array.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_map.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/functions/array/function_array_index.h"
#include "vec/functions/function.h"
#include "vec/functions/function_helpers.h"
#include "vec/functions/simple_function_factory.h"

namespace doris::vectorized {

// construct a map
// map(key1, value2, key2, value2) -> {key1: value2, key2: value2}
class FunctionMap : public IFunction {
public:
static constexpr auto name = "map";
static FunctionPtr create() { return std::make_shared<FunctionMap>(); }

/// Get function name.
String get_name() const override { return name; }

bool is_variadic() const override { return true; }

bool use_default_implementation_for_nulls() const override { return false; }

size_t get_number_of_arguments() const override { return 0; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DCHECK(arguments.size() % 2 == 0)
<< "function: " << get_name() << ", arguments should not be even number";

DataTypes key_types;
DataTypes val_types;
for (size_t i = 0; i < arguments.size(); i += 2) {
key_types.push_back(arguments[i]);
val_types.push_back(arguments[i + 1]);
}

DataTypePtr key_type;
DataTypePtr val_type;
get_least_supertype(key_types, &key_type);
get_least_supertype(val_types, &val_type);

return std::make_shared<DataTypeMap>(make_nullable(key_type), make_nullable(val_type));
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
DCHECK(arguments.size() % 2 == 0)
<< "function: " << get_name() << ", arguments should not be even number";

size_t num_element = arguments.size();

auto result_col = block.get_by_position(result).type->create_column();
auto map_column = typeid_cast<ColumnMap*>(result_col.get());
if (!map_column) {
return Status::RuntimeError("unsupported types for function {} return {}", get_name(),
block.get_by_position(result).type->get_name());
}

// map keys column
auto& result_col_map_keys_data = map_column->get_keys();
result_col_map_keys_data.reserve(input_rows_count * num_element / 2);
// map values column
auto& result_col_map_vals_data = map_column->get_values();
result_col_map_vals_data.reserve(input_rows_count * num_element / 2);
// map offsets column
auto& result_col_map_offsets = map_column->get_offsets();
result_col_map_offsets.resize(input_rows_count);

// convert to nullable column
for (size_t i = 0; i < num_element; ++i) {
auto& col = block.get_by_position(arguments[i]).column;
col = col->convert_to_full_column_if_const();
bool is_nullable = i % 2 == 0 ? result_col_map_keys_data.is_nullable()
: result_col_map_vals_data.is_nullable();
if (is_nullable && !col->is_nullable()) {
col = ColumnNullable::create(col, ColumnUInt8::create(col->size(), 0));
}
}

// insert value into array
ColumnArray::Offset64 offset = 0;
for (size_t row = 0; row < input_rows_count; ++row) {
for (size_t i = 0; i < num_element; i += 2) {
result_col_map_keys_data.insert_from(*block.get_by_position(arguments[i]).column,
row);
result_col_map_vals_data.insert_from(
*block.get_by_position(arguments[i + 1]).column, row);
}
offset += num_element / 2;
result_col_map_offsets[row] = offset;
}
block.replace_by_position(result, std::move(result_col));
return Status::OK();
}
};

class FunctionMapSize : public IFunction {
public:
static constexpr auto name = "map_size";
static FunctionPtr create() { return std::make_shared<FunctionMapSize>(); }

/// Get function name.
String get_name() const override { return name; }

bool is_variadic() const override { return false; }

size_t get_number_of_arguments() const override { return 1; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DataTypePtr datatype = arguments[0];
if (datatype->is_nullable()) {
datatype = assert_cast<const DataTypeNullable*>(datatype.get())->get_nested_type();
}
DCHECK(is_map(datatype)) << "first argument for function: " << name
<< " should be DataTypeMap";
return std::make_shared<DataTypeInt64>();
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
auto left_column =
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
const ColumnMap* map_column = nullptr;
// const UInt8* map_null_map = nullptr;
if (left_column->is_nullable()) {
auto nullable_column = reinterpret_cast<const ColumnNullable*>(left_column.get());
map_column = check_and_get_column<ColumnMap>(nullable_column->get_nested_column());
// map_null_map = nullable_column->get_null_map_column().get_data().data();
} else {
map_column = check_and_get_column<ColumnMap>(*left_column.get());
}
if (!map_column) {
return Status::RuntimeError("unsupported types for function {}({})", get_name(),
block.get_by_position(arguments[0]).type->get_name());
}

auto dst_column = ColumnInt64::create(input_rows_count);
auto& dst_data = dst_column->get_data();

for (size_t i = 0; i < map_column->size(); i++) {
dst_data[i] = map_column->size_at(i);
}

block.replace_by_position(result, std::move(dst_column));
return Status::OK();
}
};

template <bool is_key>
class FunctionMapContains : public IFunction {
public:
static constexpr auto name = is_key ? "map_contains_key" : "map_contains_value";
static FunctionPtr create() { return std::make_shared<FunctionMapContains>(); }

/// Get function name.
String get_name() const override { return name; }

bool is_variadic() const override { return false; }

size_t get_number_of_arguments() const override { return 2; }

bool use_default_implementation_for_nulls() const override {
return array_contains.use_default_implementation_for_nulls();
}

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DataTypePtr datatype = arguments[0];
if (datatype->is_nullable()) {
datatype = assert_cast<const DataTypeNullable*>(datatype.get())->get_nested_type();
}
DCHECK(is_map(datatype)) << "first argument for function: " << name
<< " should be DataTypeMap";
return make_nullable(std::make_shared<DataTypeNumber<UInt8>>());
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
// backup original argument 0
auto orig_arg0 = block.get_by_position(arguments[0]);
auto left_column =
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
const ColumnMap* map_column = nullptr;
ColumnPtr nullmap_column = nullptr;
if (left_column->is_nullable()) {
auto nullable_column = reinterpret_cast<const ColumnNullable*>(left_column.get());
map_column = check_and_get_column<ColumnMap>(nullable_column->get_nested_column());
nullmap_column = nullable_column->get_null_map_column_ptr();
} else {
map_column = check_and_get_column<ColumnMap>(*left_column.get());
}
if (!map_column) {
return Status::RuntimeError("unsupported types for function {}({})", get_name(),
block.get_by_position(arguments[0]).type->get_name());
}

DataTypePtr datatype = block.get_by_position(arguments[0]).type;
if (datatype->is_nullable()) {
datatype = assert_cast<const DataTypeNullable*>(datatype.get())->get_nested_type();
}
const auto datatype_map = static_cast<const DataTypeMap*>(datatype.get());
if constexpr (is_key) {
const auto& array_column = map_column->get_keys_array_ptr();
const auto datatype_array =
std::make_shared<DataTypeArray>(datatype_map->get_key_type());
if (nullmap_column) {
block.get_by_position(arguments[0]) = {
ColumnNullable::create(array_column, nullmap_column),
make_nullable(datatype_array),
block.get_by_position(arguments[0]).name + ".keys"};
} else {
block.get_by_position(arguments[0]) = {
array_column, datatype_array,
block.get_by_position(arguments[0]).name + ".keys"};
}
} else {
const auto& array_column = map_column->get_values_array_ptr();
const auto datatype_array =
std::make_shared<DataTypeArray>(datatype_map->get_value_type());
if (nullmap_column) {
block.get_by_position(arguments[0]) = {
ColumnNullable::create(array_column, nullmap_column),
make_nullable(datatype_array),
block.get_by_position(arguments[0]).name + ".values"};
} else {
block.get_by_position(arguments[0]) = {
array_column, datatype_array,
block.get_by_position(arguments[0]).name + ".values"};
}
}

RETURN_IF_ERROR(
array_contains.execute_impl(context, block, arguments, result, input_rows_count));

// restore original argument 0
block.get_by_position(arguments[0]) = orig_arg0;
return Status::OK();
}

private:
FunctionArrayIndex<ArrayContainsAction, FunctionMapContains<is_key>> array_contains;
};

template <bool is_key>
class FunctionMapEntries : public IFunction {
public:
static constexpr auto name = is_key ? "map_keys" : "map_values";
static FunctionPtr create() { return std::make_shared<FunctionMapEntries>(); }

/// Get function name.
String get_name() const override { return name; }

bool is_variadic() const override { return false; }

size_t get_number_of_arguments() const override { return 1; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DataTypePtr datatype = arguments[0];
if (datatype->is_nullable()) {
datatype = assert_cast<const DataTypeNullable*>(datatype.get())->get_nested_type();
}
DCHECK(is_map(datatype)) << "first argument for function: " << name
<< " should be DataTypeMap";
const auto datatype_map = static_cast<const DataTypeMap*>(datatype.get());
if (is_key) {
return std::make_shared<DataTypeArray>(datatype_map->get_key_type());
} else {
return std::make_shared<DataTypeArray>(datatype_map->get_value_type());
}
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
auto left_column =
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
const ColumnMap* map_column = nullptr;
if (left_column->is_nullable()) {
auto nullable_column = reinterpret_cast<const ColumnNullable*>(left_column.get());
map_column = check_and_get_column<ColumnMap>(nullable_column->get_nested_column());
} else {
map_column = check_and_get_column<ColumnMap>(*left_column.get());
}
if (!map_column) {
return Status::RuntimeError("unsupported types for function {}({})", get_name(),
block.get_by_position(arguments[0]).type->get_name());
}

if constexpr (is_key) {
block.replace_by_position(result, map_column->get_keys_array_ptr());
} else {
block.replace_by_position(result, map_column->get_values_array_ptr());
}

return Status::OK();
}
};

void register_function_map(SimpleFunctionFactory& factory) {
factory.register_function<FunctionMap>();
factory.register_function<FunctionMapSize>();
factory.register_function<FunctionMapContains<true>>();
factory.register_function<FunctionMapContains<false>>();
factory.register_function<FunctionMapEntries<true>>();
factory.register_function<FunctionMapEntries<false>>();
}

} // namespace doris::vectorized
2 changes: 2 additions & 0 deletions be/src/vec/functions/simple_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void register_function_convert_tz(SimpleFunctionFactory& factory);
void register_function_least_greast(SimpleFunctionFactory& factory);
void register_function_fake(SimpleFunctionFactory& factory);
void register_function_array(SimpleFunctionFactory& factory);
void register_function_map(SimpleFunctionFactory& factory);
void register_function_geo(SimpleFunctionFactory& factory);
void register_function_multi_string_position(SimpleFunctionFactory& factory);
void register_function_multi_string_search(SimpleFunctionFactory& factory);
Expand Down Expand Up @@ -230,6 +231,7 @@ class SimpleFunctionFactory {
register_function_regexp_extract(instance);
register_function_hex_variadic(instance);
register_function_array(instance);
register_function_map(instance);
register_function_geo(instance);
register_function_url(instance);
register_function_multi_string_position(instance);
Expand Down
Loading