Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
std::string label;
int64_t txn_id;
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency));
_put_block_dependency, label, txn_id));
_state->set_import_label(label);
_state->set_wal_id(txn_id); // wal_id is txn_id
return Status::OK();
} else {
return Status::InternalError("be is stopping");
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
}
if (!req.runtime_state->export_output_files().empty()) {
Expand Down
8 changes: 5 additions & 3 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ Status GroupCommitTable::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
Expand All @@ -266,6 +266,8 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
load_block_queue = inner_block_queue;
label = inner_block_queue->label;
txn_id = inner_block_queue->txn_id;
return Status::OK();
}
} else {
Expand Down Expand Up @@ -561,7 +563,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -574,7 +576,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker,
create_plan_dep, put_block_dep));
create_plan_dep, put_block_dep, label, txn_id));
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class GroupCommitTable {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
Expand Down Expand Up @@ -205,7 +206,8 @@ class GroupCommitMgr {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}

if (this.logicalQuery instanceof UnboundTableSink) {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf,
(UnboundTableSink<?>) this.logicalQuery);
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
Expand All @@ -167,17 +170,16 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor

if (physicalSink instanceof PhysicalOlapTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
if (GroupCommitInsertExecutor.canGroupCommit(ctx, sink, physicalSink, planner)) {
insertExecutor = new GroupCommitInsertExecutor(ctx, targetTableIf, label, planner, insertCtx,
emptyInsert);
targetTableIf.readUnlock();
return insertExecutor;
}
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = ctx.isTxnModel()
? new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
: new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
if (ctx.isTxnModel()) {
insertExecutor = new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
} else if (ctx.isGroupCommit()) {
insertExecutor = new OlapGroupCommitInsertExecutor(ctx, olapTable, label, planner, insertCtx,
emptyInsert);
} else {
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
}

boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
Expand Down
Loading