From a0c04caaecc9a2991bf583c5c67280e3ca321f1b Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 15 Jul 2024 18:47:17 +0800 Subject: [PATCH] fix --- .../transaction/PublishVersionDaemon.java | 11 +++++- .../data/insert_p0/txn_insert_inject_case.out | 14 ++++++++ .../insert_p0/txn_insert_inject_case.groovy | 36 ++++++++++++++++++- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 82e954b6a1b3d3..3d852e8efbbfde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -109,7 +109,12 @@ private void traverseReadyTxnAndDispatchPublishVersionTask(List allBackends, TransactionState transaction publishBackends = Sets.newHashSet(); publishBackends.addAll(allBackends); } + if (transactionState.getTransactionId() == DebugPointUtil.getDebugParamOrDefault( + "PublishVersionDaemon.genPublishTask.failed", "txnId", -1L)) { + throw new NullPointerException("genPublishTask failed for txnId: " + transactionState.getTransactionId()); + } if (transactionState.getSubTxnIds() != null) { for (Entry entry : transactionState.getSubTxnIdToTableCommitInfo().entrySet()) { diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out b/regression-test/data/insert_p0/txn_insert_inject_case.out index 799229be54a6c0..b5b736b0b919ba 100644 --- a/regression-test/data/insert_p0/txn_insert_inject_case.out +++ b/regression-test/data/insert_p0/txn_insert_inject_case.out @@ -7,3 +7,17 @@ 2 3.3 xyz [1] [1, 0] 2 3.3 xyz [1] [1, 0] +-- !select2 -- + +-- !select3 -- +\N \N \N [null] [null, 0] +1 2.2 abc [] [] +101 2.2 abc [] [] +2 3.3 xyz [1] [1, 0] + +-- !select4 -- +\N \N \N [null] [null, 0] +102 2.2 abc [] [] +3 2.2 abc [] [] +4 3.3 xyz [1] [1, 0] + diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy index 083f01b4a8a2fa..50a7729656a500 100644 --- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy +++ b/regression-test/suites/insert_p0/txn_insert_inject_case.groovy @@ -151,6 +151,7 @@ suite("txn_insert_inject_case", "nonConcurrent") { } // 2. commit failed + sql """ truncate table ${table}_0 """ def dbName = "regression_test_insert_p0" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") @@ -174,7 +175,7 @@ suite("txn_insert_inject_case", "nonConcurrent") { statement.execute("commit") assertTrue(false, "commit should fail") } catch (Exception e) { - logger.error("commit failed", e); + logger.info("commit failed " + e.getMessage()) } } finally { GetDebugPoint().disableDebugPointForAllFEs('DatabaseTransactionMgr.commitTransaction.failed') @@ -185,4 +186,37 @@ suite("txn_insert_inject_case", "nonConcurrent") { assertEquals(1, txn_info.size()) assertEquals("ABORTED", txn_info[0].get("TransactionStatus")) assertTrue(txn_info[0].get("Reason").contains("DebugPoint: DatabaseTransactionMgr.commitTransaction.failed")) + + // 3. one txn publish failed + sql """ truncate table ${table}_0 """ + txn_id = 0 + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute("begin") + statement.execute("insert into ${table}_0 select * from ${table}_1;") + txn_id = get_txn_id_from_server_info((((StatementImpl) statement).results).getServerInfo()) + GetDebugPoint().enableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed', [txnId:txn_id]) + statement.execute("insert into ${table}_0 select * from ${table}_2;") + statement.execute("commit") + + sql """insert into ${table}_0 values(100, 2.2, "abc", [], [])""" + sql """insert into ${table}_1 values(101, 2.2, "abc", [], [])""" + sql """insert into ${table}_2 values(102, 2.2, "abc", [], [])""" + order_qt_select2 """select * from ${table}_0""" + order_qt_select3 """select * from ${table}_1""" + order_qt_select4 """select * from ${table}_2""" + } finally { + GetDebugPoint().disableDebugPointForAllFEs('PublishVersionDaemon.genPublishTask.failed') + def rowCount = 0 + for (int i = 0; i < 20; i++) { + def result = sql "select count(*) from ${table}_0" + logger.info("rowCount: " + result + ", retry: " + i) + rowCount = result[0][0] + if (rowCount >= 7) { + break + } + sleep(1000) + } + assertEquals(7, rowCount) + } }