Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ Status KafkaDataConsumer::reset() {
return Status::OK();
}

Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset) {
RdKafka::ErrorCode err = _k_consumer->commitSync(offset);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
return Status::InternalError(ss.str());
}
return Status::OK();
}

// if the kafka brokers and topic are same,
// we considered this consumer as matched, thus can be reused.
bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class KafkaDataConsumer : public DataConsumer {
// reassign partition topics
virtual Status reset() override;
virtual bool match(StreamLoadContext* ctx) override;
// commit kafka offset
Status commit(std::vector<RdKafka::TopicPartition*>& offset);

Status assign_topic_partitions(
const std::map<int32_t, int64_t>& begin_partition_offset,
Expand Down
36 changes: 36 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "runtime/routine_load/kafka_consumer_pipe.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "util/defer_op.h"
#include "util/uid_util.h"

#include <thread>
Expand Down Expand Up @@ -227,6 +228,41 @@ void RoutineLoadTaskExecutor::exec_task(
// commit txn
HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed");

// commit kafka offset
switch (ctx->load_src_type) {
case TLoadSourceType::KAFKA: {
std::shared_ptr<DataConsumer> consumer;
Status st = _data_consumer_pool.get_consumer(ctx, &consumer);
if (!st.ok()) {
// Kafka Offset Commit is idempotent, Failure should not block the normal process
// So just print a warning
LOG(WARNING) << st.get_error_msg();
break;
}

std::vector<RdKafka::TopicPartition*> topic_partitions;
for (auto& kv : ctx->kafka_info->cmt_offset) {
RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create(
ctx->kafka_info->topic, kv.first, kv.second);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to delete this tp1 after commit?

topic_partitions.push_back(tp1);
}

st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->commit(topic_partitions);
if (!st.ok()) {
// Kafka Offset Commit is idempotent, Failure should not block the normal process
// So just print a warning
LOG(WARNING) << st.get_error_msg();
}
_data_consumer_pool.return_consumer(consumer);

// delete TopicPartition finally
auto tp_deleter = [&topic_partitions] () {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
};
DeferOp delete_tp(std::bind<void>(tp_deleter));
}
}
cb(ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
rl_attach.loadSourceType = TLoadSourceType::KAFKA;

TKafkaRLTaskProgress kafka_progress;
kafka_progress.partitionCmtOffset = std::move(ctx->kafka_info->cmt_offset);
kafka_progress.partitionCmtOffset = ctx->kafka_info->cmt_offset;

rl_attach.kafkaRLTaskProgress = std::move(kafka_progress);
rl_attach.__isset.kafkaRLTaskProgress = true;
Expand Down