diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 5f6c789f769fc7..7242fbe6027c90 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -116,7 +116,6 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { MonotonicStopWatch watch; watch.start(); - Status st; bool eos = false; while (true) { if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { @@ -140,12 +139,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { // waiting all threads finished _thread_pool.shutdown(); _thread_pool.join(); - if (!result_st.ok()) { - // some of consumers encounter errors, cancel this task + kafka_pipe->cancel(result_st.get_error_msg()); return result_st; } - kafka_pipe->finish(); ctx->kafka_info->cmt_offset = std::move(cmt_offset); ctx->receive_bytes = ctx->max_batch_size - left_bytes; @@ -159,9 +156,8 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); - (kafka_pipe.get()->*append_data)(static_cast(msg->payload()), + Status st = (kafka_pipe.get()->*append_data)(static_cast(msg->payload()), static_cast(msg->len())); - if (st.ok()) { left_rows--; left_bytes -= msg->len(); @@ -172,6 +168,12 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { // failed to append this msg, we must stop LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; eos = true; + { + std::unique_lock lock(_mutex); + if (result_st.ok()) { + result_st = st; + } + } } delete msg; } else {