Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 87 additions & 23 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ PColumnType to_proto(PrimitiveType type) {
return PColumnType::COLUMN_TYPE_VARCHAR;
case TYPE_STRING:
return PColumnType::COLUMN_TYPE_STRING;
case TYPE_IPV4:
return PColumnType::COLUMN_TYPE_IPV4;
case TYPE_IPV6:
return PColumnType::COLUMN_TYPE_IPV6;
default:
DCHECK(false) << "Invalid type.";
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(type));
}
DCHECK(false);
return PColumnType::COLUMN_TYPE_INT;
}

// PColumnType->PrimitiveType
Expand Down Expand Up @@ -161,10 +164,14 @@ PrimitiveType to_primitive_type(PColumnType type) {
return TYPE_CHAR;
case PColumnType::COLUMN_TYPE_STRING:
return TYPE_STRING;
case PColumnType::COLUMN_TYPE_IPV4:
return TYPE_IPV4;
case PColumnType::COLUMN_TYPE_IPV6:
return TYPE_IPV6;
default:
DCHECK(false);
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PColumnType type {}", int(type));
}
return TYPE_INT;
}

// PFilterType -> RuntimeFilterType
Expand Down Expand Up @@ -557,14 +564,13 @@ class RuntimePredicateWrapper {
}

Status assign(const PInFilter* in_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(in_filter->column_type());
_context->hybrid_set.reset(create_set(type));
_context->hybrid_set.reset(create_set(_column_return_type));
if (contain_null) {
_context->hybrid_set->set_null_aware(true);
_context->hybrid_set->insert((const void*)nullptr);
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column,
ObjectPool* pool) {
Expand Down Expand Up @@ -721,9 +727,29 @@ class RuntimePredicateWrapper {
});
break;
}
case TYPE_IPV4: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column,
ObjectPool* pool) {
int32_t tmp = column.intval();
set->insert(&tmp);
});
break;
}
case TYPE_IPV6: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column,
ObjectPool* pool) {
auto string_val = column.stringval();
StringParser::ParseResult result;
auto int128_val = StringParser::string_to_int<uint128_t>(
string_val.c_str(), string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
set->insert(&int128_val);
});
break;
}
default: {
return Status::InternalError("not support assign to in filter, type: " +
type_to_string(type));
type_to_string(_column_return_type));
}
}
return Status::OK();
Expand All @@ -746,15 +772,14 @@ class RuntimePredicateWrapper {
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
_context->minmax_func.reset(create_minmax_filter(type));
_context->minmax_func.reset(create_minmax_filter(_column_return_type));

if (contain_null) {
_context->minmax_func->set_null_aware(true);
_context->minmax_func->set_contain_null();
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
bool min_val = minmax_filter->min_val().boolval();
bool max_val = minmax_filter->max_val().boolval();
Expand Down Expand Up @@ -874,6 +899,23 @@ class RuntimePredicateWrapper {
StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length());
return _context->minmax_func->assign(&min_val, &max_val);
}
case TYPE_IPV4: {
int tmp_min = minmax_filter->min_val().intval();
int tmp_max = minmax_filter->max_val().intval();
return _context->minmax_func->assign(&tmp_min, &tmp_max);
}
case TYPE_IPV6: {
auto min_string_val = minmax_filter->min_val().stringval();
auto max_string_val = minmax_filter->max_val().stringval();
StringParser::ParseResult result;
auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
min_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
return _context->minmax_func->assign(&min_val, &max_val);
}
default:
break;
}
Expand Down Expand Up @@ -1157,7 +1199,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo
merge_filter_request->set_opt_remote_rf(opt_remote_rf);
merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());

if (get_ignored()) {
Expand Down Expand Up @@ -1518,13 +1560,10 @@ template <class T>
Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
if (param->request->has_column_type()) {
column_type = to_primitive_type(param->request->column_type());
if (!param->request->has_column_type()) {
return Status::InternalError("unknown filter column type");
}
PrimitiveType column_type = to_primitive_type(param->request->column_type());
*wrapper = std::make_unique<RuntimePredicateWrapper>(pool, column_type, get_type(filter_type),
param->request->filter_id());

Expand Down Expand Up @@ -1742,9 +1781,21 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
});
return;
}
case TYPE_IPV4: {
batch_copy<IPv4>(filter, it, [](PColumnValue* column, const IPv4* value) {
column->set_intval(*reinterpret_cast<const int32_t*>(value));
});
return;
}
case TYPE_IPV6: {
batch_copy<IPv6>(filter, it, [](PColumnValue* column, const IPv6* value) {
column->set_stringval(LargeIntValue::to_string(*value));
});
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(column_type));
}
}
}
Expand Down Expand Up @@ -1860,9 +1911,22 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
std::string(max_string_value->data, max_string_value->size));
break;
}
case TYPE_IPV4: {
filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
return;
}
case TYPE_IPV6: {
filter->mutable_min_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(min_data)));
filter->mutable_max_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(max_data)));
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}",
int(_wrapper->column_type()));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/predicate_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV4, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv4 value;
IPv4 value;
bool res = IPv4Value::from_string(value, condition);
DCHECK(res);
return value;
Expand All @@ -251,7 +251,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV6, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv6 value;
IPv6 value;
bool res = IPv6Value::from_string(value, condition);
DCHECK(res);
return value;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/large_int_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <iostream>
#include <string>

#include "olap/olap_common.h"

namespace doris {

inline const __int128 MAX_INT128 = ~((__int128)0x01 << 127);
Expand All @@ -36,6 +38,9 @@ class LargeIntValue {
}

static std::string to_string(__int128 value) { return fmt::format(FMT_COMPILE("{}"), value); }
static std::string to_string(__uint128_t value) {
return fmt::format(FMT_COMPILE("{}"), value);
}
};

std::ostream& operator<<(std::ostream& os, __int128 const& value);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/primitive_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ struct PrimitiveTypeTraits<TYPE_LARGEINT> {
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV4> {
using CppType = vectorized::IPv4;
using CppType = IPv4;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv4;
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV6> {
using CppType = vectorized::IPv6;
using CppType = IPv6;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv6;
};
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/core/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct decimal12_t;
struct uint24_t;
struct StringRef;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

namespace vectorized {

/// Data types for representing elementary values from a database in RAM.
Expand Down Expand Up @@ -296,9 +299,6 @@ struct TypeId<String> {
/// Not a data type in database, defined just for convenience.
using Strings = std::vector<String>;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

template <>
inline constexpr bool IsNumber<IPv6> = true;
template <>
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ std::string DataTypeIPv6::to_string(const IColumn& column, size_t row_num) const
return value.to_string();
}

std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) const {
std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) {
auto value = IPv6Value(ipv6_val);
return value.to_string();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DataTypeIPv6 final : public DataTypeNumberBase<IPv6> {
bool equals(const IDataType& rhs) const override;
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
std::string to_string(const IPv6& value) const;
static std::string to_string(const IPv6& value);
Status from_string(ReadBuffer& rb, IColumn* column) const override;

Field get_field(const TExprNode& node) const override {
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type_array.h"
Expand Down Expand Up @@ -147,9 +148,17 @@ TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, in
THROW_IF_ERROR(create_texpr_literal_node<TYPE_STRING>(data, &node));
break;
}
case TYPE_IPV4: {
THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV4>(data, &node));
break;
}
case TYPE_IPV6: {
THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV6>(data, &node));
break;
}
default:
DCHECK(false);
throw std::invalid_argument("Invalid type!");
throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter meet invalid type {}",
int(type));
}
return node;
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/core/wide_integer.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/functions/function.h"

Expand Down Expand Up @@ -454,6 +456,20 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio
string_literal.__set_value(origin_value->to_string());
(*node).__set_string_literal(string_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING));
} else if constexpr (T == TYPE_IPV4) {
const auto* origin_value = reinterpret_cast<const IPv4*>(data);
(*node).__set_node_type(TExprNodeType::IPV4_LITERAL);
TIPv4Literal literal;
literal.__set_value(*origin_value);
(*node).__set_ipv4_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV4));
} else if constexpr (T == TYPE_IPV6) {
const auto* origin_value = reinterpret_cast<const IPv6*>(data);
(*node).__set_node_type(TExprNodeType::IPV6_LITERAL);
TIPv6Literal literal;
literal.__set_value(vectorized::DataTypeIPv6::to_string(*origin_value));
(*node).__set_ipv6_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV6));
} else {
return Status::InvalidArgument("Invalid argument type!");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int128>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv4>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv4>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv6>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv6>>();
}
case FieldType::OLAP_FIELD_TYPE_FLOAT: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float32>>();
Expand Down
Loading