Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
dp->param<std::string>("error_msg"));
})
if (conditions.empty()) {
return Status::Error<DELETE_INVALID_PARAMETERS>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid parameters for store_cond. condition_size={}", conditions.size());
}

Expand Down Expand Up @@ -127,7 +127,7 @@ Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
if (TCondition tmp; !DeleteHandler::parse_condition(condition_str, &tmp)) {
LOG(WARNING) << "failed to parse condition_str, condtion="
<< ThriftDebugString(condition);
return Status::Error<DELETE_INVALID_CONDITION>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condtion={}", ThriftDebugString(condition));
}
VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " << condition_str;
Expand Down Expand Up @@ -235,8 +235,8 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
// Check whether the column exists
int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
return Status::Error<DELETE_INVALID_CONDITION>("field is not existent. [field_index={}]",
field_index);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("field is not existent. [field_index={}]",
field_index);
}

// Delete condition should only applied on key columns or duplicate key table, and
Expand All @@ -245,21 +245,21 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC

if (column.type() == FieldType::OLAP_FIELD_TYPE_DOUBLE ||
column.type() == FieldType::OLAP_FIELD_TYPE_FLOAT) {
return Status::Error<DELETE_INVALID_CONDITION>("data type is float or double.");
return Status::Error<ErrorCode::INVALID_ARGUMENT>("data type is float or double.");
}

// Check operator and operands size are matched.
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
cond.condition_values.size() != 1) {
return Status::Error<DELETE_INVALID_CONDITION>("invalid condition value size. [size={}]",
cond.condition_values.size());
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value size. [size={}]",
cond.condition_values.size());
}

// Check each operand is valid
for (const auto& condition_value : cond.condition_values) {
if (!is_condition_value_valid(column, cond.condition_op, condition_value)) {
return Status::Error<DELETE_INVALID_CONDITION>("invalid condition value. [value={}]",
condition_value);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value. [value={}]",
condition_value);
}
}

Expand All @@ -273,23 +273,24 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
const auto& err_msg =
fmt::format("column id does not exists in table={}, schema version={},",
schema.table_id(), schema.schema_version());
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}
if (!iequal(schema.column_by_uid(cond.column_unique_id).name(), cond.column_name)) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which column name={}, "
"colum name={} does not belongs to column uid={}, which "
"column name={}, "
"delete_cond.column_name ={}",
cond.column_name, cond.column_unique_id,
schema.column_by_uid(cond.column_unique_id).name(), cond.column_name);
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}

return Status::OK();
}

Status DeleteHandler::parse_condition(const DeleteSubPredicatePB& sub_cond, TCondition* condition) {
if (!sub_cond.has_column_name() || !sub_cond.has_op() || !sub_cond.has_cond_value()) {
return Status::Error<DELETE_INVALID_PARAMETERS>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"fail to parse condition. condition={} {} {}", sub_cond.column_name(),
sub_cond.op(), sub_cond.cond_value());
}
Expand Down Expand Up @@ -335,8 +336,8 @@ Status DeleteHandler::parse_condition(const std::string& condition_str, TConditi
<< "]";
}
if (!matched) {
return Status::Error<DELETE_INVALID_PARAMETERS>("fail to sub condition. condition={}",
condition_str);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("fail to sub condition. condition={}",
condition_str);
}

condition->column_name = what[1].str();
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}

std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
})
if (!base_migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS,
// we don't need to retry if meet them.
// note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) {
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
// or DELETE_INVALID_PARAMETERS
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
false -9999782574499444.2 -25
true 99.9 234

-- !sql --
true 99.9 234

Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,33 @@ suite("test_delete_from_timeout","nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()

try {
sql "insert into ${tableName} values(1, 99.9, 234);"
sql "insert into ${tableName} values(1, 99.9, 234), (false, -9999782574499444.2, -25);"
qt_sql "select * from ${tableName} order by col1, col2, col3;"

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: "data type is float or double."])
[error_code: 33 /* INVALID_ARGUMENT */, error_msg: "invalid parameters for store_cond. condition_size=1"])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "data type is float or double."
exception "invalid parameters for store_cond. condition_size=1"
}

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: "invalid parameters for store_cond. condition_size=1"])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "invalid parameters for store_cond. condition_size=1"
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")

t1 = Thread.start {
sleep(15000)
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
}

sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
t1.join()
qt_sql "select * from ${tableName} order by col1, col2, col3;"

} catch (Exception e) {
logger.info(e.getMessage())
AssertTrue(false)
assertTrue(false)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure")
GetDebugPoint().clearDebugPointsForAllBEs()
}
}