diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index b0c46a3e12e687..46c94f3e46487a 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -304,6 +304,16 @@ Status KafkaDataConsumer::reset() { return Status::OK(); } +Status KafkaDataConsumer::commit(std::vector& offset) { + RdKafka::ErrorCode err = _k_consumer->commitSync(offset); + if (err != RdKafka::ERR_NO_ERROR) { + std::stringstream ss; + ss << "failed to commit kafka offset : " << RdKafka::err2str(err); + return Status::InternalError(ss.str()); + } + return Status::OK(); +} + // if the kafka brokers and topic are same, // we considered this consumer as matched, thus can be reused. bool KafkaDataConsumer::match(StreamLoadContext* ctx) { diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index 95b10e6b155b75..3cbf515c4577db 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -133,6 +133,8 @@ class KafkaDataConsumer : public DataConsumer { // reassign partition topics virtual Status reset() override; virtual bool match(StreamLoadContext* ctx) override; + // commit kafka offset + Status commit(std::vector& offset); Status assign_topic_partitions( const std::map& begin_partition_offset, diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index cd31aeee791ebf..d79299c21a7a9d 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -23,6 +23,7 @@ #include "runtime/routine_load/kafka_consumer_pipe.h" #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" +#include "util/defer_op.h" #include "util/uid_util.h" #include @@ -227,6 +228,41 @@ void RoutineLoadTaskExecutor::exec_task( // commit txn HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); + // commit kafka offset + switch (ctx->load_src_type) { + case TLoadSourceType::KAFKA: { + std::shared_ptr consumer; + Status st = _data_consumer_pool.get_consumer(ctx, &consumer); + if (!st.ok()) { + // Kafka Offset Commit is idempotent, Failure should not block the normal process + // So just print a warning + LOG(WARNING) << st.get_error_msg(); + break; + } + + std::vector topic_partitions; + for (auto& kv : ctx->kafka_info->cmt_offset) { + RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( + ctx->kafka_info->topic, kv.first, kv.second); + topic_partitions.push_back(tp1); + } + + st = std::static_pointer_cast(consumer)->commit(topic_partitions); + if (!st.ok()) { + // Kafka Offset Commit is idempotent, Failure should not block the normal process + // So just print a warning + LOG(WARNING) << st.get_error_msg(); + } + _data_consumer_pool.return_consumer(consumer); + + // delete TopicPartition finally + auto tp_deleter = [&topic_partitions] () { + std::for_each(topic_partitions.begin(), topic_partitions.end(), + [](RdKafka::TopicPartition* tp1) { delete tp1; }); + }; + DeferOp delete_tp(std::bind(tp_deleter)); + } + } cb(ctx); } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 2ad2fbbf7d93d4..ba243d7461a0e7 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -277,7 +277,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.loadSourceType = TLoadSourceType::KAFKA; TKafkaRLTaskProgress kafka_progress; - kafka_progress.partitionCmtOffset = std::move(ctx->kafka_info->cmt_offset); + kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset; rl_attach.kafkaRLTaskProgress = std::move(kafka_progress); rl_attach.__isset.kafkaRLTaskProgress = true;