Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_stream_load_executor.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand All @@ -29,23 +31,62 @@ CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)

CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default;

Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
auto st = _exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx);
if (!st.ok()) {
LOG(WARNING) << "Failed to precommit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}

Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation;
if (ctx->txn_operation.compare("commit") == 0) {
return _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
} else {
// 2pc abort
return _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
}
}

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
// forward to fe to execute commit transaction for MoW table
Status st;
int retry_times = 0;
// mow table will retry when DELETE_BITMAP_LOCK_ERROR occurs
do {
st = StreamLoadExecutor::commit_txn(ctx);
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
return StreamLoadExecutor::commit_txn(ctx);
}

// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table()) {
Status st;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
st = StreamLoadExecutor::commit_txn(ctx);
// DELETE_BITMAP_LOCK_ERROR will be retried
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
} while (retry_times < config::mow_stream_load_commit_retry_times);
return st;
}

auto st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false);
if (!st.ok()) {
LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}

} // namespace doris
void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
"Failed to rollback txn");
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_stream_load_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
#include "runtime/stream_load/stream_load_executor.h"

namespace doris {

class CloudStreamLoadExecutor final : public StreamLoadExecutor {
public:
CloudStreamLoadExecutor(ExecEnv* exec_env);

~CloudStreamLoadExecutor() override;

Status pre_commit_txn(StreamLoadContext* ctx) override;

Status operate_txn_2pc(StreamLoadContext* ctx) override;

Status commit_txn(StreamLoadContext* ctx) override;

void rollback_txn(StreamLoadContext* ctx) override;
};
} // namespace doris

} // namespace doris
7 changes: 7 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,11 @@ std::string StreamLoadContext::brief(bool detail) const {
return ss.str();
}

bool StreamLoadContext::is_mow_table() const {
return (put_result.__isset.params && put_result.params.__isset.is_mow_table &&
put_result.params.is_mow_table) ||
(put_result.__isset.pipeline_params && put_result.pipeline_params.__isset.is_mow_table &&
put_result.pipeline_params.is_mow_table);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class StreamLoadContext {
// also print the load source info if detail is set to true
std::string brief(bool detail = false) const;

bool is_mow_table() const;

public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/stream_load/stream_load_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ class StreamLoadExecutor {

Status begin_txn(StreamLoadContext* ctx);

Status pre_commit_txn(StreamLoadContext* ctx);
virtual Status pre_commit_txn(StreamLoadContext* ctx);

Status operate_txn_2pc(StreamLoadContext* ctx);
virtual Status operate_txn_2pc(StreamLoadContext* ctx);

virtual Status commit_txn(StreamLoadContext* ctx);

void get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& request);

void rollback_txn(StreamLoadContext* ctx);
virtual void rollback_txn(StreamLoadContext* ctx);

Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);

private:
protected:
// collect the load statistics from context and set them to stat
// return true if stat is set, otherwise, return false
bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,16 @@ public void commitTransaction(long dbId, List<Table> tableList, long transaction
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
}

/**
* Post process of commitTxn
* 1. update some stats
* 2. produce event for further processes like async MV
* @param commitTxnResponse commit txn call response from meta-service
*/
public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
// ========================================
// update some table stats
// ========================================
long dbId = commitTxnResponse.getTxnInfo().getDbId();
long txnId = commitTxnResponse.getTxnInfo().getTxnId();
// 1. update rowCountfor AnalysisManager
Expand Down Expand Up @@ -389,6 +398,25 @@ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
if (sb.length() > 0) {
LOG.info("notify partition first load. {}", sb);
}

// ========================================
// produce event
// ========================================
List<Long> tableList = commitTxnResponse.getTxnInfo().getTableIdsList()
.stream().distinct().collect(Collectors.toList());
// Here, we only wait for the EventProcessor to finish processing the event,
// but regardless of the success or failure of the result,
// it does not affect the logic of transaction
try {
for (Long tableId : tableList) {
Env.getCurrentEnv().getEventProcessor().processEvent(
new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, tableId));
}
} catch (Throwable t) {
// According to normal logic, no exceptions will be thrown,
// but in order to avoid bugs affecting the original logic, all exceptions are caught
LOG.warn("produceEvent failed, db {}, tables {} ", dbId, tableList, t);
}
}

private Set<Long> getBaseTabletsFromTables(List<Table> tableList, List<TabletCommitInfo> tabletCommitInfos)
Expand Down Expand Up @@ -528,23 +556,6 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime());
}
afterCommitTxnResp(commitTxnResponse);
// Here, we only wait for the EventProcessor to finish processing the event,
// but regardless of the success or failure of the result,
// it does not affect the logic of transaction
try {
produceEvent(dbId, tableList);
} catch (Throwable t) {
// According to normal logic, no exceptions will be thrown,
// but in order to avoid bugs affecting the original logic, all exceptions are caught
LOG.warn("produceEvent failed: ", t);
}
}

private void produceEvent(long dbId, List<Table> tableList) {
for (Table table : tableList) {
Env.getCurrentEnv().getEventProcessor().processEvent(
new DataChangeEvent(InternalCatalog.INTERNAL_CATALOG_ID, dbId, table.getId()));
}
}

private List<OlapTable> getMowTableList(List<Table> tableList) {
Expand Down