From b9edbbd734c18dcabe924d271a4e32a0b57910b2 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Mar 2019 18:40:38 +0800 Subject: [PATCH 1/2] Modify interface 1. Add batch submit interface 2. Add Kafka Event callback to catch kafka events --- be/src/runtime/routine_load/data_consumer.cpp | 10 +++++- be/src/runtime/routine_load/data_consumer.h | 31 ++++++++++++++++++ .../runtime/stream_load/stream_load_context.h | 32 +++++++++++++------ be/src/service/backend_service.cpp | 16 ++++++++-- be/src/service/backend_service.h | 2 +- .../routineload/RoutineLoadTaskScheduler.java | 11 ++++--- .../apache/doris/common/GenericPoolTest.java | 2 +- gensrc/thrift/BackendService.thrift | 14 ++++---- 8 files changed, 90 insertions(+), 28 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 1f1cbd84375c29..a64fb20ce983e9 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -63,6 +63,14 @@ Status KafkaDataConsumer::init() { // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); + KafkaEventCb event_cb; + if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) { + std::stringstream ss; + ss << "failed to set 'event_cb'"; + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + // create consumer _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); if (!_k_consumer) { @@ -110,7 +118,7 @@ Status KafkaDataConsumer::start() { int64_t left_time = _ctx->kafka_info->max_interval_s; int64_t left_rows = _ctx->kafka_info->max_batch_rows; - int64_t left_bytes = _ctx->kafka_info->max_batch_bytes; + int64_t left_bytes = _ctx->kafka_info->max_batch_size; LOG(INFO) << "start consumer" << ". interval(s): " << left_time diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index 8830c9ea995dd1..b78bfba78dde94 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -95,4 +95,35 @@ class KafkaDataConsumer : public DataConsumer { RdKafka::KafkaConsumer* _k_consumer = nullptr; }; +class KafkaEventCb : public RdKafka::EventCb { +public: + void event_cb(RdKafka::Event &event) { + switch (event.type()) { + case RdKafka::Event::EVENT_ERROR: + LOG(INFO) << "kafka error: " << RdKafka::err2str(event.err()) + << ", event: " << event.str(); + break; + case RdKafka::Event::EVENT_STATS: + LOG(INFO) << "kafka stats: " << event.str(); + break; + + case RdKafka::Event::EVENT_LOG: + LOG(INFO) << "kafka log-" << event.severity() << "-" << event.fac().c_str() + << ", event: " << event.str(); + break; + + case RdKafka::Event::EVENT_THROTTLE: + LOG(INFO) << "kafka throttled: " << event.throttle_time() << "ms by " + << event.broker_name() << " id " << (int) event.broker_id(); + break; + + default: + LOG(INFO) << "kafka event: " << event.type() + << ", err: " << RdKafka::err2str(event.err()) + << ", event: " << event.str(); + break; + } + } +}; + } // end namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 5939dd83273f04..929d14e592a05a 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -30,6 +30,7 @@ #include "runtime/exec_env.h" #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" +#include "service/backend_options.h" #include "util/time.h" #include "util/uid_util.h" @@ -40,26 +41,39 @@ class KafkaLoadInfo { public: KafkaLoadInfo(const TKafkaLoadInfo& t_info): brokers(t_info.brokers), - group_id(t_info.group_id), - client_id(t_info.client_id), topic(t_info.topic), - max_interval_s(t_info.max_interval_s), - max_batch_rows(t_info.max_batch_rows), - max_batch_bytes(t_info.max_batch_size), begin_offset(t_info.partition_begin_offset) { + + if (t_info.__isset.max_interval_s) { + max_interval_s = t_info.max_interval_s; + } + + if (t_info.__isset.max_batch_rows) { + max_batch_rows = t_info.max_batch_rows; + } + + if (t_info.__isset.max_batch_size) { + max_batch_size = t_info.max_batch_size; + } + + std::stringstream ss; + ss << BackendOptions::get_localhost() << "_"; + + client_id = ss.str() + UniqueId().to_string(); + group_id = ss.str() + UniqueId().to_string(); } public: std::string brokers; + std::string topic; std::string group_id; std::string client_id; - std::string topic; // the following members control the max progress of a consuming // process. if any of them reach, the consuming will finish. - int64_t max_interval_s; - int64_t max_batch_rows; - int64_t max_batch_bytes; + int64_t max_interval_s = 5; + int64_t max_batch_rows = 1024; + int64_t max_batch_size = 100 * 1024 * 1024; // 100MB // partition -> begin offset, inclusive. std::map begin_offset; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index a9353fd606c749..193d4a79dc746d 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -230,9 +230,19 @@ void BackendService::get_tablet_stat(TTabletStatResult& result) { } void BackendService::submit_routine_load_task( - TStatus& t_status, const TRoutineLoadTask& task) { - Status status = _exec_env->routine_load_task_executor()->submit_task(task); - status.to_thrift(&t_status); + TStatus& t_status, const std::vector& tasks) { + + for (auto& task : tasks) { + Status st = _exec_env->routine_load_task_executor()->submit_task(task); + if (!st.ok()) { + LOG(WARNING) << "failed to submit routine load task. job id: " << task.job_id + << " task id: " << task.id; + } + } + + // we do not care about each task's submit result. just return OK. + // FE will handle the failure. + return Status::OK.to_thrift(&t_status); } } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index ecb8629a3db562..51f9505a4a6c53 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -147,7 +147,7 @@ class BackendService : public BackendServiceIf { virtual void get_tablet_stat(TTabletStatResult& result) override; - virtual void submit_routine_load_task(TStatus& t_status, const TRoutineLoadTask& task) override; + virtual void submit_routine_load_task(TStatus& t_status, const std::vector& tasks) override; private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index c78e67e1891091..3c380043895e47 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ClientPool; import org.apache.doris.common.LoadException; @@ -29,6 +27,10 @@ import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TRoutineLoadTask; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -130,9 +132,8 @@ private void submitBatchTask(Map> beIdToRoutineLoad boolean ok = false; try { client = ClientPool.backendPool.borrowObject(address); - for (TRoutineLoadTask tRoutineLoadTask : entry.getValue()) { - client.submit_routine_load_task(tRoutineLoadTask); - } + client.submit_routine_load_task(entry.getValue()); + ok = true; } catch (Exception e) { LOG.warn("task exec error. backend[{}]", backend.getId(), e); diff --git a/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java index caee523999fc26..a7d04e3c0be6e6 100644 --- a/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -220,7 +220,7 @@ public TTabletStatResult get_tablet_stat() throws TException { } @Override - public TStatus submit_routine_load_task(TRoutineLoadTask task) throws TException { + public TStatus submit_routine_load_task(List tasks) throws TException { // TODO Auto-generated method stub return null; } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 2da85eb2aea6f9..96f688512331c3 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -65,13 +65,11 @@ struct TTabletStatResult { struct TKafkaLoadInfo { 1: required string brokers; - 2: required string group_id; - 3: required string client_id; - 4: required string topic; - 5: optional i64 max_interval_s; - 6: optional i64 max_batch_rows; - 7: optional i64 max_batch_size; - 8: optional map partition_begin_offset; + 2: required string topic; + 3: required map partition_begin_offset; + 4: optional i64 max_interval_s; + 5: optional i64 max_batch_rows; + 6: optional i64 max_batch_size; } struct TRoutineLoadTask { @@ -144,5 +142,5 @@ service BackendService { TTabletStatResult get_tablet_stat(); - Status.TStatus submit_routine_load_task(1:TRoutineLoadTask task); + Status.TStatus submit_routine_load_task(1:list tasks); } From 4b92dfe060dfc1493c29b273720e3f921897cb4f Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 4 Mar 2019 19:43:54 +0800 Subject: [PATCH 2/2] fix code style --- be/src/runtime/stream_load/stream_load_context.h | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 929d14e592a05a..dce825702220d9 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -44,21 +44,12 @@ class KafkaLoadInfo { topic(t_info.topic), begin_offset(t_info.partition_begin_offset) { - if (t_info.__isset.max_interval_s) { - max_interval_s = t_info.max_interval_s; - } - - if (t_info.__isset.max_batch_rows) { - max_batch_rows = t_info.max_batch_rows; - } - - if (t_info.__isset.max_batch_size) { - max_batch_size = t_info.max_batch_size; - } + if (t_info.__isset.max_interval_s) { max_interval_s = t_info.max_interval_s; } + if (t_info.__isset.max_batch_rows) { max_batch_rows = t_info.max_batch_rows; } + if (t_info.__isset.max_batch_size) { max_batch_size = t_info.max_batch_size; } std::stringstream ss; ss << BackendOptions::get_localhost() << "_"; - client_id = ss.str() + UniqueId().to_string(); group_id = ss.str() + UniqueId().to_string(); }