Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions be/src/vec/exprs/table_function/vexplode_json_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <limits>

#include "common/status.h"
#include "util/jsonb_parser.h"
#include "util/jsonb_utils.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
Expand All @@ -50,6 +52,7 @@ Status VExplodeJsonArrayTableFunction<DataImpl>::process_init(Block* block, Runt
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&text_column_idx));
_text_column = block->get_by_position(text_column_idx).column;
_text_datatype = remove_nullable(block->get_by_position(text_column_idx).type);
return Status::OK();
}

Expand All @@ -59,17 +62,28 @@ void VExplodeJsonArrayTableFunction<DataImpl>::process_row(size_t row_idx) {

StringRef text = _text_column->get_data_at(row_idx);
if (text.data != nullptr) {
rapidjson::Document document;
document.Parse(text.data, text.size);
if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(document, document.GetArray().Size());
if (WhichDataType(_text_datatype).is_json()) {
JsonbDocument* doc = JsonbDocument::createDocument(text.data, text.size);
if (doc && doc->getValue() && doc->getValue()->isArray()) {
auto* a = (ArrayVal*)doc->getValue();
if (a->numElem() > 0) {
_cur_size = _parsed_data.set_output(*a, a->numElem());
}
}
} else {
rapidjson::Document document;
document.Parse(text.data, text.size);
if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(document, document.GetArray().Size());
}
}
}
}

template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::process_close() {
_text_column = nullptr;
_text_datatype = nullptr;
_parsed_data.reset();
}

Expand Down Expand Up @@ -141,4 +155,4 @@ template class VExplodeJsonArrayTableFunction<ParsedDataDouble>;
template class VExplodeJsonArrayTableFunction<ParsedDataString>;
template class VExplodeJsonArrayTableFunction<ParsedDataJSON>;

} // namespace doris::vectorized
} // namespace doris::vectorized
151 changes: 151 additions & 0 deletions be/src/vec/exprs/table_function/vexplode_json_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/functions/function_string.h"

namespace doris::vectorized {

Expand All @@ -44,6 +45,7 @@ struct ParsedData {
_values_null_flag.clear();
}
virtual int set_output(rapidjson::Document& document, int value_size) = 0;
virtual int set_output(ArrayVal& array_doc, int value_size) = 0;
virtual void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) = 0;
virtual void insert_many_same_value_from_parsed_data(MutableColumnPtr& column,
Expand Down Expand Up @@ -90,6 +92,36 @@ struct ParsedDataInt : public ParsedData<int64_t> {
}
return value_size;
}
int set_output(ArrayVal& array_doc, int value_size) override {
_values_null_flag.resize(value_size, 0);
_backup_data.resize(value_size);
int i = 0;
for (auto& val : array_doc) {
if (val.isInt8()) {
_backup_data[i] = static_cast<const JsonbInt8Val&>(val).val();
} else if (val.isInt16()) {
_backup_data[i] = static_cast<const JsonbInt16Val&>(val).val();
} else if (val.isInt32()) {
_backup_data[i] = static_cast<const JsonbInt32Val&>(val).val();
} else if (val.isInt64()) {
_backup_data[i] = static_cast<const JsonbInt64Val&>(val).val();
} else if (val.isDouble()) {
auto value = static_cast<const JsonbDoubleVal&>(val).val();
if (value > MAX_VALUE) {
_backup_data[i] = MAX_VALUE;
} else if (value < MIN_VALUE) {
_backup_data[i] = MIN_VALUE;
} else {
_backup_data[i] = long(value);
}
} else {
_values_null_flag[i] = 1;
_backup_data[i] = 0;
}
++i;
}
return value_size;
}

void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) override {
Expand Down Expand Up @@ -121,6 +153,22 @@ struct ParsedDataDouble : public ParsedData<double> {
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_values_null_flag.resize(value_size, 0);
_backup_data.resize(value_size);
int i = 0;
for (auto& val : array_doc) {
if (val.isDouble()) {
_backup_data[i] = static_cast<const JsonbDoubleVal&>(val).val();
} else {
_backup_data[i] = 0;
_values_null_flag[i] = 1;
}
++i;
}
return value_size;
}

void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) override {
assert_cast<ColumnFloat64*>(column.get())
Expand Down Expand Up @@ -220,6 +268,83 @@ struct ParsedDataString : public ParsedDataStringBase {
}
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_data_string_ref.clear();
_backup_data.clear();
_values_null_flag.clear();
int32_t wbytes = 0;
for (auto& val : array_doc) {
switch (val.type()) {
case JsonbType::T_String: {
_backup_data.emplace_back(static_cast<const JsonbStringVal&>(val).getBlob(),
static_cast<const JsonbStringVal&>(val).getBlobLen());
_values_null_flag.emplace_back(false);
break;
// do not set _data_string here.
// Because the address of the string stored in `_backup_data` may
// change each time `emplace_back()` is called.
}
case JsonbType::T_Int8: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt8Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int16: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt16Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int64: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRId64,
static_cast<const JsonbInt64Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Double: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f",
static_cast<const JsonbDoubleVal&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int32: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt32Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_True:
_backup_data.emplace_back(TRUE_VALUE);
_values_null_flag.emplace_back(false);
break;
case JsonbType::T_False:
_backup_data.emplace_back(FALSE_VALUE);
_values_null_flag.emplace_back(false);
break;
case JsonbType::T_Null:
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
break;
default:
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
break;
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_data` to `_data_string`.
for (auto& str : _backup_data) {
_data_string_ref.emplace_back(str.data(), str.length());
}
return value_size;
}
};

struct ParsedDataJSON : public ParsedDataStringBase {
Expand All @@ -246,6 +371,31 @@ struct ParsedDataJSON : public ParsedDataStringBase {
}
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_data_string_ref.clear();
_backup_data.clear();
_values_null_flag.clear();
auto writer = std::make_unique<JsonbWriter>();
for (auto& v : array_doc) {
if (v.isObject()) {
writer->reset();
writer->writeValue(&v);
_backup_data.emplace_back(writer->getOutput()->getBuffer(),
writer->getOutput()->getSize());
_values_null_flag.emplace_back(false);
} else {
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_data` to `_data_string`.
for (auto& str : _backup_data) {
_data_string_ref.emplace_back(str);
}
return value_size;
}
};

template <typename DataImpl>
Expand All @@ -267,6 +417,7 @@ class VExplodeJsonArrayTableFunction final : public TableFunction {
void _insert_values_into_column(MutableColumnPtr& column, int max_step);
DataImpl _parsed_data;
ColumnPtr _text_column;
DataTypePtr _text_datatype;
};

} // namespace doris::vectorized
52 changes: 36 additions & 16 deletions be/src/vec/functions/function_fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,24 @@

namespace doris::vectorized {

template <typename ReturnType, bool AlwaysNullable = false>
template <typename ReturnType, bool AlwaysNullable = false, bool VARIADIC = false>
struct FunctionFakeBaseImpl {
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
if constexpr (AlwaysNullable) {
return make_nullable(std::make_shared<ReturnType>());
}
return std::make_shared<ReturnType>();
}
static DataTypes get_variadic_argument_types() {
if constexpr (VARIADIC) {
if constexpr (AlwaysNullable) {
return {make_nullable(std::make_shared<ReturnType>())};
}
return {std::make_shared<ReturnType>()};
} else {
return {};
}
}
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

Expand All @@ -55,6 +65,7 @@ struct FunctionExplode {
return make_nullable(
check_and_get_data_type<DataTypeArray>(arguments[0].get())->get_nested_type());
}
static DataTypes get_variadic_argument_types() { return {}; }
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

Expand All @@ -67,6 +78,7 @@ struct FunctionExplodeMap {
fieldTypes[1] = check_and_get_data_type<DataTypeMap>(arguments[0].get())->get_value_type();
return make_nullable(std::make_shared<vectorized::DataTypeStruct>(fieldTypes));
}
static DataTypes get_variadic_argument_types() { return {}; }
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

Expand All @@ -80,13 +92,15 @@ struct FunctionExplodeJsonObject {
fieldTypes[1] = make_nullable(std::make_shared<DataTypeJsonb>());
return make_nullable(std::make_shared<vectorized::DataTypeStruct>(fieldTypes));
}
static DataTypes get_variadic_argument_types() { return {}; }
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

struct FunctionEsquery {
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
return FunctionFakeBaseImpl<DataTypeUInt8>::get_return_type_impl(arguments);
}
static DataTypes get_variadic_argument_types() { return {}; }
static std::string get_error_msg() { return "esquery only supported on es table"; }
};

Expand All @@ -102,22 +116,25 @@ void register_table_function_expand(SimpleFunctionFactory& factory, const std::s
factory.register_function<FunctionFake<FunctionImpl>>(name + suffix);
};

template <typename ReturnType>
template <typename ReturnType, bool VARIADIC>
void register_table_function_expand_default(SimpleFunctionFactory& factory, const std::string& name,
const std::string& suffix) {
factory.register_function<FunctionFake<FunctionFakeBaseImpl<ReturnType>>>(name);
factory.register_function<FunctionFake<FunctionFakeBaseImpl<ReturnType, true>>>(name + suffix);
factory.register_function<FunctionFake<FunctionFakeBaseImpl<ReturnType, false, VARIADIC>>>(
name);
factory.register_function<FunctionFake<FunctionFakeBaseImpl<ReturnType, true, VARIADIC>>>(
name + suffix);
};

template <typename FunctionImpl>
void register_table_function_expand_outer(SimpleFunctionFactory& factory, const std::string& name) {
register_table_function_expand<FunctionImpl>(factory, name, COMBINATOR_SUFFIX_OUTER);
};

template <typename ReturnType>
template <typename ReturnType, bool VARIADIC>
void register_table_function_expand_outer_default(SimpleFunctionFactory& factory,
const std::string& name) {
register_table_function_expand_default<ReturnType>(factory, name, COMBINATOR_SUFFIX_OUTER);
register_table_function_expand_default<ReturnType, VARIADIC>(factory, name,
COMBINATOR_SUFFIX_OUTER);
};

void register_function_fake(SimpleFunctionFactory& factory) {
Expand All @@ -127,16 +144,19 @@ void register_function_fake(SimpleFunctionFactory& factory) {
register_table_function_expand_outer<FunctionExplodeMap>(factory, "explode_map");

register_table_function_expand_outer<FunctionExplodeJsonObject>(factory, "explode_json_object");
register_table_function_expand_outer_default<DataTypeString>(factory, "explode_split");
register_table_function_expand_outer_default<DataTypeInt32>(factory, "explode_numbers");
register_table_function_expand_outer_default<DataTypeInt64>(factory, "explode_json_array_int");
register_table_function_expand_outer_default<DataTypeString>(factory,
"explode_json_array_string");
register_table_function_expand_outer_default<DataTypeString>(factory,
"explode_json_array_json");
register_table_function_expand_outer_default<DataTypeFloat64>(factory,
"explode_json_array_double");
register_table_function_expand_outer_default<DataTypeInt64>(factory, "explode_bitmap");
register_table_function_expand_outer_default<DataTypeString, false>(factory, "explode_split");
register_table_function_expand_outer_default<DataTypeInt32, false>(factory, "explode_numbers");
register_table_function_expand_outer_default<DataTypeInt64, false>(factory,
"explode_json_array_int");
register_table_function_expand_outer_default<DataTypeString, false>(
factory, "explode_json_array_string");
register_table_function_expand_outer_default<DataTypeJsonb, true>(factory,
"explode_json_array_json");
register_table_function_expand_outer_default<DataTypeString, true>(factory,
"explode_json_array_json");
register_table_function_expand_outer_default<DataTypeFloat64, false>(
factory, "explode_json_array_double");
register_table_function_expand_outer_default<DataTypeInt64, false>(factory, "explode_bitmap");
}

} // namespace doris::vectorized
5 changes: 5 additions & 0 deletions be/src/vec/functions/function_fake.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Block;
} // namespace doris

namespace doris::vectorized {

// FunctionFake is use for some function call expr only work at prepare/open phase, do not support execute().
template <typename Impl>
class FunctionFake : public IFunction {
Expand All @@ -55,6 +56,10 @@ class FunctionFake : public IFunction {

bool use_default_implementation_for_nulls() const override { return true; }

DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}

bool use_default_implementation_for_constants() const override { return false; }

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
Expand Down
Loading