Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "exprs/es_functions.h"
#include "exprs/timestamp_functions.h"
#include "exprs/decimal_operators.h"
#include "exprs/decimalv2_operators.h"
#include "exprs/utility_functions.h"
#include "exprs/json_functions.h"
#include "exprs/hll_hash_function.h"
Expand Down Expand Up @@ -182,6 +183,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
EncryptionFunctions::init();
TimestampFunctions::init();
DecimalOperators::init();
DecimalV2Operators::init();
UtilityFunctions::init();
CompoundPredicate::init();
JsonFunctions::init();
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ uint32_t HashTable::hash_variable_len_row() {
StringValue* str = reinterpret_cast<StringValue*>(loc);
hash = HashUtil::hash(str->ptr, str->len, hash);
}
} else if (_build_expr_ctxs[i]->root()->type().is_decimal_type()) {
} else if (_build_expr_ctxs[i]->root()->type().type == TYPE_DECIMAL) {
void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
if (_expr_value_null_bits[i]) {
// Hash the null random seed values at 'loc'
Expand All @@ -169,7 +169,7 @@ uint32_t HashTable::hash_variable_len_row() {
hash = decimal->hash(hash);
}
}

}

return hash;
Expand Down Expand Up @@ -410,7 +410,7 @@ Function* HashTable::codegen_eval_tuple_row(RuntimeState* state, bool build) {
for (int i = 0; i < ctxs.size(); ++i) {
PrimitiveType type = ctxs[i]->root()->type().type;
if (type == TYPE_DATE || type == TYPE_DATETIME
|| type == TYPE_DECIMAL || type == TYPE_CHAR) {
|| type == TYPE_DECIMAL || type == TYPE_CHAR || type == TYPE_DECIMALV2) {
return NULL;
}
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/olap_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ void ColumnValueRange<DecimalValue>::convert_to_fixed_value() {
return;
}

template<>
void ColumnValueRange<DecimalV2Value>::convert_to_fixed_value() {
return;
}

template<>
void ColumnValueRange<__int128>::convert_to_fixed_value() {
return;
Expand Down Expand Up @@ -147,6 +152,7 @@ Status DorisScanRange::init() {
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_DATE:
case TYPE_DATETIME:
break;
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ typedef boost::variant <
ColumnValueRange<__int128>,
ColumnValueRange<StringValue>,
ColumnValueRange<DateTimeValue>,
ColumnValueRange<DecimalValue> > ColumnValueRangeType;
ColumnValueRange<DecimalValue>,
ColumnValueRange<DecimalV2Value> > ColumnValueRangeType;

class DorisScanRange {
public:
Expand Down Expand Up @@ -388,6 +389,9 @@ void ColumnValueRange<StringValue>::convert_to_fixed_value();
template<>
void ColumnValueRange<DecimalValue>::convert_to_fixed_value();

template<>
void ColumnValueRange<DecimalV2Value>::convert_to_fixed_value();

template<>
void ColumnValueRange<__int128>::convert_to_fixed_value();

Expand Down
22 changes: 22 additions & 0 deletions be/src/exec/olap_rewrite_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ Status OlapRewriteNode::prepare(RuntimeState* state) {
new RowBatch(child(0)->row_desc(), state->batch_size(), state->fragment_mem_tracker()));

_max_decimal_val.resize(_column_types.size());
_max_decimalv2_val.resize(_column_types.size());
for (int i = 0; i < _column_types.size(); ++i) {
if (_column_types[i].type == TPrimitiveType::DECIMAL) {
_max_decimal_val[i].to_max_decimal(
_column_types[i].precision, _column_types[i].scale);
} else if (_column_types[i].type == TPrimitiveType::DECIMALV2) {
_max_decimalv2_val[i].to_max_decimal(
_column_types[i].precision, _column_types[i].scale);
}
}
return Status::OK;
Expand Down Expand Up @@ -179,6 +183,24 @@ bool OlapRewriteNode::copy_one_row(TupleRow* src_row, Tuple* tuple,
}
break;
}
case TPrimitiveType::DECIMALV2: {
DecimalV2Value* dec_val = (DecimalV2Value*)src_value;
DecimalV2Value* dst_val = (DecimalV2Value*)tuple->get_slot(slot_desc->tuple_offset());
if (dec_val->greater_than_scale(column_type.scale)) {
int code = dec_val->round(dst_val, column_type.scale, HALF_UP);
if (code != E_DEC_OK) {
(*ss) << "round one decimal failed.value=" << dec_val->to_string();
return false;
}
} else {
*reinterpret_cast<PackedInt128*>(dst_val) =
*reinterpret_cast<const PackedInt128*>(dec_val);
}
if (*dst_val > _max_decimalv2_val[i]) {
dst_val->to_max_decimal(column_type.precision, column_type.scale);
}
break;
}
default: {
void* dst_val = (void*)tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(src_value, dst_val, slot_desc->type(), pool);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_rewrite_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class OlapRewriteNode : public ExecNode {
TupleDescriptor* _output_tuple_desc;

std::vector<DecimalValue> _max_decimal_val;
std::vector<DecimalV2Value> _max_decimalv2_val;
};

}
Expand Down
14 changes: 14 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,17 @@ Status OlapScanNode::normalize_conjuncts() {
break;
}

case TYPE_DECIMALV2: {
DecimalV2Value min = DecimalV2Value::get_min_decimal();
DecimalV2Value max = DecimalV2Value::get_max_decimal();
ColumnValueRange<DecimalV2Value> range(slots[slot_idx]->col_name(),
slots[slot_idx]->type().type,
min,
max);
normalize_predicate(range, slots[slot_idx]);
break;
}

default: {
VLOG(2) << "Unsupport Normalize Slot [ColName="
<< slots[slot_idx]->col_name() << "]";
Expand Down Expand Up @@ -739,6 +750,7 @@ Status OlapScanNode::normalize_in_predicate(SlotDescriptor* slot, ColumnValueRan
break;
}
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_LARGEINT:
case TYPE_CHAR:
case TYPE_VARCHAR:
Expand Down Expand Up @@ -807,6 +819,7 @@ Status OlapScanNode::normalize_in_predicate(SlotDescriptor* slot, ColumnValueRan
break;
}
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
Expand Down Expand Up @@ -919,6 +932,7 @@ Status OlapScanNode::normalize_binary_predicate(SlotDescriptor* slot, ColumnValu
break;
}
case TYPE_DECIMAL:
case TYPE_DECIMALV2:
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_HLL:
Expand Down
10 changes: 10 additions & 0 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ void OlapScanner::_convert_row_to_tuple(Tuple* tuple) {
*slot = DecimalValue(int_value, frac_value);
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value *slot = tuple->get_decimalv2_slot(slot_desc->tuple_offset());

int64_t int_value = *(int64_t*)(ptr);
int32_t frac_value = *(int32_t*)(ptr + sizeof(int64_t));
if (!slot->from_olap_decimal(int_value, frac_value)) {
tuple->set_null(slot_desc->null_indicator_offset());
}
break;
}
case TYPE_DATETIME: {
DateTimeValue *slot = tuple->get_datetime_slot(slot_desc->tuple_offset());
uint64_t value = *reinterpret_cast<uint64_t*>(ptr);
Expand Down
46 changes: 46 additions & 0 deletions be/src/exec/olap_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ Status OlapTableSink::prepare(RuntimeState* state) {

_max_decimal_val.resize(_output_tuple_desc->slots().size());
_min_decimal_val.resize(_output_tuple_desc->slots().size());

_max_decimalv2_val.resize(_output_tuple_desc->slots().size());
_min_decimalv2_val.resize(_output_tuple_desc->slots().size());
// check if need validate batch
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
auto slot = _output_tuple_desc->slots()[i];
Expand All @@ -470,6 +473,11 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_min_decimal_val[i].to_min_decimal(slot->type().precision, slot->type().scale);
_need_validate_data = true;
break;
case TYPE_DECIMALV2:
_max_decimalv2_val[i].to_max_decimal(slot->type().precision, slot->type().scale);
_min_decimalv2_val[i].to_min_decimal(slot->type().precision, slot->type().scale);
_need_validate_data = true;
break;
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_DATE:
Expand Down Expand Up @@ -716,6 +724,44 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
LOG(INFO) << ss.str();
#else
state->append_error_msg_to_file("", ss.str());
#endif
filtered_rows++;
row_valid = false;
filter_bitmap->Set(row_no, true);
continue;
}
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value dec_val(reinterpret_cast<const PackedInt128*>(slot)->value);
if (dec_val.greater_than_scale(desc->type().scale)) {
int code = dec_val.round(&dec_val, desc->type().scale, HALF_UP);
reinterpret_cast<PackedInt128*>(slot)->value = dec_val.value();
if (code != E_DEC_OK) {
std::stringstream ss;
ss << "round one decimal failed.value=" << dec_val.to_string();
#if BE_TEST
LOG(INFO) << ss.str();
#else
state->append_error_msg_to_file("", ss.str());
#endif

filtered_rows++;
row_valid = false;
filter_bitmap->Set(row_no, true);
continue;
}
}
if (dec_val > _max_decimalv2_val[i] || dec_val < _min_decimalv2_val[i]) {
std::stringstream ss;
ss << "decimal value is not valid for defination, column=" << desc->col_name()
<< ", value=" << dec_val.to_string()
<< ", precision=" << desc->type().precision
<< ", scale=" << desc->type().scale;
#if BE_TEST
LOG(INFO) << ss.str();
#else
state->append_error_msg_to_file("", ss.str());
#endif
filtered_rows++;
row_valid = false;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_table_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ class OlapTableSink : public DataSink {
std::vector<DecimalValue> _max_decimal_val;
std::vector<DecimalValue> _min_decimal_val;

std::vector<DecimalV2Value> _max_decimalv2_val;
std::vector<DecimalV2Value> _min_decimalv2_val;

// Stats for this
int64_t _convert_batch_ns = 0;
int64_t _validate_data_ns = 0;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ inline CompareLargeFunc get_compare_func(PrimitiveType type) {
case TYPE_DECIMAL:
return compare_large<DecimalValue>;

case TYPE_DECIMALV2:
return compare_large<DecimalV2Value>;

case TYPE_CHAR:
case TYPE_VARCHAR:
return compare_large<StringValue>;
Expand Down Expand Up @@ -182,6 +185,7 @@ inline int get_olap_size(PrimitiveType type) {
return 8;
}

case TYPE_DECIMALV2:
case TYPE_DECIMAL: {
return 12;
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/partitioned_aggregation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ llvm::Function* PartitionedAggregationNode::codegen_update_slot(
break;
}
case AggFnEvaluator::SUM:
if (slot_desc->type().type != TYPE_DECIMAL) {
if (slot_desc->type().type != TYPE_DECIMAL && slot_desc->type().type != TYPE_DECIMALV2) {
if (slot_desc->type().type == TYPE_FLOAT ||
slot_desc->type().type == TYPE_DOUBLE) {
result = builder.CreateFAdd(dst_value, src.GetVal());
Expand All @@ -1298,7 +1298,7 @@ llvm::Function* PartitionedAggregationNode::codegen_update_slot(
}
break;
}
DCHECK_EQ(slot_desc->type().type, TYPE_DECIMAL);
DCHECK(slot_desc->type().type == TYPE_DECIMAL || slot_desc->type().type == TYPE_DECIMALV2);
// Fall through to xcompiled case
case AggFnEvaluator::AVG:
case AggFnEvaluator::NDV: {
Expand Down Expand Up @@ -1422,6 +1422,11 @@ Function* PartitionedAggregationNode::codegen_update_tuple() {
op == AggFnEvaluator::NDV)) {
supported = false;
}
if (type == TYPE_DECIMALV2 &&
!(op == AggFnEvaluator::SUM || op == AggFnEvaluator::AVG ||
op == AggFnEvaluator::NDV)) {
supported = false;
}
if (!supported) {
VLOG_QUERY << "Could not codegen update_tuple because intermediate type "
<< slot_desc->type()
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/pre_aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,10 @@ Status PreAggregationNode::update_agg_row(TupleRow* agg_row, TupleRow* probe_row
UpdateMinSlot<DecimalValue>(slot, value);
break;

case TYPE_DECIMALV2:
UpdateMinSlot<DecimalV2Value>(slot, value);
break;

default:
LOG(WARNING) << "invalid type: " << type_to_string(agg_expr->type());
return Status("unknown type");
Expand Down Expand Up @@ -593,6 +597,10 @@ Status PreAggregationNode::update_agg_row(TupleRow* agg_row, TupleRow* probe_row
UpdateMaxSlot<DecimalValue>(slot, value);
break;

case TYPE_DECIMALV2:
UpdateMaxSlot<DecimalV2Value>(slot, value);
break;

default:
LOG(WARNING) << "invalid type: " << type_to_string(agg_expr->type());
return Status("unknown type");
Expand All @@ -614,6 +622,10 @@ Status PreAggregationNode::update_agg_row(TupleRow* agg_row, TupleRow* probe_row
UpdateSumSlot<DecimalValue>(slot, value);
break;

case TYPE_DECIMALV2:
UpdateSumSlot<DecimalV2Value>(slot, value);
break;

default:
LOG(WARNING) << "invalid type: " << type_to_string(agg_expr->type());
return Status("Aggsum not valid.");
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ std::string SchemaColumnsScanner::type_to_string(TColumnDesc &desc) {
return "date";
case TPrimitiveType::DATETIME:
return "datetime";
case TPrimitiveType::DECIMALV2:
case TPrimitiveType::DECIMAL: {
std::stringstream stream;
stream << "decimal(";
Expand Down
15 changes: 15 additions & 0 deletions be/src/exec/text_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
#include <boost/algorithm/string.hpp>

#include "runtime/decimal_value.h"
#include "runtime/decimalv2_value.h"
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/datetime_value.h"
#include "runtime/tuple.h"
#include "util/string_parser.hpp"
#include "util/types.h"
#include "olap/utils.h"

namespace doris {
Expand Down Expand Up @@ -162,6 +164,19 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc,
break;
}

case TYPE_DECIMALV2: {
DecimalV2Value decimal_slot;

if (decimal_slot.parse_from_str(data, len)) {
parse_result = StringParser::PARSE_FAILURE;
}

*reinterpret_cast<PackedInt128*>(slot) =
*reinterpret_cast<const PackedInt128*>(&decimal_slot);

break;
}

default:
DCHECK(false) << "bad slot type: " << slot_desc->type();
break;
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ add_library(Exprs
conditional_functions.cpp
conditional_functions_ir.cpp
decimal_operators.cpp
decimalv2_operators.cpp
es_functions.cpp
literal.cpp
expr.cpp
Expand Down
Loading