Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ Status KafkaDataConsumer::init() {
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));

KafkaEventCb event_cb;
if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set 'event_cb'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}

// create consumer
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!_k_consumer) {
Expand Down Expand Up @@ -110,7 +118,7 @@ Status KafkaDataConsumer::start() {

int64_t left_time = _ctx->kafka_info->max_interval_s;
int64_t left_rows = _ctx->kafka_info->max_batch_rows;
int64_t left_bytes = _ctx->kafka_info->max_batch_bytes;
int64_t left_bytes = _ctx->kafka_info->max_batch_size;

LOG(INFO) << "start consumer"
<< ". interval(s): " << left_time
Expand Down
31 changes: 31 additions & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,35 @@ class KafkaDataConsumer : public DataConsumer {
RdKafka::KafkaConsumer* _k_consumer = nullptr;
};

class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err())
<< ", event: " << event.str();
break;
case RdKafka::Event::EVENT_STATS:
LOG(INFO) << "kafka stats: " << event.str();
break;

case RdKafka::Event::EVENT_LOG:
LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str()
<< ", event: " << event.str();
break;

case RdKafka::Event::EVENT_THROTTLE:
LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << " id " << (int) event.broker_id();
break;

default:
LOG(INFO) << "kafka event: " << event.type()
<< ", err: " << RdKafka::err2str(event.err())
<< ", event: " << event.str();
break;
}
}
};

} // end namespace doris
23 changes: 14 additions & 9 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "service/backend_options.h"
#include "util/time.h"
#include "util/uid_util.h"

Expand All @@ -40,26 +41,30 @@ class KafkaLoadInfo {
public:
KafkaLoadInfo(const TKafkaLoadInfo& t_info):
brokers(t_info.brokers),
group_id(t_info.group_id),
client_id(t_info.client_id),
topic(t_info.topic),
max_interval_s(t_info.max_interval_s),
max_batch_rows(t_info.max_batch_rows),
max_batch_bytes(t_info.max_batch_size),
begin_offset(t_info.partition_begin_offset) {

if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; }
if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; }
if (t_info.__isset.max_batch_size) { max_batch_size = t_info.max_batch_size; }

std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
client_id = ss.str() + UniqueId().to_string();
group_id = ss.str() + UniqueId().to_string();
}

public:
std::string brokers;
std::string topic;
std::string group_id;
std::string client_id;
std::string topic;

// the following members control the max progress of a consuming
// process. if any of them reach, the consuming will finish.
int64_t max_interval_s;
int64_t max_batch_rows;
int64_t max_batch_bytes;
int64_t max_interval_s = 5;
int64_t max_batch_rows = 1024;
int64_t max_batch_size = 100 * 1024 * 1024; // 100MB

// partition -> begin offset, inclusive.
std::map<int32_t, int64_t> begin_offset;
Expand Down
16 changes: 13 additions & 3 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,19 @@ void BackendService::get_tablet_stat(TTabletStatResult& result) {
}

void BackendService::submit_routine_load_task(
TStatus& t_status, const TRoutineLoadTask& task) {
Status status = _exec_env->routine_load_task_executor()->submit_task(task);
status.to_thrift(&t_status);
TStatus& t_status, const std::vector<TRoutineLoadTask>& tasks) {

for (auto& task : tasks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tasks may be successful submitted, and some may be fail.
Is this OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, it is very unlikely to submit failed. And just make is simple here. Let FE handle the error now.(may be timeout).
It will be changed later if this is a big deal.

Status st = _exec_env->routine_load_task_executor()->submit_task(task);
if (!st.ok()) {
LOG(WARNING) << "failed to submit routine load task. job id: " << task.job_id
<< " task id: " << task.id;
}
}

// we do not care about each task's submit result. just return OK.
// FE will handle the failure.
return Status::OK.to_thrift(&t_status);
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class BackendService : public BackendServiceIf {

virtual void get_tablet_stat(TTabletStatResult& result) override;

virtual void submit_routine_load_task(TStatus& t_status, const TRoutineLoadTask& task) override;
virtual void submit_routine_load_task(TStatus& t_status, const std::vector<TRoutineLoadTask>& tasks) override;

private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.load.routineload;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
Expand All @@ -29,6 +27,10 @@
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -130,9 +132,8 @@ private void submitBatchTask(Map<Long, List<TRoutineLoadTask>> beIdToRoutineLoad
boolean ok = false;
try {
client = ClientPool.backendPool.borrowObject(address);
for (TRoutineLoadTask tRoutineLoadTask : entry.getValue()) {
client.submit_routine_load_task(tRoutineLoadTask);
}
client.submit_routine_load_task(entry.getValue());

ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backend.getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public TTabletStatResult get_tablet_stat() throws TException {
}

@Override
public TStatus submit_routine_load_task(TRoutineLoadTask task) throws TException {
public TStatus submit_routine_load_task(List<TRoutineLoadTask> tasks) throws TException {
// TODO Auto-generated method stub
return null;
}
Expand Down
14 changes: 6 additions & 8 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,11 @@ struct TTabletStatResult {

struct TKafkaLoadInfo {
1: required string brokers;
2: required string group_id;
3: required string client_id;
4: required string topic;
5: optional i64 max_interval_s;
6: optional i64 max_batch_rows;
7: optional i64 max_batch_size;
8: optional map<i32, i64> partition_begin_offset;
2: required string topic;
3: required map<i32, i64> partition_begin_offset;
4: optional i64 max_interval_s;
5: optional i64 max_batch_rows;
6: optional i64 max_batch_size;
}

struct TRoutineLoadTask {
Expand Down Expand Up @@ -144,5 +142,5 @@ service BackendService {

TTabletStatResult get_tablet_stat();

Status.TStatus submit_routine_load_task(1:TRoutineLoadTask task);
Status.TStatus submit_routine_load_task(1:list<TRoutineLoadTask> tasks);
}