From ffbfb12cb7cd4ba8da37076e9804e5d1791136b0 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 10 Jul 2024 15:27:59 +0800 Subject: [PATCH 1/2] fix --- .../exec/group_commit_block_sink_operator.cpp | 24 +++++++++++++------ be/src/runtime/group_commit_mgr.cpp | 24 +++++++++---------- be/src/runtime/group_commit_mgr.h | 6 ++--- .../translator/PhysicalPlanTranslator.java | 6 +++-- .../insert/InsertIntoTableCommand.java | 4 +++- .../insert/OlapGroupCommitInsertExecutor.java | 3 +++ .../insert_p0/insert_group_commit_into.groovy | 2 ++ .../suites/insert_p0/txn_insert.groovy | 2 +- 8 files changed, 44 insertions(+), 27 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 424ede07be5cf1..9bac6d4ed29780 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -61,21 +61,19 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { "CreateGroupCommitPlanDependency", true); _put_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "GroupCommitPutBlockDependency", true); - WARN_IF_ERROR(_initialize_load_queue(), ""); + [[maybe_unused]] auto st = _initialize_load_queue(); return Status::OK(); } Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { auto& p = _parent->cast(); if (_state->exec_env()->wal_mgr()->is_running()) { - std::string label; - int64_t txn_id; RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue, _state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency, - _put_block_dependency, label, txn_id)); - _state->set_import_label(label); - _state->set_wal_id(txn_id); // wal_id is txn_id + _put_block_dependency)); + _state->set_import_label(_load_block_queue->label); + _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id return Status::OK(); } else { return Status::InternalError("be is stopping"); @@ -339,13 +337,25 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc local_state._vpartition->find_partition(block.get(), index, local_state._partitions[index]); } + bool stop_processing = false; for (int row_index = 0; row_index < rows; row_index++) { if (local_state._partitions[row_index] == nullptr) [[unlikely]] { local_state._filter_bitmap.Set(row_index, true); LOG(WARNING) << "no partition for this tuple. tuple=" << block->dump_data(row_index, 1); + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&]() -> std::string { + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple=\n{}", + block->dump_data(row_index, 1)); + return fmt::to_string(buf); + }, + &stop_processing)); + local_state._has_filtered_rows = true; + state->update_num_rows_load_filtered(1); + state->update_num_rows_load_total(-1); } - local_state._has_filtered_rows = true; } } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 54f25a708a42c6..401239b0a48a94 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -137,7 +137,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* auto last_print_duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - _last_print_time) .count(); - if (last_print_duration >= 5000) { + if (last_print_duration >= 10000) { _last_print_time = std::chrono::steady_clock::now(); LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id << ", label=" << label << ", instance_id=" << load_instance_id @@ -257,15 +257,13 @@ Status GroupCommitTable::get_first_block_load_queue( std::shared_ptr& load_block_queue, int be_exe_version, std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, - std::shared_ptr put_block_dep, std::string& label, int64_t& txn_id) { + std::shared_ptr put_block_dep) { DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { for (const auto& [_, inner_block_queue] : _load_block_queues) { if (inner_block_queue->contain_load_id(load_id)) { load_block_queue = inner_block_queue; - label = inner_block_queue->label; - txn_id = inner_block_queue->txn_id; return Status::OK(); } } @@ -274,8 +272,6 @@ Status GroupCommitTable::get_first_block_load_queue( if (base_schema_version == inner_block_queue->schema_version) { if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { load_block_queue = inner_block_queue; - label = inner_block_queue->label; - txn_id = inner_block_queue->txn_id; return Status::OK(); } } else { @@ -318,6 +314,10 @@ Status GroupCommitTable::get_first_block_load_queue( void GroupCommitTable::remove_load_id(const UniqueId& load_id) { std::unique_lock l(_lock); + if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) { + _create_plan_deps.erase(load_id); + return; + } for (const auto& [_, inner_block_queue] : _load_block_queues) { if (inner_block_queue->remove_load_id(load_id).ok()) { return; @@ -390,13 +390,13 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, instance_id, label, txn_id, schema_version, _all_block_queues_bytes, result.wait_internal_group_commit_finish, result.group_commit_interval_ms, result.group_commit_data_bytes); - std::unique_lock l(_lock); RETURN_IF_ERROR(load_block_queue->create_wal( _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, be_exe_version)); - _load_block_queues.emplace(instance_id, load_block_queue); + std::unique_lock l(_lock); + _load_block_queues.emplace(instance_id, load_block_queue); std::vector success_load_ids; for (const auto& [id, load_info] : _create_plan_deps) { auto create_dep = std::get<0>(load_info); @@ -408,8 +408,8 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, } } } - for (const auto& id2 : success_load_ids) { - _create_plan_deps.erase(id2); + for (const auto& id : success_load_ids) { + _create_plan_deps.erase(id); } } } @@ -596,7 +596,7 @@ Status GroupCommitMgr::get_first_block_load_queue( std::shared_ptr& load_block_queue, int be_exe_version, std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, - std::shared_ptr put_block_dep, std::string& label, int64_t& txn_id) { + std::shared_ptr put_block_dep) { std::shared_ptr group_commit_table; { std::lock_guard wlock(_lock); @@ -609,7 +609,7 @@ Status GroupCommitMgr::get_first_block_load_queue( } RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker, - create_plan_dep, put_block_dep, label, txn_id)); + create_plan_dep, put_block_dep)); return Status::OK(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 16c7e0c24d37aa..c6cb34a022a516 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -160,8 +160,7 @@ class GroupCommitTable { int be_exe_version, std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, - std::shared_ptr put_block_dep, - std::string& label, int64_t& txn_id); + std::shared_ptr put_block_dep); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue, std::shared_ptr get_block_dep); @@ -211,8 +210,7 @@ class GroupCommitMgr { int be_exe_version, std::shared_ptr mem_tracker, std::shared_ptr create_plan_dep, - std::shared_ptr put_block_dep, - std::string& label, int64_t& txn_id); + std::shared_ptr put_block_dep); void remove_load_id(int64_t table_id, const UniqueId& load_id); std::promise debug_promise; std::future debug_future = debug_promise.get_future(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5fbe354ffa0af9..a6f38ab632a158 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -193,6 +193,7 @@ import org.apache.doris.planner.SortNode; import org.apache.doris.planner.TableFunctionNode; import org.apache.doris.planner.UnionNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; @@ -431,8 +432,9 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink) this.logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index 5091bae17d10ae..984e8b0c8caa65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -17,12 +17,14 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; @@ -60,6 +62,7 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Unbo && ((OlapTable) table).getTableProperty().getUseSchemaLightChange() && !((OlapTable) table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME) && tableSink.getPartitions().isEmpty() + && (!(table instanceof MTMV) || MTMVUtil.allowModifyMTMVData(ctx)) && (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion)); } diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 9105a8dc8db2ff..e1cb943a9ab6f8 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -219,7 +219,9 @@ suite("insert_group_commit_into") { // 7. insert into and add rollup group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 group_commit_insert """ insert into ${table}(id) values(4); """, 1 + sql "set enable_insert_strict=false" group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2 + sql "set enable_insert_strict=true" sql """ alter table ${table} ADD ROLLUP r1(name, score); """ group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1 group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1 diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/txn_insert.groovy index 088e1134900446..4c42c3b12c495f 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/txn_insert.groovy @@ -495,7 +495,7 @@ suite("txn_insert") { assertFalse(true, "should not reach here") } catch (Exception e) { logger.info("exception: " + e) - assertTrue(e.getMessage().contains("The transaction is already timeout")) + assertTrue(e.getMessage().contains("The transaction is already timeout") || e.getMessage().contains("Execute timeout")) } finally { try { sql "rollback" From 31ada78775263e8699df7d0af861506cb50c5fc8 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 10 Jul 2024 18:34:09 +0800 Subject: [PATCH 2/2] fix --- .../load_p0/stream_load/test_group_commit_stream_load.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index e12a1f2f01b873..0f823f1d4ef56f 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 6, 3, 2, 1) + checkStreamLoadResult(exception, result, 6, 2, 3, 1) } }