Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ void ExecNode::reached_limit(vectorized::Block* block, bool* eos) {
}

_num_rows_returned += block->rows();
if (*eos) COUNTER_SET(_rows_returned_counter, _num_rows_returned);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}

/*
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/fold_constant_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Status FoldConstantExecutor::fold_constant_expr(
expr_result.set_success(false);
} else {
expr_result.set_success(true);
result = _get_result(src, ctx->root()->type().type);
result = _get_result(src, 0, ctx->root()->type().type);
}

expr_result.set_content(std::move(result));
Expand Down Expand Up @@ -143,7 +143,8 @@ Status FoldConstantExecutor::fold_constant_vexpr(
expr_result.set_success(false);
} else {
expr_result.set_success(true);
result = _get_result<true>((void *) column_ptr->get_data_at(0).data, ctx->root()->type().type);
auto string_ref = column_ptr->get_data_at(0);
result = _get_result<true>((void*)string_ref.data, string_ref.size, ctx->root()->type().type);
}

expr_result.set_content(std::move(result));
Expand Down Expand Up @@ -198,7 +199,7 @@ Status FoldConstantExecutor::_prepare_and_open(Context* ctx) {
}

template <bool is_vec>
string FoldConstantExecutor::_get_result(void* src, PrimitiveType slot_type){
string FoldConstantExecutor::_get_result(void* src, size_t size, PrimitiveType slot_type){
switch (slot_type) {
case TYPE_BOOLEAN: {
bool val = *reinterpret_cast<const bool*>(src);
Expand Down Expand Up @@ -237,6 +238,9 @@ string FoldConstantExecutor::_get_result(void* src, PrimitiveType slot_type){
case TYPE_STRING:
case TYPE_HLL:
case TYPE_OBJECT: {
if constexpr (is_vec) {
return std::string((char*)src, size);
}
return (reinterpret_cast<StringValue*>(src))->to_string();
}
case TYPE_DATE:
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fold_constant_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FoldConstantExecutor {
Status _prepare_and_open(Context* ctx);

template <bool is_vec = false>
std::string _get_result(void* src, PrimitiveType slot_type);
std::string _get_result(void* src, size_t size, PrimitiveType slot_type);

std::unique_ptr<RuntimeState> _runtime_state;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct ProcessRuntimeFilterBuild {
ProcessRuntimeFilterBuild(HashJoinNode* join_node) : _join_node(join_node) {}

Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) {
if (_join_node->_runtime_filter_descs.empty() || _join_node->_inserted_rows.empty()) {
if (_join_node->_runtime_filter_descs.empty()) {
return Status::OK();
}
VRuntimeFilterSlots* runtime_filter_slots =
Expand Down Expand Up @@ -162,7 +162,6 @@ struct ProcessHashTableProbe {
_probe_block(join_node->_probe_block),
_probe_index(join_node->_probe_index),
_probe_raw_ptrs(join_node->_probe_columns),
_arena(join_node->_arena),
_rows_returned_counter(join_node->_rows_returned_counter) {}

// Only process the join with no other join conjunt, because of no other join conjunt
Expand Down Expand Up @@ -198,7 +197,7 @@ struct ProcessHashTableProbe {
_arena)) {nullptr, false}
: key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena);

if (_probe_index + 4 < _probe_rows)
if (_probe_index + 2 < _probe_rows)
key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena);

if (find_result.is_found()) {
Expand All @@ -218,7 +217,7 @@ struct ProcessHashTableProbe {
if (!_join_node->_is_right_semi_anti) {
++current_offset;
for (size_t j = 0; j < right_col_len; ++j) {
auto &column = *mapped.block->get_by_position(j).column;
auto& column = *mapped.block->get_by_position(j).column;
mcol[j + right_col_idx]->insert_from(column, mapped.row_num);
}
}
Expand Down Expand Up @@ -515,7 +514,7 @@ struct ProcessHashTableProbe {
const Block& _probe_block;
int& _probe_index;
ColumnRawPtrs& _probe_raw_ptrs;
Arena& _arena;
Arena _arena;

ProfileCounter* _rows_returned_counter;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/vcross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) {
}
}
dst_columns.clear();

RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));

reached_limit(block, eos);
Expand Down
20 changes: 4 additions & 16 deletions be/src/vec/functions/function_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,26 +326,20 @@ class FunctionStringConcat : public IFunction {
bool is_variadic() const override { return true; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeString>());
return std::make_shared<DataTypeString>();
}
bool use_default_implementation_for_nulls() const override { return false; }
bool use_default_implementation_for_nulls() const override { return true; }
bool use_default_implementation_for_constants() const override { return true; }

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) override {
DCHECK_GE(arguments.size(), 1);

if (arguments.size() == 1) {
if (block.get_by_position(arguments[0]).column->is_nullable()) {
block.get_by_position(result).column = block.get_by_position(arguments[0]).column;
} else {
block.get_by_position(result).column =
make_nullable(block.get_by_position(arguments[0]).column);
}
block.get_by_position(result).column = block.get_by_position(arguments[0]).column;
return Status::OK();
}

auto null_map = ColumnUInt8::create(input_rows_count, 0);
int argument_size = arguments.size();
ColumnPtr argument_columns[argument_size];

Expand All @@ -355,11 +349,6 @@ class FunctionStringConcat : public IFunction {
for (int i = 0; i < argument_size; ++i) {
argument_columns[i] =
block.get_by_position(arguments[i]).column->convert_to_full_column_if_const();
if (auto* nullable = check_and_get_column<const ColumnNullable>(*argument_columns[i])) {
argument_columns[i] = nullable->get_nested_column_ptr();
VectorizedUtils::update_null_map(null_map->get_data(),
nullable->get_null_map_data());
}
auto col_str = assert_cast<const ColumnString*>(argument_columns[i].get());
offsets_list[i] = &col_str->get_offsets();
chars_list[i] = &col_str->get_chars();
Expand Down Expand Up @@ -401,8 +390,7 @@ class FunctionStringConcat : public IFunction {
res_offset[i] = res_offset[i - 1] + current_length;
}

block.get_by_position(result).column =
ColumnNullable::create(std::move(res), std::move(null_map));
block.get_by_position(result).column = std::move(res);
return Status::OK();
}
};
Expand Down