Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions be/src/vec/exprs/table_function/vexplode_json_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <limits>

#include "common/status.h"
#include "util/jsonb_parser.h"
#include "util/jsonb_utils.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
Expand All @@ -50,6 +52,7 @@ Status VExplodeJsonArrayTableFunction<DataImpl>::process_init(Block* block, Runt
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&text_column_idx));
_text_column = block->get_by_position(text_column_idx).column;
_text_datatype = block->get_by_position(text_column_idx).type;
return Status::OK();
}

Expand All @@ -59,17 +62,28 @@ void VExplodeJsonArrayTableFunction<DataImpl>::process_row(size_t row_idx) {

StringRef text = _text_column->get_data_at(row_idx);
if (text.data != nullptr) {
rapidjson::Document document;
document.Parse(text.data, text.size);
if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(document, document.GetArray().Size());
if (WhichDataType(_text_datatype).is_json()) {
JsonbDocument* doc = JsonbDocument::createDocument(text.data, text.size);
if (doc && doc->getValue() && doc->getValue()->isArray()) {
auto* a = (ArrayVal*)doc->getValue();
if (a->numElem() > 0) {
_cur_size = _parsed_data.set_output(*a, a->numElem());
}
}
} else {
rapidjson::Document document;
document.Parse(text.data, text.size);
if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(document, document.GetArray().Size());
}
}
}
}

template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::process_close() {
_text_column = nullptr;
_text_datatype = nullptr;
_parsed_data.reset();
}

Expand Down
151 changes: 151 additions & 0 deletions be/src/vec/exprs/table_function/vexplode_json_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/functions/function_string.h"

namespace doris::vectorized {

Expand All @@ -44,6 +45,7 @@ struct ParsedData {
_values_null_flag.clear();
}
virtual int set_output(rapidjson::Document& document, int value_size) = 0;
virtual int set_output(ArrayVal& array_doc, int value_size) = 0;
virtual void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) = 0;
virtual void insert_many_same_value_from_parsed_data(MutableColumnPtr& column,
Expand Down Expand Up @@ -90,6 +92,36 @@ struct ParsedDataInt : public ParsedData<int64_t> {
}
return value_size;
}
int set_output(ArrayVal& array_doc, int value_size) override {
_values_null_flag.resize(value_size, 0);
_backup_data.resize(value_size);
int i = 0;
for (auto& val : array_doc) {
if (val.isInt8()) {
_backup_data[i] = static_cast<const JsonbInt8Val&>(val).val();
} else if (val.isInt16()) {
_backup_data[i] = static_cast<const JsonbInt16Val&>(val).val();
} else if (val.isInt32()) {
_backup_data[i] = static_cast<const JsonbInt32Val&>(val).val();
} else if (val.isInt64()) {
_backup_data[i] = static_cast<const JsonbInt64Val&>(val).val();
} else if (val.isDouble()) {
auto value = static_cast<const JsonbDoubleVal&>(val).val();
if (value > MAX_VALUE) {
_backup_data[i] = MAX_VALUE;
} else if (value < MIN_VALUE) {
_backup_data[i] = MIN_VALUE;
} else {
_backup_data[i] = long(value);
}
} else {
_values_null_flag[i] = 1;
_backup_data[i] = 0;
}
++i;
}
return value_size;
}

void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) override {
Expand Down Expand Up @@ -121,6 +153,22 @@ struct ParsedDataDouble : public ParsedData<double> {
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_values_null_flag.resize(value_size, 0);
_backup_data.resize(value_size);
int i = 0;
for (auto& val : array_doc) {
if (val.isDouble()) {
_backup_data[i] = static_cast<const JsonbDoubleVal&>(val).val();
} else {
_backup_data[i] = 0;
_values_null_flag[i] = 1;
}
++i;
}
return value_size;
}

void insert_result_from_parsed_data(MutableColumnPtr& column, int64_t cur_offset,
int max_step) override {
assert_cast<ColumnFloat64*>(column.get())
Expand Down Expand Up @@ -220,6 +268,83 @@ struct ParsedDataString : public ParsedDataStringBase {
}
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_data_string_ref.clear();
_backup_data.clear();
_values_null_flag.clear();
int32_t wbytes = 0;
for (auto& val : array_doc) {
switch (val.type()) {
case JsonbType::T_String: {
_backup_data.emplace_back(static_cast<const JsonbStringVal&>(val).getBlob(),
static_cast<const JsonbStringVal&>(val).getBlobLen());
_values_null_flag.emplace_back(false);
break;
// do not set _data_string here.
// Because the address of the string stored in `_backup_data` may
// change each time `emplace_back()` is called.
}
case JsonbType::T_Int8: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt8Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int16: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt16Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int64: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRId64,
static_cast<const JsonbInt64Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Double: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f",
static_cast<const JsonbDoubleVal&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_Int32: {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d",
static_cast<const JsonbInt32Val&>(val).val());
_backup_data.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
break;
}
case JsonbType::T_True:
_backup_data.emplace_back(TRUE_VALUE);
_values_null_flag.emplace_back(false);
break;
case JsonbType::T_False:
_backup_data.emplace_back(FALSE_VALUE);
_values_null_flag.emplace_back(false);
break;
case JsonbType::T_Null:
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
break;
default:
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
break;
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_data` to `_data_string`.
for (auto& str : _backup_data) {
_data_string_ref.emplace_back(str.data(), str.length());
}
return value_size;
}
};

struct ParsedDataJSON : public ParsedDataStringBase {
Expand All @@ -246,6 +371,31 @@ struct ParsedDataJSON : public ParsedDataStringBase {
}
return value_size;
}

int set_output(ArrayVal& array_doc, int value_size) override {
_data_string_ref.clear();
_backup_data.clear();
_values_null_flag.clear();
auto writer = std::make_unique<JsonbWriter>();
for (auto& v : array_doc) {
if (v.isObject()) {
writer->reset();
writer->writeValue(&v);
_backup_data.emplace_back(writer->getOutput()->getBuffer(),
writer->getOutput()->getSize());
_values_null_flag.emplace_back(false);
} else {
_backup_data.emplace_back();
_values_null_flag.emplace_back(true);
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_data` to `_data_string`.
for (auto& str : _backup_data) {
_data_string_ref.emplace_back(str);
}
return value_size;
}
};

template <typename DataImpl>
Expand All @@ -267,6 +417,7 @@ class VExplodeJsonArrayTableFunction final : public TableFunction {
void _insert_values_into_column(MutableColumnPtr& column, int max_step);
DataImpl _parsed_data;
ColumnPtr _text_column;
DataTypePtr _text_datatype;
};

} // namespace doris::vectorized
3 changes: 1 addition & 2 deletions be/src/vec/functions/function_fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ void register_function_fake(SimpleFunctionFactory& factory) {
register_table_function_expand_outer_default<DataTypeInt64>(factory, "explode_json_array_int");
register_table_function_expand_outer_default<DataTypeString>(factory,
"explode_json_array_string");
register_table_function_expand_outer_default<DataTypeString>(factory,
"explode_json_array_json");
register_table_function_expand_outer_default<DataTypeJsonb>(factory, "explode_json_array_json");
register_table_function_expand_outer_default<DataTypeFloat64>(factory,
"explode_json_array_double");
register_table_function_expand_outer_default<DataTypeInt64>(factory, "explode_bitmap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DoubleType;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -36,6 +37,7 @@
public class ExplodeJsonArrayDouble extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(DoubleType.INSTANCE).args(JsonType.INSTANCE),
FunctionSignature.ret(DoubleType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DoubleType;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -36,6 +37,7 @@
public class ExplodeJsonArrayDoubleOuter extends TableGeneratingFunction implements UnaryExpression, AlwaysNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(DoubleType.INSTANCE).args(JsonType.INSTANCE),
FunctionSignature.ret(DoubleType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -36,6 +37,7 @@
public class ExplodeJsonArrayInt extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(BigIntType.INSTANCE).args(JsonType.INSTANCE),
FunctionSignature.ret(BigIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -36,6 +37,7 @@
public class ExplodeJsonArrayIntOuter extends TableGeneratingFunction implements UnaryExpression, AlwaysNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(BigIntType.INSTANCE).args(JsonType.INSTANCE),
FunctionSignature.ret(BigIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.JsonType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -35,7 +35,7 @@
*/
public class ExplodeJsonArrayJson extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT)
FunctionSignature.ret(JsonType.INSTANCE).args(JsonType.INSTANCE)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.JsonType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -35,7 +35,7 @@
*/
public class ExplodeJsonArrayJsonOuter extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT)
FunctionSignature.ret(JsonType.INSTANCE).args(JsonType.INSTANCE)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -35,6 +36,7 @@
public class ExplodeJsonArrayString extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(JsonType.INSTANCE),
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.nereids.trees.expressions.functions.AlwaysNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.JsonType;
import org.apache.doris.nereids.types.VarcharType;

import com.google.common.base.Preconditions;
Expand All @@ -35,6 +36,7 @@
public class ExplodeJsonArrayStringOuter extends TableGeneratingFunction implements UnaryExpression, AlwaysNullable {

public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(JsonType.INSTANCE),
FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT)
);

Expand Down
Loading