diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index f180669fe0e473..1f1cbd84375c29 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -39,25 +39,29 @@ Status KafkaDataConsumer::init() { RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // conf has to be deleted finally - auto conf_deleter = [] (RdKafka::Conf *conf) { delete conf; }; - DeferOp delete_conf(std::bind(conf_deleter, conf)); + auto conf_deleter = [conf] () { delete conf; }; + DeferOp delete_conf(std::bind(conf_deleter)); std::string errstr; -#define SET_KAFKA_CONF(conf_key, conf_val) \ - if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { \ - std::stringstream ss; \ - ss << "failed to set '" << conf_key << "'"; \ - LOG(WARNING) << ss.str(); \ - return Status(ss.str()); \ - } + auto set_conf = [conf, &errstr](const std::string& conf_key, const std::string& conf_val) { + if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { + std::stringstream ss; + ss << "failed to set '" << conf_key << "'"; + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + return Status::OK; + }; + + RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers)); - SET_KAFKA_CONF("metadata.broker.list", _ctx->kafka_info->brokers); - SET_KAFKA_CONF("group.id", _ctx->kafka_info->group_id); - SET_KAFKA_CONF("client.id", _ctx->kafka_info->client_id); - SET_KAFKA_CONF("enable.partition.eof", "false"); - SET_KAFKA_CONF("enable.auto.offset.store", "false"); + RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers)); + RETURN_IF_ERROR(set_conf("group.id", _ctx->kafka_info->group_id)); + RETURN_IF_ERROR(set_conf("client.id", _ctx->kafka_info->client_id)); + RETURN_IF_ERROR(set_conf("enable.partition.eof", "false")); + RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false")); // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() - SET_KAFKA_CONF("statistics.interval.ms", "0"); + RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0")); // create consumer _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); @@ -75,11 +79,11 @@ Status KafkaDataConsumer::init() { } // delete TopicPartition finally - auto tp_deleter = [] (const std::vector& vec) { - std::for_each(vec.begin(), vec.end(), - [](RdKafka::TopicPartition* tp1) { delete tp1; }); + auto tp_deleter = [&topic_partitions] () { + std::for_each(topic_partitions.begin(), topic_partitions.end(), + [](RdKafka::TopicPartition* tp1) { delete tp1; }); }; - DeferOp delete_tp(std::bind(tp_deleter, topic_partitions)); + DeferOp delete_tp(std::bind(tp_deleter)); // assign partition RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); @@ -120,13 +124,13 @@ Status KafkaDataConsumer::start() { while (true) { std::unique_lock l(_lock); if (_cancelled) { - st = Status::CANCELLED; - break; + _kafka_consumer_pipe->cancel(); + return Status::CANCELLED; } if (_finished) { - st = Status::OK; - break; + _kafka_consumer_pipe->finish(); + return Status::OK; } if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { @@ -143,7 +147,11 @@ Status KafkaDataConsumer::start() { RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: - VLOG(3) << "get kafka message, offset: " << msg->offset(); + LOG(INFO) << "get kafka message" + << ", partition: " << msg->partition() + << ", offset: " << msg->offset() + << ", len: " << msg->len(); + st = _kafka_consumer_pipe->append_with_line_delimiter( static_cast(msg->payload()), static_cast(msg->len())); @@ -162,6 +170,7 @@ Status KafkaDataConsumer::start() { LOG(WARNING) << "kafka consume timeout"; break; default: + LOG(WARNING) << "kafka consume failed: " << msg->errstr(); st = Status(msg->errstr()); break; } diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h index 7827d9a68a4f94..798a7a3eef5470 100644 --- a/be/src/runtime/routine_load/kafka_consumer_pipe.h +++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h @@ -27,10 +27,12 @@ #include "exec/file_reader.h" #include "runtime/message_body_sink.h" +#include "runtime/stream_load/stream_load_pipe.h" namespace doris { class KafkaConsumerPipe : public StreamLoadPipe { + public: KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 5814727f4a9cd9..c08712cf259add 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -25,6 +25,8 @@ #include "runtime/stream_load/stream_load_executor.h" #include "util/uid_util.h" +#include + #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/BackendService_types.h" #include "gen_cpp/Types_types.h" @@ -51,6 +53,15 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { ctx->label = task.label; ctx->auth.auth_code = task.auth_code; + // set execute plan params + TStreamLoadPutResult put_result; + TStatus tstatus; + tstatus.status_code = TStatusCode::OK; + put_result.status = tstatus; + put_result.params = std::move(task.params); + put_result.__isset.params = true; + ctx->put_result = std::move(put_result); + // the routine load task'txn has alreay began in FE. // so it need to rollback if encounter error. ctx->need_rollback = true; @@ -77,7 +88,8 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { [this] (StreamLoadContext* ctx) { std::unique_lock l(_lock); _task_map.erase(ctx->id); - LOG(INFO) << "finished routine load task " << ctx->brief(); + LOG(INFO) << "finished routine load task " << ctx->brief() + << ", current tasks num: " << _task_map.size(); if (ctx->unref()) { delete ctx; } @@ -134,10 +146,15 @@ void RoutineLoadTaskExecutor::exec_task( // must put pipe before executing plan fragment HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); +#ifndef BE_TEST // execute plan fragment, async HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx), "failed to execute plan fragment"); - +#else + // only for test + HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed"); +#endif + // start to consume, this may block a while HANDLE_ERROR(consumer->start(), "consuming failed"); @@ -170,5 +187,40 @@ void RoutineLoadTaskExecutor::err_handler( return; } +Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { + auto mock_consumer = [this, ctx]() { + std::shared_ptr pipe = _exec_env->load_stream_mgr()->get(ctx->id); + bool eof = false; + std::stringstream ss; + while (true) { + char one; + size_t len = 1; + Status st = pipe->read((uint8_t*) &one, &len, &eof); + if (!st.ok()) { + LOG(WARNING) << "read failed"; + ctx->promise.set_value(st); + break; + } + + if (eof) { + ctx->promise.set_value(Status::OK); + break; + } + + if (one == '\n') { + LOG(INFO) << "get line: " << ss.str(); + ss.str(""); + ctx->number_loaded_rows++; + } else { + ss << one; + } + } + }; + + std::thread t1(mock_consumer); + t1.detach(); + return Status::OK; +} + } // end namespace diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 4f16dbac269d91..259098fd0c8655 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -63,6 +63,8 @@ class RoutineLoadTaskExecutor { const Status& st, const std::string& err_msg); + Status _execute_plan_for_test(StreamLoadContext* ctx); + private: ExecEnv* _exec_env; ThreadPool* _thread_pool; diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index a42f23cbd08a32..7f1175c57110c0 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -55,3 +55,5 @@ ADD_BE_TEST(tablet_writer_mgr_test) #ADD_BE_TEST(export_task_mgr_test) ADD_BE_TEST(snapshot_loader_test) ADD_BE_TEST(user_function_cache_test) +ADD_BE_TEST(kafka_consumer_pipe_test) +ADD_BE_TEST(routine_load_task_executor_test) diff --git a/be/test/runtime/kafka_consumer_pipe_test.cpp b/be/test/runtime/kafka_consumer_pipe_test.cpp new file mode 100644 index 00000000000000..5cea24f863a904 --- /dev/null +++ b/be/test/runtime/kafka_consumer_pipe_test.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/kafka_consumer_pipe.h" + +#include + +namespace doris { + +class KafkaConsumerPipeTest : public testing::Test { +public: + KafkaConsumerPipeTest() { } + virtual ~ KafkaConsumerPipeTest() { } + + void SetUp() override { + + + } + + void TearDown() override { + + } + +private: + +}; + +TEST_F(KafkaConsumerPipeTest, append_read) { + KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024); + + std::string msg1 = "i have a dream"; + std::string msg2 = "This is from kafka"; + + Status st; + st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length()); + ASSERT_TRUE(st.ok()); + st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length()); + ASSERT_TRUE(st.ok()); + st = k_pipe.finish(); + ASSERT_TRUE(st.ok()); + + char buf[1024]; + size_t data_size = 1024; + bool eof = false; + st = k_pipe.read((uint8_t*) buf, &data_size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(data_size, msg1.length() + msg2.length() + 2); + ASSERT_EQ(eof, false); + + data_size = 1024; + st = k_pipe.read((uint8_t*) buf, &data_size, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(data_size, 0); + ASSERT_EQ(eof, true); +} + +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + return RUN_ALL_TESTS(); +} + diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp new file mode 100644 index 00000000000000..477ba5cdeb06dd --- /dev/null +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/routine_load_task_executor.h" + +#include "runtime/exec_env.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_executor.h" + +#include + +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" + +namespace doris { + +extern TLoadTxnBeginResult k_stream_load_begin_result; +extern TLoadTxnCommitResult k_stream_load_commit_result; +extern TLoadTxnRollbackResult k_stream_load_rollback_result; +extern TStreamLoadPutResult k_stream_load_put_result; + +class RoutineLoadTaskExecutorTest : public testing::Test { +public: + RoutineLoadTaskExecutorTest() { } + virtual ~RoutineLoadTaskExecutorTest() { } + + void SetUp() override { + k_stream_load_begin_result = TLoadTxnBeginResult(); + k_stream_load_commit_result = TLoadTxnCommitResult(); + k_stream_load_rollback_result = TLoadTxnRollbackResult(); + k_stream_load_put_result = TStreamLoadPutResult(); + + _env._master_info = new TMasterInfo(); + _env._load_stream_mgr = new LoadStreamMgr(); + _env._stream_load_executor = new StreamLoadExecutor(&_env); + } + + void TearDown() override { + delete _env._master_info; + _env._master_info = nullptr; + delete _env._load_stream_mgr; + _env._load_stream_mgr = nullptr; + delete _env._stream_load_executor; + _env._stream_load_executor = nullptr; + } + +private: + ExecEnv _env; +}; + +TEST_F(RoutineLoadTaskExecutorTest, exec_task) { + TRoutineLoadTask task; + task.type = TLoadSourceType::KAFKA; + task.job_id = 1L; + task.id = TUniqueId(); + task.txn_id = 4; + task.auth_code = 5; + task.__set_db("db1"); + task.__set_tbl("tbl1"); + task.__set_label("l1"); + + TKafkaLoadInfo k_info; + k_info.brokers = "127.0.0.1:9092"; + k_info.group_id = "6"; + k_info.client_id = "7"; + k_info.topic = "test"; + k_info.__set_max_interval_s(10); + k_info.__set_max_batch_rows(10); + k_info.__set_max_batch_size(2048); + + std::map part_off; + part_off[0] = 0L; + k_info.__set_partition_begin_offset(part_off); + + task.__set_kafka_load_info(k_info); + + RoutineLoadTaskExecutor executor(&_env); + + // submit task + Status st; + st = executor.submit_task(task); + ASSERT_TRUE(st.ok()); + + // st = executor.submit_task(task); + // ASSERT_TRUE(st.ok()); +} + +} + +int main(int argc, char* argv[]) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + ::testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + return RUN_ALL_TESTS(); +} + diff --git a/conf/be.conf b/conf/be.conf index 3234a479f88c91..31e593ddc04593 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -29,6 +29,6 @@ storage_root_path = /home/disk1/palo;/home/disk2/palo # sys_log_dir = ${DORIS_HOME}/log # sys_log_roll_mode = SIZE-MB-1024 # sys_log_roll_num = 10 -# sys_log_verbose_modules = +# sys_log_verbose_modules = # log_buffer_level = -1 # palo_cgroups diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 79e332eabec55a..2da85eb2aea6f9 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -84,6 +84,7 @@ struct TRoutineLoadTask { 7: optional string tbl 8: optional string label 9: optional TKafkaLoadInfo kafka_load_info + 10: optional PaloInternalService.TExecPlanFragmentParams params } service BackendService { diff --git a/run-ut.sh b/run-ut.sh index 96ed3f544e43c4..2963013b595029 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -210,6 +210,10 @@ ${DORIS_TEST_BINARY_DIR}/olap/olap_header_manager_test ${DORIS_TEST_BINARY_DIR}/olap/olap_meta_test ${DORIS_TEST_BINARY_DIR}/olap/delta_writer_test +# Running routine load test +${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test +${DORIS_TEST_BINARY_DIR}/runtime/routine_load_task_executor_test + ## Running agent unittest # Prepare agent testdata if [ -d ${DORIS_TEST_BINARY_DIR}/agent/test_data ]; then