Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ class BeExecVersionManager {
* a. function month/day/hour/minute/second's return type is changed to smaller type.
* b. in order to solve agg of sum/count is not compatibility during the upgrade process
* c. change the string hash method in runtime filter
* d. elt funciton return type change to nullable(string)
* d. elt function return type change to nullable(string)
* e. add repeat_max_num in repeat function
* 3: start from doris 2.1
* a. aggregation function do not serialize bitmap to string
*/
inline const int BeExecVersionManager::max_be_exec_version = 2;
inline const int BeExecVersionManager::max_be_exec_version = 3;
inline const int BeExecVersionManager::min_be_exec_version = 0;

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/util/bitmap_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,10 @@ class BitmapValue {
_is_shared = other._is_shared;
_bitmap = std::move(other._bitmap);
_set = std::move(other._set);

other._type = EMPTY;
other._is_shared = false;
other._bitmap = nullptr;
}

BitmapValue& operator=(const BitmapValue& other) {
Expand Down
11 changes: 7 additions & 4 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ class IAggregateFunction {

virtual DataTypePtr get_serialized_type() const { return std::make_shared<DataTypeString>(); }

virtual void set_version(const int version_) { version = version_; }

protected:
DataTypes argument_types;
int version {};
};

/// Implement method to obtain an address of 'add' function.
Expand Down Expand Up @@ -323,8 +326,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {

void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
MutableColumnPtr& dst, const size_t num_rows) const override {
VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
serialize_vec(places, offset, writter, num_rows);
VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
serialize_vec(places, offset, writer, num_rows);
}

void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
Expand All @@ -341,8 +344,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {

void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) const override {
VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
streaming_agg_serialize(columns, writter, num_rows, arena);
VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
streaming_agg_serialize(columns, writer, num_rows, arena);
}

void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
Expand Down
148 changes: 142 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,145 @@ struct AggregateFunctionBitmapData {
BitmapValue& get() { return value; }
};

template <typename Data, typename Derived>
class AggregateFunctionBitmapSerializationHelper
: public IAggregateFunctionDataHelper<Data, Derived> {
public:
using BaseHelper = IAggregateFunctionHelper<Derived>;

AggregateFunctionBitmapSerializationHelper(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<Data, Derived>(argument_types_) {}

void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) const override {
if (version >= 3) {
auto& col = assert_cast<ColumnBitmap&>(*dst);
char place[sizeof(Data)];
col.resize(num_rows);
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->create(place);
DEFER({ assert_cast<const Derived*>(this)->destroy(place); });
assert_cast<const Derived*>(this)->add(place, columns, i, arena);
data[i] = std::move(this->data(place).value);
}
} else {
BaseHelper::streaming_agg_serialize_to_column(columns, dst, num_rows, arena);
}
}

void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset,
MutableColumnPtr& dst, const size_t num_rows) const override {
if (version >= 3) {
auto& col = assert_cast<ColumnBitmap&>(*dst);
col.resize(num_rows);
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
data[i] = std::move(this->data(places[i] + offset).value);
}
} else {
BaseHelper::serialize_to_column(places, offset, dst, num_rows);
}
}

void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
Arena* arena) const override {
if (version >= 3) {
auto& col = assert_cast<const ColumnBitmap&>(column);
const size_t num_rows = column.size();
auto* data = col.get_data().data();

for (size_t i = 0; i != num_rows; ++i) {
this->data(place).merge(data[i]);
}
} else {
BaseHelper::deserialize_and_merge_from_column(place, column, arena);
}
}

void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place,
const IColumn& column, size_t begin, size_t end,
Arena* arena) const override {
DCHECK(end <= column.size() && begin <= end)
<< ", begin:" << begin << ", end:" << end << ", column.size():" << column.size();
if (version >= 3) {
auto& col = assert_cast<const ColumnBitmap&>(column);
auto* data = col.get_data().data();
for (size_t i = begin; i <= end; ++i) {
this->data(place).merge(data[i]);
}
} else {
BaseHelper::deserialize_and_merge_from_column_range(place, column, begin, end, arena);
}
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column, Arena* arena,
const size_t num_rows) const override {
if (version >= 3) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->data(places[i]).merge(data[i]);
}
} else {
BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, arena, num_rows);
}
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const ColumnString* column,
Arena* arena, const size_t num_rows) const override {
if (version >= 3) {
auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column));
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
this->data(places[i]).merge(data[i]);
}
}
} else {
BaseHelper::deserialize_and_merge_vec_selected(places, offset, rhs, column, arena,
num_rows);
}
}

void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place,
IColumn& to) const override {
if (version >= 3) {
auto& col = assert_cast<ColumnBitmap&>(to);
size_t old_size = col.size();
col.resize(old_size + 1);
col.get_data()[old_size] = std::move(this->data(place).value);
} else {
BaseHelper::serialize_without_key_to_column(place, to);
}
}

[[nodiscard]] MutableColumnPtr create_serialize_column() const override {
if (version >= 3) {
return ColumnBitmap::create();
} else {
return ColumnString::create();
}
}

[[nodiscard]] DataTypePtr get_serialized_type() const override {
if (version >= 3) {
return std::make_shared<DataTypeBitMap>();
} else {
return IAggregateFunction::get_serialized_type();
}
}

protected:
using IAggregateFunction::version;
};

template <typename Op>
class AggregateFunctionBitmapOp final
: public IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>> {
: public AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>> {
public:
using ResultDataType = BitmapValue;
using ColVecType = ColumnBitmap;
Expand All @@ -158,8 +293,9 @@ class AggregateFunctionBitmapOp final
String get_name() const override { return Op::name; }

AggregateFunctionBitmapOp(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>>(argument_types_) {}
: AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
AggregateFunctionBitmapOp<Op>>(
argument_types_) {}

DataTypePtr get_return_type() const override { return std::make_shared<DataTypeBitMap>(); }

Expand Down Expand Up @@ -207,7 +343,7 @@ class AggregateFunctionBitmapOp final

template <bool arg_is_nullable, typename ColVecType>
class AggregateFunctionBitmapCount final
: public IAggregateFunctionDataHelper<
: public AggregateFunctionBitmapSerializationHelper<
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>> {
public:
Expand All @@ -216,7 +352,7 @@ class AggregateFunctionBitmapCount final
using AggFunctionData = AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>;

AggregateFunctionBitmapCount(const DataTypes& argument_types_)
: IAggregateFunctionDataHelper<
: AggregateFunctionBitmapSerializationHelper<
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>>(argument_types_) {}

Expand Down
22 changes: 20 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ struct AggregateFunctionBitmapAggData {
void reset() { value.clear(); }

void merge(const AggregateFunctionBitmapAggData& other) { value |= other.value; }

void write(BufferWritable& buf) const { DataTypeBitMap::serialize_as_stream(value, buf); }

void read(BufferReadable& buf) { DataTypeBitMap::deserialize_as_stream(value, buf); }
};

template <bool arg_nullable, typename T>
Expand Down Expand Up @@ -114,12 +118,26 @@ class AggregateFunctionBitmapAgg final
}

void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
__builtin_unreachable();
this->data(place).write(buf);
}

void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
__builtin_unreachable();
this->data(place).read(buf);
}

void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) const override {
auto& col = assert_cast<ColumnBitmap&>(*dst);
char place[sizeof(Data)];
col.resize(num_rows);
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
this->create(place);
DEFER({ this->destroy(place); });
this->add(place, columns, i, arena);
data[i] = std::move(this->data(place).value);
}
}

void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* state) {

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

// move _create_agg_status to open not in during prepare,
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exprs/vectorized_agg_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class AggFnEvaluator {
bool is_merge() const { return _is_merge; }
const VExprContextSPtrs& input_exprs_ctxs() const { return _input_exprs_ctxs; }

void set_version(const int version) { _function->set_version(version); }

private:
const TFunction _fn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ public class Config extends ConfigBase {
* Max data version of backends serialize block.
*/
@ConfField(mutable = false)
public static int max_be_exec_version = 2;
public static int max_be_exec_version = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment here, why need change be exec version to 3??


/**
* Min data version of backends serialize block.
Expand Down