Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,19 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
"CreateGroupCommitPlanDependency", true);
_put_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitPutBlockDependency", true);
WARN_IF_ERROR(_initialize_load_queue(), "");
[[maybe_unused]] auto st = _initialize_load_queue();
return Status::OK();
}

Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
std::string label;
int64_t txn_id;
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency, label, txn_id));
_state->set_import_label(label);
_state->set_wal_id(txn_id); // wal_id is txn_id
_put_block_dependency));
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
return Status::OK();
} else {
return Status::InternalError("be is stopping");
Expand Down Expand Up @@ -339,13 +337,25 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc
local_state._vpartition->find_partition(block.get(), index,
local_state._partitions[index]);
}
bool stop_processing = false;
for (int row_index = 0; row_index < rows; row_index++) {
if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
local_state._filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
<< block->dump_data(row_index, 1);
RETURN_IF_ERROR(state->append_error_msg_to_file(
[]() -> std::string { return ""; },
[&]() -> std::string {
fmt::memory_buffer buf;
fmt::format_to(buf, "no partition for this tuple. tuple=\n{}",
block->dump_data(row_index, 1));
return fmt::to_string(buf);
},
&stop_processing));
local_state._has_filtered_rows = true;
state->update_num_rows_load_filtered(1);
state->update_num_rows_load_total(-1);
}
local_state._has_filtered_rows = true;
}
}

Expand Down
24 changes: 12 additions & 12 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
if (last_print_duration >= 5000) {
if (last_print_duration >= 10000) {
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
Expand Down Expand Up @@ -257,15 +257,13 @@ Status GroupCommitTable::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
std::shared_ptr<pipeline::Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->contain_load_id(load_id)) {
load_block_queue = inner_block_queue;
label = inner_block_queue->label;
txn_id = inner_block_queue->txn_id;
return Status::OK();
}
}
Expand All @@ -274,8 +272,6 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
load_block_queue = inner_block_queue;
label = inner_block_queue->label;
txn_id = inner_block_queue->txn_id;
return Status::OK();
}
} else {
Expand Down Expand Up @@ -318,6 +314,10 @@ Status GroupCommitTable::get_first_block_load_queue(

void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(_lock);
if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) {
_create_plan_deps.erase(load_id);
return;
}
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->remove_load_id(load_id).ok()) {
return;
Expand Down Expand Up @@ -390,13 +390,13 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
result.group_commit_data_bytes);
std::unique_lock l(_lock);
RETURN_IF_ERROR(load_block_queue->create_wal(
_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
_load_block_queues.emplace(instance_id, load_block_queue);

std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
std::vector<UniqueId> success_load_ids;
for (const auto& [id, load_info] : _create_plan_deps) {
auto create_dep = std::get<0>(load_info);
Expand All @@ -408,8 +408,8 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
}
}
}
for (const auto& id2 : success_load_ids) {
_create_plan_deps.erase(id2);
for (const auto& id : success_load_ids) {
_create_plan_deps.erase(id);
}
}
}
Expand Down Expand Up @@ -596,7 +596,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -609,7 +609,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker,
create_plan_dep, put_block_dep, label, txn_id));
create_plan_dep, put_block_dep));
return Status::OK();
}

Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ class GroupCommitTable {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
std::shared_ptr<pipeline::Dependency> put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
Expand Down Expand Up @@ -211,8 +210,7 @@ class GroupCommitMgr {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
std::shared_ptr<pipeline::Dependency> put_block_dep);
void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
Expand Down Expand Up @@ -431,8 +432,9 @@ public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends P
// This statement is only used in the group_commit mode
if (context.getConnectContext().isGroupCommit()) {
sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), olapTuple,
olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(),
context.getSessionVariable().getGroupCommit(), 0);
olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(),
context.getSessionVariable().getGroupCommit(),
ConnectContext.get().getSessionVariable().getEnableInsertStrict() ? 0 : 1);
} else {
sink = new OlapTableSink(
olapTableSink.getTargetTable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}
if (this.logicalQuery instanceof UnboundTableSink) {
boolean isOverwrite = insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext
&& ((OlapInsertCommandContext) insertCtx.get()).isOverwrite();
if (this.logicalQuery instanceof UnboundTableSink && !isOverwrite) {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf,
(UnboundTableSink<?>) this.logicalQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
Expand Down Expand Up @@ -60,6 +62,7 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Unbo
&& ((OlapTable) table).getTableProperty().getUseSchemaLightChange()
&& !((OlapTable) table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
&& tableSink.getPartitions().isEmpty()
&& (!(table instanceof MTMV) || MTMVUtil.allowModifyMTMVData(ctx))
&& (tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ suite("insert_group_commit_into") {
// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4); """, 1
sql "set enable_insert_strict=false"
group_commit_insert """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """, 2
sql "set enable_insert_strict=true"
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/insert_p0/txn_insert.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ suite("txn_insert") {
assertFalse(true, "should not reach here")
} catch (Exception e) {
logger.info("exception: " + e)
assertTrue(e.getMessage().contains("The transaction is already timeout"))
assertTrue(e.getMessage().contains("The transaction is already timeout") || e.getMessage().contains("Execute timeout"))
} finally {
try {
sql "rollback"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 6, 3, 2, 1)
checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}

Expand Down