Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 185 additions & 74 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "jni_connector.h"

#include <glog/logging.h>
#include <stdint.h>

#include <sstream>
#include <variant>
Expand All @@ -27,13 +26,17 @@
#include "runtime/decimalv2_value.h"
#include "runtime/runtime_state.h"
#include "util/jni-util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"

namespace doris {
class RuntimeProfile;
Expand Down Expand Up @@ -301,24 +304,101 @@ Status JniConnector::_fill_column(ColumnPtr& doris_column, DataTypePtr& data_typ
[[fallthrough]];
case TypeIndex::FixedString:
return _fill_string_column(data_column, num_rows);
case TypeIndex::Array:
return _fill_array_column(data_column, data_type, num_rows);
case TypeIndex::Map:
return _fill_map_column(data_column, data_type, num_rows);
case TypeIndex::Struct:
return _fill_struct_column(data_column, data_type, num_rows);
default:
return Status::InvalidArgument("Unsupported type {} in jni scanner",
getTypeName(logical_type));
}
return Status::OK();
}

Status JniConnector::_fill_array_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
ColumnPtr& element_column = static_cast<ColumnArray&>(*doris_column).get_data_ptr();
DataTypePtr& element_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
->get_nested_type());
ColumnArray::Offsets64& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets();

int64* offsets = reinterpret_cast<int64*>(_next_meta_as_ptr());
size_t origin_size = offsets_data.size();
offsets_data.resize(origin_size + num_rows);
size_t start_offset = offsets_data[origin_size - 1];
for (size_t i = 0; i < num_rows; ++i) {
offsets_data[origin_size + i] = offsets[i] + start_offset;
}

// offsets[num_rows - 1] == offsets_data[origin_size + num_rows - 1] - start_offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this line of code be deleted?

// but num_row equals 0 when there are all empty arrays
return _fill_column(element_column, element_type,
offsets_data[origin_size + num_rows - 1] - start_offset);
}

Status JniConnector::_fill_map_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
auto& map = static_cast<ColumnMap&>(*doris_column);
DataTypePtr& key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type());
DataTypePtr& value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
ColumnPtr& key_column = map.get_keys_ptr();
ColumnPtr& value_column = map.get_values_ptr();
ColumnArray::Offsets64& map_offsets = map.get_offsets();

int64* offsets = reinterpret_cast<int64*>(_next_meta_as_ptr());
size_t origin_size = map_offsets.size();
map_offsets.resize(origin_size + num_rows);
size_t start_offset = map_offsets[origin_size - 1];
for (size_t i = 0; i < num_rows; ++i) {
map_offsets[origin_size + i] = offsets[i] + start_offset;
}

RETURN_IF_ERROR(_fill_column(key_column, key_type,
map_offsets[origin_size + num_rows - 1] - start_offset));
return _fill_column(value_column, value_type,
map_offsets[origin_size + num_rows - 1] - start_offset);
}

Status JniConnector::_fill_struct_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
auto& doris_struct = static_cast<ColumnStruct&>(*doris_column);
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& struct_field = doris_struct.get_column_ptr(i);
DataTypePtr& field_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
RETURN_IF_ERROR(_fill_column(struct_field, field_type, num_rows));
}
return Status::OK();
}

Status JniConnector::_fill_string_column(MutableColumnPtr& doris_column, size_t num_rows) {
if (num_rows == 0) {
return Status::OK();
}
auto& string_col = static_cast<const ColumnString&>(*doris_column);
ColumnString::Chars& string_chars = const_cast<ColumnString::Chars&>(string_col.get_chars());
ColumnString::Offsets& string_offsets =
const_cast<ColumnString::Offsets&>(string_col.get_offsets());
int* offsets = reinterpret_cast<int*>(_next_meta_as_ptr());
char* data = reinterpret_cast<char*>(_next_meta_as_ptr());
std::vector<StringRef> string_values;
string_values.reserve(num_rows);
char* chars = reinterpret_cast<char*>(_next_meta_as_ptr());

size_t origin_chars_size = string_chars.size();
string_chars.resize(origin_chars_size + offsets[num_rows - 1]);
memcpy(string_chars.data() + origin_chars_size, chars, offsets[num_rows - 1]);

size_t origin_offsets_size = string_offsets.size();
size_t start_offset = string_offsets[origin_offsets_size - 1];
string_offsets.resize(origin_offsets_size + num_rows);
for (size_t i = 0; i < num_rows; ++i) {
int start_offset = i == 0 ? 0 : offsets[i - 1];
int end_offset = offsets[i];
string_values.emplace_back(data + start_offset, end_offset - start_offset);
string_offsets[origin_offsets_size + i] = offsets[i] + start_offset;
}
doris_column->insert_many_strings(&string_values[0], num_rows);
return Status::OK();
}

Expand Down Expand Up @@ -418,77 +498,108 @@ std::string JniConnector::get_hive_type(const TypeDescriptor& desc) {
}
}

Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) {
std::vector<long> meta_data;
// insert number of rows
meta_data.emplace_back(block->rows());
for (int i = 0; i < block->columns(); ++i) {
auto& column_with_type_and_name = block->get_by_position(i);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
TypeIndex logical_type = remove_nullable(column_type)->get_type_id();

// insert null map address
MutableColumnPtr data_column;
if (column_ptr->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
column_ptr->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& null_map = nullable_column->get_null_map_data();
meta_data.emplace_back((long)null_map.data());
} else {
meta_data.emplace_back(0);
data_column = column_ptr->assume_mutable();
}

switch (logical_type) {
void JniConnector::_fill_column_meta(ColumnPtr& doris_column, DataTypePtr& data_type,
std::vector<long>& meta_data) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
// insert null map address
MutableColumnPtr data_column;
if (doris_column->is_nullable()) {
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(doris_column->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& null_map = nullable_column->get_null_map_data();
meta_data.emplace_back((long)null_map.data());
} else {
meta_data.emplace_back(0);
data_column = doris_column->assume_mutable();
}
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case NUMERIC_TYPE: { \
meta_data.emplace_back(_get_numeric_data_address<CPP_NUMERIC_TYPE>(data_column)); \
break; \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Decimal128:
[[fallthrough]];
case TypeIndex::Decimal128I: {
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
break;
}
case TypeIndex::Decimal32: {
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
break;
}
case TypeIndex::Decimal64: {
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
break;
}
case TypeIndex::DateV2: {
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
break;
}
case TypeIndex::DateTimeV2: {
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
break;
}
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
auto& string_column = static_cast<ColumnString&>(*data_column);
// inert offsets
meta_data.emplace_back((long)string_column.get_offsets().data());
meta_data.emplace_back((long)string_column.get_chars().data());
break;
}
case TypeIndex::Array:
[[fallthrough]];
case TypeIndex::Struct:
[[fallthrough]];
case TypeIndex::Map:
return Status::IOError("Unhandled type {}", getTypeName(logical_type));
default:
return Status::IOError("Unsupported type {}", getTypeName(logical_type));
case TypeIndex::Decimal128:
[[fallthrough]];
case TypeIndex::Decimal128I: {
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
break;
}
case TypeIndex::Decimal32: {
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
break;
}
case TypeIndex::Decimal64: {
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
break;
}
case TypeIndex::DateV2: {
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
break;
}
case TypeIndex::DateTimeV2: {
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
break;
}
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
auto& string_column = static_cast<ColumnString&>(*data_column);
// inert offsets
meta_data.emplace_back((long)string_column.get_offsets().data());
meta_data.emplace_back((long)string_column.get_chars().data());
break;
}
case TypeIndex::Array: {
ColumnPtr& element_column = static_cast<ColumnArray&>(*data_column).get_data_ptr();
meta_data.emplace_back((long)static_cast<ColumnArray&>(*data_column).get_offsets().data());
DataTypePtr& element_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
->get_nested_type());
_fill_column_meta(element_column, element_type, meta_data);
break;
}
case TypeIndex::Struct: {
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& struct_field = doris_struct.get_column_ptr(i);
DataTypePtr& field_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
_fill_column_meta(struct_field, field_type, meta_data);
}
break;
}
case TypeIndex::Map: {
auto& map = static_cast<ColumnMap&>(*data_column);
DataTypePtr& key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_key_type());
DataTypePtr& value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
ColumnPtr& key_column = map.get_keys_ptr();
ColumnPtr& value_column = map.get_values_ptr();
meta_data.emplace_back((long)map.get_offsets().data());
_fill_column_meta(key_column, key_type, meta_data);
_fill_column_meta(value_column, value_type, meta_data);
break;
}
default:
return;
}
}

Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) {
std::vector<long> meta_data;
// insert number of rows
meta_data.emplace_back(block->rows());
for (int i = 0; i < block->columns(); ++i) {
auto& column_with_type_and_name = block->get_by_position(i);
_fill_column_meta(column_with_type_and_name.column, column_with_type_and_name.type,
meta_data);
}

meta.reset(new long[meta_data.size()]);
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ class JniConnector {

Status _fill_column(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_rows);

Status _fill_map_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

Status _fill_array_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

Status _fill_struct_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

static void _fill_column_meta(ColumnPtr& doris_column, DataTypePtr& data_type,
std::vector<long>& meta_data);

template <typename CppType>
Status _fill_numeric_column(MutableColumnPtr& doris_column, CppType* ptr, size_t num_rows) {
auto& column_data = static_cast<ColumnVector<CppType>&>(*doris_column).get_data();
Expand Down
Loading