diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index 0ecc8686513f08..25de399df3e045 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -56,10 +56,12 @@ class BeExecVersionManager { * a. function month/day/hour/minute/second's return type is changed to smaller type. * b. in order to solve agg of sum/count is not compatibility during the upgrade process * c. change the string hash method in runtime filter - * d. elt funciton return type change to nullable(string) + * d. elt function return type change to nullable(string) * e. add repeat_max_num in repeat function + * 3: start from doris 2.1 + * a. aggregation function do not serialize bitmap to string */ -inline const int BeExecVersionManager::max_be_exec_version = 2; +inline const int BeExecVersionManager::max_be_exec_version = 3; inline const int BeExecVersionManager::min_be_exec_version = 0; } // namespace doris diff --git a/be/src/util/bitmap_value.h b/be/src/util/bitmap_value.h index ec75c4141c5adf..96510bdde3f6fe 100644 --- a/be/src/util/bitmap_value.h +++ b/be/src/util/bitmap_value.h @@ -1191,6 +1191,10 @@ class BitmapValue { _is_shared = other._is_shared; _bitmap = std::move(other._bitmap); _set = std::move(other._set); + + other._type = EMPTY; + other._is_shared = false; + other._bitmap = nullptr; } BitmapValue& operator=(const BitmapValue& other) { diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 4b2118fc511b5e..cc1b7d88f58787 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -220,8 +220,11 @@ class IAggregateFunction { virtual DataTypePtr get_serialized_type() const { return std::make_shared(); } + virtual void set_version(const int version_) { version = version_; } + protected: DataTypes argument_types; + int version {}; }; /// Implement method to obtain an address of 'add' function. @@ -323,8 +326,8 @@ class IAggregateFunctionHelper : public IAggregateFunction { void serialize_to_column(const std::vector& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { - VectorBufferWriter writter(assert_cast(*dst)); - serialize_vec(places, offset, writter, num_rows); + VectorBufferWriter writer(assert_cast(*dst)); + serialize_vec(places, offset, writer, num_rows); } void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf, @@ -341,8 +344,8 @@ class IAggregateFunctionHelper : public IAggregateFunction { void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, const size_t num_rows, Arena* arena) const override { - VectorBufferWriter writter(assert_cast(*dst)); - streaming_agg_serialize(columns, writter, num_rows, arena); + VectorBufferWriter writer(assert_cast(*dst)); + streaming_agg_serialize(columns, writer, num_rows, arena); } void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h index 00d3517fa031bd..7d2634a8dcc50d 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h @@ -146,10 +146,145 @@ struct AggregateFunctionBitmapData { BitmapValue& get() { return value; } }; +template +class AggregateFunctionBitmapSerializationHelper + : public IAggregateFunctionDataHelper { +public: + using BaseHelper = IAggregateFunctionHelper; + + AggregateFunctionBitmapSerializationHelper(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper(argument_types_) {} + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + if (version >= 3) { + auto& col = assert_cast(*dst); + char place[sizeof(Data)]; + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + assert_cast(this)->create(place); + DEFER({ assert_cast(this)->destroy(place); }); + assert_cast(this)->add(place, columns, i, arena); + data[i] = std::move(this->data(place).value); + } + } else { + BaseHelper::streaming_agg_serialize_to_column(columns, dst, num_rows, arena); + } + } + + void serialize_to_column(const std::vector& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast(*dst); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = std::move(this->data(places[i] + offset).value); + } + } else { + BaseHelper::serialize_to_column(places, offset, dst, num_rows); + } + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + if (version >= 3) { + auto& col = assert_cast(column); + const size_t num_rows = column.size(); + auto* data = col.get_data().data(); + + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_from_column(place, column, arena); + } + } + + void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place, + const IColumn& column, size_t begin, size_t end, + Arena* arena) const override { + DCHECK(end <= column.size() && begin <= end) + << ", begin:" << begin << ", end:" << end << ", column.size():" << column.size(); + if (version >= 3) { + auto& col = assert_cast(column); + auto* data = col.get_data().data(); + for (size_t i = begin; i <= end; ++i) { + this->data(place).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_from_column_range(place, column, begin, end, arena); + } + } + + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast(*assert_cast(column)); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(places[i]).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, arena, num_rows); + } + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast(*assert_cast(column)); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + if (places[i]) { + this->data(places[i]).merge(data[i]); + } + } + } else { + BaseHelper::deserialize_and_merge_vec_selected(places, offset, rhs, column, arena, + num_rows); + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + IColumn& to) const override { + if (version >= 3) { + auto& col = assert_cast(to); + size_t old_size = col.size(); + col.resize(old_size + 1); + col.get_data()[old_size] = std::move(this->data(place).value); + } else { + BaseHelper::serialize_without_key_to_column(place, to); + } + } + + [[nodiscard]] MutableColumnPtr create_serialize_column() const override { + if (version >= 3) { + return ColumnBitmap::create(); + } else { + return ColumnString::create(); + } + } + + [[nodiscard]] DataTypePtr get_serialized_type() const override { + if (version >= 3) { + return std::make_shared(); + } else { + return IAggregateFunction::get_serialized_type(); + } + } + +protected: + using IAggregateFunction::version; +}; + template class AggregateFunctionBitmapOp final - : public IAggregateFunctionDataHelper, - AggregateFunctionBitmapOp> { + : public AggregateFunctionBitmapSerializationHelper, + AggregateFunctionBitmapOp> { public: using ResultDataType = BitmapValue; using ColVecType = ColumnBitmap; @@ -158,8 +293,9 @@ class AggregateFunctionBitmapOp final String get_name() const override { return Op::name; } AggregateFunctionBitmapOp(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper, - AggregateFunctionBitmapOp>(argument_types_) {} + : AggregateFunctionBitmapSerializationHelper, + AggregateFunctionBitmapOp>( + argument_types_) {} DataTypePtr get_return_type() const override { return std::make_shared(); } @@ -207,7 +343,7 @@ class AggregateFunctionBitmapOp final template class AggregateFunctionBitmapCount final - : public IAggregateFunctionDataHelper< + : public AggregateFunctionBitmapSerializationHelper< AggregateFunctionBitmapData, AggregateFunctionBitmapCount> { public: @@ -216,7 +352,7 @@ class AggregateFunctionBitmapCount final using AggFunctionData = AggregateFunctionBitmapData; AggregateFunctionBitmapCount(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper< + : AggregateFunctionBitmapSerializationHelper< AggregateFunctionBitmapData, AggregateFunctionBitmapCount>(argument_types_) {} diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h index 02e3b8f28e6d08..d7b1fe72b9e1bd 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h @@ -49,6 +49,10 @@ struct AggregateFunctionBitmapAggData { void reset() { value.clear(); } void merge(const AggregateFunctionBitmapAggData& other) { value |= other.value; } + + void write(BufferWritable& buf) const { DataTypeBitMap::serialize_as_stream(value, buf); } + + void read(BufferReadable& buf) { DataTypeBitMap::deserialize_as_stream(value, buf); } }; template @@ -114,12 +118,26 @@ class AggregateFunctionBitmapAgg final } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - __builtin_unreachable(); + this->data(place).write(buf); } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena*) const override { - __builtin_unreachable(); + this->data(place).read(buf); + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto& col = assert_cast(*dst); + char place[sizeof(Data)]; + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + this->create(place); + DEFER({ this->destroy(place); }); + this->add(place, columns, i, arena); + data[i] = std::move(this->data(place).value); + } } void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index c127c754f167c0..cb83d6fcab61f0 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -465,6 +465,7 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* state) { for (int i = 0; i < _aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state)); + _aggregate_evaluators[i]->set_version(state->be_exec_version()); } // move _create_agg_status to open not in during prepare, diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 97d13b1658de0a..2688fae26088b4 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -101,6 +101,8 @@ class AggFnEvaluator { bool is_merge() const { return _is_merge; } const VExprContextSPtrs& input_exprs_ctxs() const { return _input_exprs_ctxs; } + void set_version(const int version) { _function->set_version(version); } + private: const TFunction _fn; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c2dc6258293397..aa415da2121ff9 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1619,7 +1619,7 @@ public class Config extends ConfigBase { * Max data version of backends serialize block. */ @ConfField(mutable = false) - public static int max_be_exec_version = 2; + public static int max_be_exec_version = 3; /** * Min data version of backends serialize block.