Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions be/src/olap/wal/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ Status WalTable::_relay_wal_one_by_one() {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
msg.find("LabelAlreadyUsedException") != msg.npos) {
(msg.find("LabelAlreadyUsedException") != msg.npos &&
(msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != msg.npos))) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
// delete wal
Expand Down Expand Up @@ -163,8 +164,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
request.__set_label(label);
std::string reason = "relay wal with label " + label;
request.__set_reason(reason);
request.__set_reason("relay wal with label " + label);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
Expand Down Expand Up @@ -192,6 +192,10 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
[[maybe_unused]] auto st = _try_abort_txn(_db_id, label);
}
#endif
DBUG_EXECUTE_IF("WalTable.replay_wals.stop", {
// LOG(INFO) << "WalTable.replay_wals.stop";
return Status::InternalError("WalTable.replay_wals.stop");
});
return _replay_one_wal_with_streamload(wal_id, wal, label);
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
Status result_status;
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{ status = Status::InternalError(""); });
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error",
{ status = Status::InternalError("load_error"); });
if (status.ok()) {
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error",
{ status = Status::InternalError(""); });
Expand Down Expand Up @@ -476,6 +478,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
.error(result_status);
retry_times++;
}
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
{ result_status = Status::InternalError("commit_success_and_rpc_error"); });
} else {
// abort txn
TLoadTxnRollbackRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public LabelAlreadyUsedException(String msg, boolean isLabel) {
}

public LabelAlreadyUsedException(TransactionState txn) {
super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + "]");
super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId()
+ "], status [" + txn.getTransactionStatus() + "].");
switch (txn.getTransactionStatus()) {
case UNKNOWN:
case PREPARE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,9 @@ public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) t
return result;
}
try {
if (DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) {
throw new UserException("FrontendServiceImpl.loadTxnRollback.error");
}
loadTxnRollbackImpl(request);
} catch (MetaNotFoundException e) {
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,1
2,2
3,3
4,4
5,5
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// The cases is copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

import org.apache.doris.regression.suite.ClusterOptions

suite("replay_wal_restart_fe") {
def check_schema_change = { state ->
for (int i = 0; i < 30; i++) {
def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_2' order by CreateTime desc;"
assertTrue(jobs.size() >= 1)
logger.info("alter job: ${jobs[0]}")
if (jobs[0].State == state) {
break
}
sleep(1000)
}
}

def options = new ClusterOptions()
options.setFeNum(1)
options.setBeNum(1)
options.enableDebugPoints()
options.feConfigs.add('sys_log_verbose_modules=org.apache.doris')
options.beConfigs.add('sys_log_verbose_modules=*')
options.beConfigs.add('enable_java_support=false')
docker(options) {
def result = sql 'SELECT DATABASE()'

// group commit load error and stop replay
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
GetDebugPoint().enableDebugPointForAllBEs("WalTable.replay_wals.stop")

// 1 wal need to replay
sql 'CREATE TABLE tbl_2 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "group_commit_interval_ms"="1000")'
sql 'SET GROUP_COMMIT = ASYNC_MODE'
sql 'INSERT INTO tbl_2 VALUES (1, 2)'

// do schema change
sql 'ALTER TABLE tbl_2 ORDER BY (k2, k1)'
check_schema_change('RUNNING')

// stop be, restart fe, start be
cluster.stopBackends()
cluster.restartFrontends()
sleep(30000)
context.reconnectFe()
check_schema_change('RUNNING')
cluster.startBackends()

// check schema change status and row count
check_schema_change('FINISHED')
for (int i = 0; i < 30; i++) {
result = sql "select count(*) from tbl_2"
logger.info("rowCount: ${result}")
if (result[0][0] >= 1) {
break
}
sleep(1000)
}
order_qt_select_1 'SELECT * FROM tbl_2'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.awaitility.Awaitility
import static java.util.concurrent.TimeUnit.SECONDS

suite("test_group_commit_replay_wal", "nonConcurrent") {
def tableName = "test_group_commit_replay_wal"

def getRowCount = { expectedRowCount ->
Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until(
{
def result = sql "select count(*) from ${tableName}"
logger.info("table: ${tableName}, rowCount: ${result}")
return result[0][0] == expectedRowCount
}
)
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k` int ,
`v` int ,
) engine=olap
DISTRIBUTED BY HASH(`k`)
BUCKETS 5
properties("replication_num" = "1", "group_commit_interval_ms"="2000")
"""

// 1. load success but commit rpc timeout
// 2. should skip replay because of fe throw LabelAlreadyUsedException and txn status is VISIBLE
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error")
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'group_commit', 'async_mode'
unset 'label'
file 'group_commit_wal_msg.csv'
time 10000
}
getRowCount(5)
// check wal count is 0
sleep(5000)
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
assertTrue(false)
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}

// load fail and abort fail, wal should not be deleted and retry
try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
GetDebugPoint().enableDebugPointForAllFEs("FrontendServiceImpl.loadTxnRollback.error")
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'group_commit', 'async_mode'
unset 'label'
file 'group_commit_wal_msg.csv'
time 10000
}
getRowCount(5)
sleep(10000) // wal replay but all failed
getRowCount(5)
// check wal count is 1

GetDebugPoint().clearDebugPointsForAllFEs()
getRowCount(10)
// check wal count is 0
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
assertTrue(false)
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}
}