Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 207 additions & 172 deletions be/src/vec/aggregate_functions/aggregate_function_java_udaf.h

Large diffs are not rendered by default.

62 changes: 21 additions & 41 deletions be/src/vec/functions/function_java_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,6 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
TJavaUdfExecutorCtorParams ctor_params;
ctor_params.__set_fn(fn_);
ctor_params.__set_location(local_location);
ctor_params.__set_input_offsets_ptrs((int64_t)jni_ctx->input_offsets_ptrs.get());
ctor_params.__set_input_buffer_ptrs((int64_t)jni_ctx->input_values_buffer_ptr.get());
ctor_params.__set_input_nulls_ptrs((int64_t)jni_ctx->input_nulls_buffer_ptr.get());
ctor_params.__set_input_array_nulls_buffer_ptr(
(int64_t)jni_ctx->input_array_nulls_buffer_ptr.get());
ctor_params.__set_input_array_string_offsets_ptrs(
(int64_t)jni_ctx->input_array_string_offsets_ptrs.get());
ctor_params.__set_output_buffer_ptr((int64_t)jni_ctx->output_value_buffer.get());
ctor_params.__set_output_null_ptr((int64_t)jni_ctx->output_null_value.get());
ctor_params.__set_output_offsets_ptr((int64_t)jni_ctx->output_offsets_ptr.get());
ctor_params.__set_output_array_null_ptr((int64_t)jni_ctx->output_array_null_ptr.get());
ctor_params.__set_output_array_string_offsets_ptr(
(int64_t)jni_ctx->output_array_string_offsets_ptr.get());
ctor_params.__set_output_intermediate_state_ptr(
(int64_t)jni_ctx->output_intermediate_state_ptr.get());
ctor_params.__set_batch_size_ptr((int64_t)jni_ctx->batch_size_ptr.get());

jbyteArray ctor_params_bytes;

// Pushed frame will be popped when jni_frame goes out-of-scope.
Expand Down Expand Up @@ -255,7 +238,6 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
->get_data()
.data());
int64_t value_nested_data_address = 0, value_nested_offset_address = 0;
// array type need pass address: [nullmap_address], offset_address, nested_nullmap_address, nested_data_address/nested_char_address,nested_offset_address
if (value_data_column->is_column_string()) {
const ColumnString* col = assert_cast<const ColumnString*>(value_data_column.get());
value_nested_data_address = reinterpret_cast<int64_t>(col->get_chars().data());
Expand Down Expand Up @@ -364,6 +346,27 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
auto& key_null_map_data =
assert_cast<ColumnVector<UInt8>*>(key_data_column_null_map.get())->get_data();
auto key_nested_nullmap_address = reinterpret_cast<int64_t>(key_null_map_data.data());
ColumnNullable& map_value_column_nullable =
assert_cast<ColumnNullable&>(map_col->get_values());
auto value_data_column_null_map = map_value_column_nullable.get_null_map_column_ptr();
auto value_data_column = map_value_column_nullable.get_nested_column_ptr();
auto& value_null_map_data =
assert_cast<ColumnVector<UInt8>*>(value_data_column_null_map.get())->get_data();
auto value_nested_nullmap_address = reinterpret_cast<int64_t>(value_null_map_data.data());
jmethodID map_size = env->GetMethodID(hashmap_class, "size", "()I");
int element_size = 0; // get all element size in num_rows of map column
for (int i = 0; i < num_rows; ++i) {
jobject obj = env->GetObjectArrayElement(result_obj, i);
if (obj == nullptr) {
continue;
}
element_size = element_size + env->CallIntMethod(obj, map_size);
env->DeleteLocalRef(obj);
}
map_key_column_nullable.resize(element_size);
memset(key_null_map_data.data(), 0, element_size);
map_value_column_nullable.resize(element_size);
memset(value_null_map_data.data(), 0, element_size);
int64_t key_nested_data_address = 0, key_nested_offset_address = 0;
if (key_data_column->is_column_string()) {
ColumnString* str_col = assert_cast<ColumnString*>(key_data_column.get());
Expand All @@ -376,16 +379,7 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
key_nested_data_address =
reinterpret_cast<int64_t>(key_data_column->get_raw_data().data);
}

ColumnNullable& map_value_column_nullable =
assert_cast<ColumnNullable&>(map_col->get_values());
auto value_data_column_null_map = map_value_column_nullable.get_null_map_column_ptr();
auto value_data_column = map_value_column_nullable.get_nested_column_ptr();
auto& value_null_map_data =
assert_cast<ColumnVector<UInt8>*>(value_data_column_null_map.get())->get_data();
auto value_nested_nullmap_address = reinterpret_cast<int64_t>(value_null_map_data.data());
int64_t value_nested_data_address = 0, value_nested_offset_address = 0;
// array type need pass address: [nullmap_address], offset_address, nested_nullmap_address, nested_data_address/nested_char_address,nested_offset_address
if (value_data_column->is_column_string()) {
ColumnString* str_col = assert_cast<ColumnString*>(value_data_column.get());
ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars());
Expand All @@ -397,20 +391,6 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
value_nested_data_address =
reinterpret_cast<int64_t>(value_data_column->get_raw_data().data);
}
jmethodID map_size = env->GetMethodID(hashmap_class, "size", "()I");
int element_size = 0; // get all element size in num_rows of map column
for (int i = 0; i < num_rows; ++i) {
jobject obj = env->GetObjectArrayElement(result_obj, i);
if (obj == nullptr) {
continue;
}
element_size = element_size + env->CallIntMethod(obj, map_size);
env->DeleteLocalRef(obj);
}
map_key_column_nullable.resize(element_size);
memset(key_null_map_data.data(), 0, element_size);
map_value_column_nullable.resize(element_size);
memset(value_null_map_data.data(), 0, element_size);
env->CallNonvirtualVoidMethod(jni_ctx->executor, jni_env->executor_cl,
jni_env->executor_result_map_batch_id, result_nullable,
num_rows, result_obj, nullmap_address, offset_address,
Expand Down
33 changes: 1 addition & 32 deletions be/src/vec/functions/function_java_udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,39 +115,8 @@ class JavaFunctionCall : public IFunctionBase {
jobject executor = nullptr;
bool is_closed = false;

std::unique_ptr<int64_t[]> input_values_buffer_ptr;
std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
std::unique_ptr<int64_t[]> input_offsets_ptrs;
//used for array type nested column null map, because array nested column must be nullable
std::unique_ptr<int64_t[]> input_array_nulls_buffer_ptr;
//used for array type of nested string column offset, not the array column offset
std::unique_ptr<int64_t[]> input_array_string_offsets_ptrs;
std::unique_ptr<int64_t> output_value_buffer;
std::unique_ptr<int64_t> output_null_value;
std::unique_ptr<int64_t> output_offsets_ptr;
//used for array type nested column null map
std::unique_ptr<int64_t> output_array_null_ptr;
//used for array type of nested string column offset
std::unique_ptr<int64_t> output_array_string_offsets_ptr;
std::unique_ptr<int32_t> batch_size_ptr;
// intermediate_state includes two parts: reserved / used buffer size and rows
std::unique_ptr<IntermediateState> output_intermediate_state_ptr;

JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id)
: executor_cl_(executor_cl),
executor_close_id_(executor_close_id),
input_values_buffer_ptr(new int64_t[num_args]),
input_nulls_buffer_ptr(new int64_t[num_args]),
input_offsets_ptrs(new int64_t[num_args]),
input_array_nulls_buffer_ptr(new int64_t[num_args]),
input_array_string_offsets_ptrs(new int64_t[num_args]),
output_value_buffer(new int64_t()),
output_null_value(new int64_t()),
output_offsets_ptr(new int64_t()),
output_array_null_ptr(new int64_t()),
output_array_string_offsets_ptr(new int64_t()),
batch_size_ptr(new int32_t()),
output_intermediate_state_ptr(new IntermediateState()) {}
: executor_cl_(executor_cl), executor_close_id_(executor_close_id) {}

void close() {
if (is_closed) {
Expand Down
Loading