Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
}
}
}
if (_data_bytes >= _group_commit_data_bytes) {
VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
_need_commit = true;
data_size_condition = true;
if (!_need_commit) {
if (_data_bytes >= _group_commit_data_bytes) {
VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
_need_commit = true;
data_size_condition = true;
}
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
_start_time)
.count() >= _group_commit_interval_ms) {
VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes;
_need_commit = true;
}
}
_get_cond.notify_all();
return Status::OK();
Expand All @@ -90,11 +99,9 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
*eos = false;
std::unique_lock l(mutex);
if (!_need_commit) {
auto left_milliseconds =
_group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
_start_time)
.count() >= _group_commit_interval_ms) {
_need_commit = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ private boolean removeAlterJobV2FromTableNotFinalStateJobMap(AlterJobV2 alterJob
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
Expand Down Expand Up @@ -222,10 +225,6 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause

olapTable.setState(OlapTableState.ROLLUP);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());

Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
Expand All @@ -248,6 +247,11 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());

Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
Expand Down Expand Up @@ -309,10 +313,6 @@ public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses,
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());

// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
Expand Down
32 changes: 16 additions & 16 deletions regression-test/data/insert_p0/insert_group_commit_into.out
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
-- !select1 --
1 a 10
2 b -1
3 c -1
4 \N -1
5 q 50
6 \N -1

-- !sql --
-- !select2 --
1 a 10
1 a 10
2 b -1
Expand All @@ -20,7 +20,7 @@
6 \N -1
6 \N -1

-- !sql --
-- !select3 --
1 a \N 10
1 a \N 10
1 a \N 10
Expand All @@ -39,11 +39,11 @@
6 \N \N -1
6 \N \N -1

-- !sql --
-- !select4 --
2 b \N -1
6 \N \N -1

-- !sql --
-- !select5 --
1 a 10 5
2 b -1 \N
2 b -1 \N
Expand All @@ -53,7 +53,7 @@
6 \N -1 \N
6 \N -1 \N

-- !sql --
-- !select6 --
1 a 10
1 a 10
2 b -1
Expand All @@ -69,7 +69,7 @@
6 \N -1
6 \N -1

-- !sql --
-- !select7 --
\N -1
\N -1
\N -1
Expand Down Expand Up @@ -103,19 +103,19 @@ q 50
3 3 3
4 4 4

-- !sql --
-- !select8 --
1 test
2 or

-- !sql --
-- !select1 --
1 a 10
2 b -1
3 c -1
4 \N -1
5 q 50
6 \N -1

-- !sql --
-- !select2 --
1 a 10
1 a 10
2 b -1
Expand All @@ -128,7 +128,7 @@ q 50
6 \N -1
6 \N -1

-- !sql --
-- !select3 --
1 a \N 10
1 a \N 10
1 a \N 10
Expand All @@ -147,11 +147,11 @@ q 50
6 \N \N -1
6 \N \N -1

-- !sql --
-- !select4 --
2 b \N -1
6 \N \N -1

-- !sql --
-- !select5 --
1 a 10 5
2 b -1 \N
2 b -1 \N
Expand All @@ -161,7 +161,7 @@ q 50
6 \N -1 \N
6 \N -1 \N

-- !sql --
-- !select6 --
1 a 10
1 a 10
2 b -1
Expand All @@ -177,7 +177,7 @@ q 50
6 \N -1
6 \N -1

-- !sql --
-- !select7 --
\N -1
\N -1
\N -1
Expand Down Expand Up @@ -211,7 +211,7 @@ q 50
3 3 3
4 4 4

-- !sql --
-- !select8 --
1 test
2 or

66 changes: 42 additions & 24 deletions regression-test/suites/insert_p0/insert_group_commit_into.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ suite("insert_group_commit_into") {
sql "use ${dbName};"
while (true) {
sleep(2000)
def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc "
def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc limit 1"
logger.info("alter table state: ${state}")
if (state.size() > 0 && state[0][9] == "FINISHED") {
return true
Expand Down Expand Up @@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
return serverInfo
}

def group_commit_insert_with_retry = { sql, expected_row_count ->
def retry = 0
while (true){
try {
return group_commit_insert(sql, expected_row_count)
} catch (Exception e) {
logger.warn("group_commit_insert failed, retry: " + retry + ", error: " + e.getMessage())
retry++
if (e.getMessage().contains("is blocked on schema change") && retry < 20) {
sleep(1500)
continue
} else {
throw e
}
}
}
}

def none_group_commit_insert = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
Expand Down Expand Up @@ -120,7 +138,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6; """, 1

getRowCount(6)
qt_sql """ select * from ${table} order by id, name, score asc; """
order_qt_select1 """ select * from ${table} order by id, name, score asc; """

// 2. insert into and delete
sql """ delete from ${table} where id = 4; """
Expand All @@ -134,19 +152,19 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6; """, 1

getRowCount(11)
qt_sql """ select * from ${table} order by id, name, score asc; """
order_qt_select2 """ select * from ${table} order by id, name, score asc; """

// 3. insert into and light schema change: add column
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """, 2
sql """ alter table ${table} ADD column age int after name; """
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert """ insert into ${table}(id) select 6; """, 1
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1

assertTrue(getAlterTableState(), "add column should success")
getRowCount(17)
qt_sql """ select * from ${table} order by id, name,score asc; """
order_qt_select3 """ select * from ${table} order by id, name,score asc; """

// 4. insert into and truncate table
/*sql """ insert into ${table}(name, id) values('c', 3); """
Expand All @@ -157,43 +175,43 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6; """, 1

getRowCount(2)
qt_sql """ select * from ${table} order by id, name, score asc; """
order_qt_select4 """ select * from ${table} order by id, name, score asc; """

// 5. insert into and schema change: modify column order
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
// sql """ alter table ${table} order by (id, name, score, age); """
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert """ insert into ${table}(id) select 6; """, 1
group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
sql """ alter table ${table} order by (id, name, score, age); """
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1

// assertTrue(getAlterTableState(), "modify column order should success")
assertTrue(getAlterTableState(), "modify column order should success")
getRowCount(8)
qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """
order_qt_select5 """ select id, name, score, age from ${table} order by id, name, score asc; """

// 6. insert into and light schema change: drop column
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
group_commit_insert """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
group_commit_insert """ insert into ${table}(id, name, age, score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
sql """ alter table ${table} DROP column age; """
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert """ insert into ${table}(id) select 6; """, 1
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1

assertTrue(getAlterTableState(), "drop column should success")
getRowCount(14)
qt_sql """ select * from ${table} order by id, name, score asc; """
order_qt_select6 """ select * from ${table} order by id, name, score asc; """

// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2
// sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert """ insert into ${table}(id) select 6; """, 1
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1

getRowCount(20)
qt_sql """ select name, score from ${table} order by name asc; """

order_qt_select7 """ select name, score from ${table} order by name asc; """
assertTrue(getAlterTableState(), "add rollup should success")

/*if (item == "nereids") {
group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1
Expand All @@ -208,7 +226,7 @@ suite("insert_group_commit_into") {

def rowCount = sql "select count(*) from ${table}"
logger.info("row count: " + rowCount)
assertEquals(rowCount[0][0], 23)
assertEquals(23, rowCount[0][0])
}
} finally {
// try_sql("DROP TABLE ${table}")
Expand Down Expand Up @@ -476,7 +494,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table} values(1, 'test'); """, 1
group_commit_insert """ insert into ${table}(k1,`or`) values (2,"or"); """, 1
getRowCount(2)
qt_sql """ select * from ${table}; """
order_qt_select8 """ select * from ${table}; """
}
} finally {
}
Expand Down