diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 0b5de1f47fbad5..4bbfb35eb8e027 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -97,6 +97,7 @@ add_library(Runtime STATIC stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp routine_load/data_consumer.cpp + routine_load/data_consumer_pool.cpp routine_load/routine_load_task_executor.cpp ) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index a64fb20ce983e9..dcb30023f83d65 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -23,13 +23,18 @@ #include #include "common/status.h" +#include "runtime/stream_load/stream_load_pipe.h" #include "runtime/routine_load/kafka_consumer_pipe.h" +#include "service/backend_options.h" #include "util/defer_op.h" #include "util/stopwatch.hpp" +#include "util/uid_util.h" namespace doris { -Status KafkaDataConsumer::init() { +// init kafka consumer will only set common configs such as +// brokers, groupid +Status KafkaDataConsumer::init(StreamLoadContext* ctx) { std::unique_lock l(_lock); if (_init) { // this consumer has already been initialized. @@ -42,29 +47,32 @@ Status KafkaDataConsumer::init() { auto conf_deleter = [conf] () { delete conf; }; DeferOp delete_conf(std::bind(conf_deleter)); + std::stringstream ss; + ss << BackendOptions::get_localhost() << "_"; + std::string group_id = ss.str() + UniqueId().to_string(); + LOG(INFO) << "init kafka consumer with group id: " << group_id; + std::string errstr; - auto set_conf = [conf, &errstr](const std::string& conf_key, const std::string& conf_val) { + auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { std::stringstream ss; ss << "failed to set '" << conf_key << "'"; LOG(WARNING) << ss.str(); return Status(ss.str()); } + VLOG(3) << "set " << conf_key << ": " << conf_val; return Status::OK; }; - RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers)); - - RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers)); - RETURN_IF_ERROR(set_conf("group.id", _ctx->kafka_info->group_id)); - RETURN_IF_ERROR(set_conf("client.id", _ctx->kafka_info->client_id)); + RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); + RETURN_IF_ERROR(set_conf("group.id", group_id)); RETURN_IF_ERROR(set_conf("enable.partition.eof", "false")); RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); + RETURN_IF_ERROR(set_conf("auto.offset.reset", "error")); - KafkaEventCb event_cb; - if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) { + if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) { std::stringstream ss; ss << "failed to set 'event_cb'"; LOG(WARNING) << ss.str(); @@ -78,14 +86,27 @@ Status KafkaDataConsumer::init() { return Status("failed to create kafka consumer"); } + VLOG(3) << "finished to init kafka consumer. " << ctx->brief(); + + _init = true; + return Status::OK; +} + +Status KafkaDataConsumer::assign_topic_partitions(StreamLoadContext* ctx) { + DCHECK(_k_consumer); // create TopicPartitions + std::stringstream ss; std::vector topic_partitions; - for (auto& entry : _ctx->kafka_info->begin_offset) { + for (auto& entry : ctx->kafka_info->begin_offset) { RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( - _ctx->kafka_info->topic, entry.first, entry.second); + ctx->kafka_info->topic, entry.first, entry.second); topic_partitions.push_back(tp1); + ss << "partition[" << entry.first << "-" << entry.second << "] "; } + VLOG(1) << "assign topic partitions: " << ctx->kafka_info->topic + << ", " << ss.str(); + // delete TopicPartition finally auto tp_deleter = [&topic_partitions] () { std::for_each(topic_partitions.begin(), topic_partitions.end(), @@ -96,19 +117,15 @@ Status KafkaDataConsumer::init() { // assign partition RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); if (err) { - LOG(WARNING) << "failed to assign topic partitions: " << _ctx->brief(true) + LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true) << ", err: " << RdKafka::err2str(err); - return Status("failed to assgin topic partitions"); + return Status("failed to assign topic partitions"); } - VLOG(3) << "finished to init kafka consumer. " - << _ctx->brief(true); - - _init = true; return Status::OK; } -Status KafkaDataConsumer::start() { +Status KafkaDataConsumer::start(StreamLoadContext* ctx) { { std::unique_lock l(_lock); if (!_init) { @@ -116,15 +133,17 @@ Status KafkaDataConsumer::start() { } } - int64_t left_time = _ctx->kafka_info->max_interval_s; - int64_t left_rows = _ctx->kafka_info->max_batch_rows; - int64_t left_bytes = _ctx->kafka_info->max_batch_size; + int64_t left_time = ctx->kafka_info->max_interval_s; + int64_t left_rows = ctx->kafka_info->max_batch_rows; + int64_t left_bytes = ctx->kafka_info->max_batch_size; + + std::shared_ptr kakfa_pipe = std::static_pointer_cast(ctx->body_sink); LOG(INFO) << "start consumer" - << ". interval(s): " << left_time + << ". max time(s): " << left_time << ", bath rows: " << left_rows << ", batch size: " << left_bytes - << ". " << _ctx->brief(); + << ". " << ctx->brief(); MonotonicStopWatch watch; watch.start(); @@ -132,23 +151,33 @@ Status KafkaDataConsumer::start() { while (true) { std::unique_lock l(_lock); if (_cancelled) { - _kafka_consumer_pipe->cancel(); + kakfa_pipe ->cancel(); return Status::CANCELLED; } if (_finished) { - _kafka_consumer_pipe->finish(); + kakfa_pipe ->finish(); return Status::OK; } if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { - VLOG(3) << "kafka consume batch finished" + VLOG(3) << "kafka consume batch done" << ". left time=" << left_time << ", left rows=" << left_rows << ", left bytes=" << left_bytes; - _kafka_consumer_pipe->finish(); - _finished = true; - return Status::OK; + + if (left_bytes == ctx->kafka_info->max_batch_size) { + // nothing to be consumed, cancel it + kakfa_pipe->cancel(); + _cancelled = true; + return Status::CANCELLED; + } else { + DCHECK(left_bytes < ctx->kafka_info->max_batch_size); + DCHECK(left_rows < ctx->kafka_info->max_batch_rows); + kakfa_pipe->finish(); + _finished = true; + return Status::OK; + } } // consume 1 message at a time @@ -160,15 +189,15 @@ Status KafkaDataConsumer::start() { << ", offset: " << msg->offset() << ", len: " << msg->len(); - st = _kafka_consumer_pipe->append_with_line_delimiter( + st = kakfa_pipe ->append_with_line_delimiter( static_cast(msg->payload()), static_cast(msg->len())); if (st.ok()) { left_rows--; left_bytes -= msg->len(); - _ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset(); - VLOG(3) << "consume partition[ " << msg->partition() - << " - " << msg->offset(); + ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset(); + VLOG(3) << "consume partition[" << msg->partition() + << " - " << msg->offset() << "]"; } break; @@ -185,17 +214,17 @@ Status KafkaDataConsumer::start() { delete msg; if (!st.ok()) { - _kafka_consumer_pipe->cancel(); + kakfa_pipe ->cancel(); return st; } - left_time = _ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; + left_time = ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; } return Status::OK; } -Status KafkaDataConsumer::cancel() { +Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { std::unique_lock l(_lock); if (!_init) { return Status("consumer is not initialized"); @@ -209,4 +238,23 @@ Status KafkaDataConsumer::cancel() { return Status::OK; } +Status KafkaDataConsumer::reset() { + std::unique_lock l(_lock); + _finished = false; + _cancelled = false; + return Status::OK; +} + +// if the kafka brokers and topic are same, +// we considered this consumer as matched, thus can be reused. +bool KafkaDataConsumer::match(StreamLoadContext* ctx) { + if (ctx->load_src_type != TLoadSourceType::KAFKA) { + return false; + } + if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) { + return false; + } + return true; +} + } // end namespace doris diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index b78bfba78dde94..d71aa0d24038d7 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -22,43 +22,43 @@ #include "librdkafka/rdkafkacpp.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/uid_util.h" namespace doris { class KafkaConsumerPipe; class Status; +class StreamLoadPipe; class DataConsumer { public: DataConsumer(StreamLoadContext* ctx): - _ctx(ctx), _init(false), _finished(false), _cancelled(false) { - - _ctx->ref(); } virtual ~DataConsumer() { - if (_ctx->unref()) { - delete _ctx; - } } // init the consumer with the given parameters - virtual Status init() = 0; - + virtual Status init(StreamLoadContext* ctx) = 0; // start consuming - virtual Status start() = 0; - + virtual Status start(StreamLoadContext* ctx) = 0; // cancel the consuming process. // if the consumer is not initialized, or the consuming // process is already finished, call cancel() will // return ERROR - virtual Status cancel() = 0; + virtual Status cancel(StreamLoadContext* ctx) = 0; + // reset the data consumer before being reused + virtual Status reset() = 0; + // return true the if the consumer match the need + virtual bool match(StreamLoadContext* ctx) = 0; + const UniqueId& id() { return _id; } + protected: - StreamLoadContext* _ctx; + UniqueId _id; // lock to protect the following bools std::mutex _lock; @@ -67,34 +67,6 @@ class DataConsumer { bool _cancelled; }; -class KafkaDataConsumer : public DataConsumer { -public: - KafkaDataConsumer( - StreamLoadContext* ctx, - std::shared_ptr kafka_consumer_pipe - ): - DataConsumer(ctx), - _kafka_consumer_pipe(kafka_consumer_pipe) { - } - - virtual Status init() override; - - virtual Status start() override; - - virtual Status cancel() override; - - virtual ~KafkaDataConsumer() { - if (_k_consumer) { - _k_consumer->close(); - delete _k_consumer; - } - } - -private: - std::shared_ptr _kafka_consumer_pipe; - RdKafka::KafkaConsumer* _k_consumer = nullptr; -}; - class KafkaEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { @@ -126,4 +98,39 @@ class KafkaEventCb : public RdKafka::EventCb { } }; +class KafkaDataConsumer : public DataConsumer { +public: + KafkaDataConsumer(StreamLoadContext* ctx): + DataConsumer(ctx), + _brokers(ctx->kafka_info->brokers), + _topic(ctx->kafka_info->topic) { + } + + virtual ~KafkaDataConsumer() { + VLOG(3) << "deconstruct consumer"; + if (_k_consumer) { + _k_consumer->close(); + delete _k_consumer; + _k_consumer = nullptr; + } + } + + virtual Status init(StreamLoadContext* ctx) override; + virtual Status start(StreamLoadContext* ctx) override; + virtual Status cancel(StreamLoadContext* ctx) override; + // reassign partition topics + virtual Status reset() override; + virtual bool match(StreamLoadContext* ctx) override; + + Status assign_topic_partitions(StreamLoadContext* ctx); + +private: + std::string _brokers; + std::string _topic; + + KafkaEventCb _k_event_cb; + RdKafka::KafkaConsumer* _k_consumer = nullptr; + std::shared_ptr _k_consumer_pipe; +}; + } // end namespace doris diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index c08712cf259add..8dbff529c00b60 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -83,12 +83,14 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { _task_map[ctx->id] = ctx; // offer the task to thread pool - if (!_thread_pool->offer( + if (!_thread_pool.offer( boost::bind(&RoutineLoadTaskExecutor::exec_task, this, ctx, + &_data_consumer_pool, [this] (StreamLoadContext* ctx) { std::unique_lock l(_lock); _task_map.erase(ctx->id); LOG(INFO) << "finished routine load task " << ctx->brief() + << ", status: " << ctx->status.get_error_msg() << ", current tasks num: " << _task_map.size(); if (ctx->unref()) { delete ctx; @@ -111,17 +113,30 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } void RoutineLoadTaskExecutor::exec_task( - StreamLoadContext* ctx, ExecFinishCallback cb) { + StreamLoadContext* ctx, + DataConsumerPool* consumer_pool, + ExecFinishCallback cb) { - // create pipe and consumer - std::shared_ptr pipe; +#define HANDLE_ERROR(stmt, err_msg) \ + do { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok())) { \ + err_handler(ctx, _status_, err_msg); \ + cb(ctx); \ + return; \ + } \ + } while (false); + + // get or create data consumer std::shared_ptr consumer; + HANDLE_ERROR(consumer_pool->get_consumer(ctx, &consumer), "failed to get consumer"); + + // create and set pipe + std::shared_ptr pipe; switch (ctx->load_src_type) { case TLoadSourceType::KAFKA: pipe = std::make_shared(); - consumer = std::make_shared( - ctx, std::static_pointer_cast(pipe)); - ctx->body_sink = pipe; + std::static_pointer_cast(consumer)->assign_topic_partitions(ctx); break; default: std::stringstream ss; @@ -130,18 +145,7 @@ void RoutineLoadTaskExecutor::exec_task( cb(ctx); return; } - -#define HANDLE_ERROR(stmt, err_msg) \ - do { \ - Status _status_ = (stmt); \ - if (UNLIKELY(!_status_.ok())) { \ - err_handler(ctx, _status_, err_msg); \ - cb(ctx); \ - return; \ - } \ - } while (false); - - HANDLE_ERROR(consumer->init(), "failed to init consumer"); + ctx->body_sink = pipe; // must put pipe before executing plan fragment HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); @@ -156,7 +160,7 @@ void RoutineLoadTaskExecutor::exec_task( #endif // start to consume, this may block a while - HANDLE_ERROR(consumer->start(), "consuming failed"); + HANDLE_ERROR(consumer->start(ctx), "consuming failed"); // wait for consumer finished HANDLE_ERROR(ctx->future.get(), "consume failed"); @@ -165,7 +169,10 @@ void RoutineLoadTaskExecutor::exec_task( // commit txn HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); - + + // return the consumer back to pool + consumer_pool->return_consumer(consumer); + cb(ctx); } @@ -187,6 +194,7 @@ void RoutineLoadTaskExecutor::err_handler( return; } +// for test only Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { auto mock_consumer = [this, ctx]() { std::shared_ptr pipe = _exec_env->load_stream_mgr()->get(ctx->id); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 259098fd0c8655..3f78e2a26996a6 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -21,6 +21,7 @@ #include #include +#include "runtime/routine_load/data_consumer_pool.h" #include "util/thread_pool.hpp" #include "util/uid_util.h" @@ -41,14 +42,13 @@ class RoutineLoadTaskExecutor { typedef std::function ExecFinishCallback; RoutineLoadTaskExecutor(ExecEnv* exec_env): - _exec_env(exec_env) { - _thread_pool = new ThreadPool(10, 1000); + _exec_env(exec_env), + _thread_pool(10, 1000), + _data_consumer_pool(10) { } ~RoutineLoadTaskExecutor() { - if (_thread_pool) { - delete _thread_pool; - } + } // submit a routine load task @@ -56,18 +56,20 @@ class RoutineLoadTaskExecutor { private: // execute the task - void exec_task(StreamLoadContext* ctx, ExecFinishCallback cb); + void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb); void err_handler( StreamLoadContext* ctx, const Status& st, const std::string& err_msg); + // for test only Status _execute_plan_for_test(StreamLoadContext* ctx); private: ExecEnv* _exec_env; - ThreadPool* _thread_pool; + ThreadPool _thread_pool; + DataConsumerPool _data_consumer_pool; std::mutex _lock; // task id -> load context diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 45c06d122cdace..c4b2a706604d6f 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -74,15 +74,13 @@ std::string StreamLoadContext::to_json() const { std::string StreamLoadContext::brief(bool detail) const { std::stringstream ss; - ss << " id=" << id << ", txn id=" << txn_id << ", label=" << label; + ss << "id=" << id << ", txn id=" << txn_id << ", label=" << label; if (detail) { switch(load_src_type) { case TLoadSourceType::KAFKA: if (kafka_info != nullptr) { ss << ", kafka" << ", brokers: " << kafka_info->brokers - << ", group_id: " << kafka_info->group_id - << ", client_id: " << kafka_info->client_id << ", topic: " << kafka_info->topic << ", partition: "; for (auto& entry : kafka_info->begin_offset) { diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index dce825702220d9..11ff1f3355f362 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -31,6 +31,7 @@ #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "service/backend_options.h" +#include "util/string_util.h" #include "util/time.h" #include "util/uid_util.h" @@ -47,18 +48,11 @@ class KafkaLoadInfo { if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; } if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; } if (t_info.__isset.max_batch_size) { max_batch_size = t_info.max_batch_size; } - - std::stringstream ss; - ss << BackendOptions::get_localhost() << "_"; - client_id = ss.str() + UniqueId().to_string(); - group_id = ss.str() + UniqueId().to_string(); } public: std::string brokers; std::string topic; - std::string group_id; - std::string client_id; // the following members control the max progress of a consuming // process. if any of them reach, the consuming will finish. @@ -95,8 +89,6 @@ class StreamLoadContext { } } - void rollback(); - std::string to_json() const; // return the brief info of this context. @@ -124,8 +116,6 @@ class StreamLoadContext { std::string table; std::string label; - std::string user_ip; - AuthInfo auth; // only used to check if we receive whole body @@ -160,6 +150,9 @@ class StreamLoadContext { KafkaLoadInfo* kafka_info = nullptr; + // consumer_id is used for data consumer cache key. + // to identified a specified data consumer. + int64_t consumer_id; private: ExecEnv* _exec_env; std::atomic _refs; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 477ba5cdeb06dd..3c078cb4d9cbbc 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -20,6 +20,7 @@ #include "runtime/exec_env.h" #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" +#include "util/logging.h" #include @@ -29,6 +30,8 @@ namespace doris { +using namespace RdKafka; + extern TLoadTxnBeginResult k_stream_load_begin_result; extern TLoadTxnCommitResult k_stream_load_commit_result; extern TLoadTxnRollbackResult k_stream_load_rollback_result; @@ -76,15 +79,13 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { TKafkaLoadInfo k_info; k_info.brokers = "127.0.0.1:9092"; - k_info.group_id = "6"; - k_info.client_id = "7"; k_info.topic = "test"; - k_info.__set_max_interval_s(10); + k_info.__set_max_interval_s(5); k_info.__set_max_batch_rows(10); k_info.__set_max_batch_size(2048); std::map part_off; - part_off[0] = 0L; + part_off[0] = 13L; k_info.__set_partition_begin_offset(part_off); task.__set_kafka_load_info(k_info); @@ -96,20 +97,38 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { st = executor.submit_task(task); ASSERT_TRUE(st.ok()); - // st = executor.submit_task(task); - // ASSERT_TRUE(st.ok()); -} + sleep(10); + k_info.brokers = "127.0.0.2:9092"; + task.__set_kafka_load_info(k_info); + st = executor.submit_task(task); + ASSERT_TRUE(st.ok()); + sleep(10); + k_info.brokers = "192.0.0.2:9092"; + task.__set_kafka_load_info(k_info); + st = executor.submit_task(task); + ASSERT_TRUE(st.ok()); + + sleep(10); + k_info.brokers = "192.0.0.2:9092"; + task.__set_kafka_load_info(k_info); + st = executor.submit_task(task); + ASSERT_TRUE(st.ok()); + + sleep(10); } +} // end namespace + int main(int argc, char* argv[]) { std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; if (!doris::config::init(conffile.c_str(), false)) { fprintf(stderr, "error read config file. \n"); return -1; } + doris::init_glog("be-test"); + ::testing::InitGoogleTest(&argc, argv); - doris::CpuInfo::init(); return RUN_ALL_TESTS(); } diff --git a/conf/be.conf b/conf/be.conf index 31e593ddc04593..8807bca542e946 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -29,6 +29,6 @@ storage_root_path = /home/disk1/palo;/home/disk2/palo # sys_log_dir = ${DORIS_HOME}/log # sys_log_roll_mode = SIZE-MB-1024 # sys_log_roll_num = 10 -# sys_log_verbose_modules = +# sys_log_verbose_modules = * # log_buffer_level = -1 # palo_cgroups diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 41c726469fc72a..2ef34910f9775f 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -627,8 +627,12 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce cluster = SystemInfoService.DEFAULT_CLUSTER; } - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); + if (request.isSetAuth_code()) { + // TODO(cmy): find a way to check + } else { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); + } // get database Catalog catalog = Catalog.getInstance(); @@ -676,8 +680,12 @@ private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserExc cluster = SystemInfoService.DEFAULT_CLUSTER; } - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); + if (request.isSetAuth_code()) { + // TODO(cmy): find a way to check + } else { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); + } Catalog.getCurrentGlobalTransactionMgr().abortTransaction(request.getTxnId(), request.isSetReason() ? request.getReason() : "system cancel"); @@ -755,3 +763,4 @@ private TNetworkAddress getClientAddr() { } } +