Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/codegen/doris_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct __float128;
#include "codegen/codegen_anyval_ir.cpp"
#include "exec/aggregation_node_ir.cpp"
#include "exec/hash_join_node_ir.cpp"
#include "exprs/aggregate_functions.cpp"
#include "exprs/cast_functions.cpp"
#include "exprs/conditional_functions_ir.cpp"
#include "exprs/decimal_operators.cpp"
Expand Down
20 changes: 11 additions & 9 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "exprs/json_functions.h"
#include "exprs/hll_hash_function.h"
#include "exprs/timezone_db.h"
#include "exprs/bitmap_function.h"
#include "geo/geo_functions.h"
#include "olap/options.h"
#include "util/time.h"
Expand Down Expand Up @@ -84,7 +85,7 @@ void* tcmalloc_gc_thread(void* dummy) {

return NULL;
}

void* memory_maintenance_thread(void* dummy) {
while (true) {
sleep(config::memory_maintenance_sleep_time_s);
Expand All @@ -94,20 +95,20 @@ void* memory_maintenance_thread(void* dummy) {
if (env != nullptr) {
BufferPool* buffer_pool = env->buffer_pool();
if (buffer_pool != nullptr) buffer_pool->Maintenance();

// The process limit as measured by our trackers may get out of sync with the
// process usage if memory is allocated or freed without updating a MemTracker.
// The metric is refreshed whenever memory is consumed or released via a MemTracker,
// so on a system with queries executing it will be refreshed frequently. However
// if the system is idle, we need to refresh the tracker occasionally since
// untracked memory may be allocated or freed, e.g. by background threads.
if (env->process_mem_tracker() != nullptr &&
if (env->process_mem_tracker() != nullptr &&
!env->process_mem_tracker()->is_consumption_metric_null()) {
env->process_mem_tracker()->RefreshConsumptionFromMetric();
}
}
}
}
}
}

return NULL;
}

Expand Down Expand Up @@ -174,8 +175,8 @@ void* calculate_metrics(void* dummy) {
}

sleep(15); // 15 seconds
}
}

return NULL;
}

Expand Down Expand Up @@ -270,6 +271,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
ESFunctions::init();
GeoFunctions::init();
TimezoneDatabase::init();
BitmapFunctions::init();

pthread_t tc_malloc_pid;
pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL);
Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ Status OlapScanner::get_batch(
state->batch_size() * _tuple_desc->byte_size());
bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size());
Tuple *tuple = reinterpret_cast<Tuple*>(tuple_buf);
std::unique_ptr<Arena> arena(new Arena());

int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
{
Expand All @@ -263,7 +264,7 @@ Status OlapScanner::get_batch(
break;
}
// Read one row from reader
auto res = _reader->next_row_with_aggregation(&_read_row_cursor, eof);
auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof);
if (res != OLAP_SUCCESS) {
return Status::InternalError("Internal Error: read storage fail.");
}
Expand Down Expand Up @@ -328,6 +329,11 @@ Status OlapScanner::get_batch(
slot->ptr = reinterpret_cast<char*>(v);
}
}

// the memory allocate by arena has been copied,
// so we should release these memory immediately
arena.reset(new Arena());

if (VLOG_ROW_IS_ON) {
VLOG_ROW << "OlapScanner output row: " << Tuple::to_string(tuple, *_tuple_desc);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_library(Exprs
hll_hash_function.cpp
agg_fn.cc
new_agg_fn_evaluator.cc
bitmap_function.cpp
)
#ADD_BE_TEST(json_function_test)
#ADD_BE_TEST(binary_predicate_test)
Expand Down
90 changes: 90 additions & 0 deletions be/src/exprs/bitmap_function.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/bitmap_function.h"

#include "exprs/anyval_util.h"
#include "util/bitmap.h"

namespace doris {
void BitmapFunctions::init() {
}

void BitmapFunctions::bitmap_init(FunctionContext* ctx, StringVal* dst) {
dst->is_null = false;
dst->len = sizeof(RoaringBitmap);
dst->ptr = (uint8_t*)new RoaringBitmap();
}

template <typename T>
void BitmapFunctions::bitmap_update_int(FunctionContext* ctx, const T& src, StringVal* dst) {
if (src.is_null) {
return;
}

auto* dst_bitmap = reinterpret_cast<RoaringBitmap*>(dst->ptr);
dst_bitmap->update(src.val);
}

BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal& src) {
auto* src_bitmap = reinterpret_cast<RoaringBitmap*>(src.ptr);
BigIntVal result(src_bitmap->cardinality());
delete src_bitmap;
return result;
}

void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr);
auto* dst_bitmap = reinterpret_cast<RoaringBitmap*>(dst->ptr);
dst_bitmap->merge(src_bitmap);
}

BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) {
RoaringBitmap bitmap ((char*)src.ptr);
BigIntVal result(bitmap.cardinality());
return result;
}

// we assume the input src is a valid integer string
StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) {
std::unique_ptr<RoaringBitmap> bitmap {new RoaringBitmap()};
if (!src.is_null) {
std::string tmp_str = std::string(reinterpret_cast<char*>(src.ptr), src.len) ;
bitmap->update(std::stoi(tmp_str));
}
std::string buf;
buf.resize(bitmap->size());
bitmap->serialize((char*)buf.c_str());
return AnyValUtil::from_string_temp(ctx, buf);
}

StringVal BitmapFunctions::bitmap_serialize(FunctionContext* ctx, const StringVal& src) {
auto* src_bitmap = reinterpret_cast<RoaringBitmap*>(src.ptr);
StringVal result(ctx, src_bitmap->size());
src_bitmap->serialize((char*)result.ptr);
delete src_bitmap;
return result;
}

template void BitmapFunctions::bitmap_update_int<TinyIntVal>(
FunctionContext* ctx, const TinyIntVal& src, StringVal* dst);
template void BitmapFunctions::bitmap_update_int<SmallIntVal>(
FunctionContext* ctx, const SmallIntVal& src, StringVal* dst);
template void BitmapFunctions::bitmap_update_int<IntVal>(
FunctionContext* ctx, const IntVal& src, StringVal* dst);

}
42 changes: 42 additions & 0 deletions be/src/exprs/bitmap_function.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H
#define DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H

#include "udf/udf.h"

namespace doris {

class BitmapFunctions {
public:
static void init();
static void bitmap_init(FunctionContext* ctx, StringVal* slot);
template <typename T>
static void bitmap_update_int(FunctionContext* ctx, const T& src, StringVal* dst);
// the input src's ptr need to point a RoaringBitmap, this function will release the
// RoaringBitmap memory
static BigIntVal bitmap_finalize(FunctionContext* ctx, const StringVal& src);

static void bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst);
static BigIntVal bitmap_count(FunctionContext* ctx, const StringVal& src);

static StringVal bitmap_serialize(FunctionContext* ctx, const StringVal& src);
static StringVal to_bitmap(FunctionContext* ctx, const StringVal& src);
};
}
#endif //DORIS_BE_SRC_QUERY_EXPRS_BITMAP_FUNCTION_H
8 changes: 0 additions & 8 deletions be/src/gutil/port.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,6 @@ inline void *aligned_malloc(size_t size, int minimum_alignment) {
#endif
}

inline void aligned_free(void *aligned_memory) {
free(aligned_memory);
}

#else // not GCC

#define PRINTF_ATTRIBUTE(string_index, first_to_check)
Expand Down Expand Up @@ -861,10 +857,6 @@ inline void *aligned_malloc(size_t size, int minimum_alignment) {
return _aligned_malloc(size, minimum_alignment);
}

inline void aligned_free(void *aligned_memory) {
_aligned_free(aligned_memory);
}

// ----- BEGIN VC++ STUBS & FAKE DEFINITIONS ---------------------------------

// See http://en.wikipedia.org/wiki/IEEE_754 for details of
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/aggregate_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ template<typename Traits>
AggregateInfo::AggregateInfo(const Traits& traits)
: _init_fn(traits.init),
_update_fn(traits.update),
_merge_fn(traits.merge),
_finalize_fn(traits.finalize),
_agg_method(traits.agg_method) {
if (_merge_fn == nullptr) {
_merge_fn = _update_fn;
}
}

struct AggregateFuncMapHash {
Expand Down Expand Up @@ -128,6 +124,9 @@ AggregateFuncResolver::AggregateFuncResolver() {

// Hyperloglog Aggregate Function
add_aggregate_mapping<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL>();

// Bitmap Aggregate Function
add_aggregate_mapping<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_VARCHAR>();
}

AggregateFuncResolver::~AggregateFuncResolver() {
Expand Down
Loading