Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "exprs/bitmap_function.h"
#include "exprs/cast_functions.h"
#include "exprs/compound_predicate.h"
#include "exprs/decimal_operators.h"
#include "exprs/decimalv2_operators.h"
#include "exprs/encryption_functions.h"
#include "exprs/es_functions.h"
Expand Down Expand Up @@ -250,7 +249,6 @@ void Daemon::init(int argc, char** argv, const std::vector<StorePath>& paths) {
MathFunctions::init();
EncryptionFunctions::init();
TimestampFunctions::init();
DecimalOperators::init();
DecimalV2Operators::init();
TimeOperators::init();
UtilityFunctions::init();
Expand Down
8 changes: 0 additions & 8 deletions be/src/exec/es/es_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ std::string ExtLiteral::value_to_string() {
case TYPE_BOOLEAN:
ss << std::to_string(get_bool());
break;
case TYPE_DECIMAL:
ss << get_decimal_string();
break;
case TYPE_DECIMALV2:
ss << get_decimalv2_string();
break;
Expand Down Expand Up @@ -158,11 +155,6 @@ bool ExtLiteral::get_bool() {
return *(reinterpret_cast<bool*>(_value));
}

std::string ExtLiteral::get_decimal_string() {
DCHECK(_type == TYPE_DECIMAL);
return reinterpret_cast<DecimalValue*>(_value)->to_string();
}

std::string ExtLiteral::get_decimalv2_string() {
DCHECK(_type == TYPE_DECIMALV2);
return reinterpret_cast<DecimalV2Value*>(_value)->to_string();
Expand Down
17 changes: 0 additions & 17 deletions be/src/exec/es_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,14 +635,6 @@ bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLitera
break;
}

case TYPE_DECIMAL: {
node_type = (TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(reinterpret_cast<DecimalValue*>(value)->to_string());
literal->__set_decimal_literal(decimal_literal);
break;
}

case TYPE_DATE:
case TYPE_DATETIME: {
node_type = (TExprNodeType::DATE_LITERAL);
Expand Down Expand Up @@ -861,15 +853,6 @@ Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
break;
}
case TYPE_DECIMAL: {
if (val_idx >= col.binary_vals.size()) {
return Status::InternalError(
strings::Substitute(ERROR_INVALID_COL_DATA, "DECIMAL"));
}
const string& val = col.binary_vals[val_idx];
*reinterpret_cast<DecimalValue*>(slot) = *reinterpret_cast<const DecimalValue*>(&val);
break;
}
default:
DCHECK(false);
}
Expand Down
9 changes: 0 additions & 9 deletions be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,6 @@ uint32_t HashTable::hash_variable_len_row() {
StringValue* str = reinterpret_cast<StringValue*>(loc);
hash = HashUtil::hash(str->ptr, str->len, hash);
}
} else if (_build_expr_ctxs[i]->root()->type().type == TYPE_DECIMAL) {
void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
if (_expr_value_null_bits[i]) {
// Hash the null random seed values at 'loc'
hash = HashUtil::hash(loc, sizeof(StringValue), hash);
} else {
DecimalValue* decimal = reinterpret_cast<DecimalValue*>(loc);
hash = decimal->hash(hash);
}
}
}

Expand Down
174 changes: 82 additions & 92 deletions be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,19 @@ void ODBCConnector::_init_profile(doris::RuntimeProfile* profile) {

Status ODBCConnector::init_to_write(doris::RuntimeProfile* profile) {
if (!_is_open) {
return Status::InternalError( "Init before open.");
return Status::InternalError("Init before open.");
}

_init_profile(profile);
// Allocate a statement handle
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt), "alloc statement");
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLAllocHandle(SQL_HANDLE_STMT, _dbc, &_stmt),
"alloc statement");

return Status::OK();
}

Status ODBCConnector::append(const std::string& table_name, RowBatch *batch, uint32_t start_send_row, uint32* num_rows_sent) {
Status ODBCConnector::append(const std::string& table_name, RowBatch* batch,
uint32_t start_send_row, uint32* num_rows_sent) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
{
Expand All @@ -218,99 +220,86 @@ Status ODBCConnector::append(const std::string& table_name, RowBatch *batch, uin
if (j != 0) {
fmt::format_to(_insert_stmt_buffer, "{}", ", ");
}
void *item = _output_expr_ctxs[j]->get_value(row);
void* item = _output_expr_ctxs[j]->get_value(row);
if (item == nullptr) {
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
continue;
}
switch (_output_expr_ctxs[j]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int8_t *>(item));
break;
case TYPE_SMALLINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int16_t *>(item));
break;
case TYPE_INT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int32_t *>(item));
break;
case TYPE_BIGINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int64_t *>(item));
break;
case TYPE_FLOAT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<float *>(item));
break;
case TYPE_DOUBLE:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<double *>(item));
break;
case TYPE_DATE:
case TYPE_DATETIME: {
char buf[64];
const auto *time_val = (const DateTimeValue *) (item);
time_val->to_string(buf);
fmt::format_to(_insert_stmt_buffer, "'{}'", buf);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR: {
const auto *string_val = (const StringValue *) (item);

if (string_val->ptr == NULL) {
if (string_val->len == 0) {
fmt::format_to(_insert_stmt_buffer, "{}", "''");
} else {
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
}
} else {
fmt::format_to(_insert_stmt_buffer, "'{}'",
fmt::basic_string_view(string_val->ptr, string_val->len));
}
break;
}
case TYPE_DECIMAL: {
const auto *decimal_val = reinterpret_cast<const DecimalValue *>(item);
std::string decimal_str;
int output_scale = _output_expr_ctxs[j]->root()->output_scale();

if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val->to_string(output_scale);
} else {
decimal_str = decimal_val->to_string();
}
fmt::format_to(_insert_stmt_buffer, "{}", decimal_str);
break;
}
case TYPE_DECIMALV2: {
const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128 *>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[j]->root()->output_scale();
case TYPE_BOOLEAN:
case TYPE_TINYINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int8_t*>(item));
break;
case TYPE_SMALLINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int16_t*>(item));
break;
case TYPE_INT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int32_t*>(item));
break;
case TYPE_BIGINT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<int64_t*>(item));
break;
case TYPE_FLOAT:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<float*>(item));
break;
case TYPE_DOUBLE:
fmt::format_to(_insert_stmt_buffer, "{}", *static_cast<double*>(item));
break;
case TYPE_DATE:
case TYPE_DATETIME: {
char buf[64];
const auto* time_val = (const DateTimeValue*)(item);
time_val->to_string(buf);
fmt::format_to(_insert_stmt_buffer, "'{}'", buf);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR: {
const auto* string_val = (const StringValue*)(item);

if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val.to_string(output_scale);
if (string_val->ptr == NULL) {
if (string_val->len == 0) {
fmt::format_to(_insert_stmt_buffer, "{}", "''");
} else {
decimal_str = decimal_val.to_string();
fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
}
fmt::format_to(_insert_stmt_buffer, "{}", decimal_str);
break;
}
case TYPE_LARGEINT: {
char buf[48];
int len = 48;
char *v = LargeIntValue::to_string(reinterpret_cast<const PackedInt128 *>(item)->value,
buf, &len);
fmt::format_to(_insert_stmt_buffer, "{}", std::string(v, len));
break;
} else {
fmt::format_to(_insert_stmt_buffer, "'{}'",
fmt::basic_string_view(string_val->ptr, string_val->len));
}
default: {
fmt::memory_buffer err_out;
fmt::format_to(err_out, "can't convert this type to mysql type. type = {}",
_output_expr_ctxs[j]->root()->type().type);
return Status::InternalError(err_out.data());
break;
}
case TYPE_DECIMALV2: {
const DecimalV2Value decimal_val(
reinterpret_cast<const PackedInt128*>(item)->value);
std::string decimal_str;
int output_scale = _output_expr_ctxs[j]->root()->output_scale();

if (output_scale > 0 && output_scale <= 30) {
decimal_str = decimal_val.to_string(output_scale);
} else {
decimal_str = decimal_val.to_string();
}
fmt::format_to(_insert_stmt_buffer, "{}", decimal_str);
break;
}
case TYPE_LARGEINT: {
char buf[48];
int len = 48;
char* v = LargeIntValue::to_string(
reinterpret_cast<const PackedInt128*>(item)->value, buf, &len);
fmt::format_to(_insert_stmt_buffer, "{}", std::string(v, len));
break;
}
default: {
fmt::memory_buffer err_out;
fmt::format_to(err_out, "can't convert this type to mysql type. type = {}",
_output_expr_ctxs[j]->root()->type().type);
return Status::InternalError(err_out.data());
}
}
}


if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
fmt::format_to(_insert_stmt_buffer, "{}", "),(");
} else {
Expand All @@ -320,13 +309,15 @@ Status ODBCConnector::append(const std::string& table_name, RowBatch *batch, uin
}
}
// Translate utf8 string to utf16 to use unicode encodeing
insert_stmt = utf8_to_wstring(std::string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
insert_stmt = utf8_to_wstring(
std::string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() + _insert_stmt_buffer.size()));
}

{
SCOPED_TIMER(_result_send_timer);
ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT, SQLExecDirectW(_stmt, (SQLWCHAR *) (insert_stmt.c_str()), SQL_NTS),
ODBC_DISPOSE(_stmt, SQL_HANDLE_STMT,
SQLExecDirectW(_stmt, (SQLWCHAR*)(insert_stmt.c_str()), SQL_NTS),
_insert_stmt_buffer.data());
}
COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
Expand All @@ -339,7 +330,8 @@ Status ODBCConnector::begin_trans() {
}

ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
SQLSetConnectAttr(_dbc, SQL_ATTR_AUTOCOMMIT, (SQLPOINTER)SQL_AUTOCOMMIT_OFF, SQL_IS_UINTEGER),
SQLSetConnectAttr(_dbc, SQL_ATTR_AUTOCOMMIT, (SQLPOINTER)SQL_AUTOCOMMIT_OFF,
SQL_IS_UINTEGER),
"Begin transcation");
_is_in_transaction = true;

Expand All @@ -351,8 +343,7 @@ Status ODBCConnector::abort_trans() {
return Status::InternalError("Abort transaction before begin trans.");
}

ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_ROLLBACK),
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_ROLLBACK),
"Abort transcation");

return Status::OK();
Expand All @@ -363,8 +354,7 @@ Status ODBCConnector::finish_trans() {
return Status::InternalError("Abort transaction before begin trans.");
}

ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC,
SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_COMMIT),
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_COMMIT),
"commit transcation");
_is_in_transaction = false;

Expand All @@ -385,7 +375,7 @@ Status ODBCConnector::error_status(const std::string& prefix, const std::string&
// hType Type of handle (HANDLE_STMT, HANDLE_ENV, HANDLE_DBC)
// RetCode Return code of failing command
std::string ODBCConnector::handle_diagnostic_record(SQLHANDLE hHandle, SQLSMALLINT hType,
RETCODE RetCode) {
RETCODE RetCode) {
SQLSMALLINT rec = 0;
SQLINTEGER error;
CHAR message[1000];
Expand Down
5 changes: 0 additions & 5 deletions be/src/exec/olap_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ void ColumnValueRange<StringValue>::convert_to_fixed_value() {
return;
}

template <>
void ColumnValueRange<DecimalValue>::convert_to_fixed_value() {
return;
}

template <>
void ColumnValueRange<DecimalV2Value>::convert_to_fixed_value() {
return;
Expand Down
Loading