Skip to content

Commit 84be3bb

Browse files
branch-3.0: [Fix](Compaction) Fix full clone failure when rowset missing #53984 (#54162)
Cherry-picked from #53984 Co-authored-by: abmdocrt <lianyukang@selectdb.com>
1 parent c577d2e commit 84be3bb

6 files changed

+272
-7
lines changed

be/src/olap/cumulative_compaction.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,19 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
197197
<< ", first missed version next rowset version=" << missing_versions[1]
198198
<< ", tablet=" << _tablet->tablet_id();
199199
if (config::enable_auto_clone_on_compaction_missing_version) {
200+
int64_t max_version = tablet()->max_version_unlocked();
200201
LOG_INFO("cumulative compaction submit missing rowset clone task.")
201202
.tag("tablet_id", _tablet->tablet_id())
202-
.tag("version", missing_versions.back().first)
203+
.tag("max_version", max_version)
203204
.tag("replica_id", tablet()->replica_id())
204205
.tag("partition_id", _tablet->partition_id())
205206
.tag("table_id", _tablet->table_id());
206-
Status st = _engine.submit_clone_task(tablet(), missing_versions.back().first);
207+
Status st = _engine.submit_clone_task(tablet(), max_version);
207208
if (!st) {
208209
LOG_WARNING("cumulative compaction failed to submit missing rowset clone task.")
209210
.tag("st", st.msg())
210211
.tag("tablet_id", _tablet->tablet_id())
211-
.tag("version", missing_versions.back().first)
212+
.tag("max_version", max_version)
212213
.tag("replica_id", tablet()->replica_id())
213214
.tag("partition_id", _tablet->partition_id())
214215
.tag("table_id", _tablet->table_id());

be/src/olap/task/engine_publish_version_task.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,17 +220,17 @@ Status EnginePublishVersionTask::execute() {
220220
if (config::enable_auto_clone_on_mow_publish_missing_version) {
221221
LOG_INFO("mow publish submit missing rowset clone task.")
222222
.tag("tablet_id", tablet->tablet_id())
223-
.tag("version", version.first - 1)
223+
.tag("version", version.second)
224224
.tag("replica_id", tablet->replica_id())
225225
.tag("partition_id", tablet->partition_id())
226226
.tag("table_id", tablet->table_id());
227-
Status st = _engine.submit_clone_task(tablet.get(), version.first - 1);
227+
Status st = _engine.submit_clone_task(tablet.get(), version.second);
228228
if (!st) {
229229
LOG_WARNING(
230230
"mow publish failed to submit missing rowset clone task.")
231231
.tag("st", st.msg())
232232
.tag("tablet_id", tablet->tablet_id())
233-
.tag("version", version.first - 1)
233+
.tag("version", version.second)
234234
.tag("replica_id", tablet->replica_id())
235235
.tag("partition_id", tablet->partition_id())
236236
.tag("table_id", tablet->table_id());

regression-test/suites/fault_injection_p0/test_compaction_clone_missing_rowset_fault_injection.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ suite('test_compaction_clone_missing_rowset_fault_injection', 'docker') {
3939
assertNotNull(normalBe)
4040

4141
try {
42-
def tableName = "test_compaction_clone_missing_rowset"
42+
def tableName = "test_compaction_clone_missing_rowset_fault_injection"
4343
sql """ DROP TABLE IF EXISTS ${tableName} force"""
4444
sql """
4545
CREATE TABLE IF NOT EXISTS ${tableName} (
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import org.apache.http.NoHttpResponseException
20+
import org.apache.doris.regression.util.DebugPoint
21+
import org.apache.doris.regression.util.NodeType
22+
23+
suite('test_compaction_full_clone_missing_rowset_fault_injection', 'docker') {
24+
def options = new ClusterOptions()
25+
options.cloudMode = false
26+
options.enableDebugPoints()
27+
options.feConfigs += [ "disable_tablet_scheduler=true" ]
28+
options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ]
29+
options.beConfigs += [ "tablet_rowset_stale_sweep_time_sec=0" ]
30+
options.beConfigs += [ "tablet_rowset_stale_sweep_by_size=true" ]
31+
options.beConfigs += [ "tablet_rowset_stale_sweep_threshold_size=0" ]
32+
options.beNum = 3
33+
docker(options) {
34+
35+
def injectBe = null
36+
def normalBe1 = null
37+
def normalBe2 = null
38+
def backends = sql_return_maparray('show backends')
39+
40+
injectBe = backends[0]
41+
assertNotNull(injectBe)
42+
normalBe1 = backends[1]
43+
assertNotNull(normalBe1)
44+
normalBe2 = backends[2]
45+
assertNotNull(normalBe2)
46+
47+
try {
48+
def tableName = "test_compaction_full_clone_missing_rowset_fault_injection"
49+
sql """ DROP TABLE IF EXISTS ${tableName} force"""
50+
sql """
51+
CREATE TABLE IF NOT EXISTS ${tableName} (
52+
`k` int ,
53+
`v` int ,
54+
) engine=olap
55+
DUPLICATE KEY(k)
56+
DISTRIBUTED BY HASH(k)
57+
BUCKETS 1
58+
properties(
59+
"replication_num" = "3",
60+
"disable_auto_compaction" = "true")
61+
"""
62+
sql """ INSERT INTO ${tableName} VALUES (1,0)"""
63+
DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"])
64+
sql """ INSERT INTO ${tableName} VALUES (2,0)"""
65+
sql """ INSERT INTO ${tableName} VALUES (3,0)"""
66+
sql """ INSERT INTO ${tableName} VALUES (4,0)"""
67+
DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random")
68+
sql """ INSERT INTO ${tableName} VALUES (5,0)"""
69+
sql """ INSERT INTO ${tableName} VALUES (6,0)"""
70+
sql """ INSERT INTO ${tableName} VALUES (7,0)"""
71+
sql """ INSERT INTO ${tableName} VALUES (8,0)"""
72+
sql """ INSERT INTO ${tableName} VALUES (9,0)"""
73+
74+
def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
75+
def tabletId = array[0].TabletId
76+
77+
// normal BEs compaction
78+
logger.info("normal BE run cumu compaction:" + tabletId)
79+
def (code, out, err) = be_run_cumulative_compaction(normalBe1.Host, normalBe1.HttpPort, tabletId)
80+
logger.info("normal BE1 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
81+
(code, out, err) = be_run_cumulative_compaction(normalBe2.Host, normalBe2.HttpPort, tabletId)
82+
logger.info("normal BE2 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
83+
84+
logger.info("normal BE show:" + tabletId)
85+
(code, out, err) = be_show_tablet_status(normalBe1.Host, normalBe1.HttpPort, tabletId)
86+
logger.info("normal BE1 show: code=" + code + ", out=" + out + ", err=" + err)
87+
(code, out, err) = be_show_tablet_status(normalBe2.Host, normalBe2.HttpPort, tabletId)
88+
logger.info("normal BE2 show: code=" + code + ", out=" + out + ", err=" + err)
89+
90+
sleep(10000)
91+
92+
// 1st check rowsets
93+
logger.info("1st show:" + tabletId)
94+
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId)
95+
logger.info("1st show: code=" + code + ", out=" + out + ", err=" + err)
96+
assertTrue(out.contains("[0-1]"))
97+
assertTrue(out.contains("[2-2]"))
98+
// missing rowset [3-5]
99+
assertTrue(out.contains("[3-5]"))
100+
assertTrue(out.contains("[6-6]"))
101+
assertTrue(out.contains("[7-7]"))
102+
assertTrue(out.contains("[8-8]"))
103+
assertTrue(out.contains("[9-9]"))
104+
assertTrue(out.contains("[10-10]"))
105+
106+
logger.info("1st run cumu compaction:" + tabletId)
107+
(code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId)
108+
logger.info("1st Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
109+
110+
sleep(30000)
111+
112+
// 2nd check rowsets
113+
logger.info("2nd show:" + tabletId)
114+
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId)
115+
logger.info("2nd show: code=" + code + ", out=" + out + ", err=" + err)
116+
assertTrue(out.contains("[0-1]"))
117+
assertTrue(out.contains("[2-10]"))
118+
119+
} finally {
120+
if (injectBe != null) {
121+
DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random")
122+
}
123+
}
124+
}
125+
}

regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset.groovy renamed to regression-test/suites/fault_injection_p0/test_mow_publish_clone_missing_rowset_fault_injection.groovy

File renamed without changes.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import org.apache.http.NoHttpResponseException
20+
import org.apache.doris.regression.util.DebugPoint
21+
import org.apache.doris.regression.util.NodeType
22+
23+
suite('test_mow_publish_full_clone_missing_rowset_fault_injection', 'docker') {
24+
25+
def set_be_param = { paramName, paramValue, beIp, bePort ->
26+
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
27+
assertTrue(out.contains("OK"))
28+
}
29+
30+
def options = new ClusterOptions()
31+
options.cloudMode = false
32+
options.enableDebugPoints()
33+
options.feConfigs += [ "disable_tablet_scheduler=true" ]
34+
options.beConfigs += [ "enable_auto_clone_on_compaction_missing_version=true" ]
35+
options.beConfigs += [ "tablet_rowset_stale_sweep_time_sec=0" ]
36+
options.beConfigs += [ "tablet_rowset_stale_sweep_by_size=true" ]
37+
options.beConfigs += [ "tablet_rowset_stale_sweep_threshold_size=0" ]
38+
options.beNum = 3
39+
docker(options) {
40+
41+
def injectBe = null
42+
def normalBe1 = null
43+
def normalBe2 = null
44+
def backends = sql_return_maparray('show backends')
45+
46+
injectBe = backends[0]
47+
assertNotNull(injectBe)
48+
normalBe1 = backends[1]
49+
assertNotNull(normalBe1)
50+
normalBe2 = backends[2]
51+
assertNotNull(normalBe2)
52+
53+
try {
54+
def tableName = "test_mow_publish_full_clone_missing_rowset_fault_injection"
55+
sql """ DROP TABLE IF EXISTS ${tableName} force"""
56+
sql """
57+
CREATE TABLE IF NOT EXISTS ${tableName} (
58+
`k` int ,
59+
`v` int ,
60+
) engine=olap
61+
UNIQUE KEY(k)
62+
DISTRIBUTED BY HASH(k)
63+
BUCKETS 1
64+
properties(
65+
"replication_num" = "3",
66+
"disable_auto_compaction" = "true")
67+
"""
68+
sql """ INSERT INTO ${tableName} VALUES (1,0)"""
69+
DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random", [percent:"1.0"])
70+
sql """ INSERT INTO ${tableName} VALUES (2,0)"""
71+
DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random")
72+
sql """ INSERT INTO ${tableName} VALUES (3,0)"""
73+
sql """ INSERT INTO ${tableName} VALUES (4,0)"""
74+
sql """ INSERT INTO ${tableName} VALUES (5,0)"""
75+
sql """ INSERT INTO ${tableName} VALUES (6,0)"""
76+
sql """ INSERT INTO ${tableName} VALUES (7,0)"""
77+
78+
def array = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
79+
def tabletId = array[0].TabletId
80+
81+
// normal BEs compaction
82+
logger.info("normal BE run cumu compaction:" + tabletId)
83+
def (code, out, err) = be_run_cumulative_compaction(normalBe1.Host, normalBe1.HttpPort, tabletId)
84+
logger.info("normal BE1 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
85+
(code, out, err) = be_run_cumulative_compaction(normalBe2.Host, normalBe2.HttpPort, tabletId)
86+
logger.info("normal BE2 Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
87+
88+
logger.info("normal BE show:" + tabletId)
89+
(code, out, err) = be_show_tablet_status(normalBe1.Host, normalBe1.HttpPort, tabletId)
90+
logger.info("normal BE1 show: code=" + code + ", out=" + out + ", err=" + err)
91+
(code, out, err) = be_show_tablet_status(normalBe2.Host, normalBe2.HttpPort, tabletId)
92+
logger.info("normal BE2 show: code=" + code + ", out=" + out + ", err=" + err)
93+
94+
sleep(10000)
95+
96+
// 1st inject be check rowsets
97+
logger.info("1st inject be show:" + tabletId)
98+
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId)
99+
logger.info("1st inject be show: code=" + code + ", out=" + out + ", err=" + err)
100+
assertTrue(out.contains("[0-1]"))
101+
assertTrue(out.contains("[2-2]"))
102+
assertFalse(out.contains("[3-3]"))
103+
assertFalse(out.contains("[4-4]"))
104+
assertFalse(out.contains("[5-5]"))
105+
assertFalse(out.contains("[6-6]"))
106+
assertFalse(out.contains("[7-7]"))
107+
108+
set_be_param("enable_auto_clone_on_mow_publish_missing_version", "true", injectBe.Host, injectBe.HttpPort);
109+
Thread.sleep(10000)
110+
// submit clone task
111+
sql """ INSERT INTO ${tableName} VALUES (8,0)"""
112+
113+
sleep(30000)
114+
115+
// 2nd inject be check rowsets
116+
logger.info("2nd inject be show:" + tabletId)
117+
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId)
118+
logger.info("2nd inject be show: code=" + code + ", out=" + out + ", err=" + err)
119+
assertTrue(out.contains("[0-1]"))
120+
assertTrue(out.contains("[2-8]"))
121+
assertTrue(out.contains("[9-9]"))
122+
123+
// inject be compaction
124+
logger.info("run cumu compaction:" + tabletId)
125+
(code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId)
126+
logger.info("Run cumu compaction: code=" + code + ", out=" + out + ", err=" + err)
127+
128+
logger.info("3rd inject be show:" + tabletId)
129+
(code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, tabletId)
130+
logger.info("3rd inject be show: code=" + code + ", out=" + out + ", err=" + err)
131+
assertTrue(out.contains("[0-1]"))
132+
assertTrue(out.contains("[2-8]"))
133+
} finally {
134+
if (injectBe != null) {
135+
DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, "EnginePublishVersionTask.finish.random")
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)