Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ add_library(Runtime STATIC
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
)

Expand Down
120 changes: 84 additions & 36 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
#include <vector>

#include "common/status.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/routine_load/kafka_consumer_pipe.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"

namespace doris {

Status KafkaDataConsumer::init() {
// init kafka consumer will only set common configs such as
// brokers, groupid
Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (_init) {
// this consumer has already been initialized.
Expand All @@ -42,29 +47,32 @@ Status KafkaDataConsumer::init() {
auto conf_deleter = [conf] () { delete conf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));

std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
std::string group_id = ss.str() + UniqueId().to_string();
LOG(INFO) << "init kafka consumer with group id: " << group_id;

std::string errstr;
auto set_conf = [conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set '" << conf_key << "'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}
VLOG(3) << "set " << conf_key << ": " << conf_val;
return Status::OK;
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));

RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", _ctx->kafka_info->group_id));
RETURN_IF_ERROR(set_conf("client.id", _ctx->kafka_info->client_id));
RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", group_id));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));

KafkaEventCb event_cb;
if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {
if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set 'event_cb'";
LOG(WARNING) << ss.str();
Expand All @@ -78,14 +86,27 @@ Status KafkaDataConsumer::init() {
return Status("failed to create kafka consumer");
}

VLOG(3) << "finished to init kafka consumer. " << ctx->brief();

_init = true;
return Status::OK;
}

Status KafkaDataConsumer::assign_topic_partitions(StreamLoadContext* ctx) {
DCHECK(_k_consumer);
// create TopicPartitions
std::stringstream ss;
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (auto& entry : _ctx->kafka_info->begin_offset) {
for (auto& entry : ctx->kafka_info->begin_offset) {
RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create(
_ctx->kafka_info->topic, entry.first, entry.second);
ctx->kafka_info->topic, entry.first, entry.second);
topic_partitions.push_back(tp1);
ss << "partition[" << entry.first << "-" << entry.second << "] ";
}

VLOG(1) << "assign topic partitions: " << ctx->kafka_info->topic
<< ", " << ss.str();

// delete TopicPartition finally
auto tp_deleter = [&topic_partitions] () {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
Expand All @@ -96,59 +117,67 @@ Status KafkaDataConsumer::init() {
// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
if (err) {
LOG(WARNING) << "failed to assign topic partitions: " << _ctx->brief(true)
LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
<< ", err: " << RdKafka::err2str(err);
return Status("failed to assgin topic partitions");
return Status("failed to assign topic partitions");
}

VLOG(3) << "finished to init kafka consumer. "
<< _ctx->brief(true);

_init = true;
return Status::OK;
}

Status KafkaDataConsumer::start() {
Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
{
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status("consumer is not initialized");
}
}

int64_t left_time = _ctx->kafka_info->max_interval_s;
int64_t left_rows = _ctx->kafka_info->max_batch_rows;
int64_t left_bytes = _ctx->kafka_info->max_batch_size;
int64_t left_time = ctx->kafka_info->max_interval_s;
int64_t left_rows = ctx->kafka_info->max_batch_rows;
int64_t left_bytes = ctx->kafka_info->max_batch_size;

std::shared_ptr<KafkaConsumerPipe> kakfa_pipe = std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);

LOG(INFO) << "start consumer"
<< ". interval(s): " << left_time
<< ". max time(s): " << left_time
<< ", bath rows: " << left_rows
<< ", batch size: " << left_bytes
<< ". " << _ctx->brief();
<< ". " << ctx->brief();

MonotonicStopWatch watch;
watch.start();
Status st;
while (true) {
std::unique_lock<std::mutex> l(_lock);
if (_cancelled) {
_kafka_consumer_pipe->cancel();
kakfa_pipe ->cancel();
return Status::CANCELLED;
}

if (_finished) {
_kafka_consumer_pipe->finish();
kakfa_pipe ->finish();
return Status::OK;
}

if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) {
VLOG(3) << "kafka consume batch finished"
VLOG(3) << "kafka consume batch done"
<< ". left time=" << left_time
<< ", left rows=" << left_rows
<< ", left bytes=" << left_bytes;
_kafka_consumer_pipe->finish();
_finished = true;
return Status::OK;

if (left_bytes == ctx->kafka_info->max_batch_size) {
// nothing to be consumed, cancel it
kakfa_pipe->cancel();
_cancelled = true;
return Status::CANCELLED;
} else {
DCHECK(left_bytes < ctx->kafka_info->max_batch_size);
DCHECK(left_rows < ctx->kafka_info->max_batch_rows);
kakfa_pipe->finish();
_finished = true;
return Status::OK;
}
}

// consume 1 message at a time
Expand All @@ -160,15 +189,15 @@ Status KafkaDataConsumer::start() {
<< ", offset: " << msg->offset()
<< ", len: " << msg->len();

st = _kafka_consumer_pipe->append_with_line_delimiter(
st = kakfa_pipe ->append_with_line_delimiter(
static_cast<const char *>(msg->payload()),
static_cast<size_t>(msg->len()));
if (st.ok()) {
left_rows--;
left_bytes -= msg->len();
_ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset();
VLOG(3) << "consume partition[ " << msg->partition()
<< " - " << msg->offset();
ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset();
VLOG(3) << "consume partition[" << msg->partition()
<< " - " << msg->offset() << "]";
}

break;
Expand All @@ -185,17 +214,17 @@ Status KafkaDataConsumer::start() {
delete msg;

if (!st.ok()) {
_kafka_consumer_pipe->cancel();
kakfa_pipe ->cancel();
return st;
}

left_time = _ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
left_time = ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
}

return Status::OK;
}

Status KafkaDataConsumer::cancel() {
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status("consumer is not initialized");
Expand All @@ -209,4 +238,23 @@ Status KafkaDataConsumer::cancel() {
return Status::OK;
}

Status KafkaDataConsumer::reset() {
std::unique_lock<std::mutex> l(_lock);
_finished = false;
_cancelled = false;
return Status::OK;
}

// if the kafka brokers and topic are same,
// we considered this consumer as matched, thus can be reused.
bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return false;
}
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
return false;
}
return true;
}

} // end namespace doris
87 changes: 47 additions & 40 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,43 @@
#include "librdkafka/rdkafkacpp.h"

#include "runtime/stream_load/stream_load_context.h"
#include "util/uid_util.h"

namespace doris {

class KafkaConsumerPipe;
class Status;
class StreamLoadPipe;

class DataConsumer {
public:
DataConsumer(StreamLoadContext* ctx):
_ctx(ctx),
_init(false),
_finished(false),
_cancelled(false) {

_ctx->ref();
}

virtual ~DataConsumer() {
if (_ctx->unref()) {
delete _ctx;
}
}

// init the consumer with the given parameters
virtual Status init() = 0;

virtual Status init(StreamLoadContext* ctx) = 0;
// start consuming
virtual Status start() = 0;

virtual Status start(StreamLoadContext* ctx) = 0;
// cancel the consuming process.
// if the consumer is not initialized, or the consuming
// process is already finished, call cancel() will
// return ERROR
virtual Status cancel() = 0;
virtual Status cancel(StreamLoadContext* ctx) = 0;
// reset the data consumer before being reused
virtual Status reset() = 0;
// return true the if the consumer match the need
virtual bool match(StreamLoadContext* ctx) = 0;

const UniqueId& id() { return _id; }

protected:
StreamLoadContext* _ctx;
UniqueId _id;

// lock to protect the following bools
std::mutex _lock;
Expand All @@ -67,34 +67,6 @@ class DataConsumer {
bool _cancelled;
};

class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(
StreamLoadContext* ctx,
std::shared_ptr<KafkaConsumerPipe> kafka_consumer_pipe
):
DataConsumer(ctx),
_kafka_consumer_pipe(kafka_consumer_pipe) {
}

virtual Status init() override;

virtual Status start() override;

virtual Status cancel() override;

virtual ~KafkaDataConsumer() {
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
}
}

private:
std::shared_ptr<KafkaConsumerPipe> _kafka_consumer_pipe;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
};

class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
Expand Down Expand Up @@ -126,4 +98,39 @@ class KafkaEventCb : public RdKafka::EventCb {
}
};

class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(StreamLoadContext* ctx):
DataConsumer(ctx),
_brokers(ctx->kafka_info->brokers),
_topic(ctx->kafka_info->topic) {
}

virtual ~KafkaDataConsumer() {
VLOG(3) << "deconstruct consumer";
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
_k_consumer = nullptr;
}
}

virtual Status init(StreamLoadContext* ctx) override;
virtual Status start(StreamLoadContext* ctx) override;
virtual Status cancel(StreamLoadContext* ctx) override;
// reassign partition topics
virtual Status reset() override;
virtual bool match(StreamLoadContext* ctx) override;

Status assign_topic_partitions(StreamLoadContext* ctx);

private:
std::string _brokers;
std::string _topic;

KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
std::shared_ptr<KafkaConsumerPipe> _k_consumer_pipe;
};

} // end namespace doris
Loading