Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,23 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
}
} else {
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids) {
ss << id.to_string() << ", ";
auto last_print_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
if (last_print_duration >= 10000) {
_last_print_time = std::chrono::steady_clock::now();
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << ss.str()
<< ", runtime_state=" << runtime_state;
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << ss.str()
<< ", runtime_state=" << runtime_state;
}
}
_get_cond.wait_for(l, std::chrono::milliseconds(
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class LoadBlockQueue {
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
_last_print_time(_start_time),
_group_commit_data_bytes(group_commit_data_bytes),
_all_block_queues_bytes(all_block_queues_bytes) {};

Expand Down Expand Up @@ -112,6 +113,7 @@ class LoadBlockQueue {
// commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
std::chrono::steady_clock::time_point _last_print_time;
// commit by data size
int64_t _group_commit_data_bytes;
int64_t _data_bytes = 0;
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,10 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
response->set_loaded_rows(state->num_rows_load_success());
response->set_filtered_rows(state->num_rows_load_filtered());
status->to_protobuf(response->mutable_status());
if (!state->get_error_log_file_path().empty()) {
response->set_error_url(
to_load_error_http_path(state->get_error_log_file_path()));
}
_exec_env->new_load_stream_mgr()->remove(load_id);
});
} catch (const Exception& e) {
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,25 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_
for (int index = 0; index < rows; index++) {
_vpartition->find_partition(block.get(), index, _partitions[index]);
}
bool stop_processing = false;
for (int row_index = 0; row_index < rows; row_index++) {
if (_partitions[row_index] == nullptr) [[unlikely]] {
_filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
<< block->dump_data(row_index, 1);
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple=\n{}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
},
&stop_processing));
_has_filtered_rows = true;
state->update_num_rows_load_filtered(1);
state->update_num_rows_load_total(-1);
}
_has_filtered_rows = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ private boolean removeAlterJobV2FromTableNotFinalStateJobMap(AlterJobV2 alterJob
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
Expand Down Expand Up @@ -217,11 +220,6 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
addAlterJobV2(rollupJobV2);

olapTable.setState(OlapTableState.ROLLUP);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());

Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
Expand All @@ -244,6 +242,9 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
Expand Down Expand Up @@ -305,10 +306,6 @@ public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses,
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());

// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2202,11 +2202,17 @@ private void handleInsertStmt() throws Exception {
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId
+ ", query_id: " + DebugUtil.printId(context.queryId()) + ", backend_id: "
+ groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus();
if (response.hasErrorUrl()) {
errMsg += ", error url: " + response.getErrorUrl();
}
}
} else if (code != TStatusCode.OK) {
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: "
+ DebugUtil.printId(context.queryId()) + ", backend_id: " + groupCommitPlanner.getBackend()
.getId() + ", status: " + response.getStatus();
if (response.hasErrorUrl()) {
errMsg += ", error url: " + response.getErrorUrl();
}
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
label = response.getLabel();
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ message PGroupCommitInsertResponse {
optional int64 txn_id = 3;
optional int64 loaded_rows = 4;
optional int64 filtered_rows = 5;
optional string error_url = 6;
}

message POpenLoadStreamRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
return serverInfo
}

def group_commit_insert_with_retry = { sql, expected_row_count ->
def retry = 0
while (true){
try {
return group_commit_insert(sql, expected_row_count)
} catch (Exception e) {
logger.warn("group_commit_insert failed, retry: " + retry + ", error: " + e.getMessage())
retry++
if (e.getMessage().contains("is blocked on schema change") && retry < 20) {
sleep(1500)
continue
} else {
throw e
}
}
}
}

def none_group_commit_insert = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
Expand Down Expand Up @@ -186,10 +204,19 @@ suite("insert_group_commit_into") {
// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
sql "set enable_insert_strict=false"
group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2
// sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert """ insert into ${table}(id) select 6; """, 1
sql "set enable_insert_strict=true"
try {
sql """ insert into ${table} values (102, 'a', 100); """
assertTrue(false, "insert should fail")
} catch (Exception e) {
logger.info("error: " + e.getMessage())
assertTrue(e.getMessage().contains("url:"))
}
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) select 6; """, 1

getRowCount(20)
qt_sql """ select name, score from ${table} order by name asc; """
Expand Down Expand Up @@ -237,7 +264,7 @@ suite("insert_group_commit_into") {

// 1. insert into
def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
assertTrue(server_info.contains('query_id'))
/*assertTrue(server_info.contains('query_id'))
// get query_id, such as 43f87963586a482a-b0496bcf9e2b5555
def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length()
def query_id = server_info.substring(query_id_index, query_id_index + 33)
Expand All @@ -255,7 +282,7 @@ suite("insert_group_commit_into") {
logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def json = parseJson(out)
assertEquals("success", json.msg.toLowerCase())
assertEquals("success", json.msg.toLowerCase())*/
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ suite("insert_group_commit_into_max_filter_ratio") {

sql """ set group_commit = async_mode; """
sql """ set enable_insert_strict = false; """
group_commit_insert """ insert into ${dbTableName} values (9, 'a', 'a'); """, 0
group_commit_insert """ insert into ${dbTableName} values (9, 'a', 'a'); """, 1
}
if (item == "nereids") {
get_row_count_with_retry(6)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 6, 3, 2, 1)
checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}

Expand Down