From 0f419c3485290e06195056c9faec13abfedf6987 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Tue, 24 Oct 2023 13:08:18 +0800 Subject: [PATCH 1/4] add routine load rows check --- be/src/runtime/routine_load/data_consumer_group.cpp | 3 ++- be/src/runtime/routine_load/data_consumer_group.h | 6 ++++++ .../runtime/routine_load/routine_load_task_executor.cpp | 9 +++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 60e7c57a6c12fa..c54bbbd99dd99a 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -125,9 +125,10 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, bool eos = false; while (true) { if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { + _rows = ctx->max_batch_rows - left_rows; LOG(INFO) << "consumer group done: " << _grp_id << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time - << ", received rows=" << ctx->max_batch_rows - left_rows + << ", received rows=" << _rows << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos << ", left_time: " << left_time << ", left_rows: " << left_rows << ", left_bytes: " << left_bytes diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index e15ad7115f6e9a..aedcd4f0697ef4 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -60,6 +60,10 @@ class DataConsumerGroup { ++_counter; } + int64_t get_consumer_rows() { return _rows; } + + void set_consumer_rows(int64_t rows) { _rows = rows; } + // start all consumers virtual Status start_all(std::shared_ptr ctx, std::shared_ptr kafka_pipe) { @@ -77,6 +81,8 @@ class DataConsumerGroup { // when the counter becomes zero, shutdown the queue to finish std::mutex _mutex; int _counter; + // received total rows + int64_t _rows; }; // for kafka diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 445e78e06f382a..c9769b43b44030 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -355,6 +355,15 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); + // check received and load rows + LOG(INFO) << "routine load task received rows: " << consumer_grp.get()->get_consumer_rows() + << " load total rows: " << ctx.get()->number_total_rows + << " loaded rows: " << ctx.get()->number_loaded_rows + << " filtered rows: " << ctx.get()->number_filtered_rows + << " unselected rows: " << ctx.get()->number_unselected_rows; + DCHECK(consumer_grp.get()->get_consumer_rows() == ctx.get()->number_total_rows); + consumer_grp.get()->set_consumer_rows(0); + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; // return the consumer back to pool From b13b4e89024b75797ece1ae5be0b5686a9f03e7e Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Tue, 24 Oct 2023 16:52:55 +0800 Subject: [PATCH 2/4] update --- be/src/runtime/routine_load/data_consumer_group.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index aedcd4f0697ef4..f6e7d6ec5dd3a3 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -60,7 +60,7 @@ class DataConsumerGroup { ++_counter; } - int64_t get_consumer_rows() { return _rows; } + int64_t get_consumer_rows() const { return _rows; } void set_consumer_rows(int64_t rows) { _rows = rows; } From f5d677df90ce9a59020e8e17b658305b03918e2e Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Tue, 24 Oct 2023 20:36:29 +0800 Subject: [PATCH 3/4] update --- be/src/runtime/routine_load/data_consumer_group.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index f6e7d6ec5dd3a3..1fa1298a6a0902 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -46,7 +46,10 @@ class DataConsumerGroup { typedef std::function ConsumeFinishCallback; DataConsumerGroup() - : _grp_id(UniqueId::gen_uid()), _thread_pool(3, 10, "data_consumer"), _counter(0) {} + : _grp_id(UniqueId::gen_uid()), + _thread_pool(3, 10, "data_consumer"), + _counter(0), + _rows(0) {} virtual ~DataConsumerGroup() { _consumers.clear(); } From bbae629e74520e662f0f6c82eecf3cd08bbc3e5e Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Tue, 24 Oct 2023 21:42:04 +0800 Subject: [PATCH 4/4] update --- be/src/runtime/routine_load/data_consumer_group.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index 1fa1298a6a0902..0cda80a9ec4dc3 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -46,10 +46,7 @@ class DataConsumerGroup { typedef std::function ConsumeFinishCallback; DataConsumerGroup() - : _grp_id(UniqueId::gen_uid()), - _thread_pool(3, 10, "data_consumer"), - _counter(0), - _rows(0) {} + : _grp_id(UniqueId::gen_uid()), _thread_pool(3, 10, "data_consumer"), _counter(0) {} virtual ~DataConsumerGroup() { _consumers.clear(); } @@ -85,7 +82,7 @@ class DataConsumerGroup { std::mutex _mutex; int _counter; // received total rows - int64_t _rows; + int64_t _rows {0}; }; // for kafka