Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,29 @@ Status KafkaDataConsumer::init() {
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

// conf has to be deleted finally
auto conf_deleter = [] (RdKafka::Conf *conf) { delete conf; };
DeferOp delete_conf(std::bind<void>(conf_deleter, conf));
auto conf_deleter = [conf] () { delete conf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));

std::string errstr;
#define SET_KAFKA_CONF(conf_key, conf_val) \
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { \
std::stringstream ss; \
ss << "failed to set '" << conf_key << "'"; \
LOG(WARNING) << ss.str(); \
return Status(ss.str()); \
}
auto set_conf = [conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set '" << conf_key << "'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}
return Status::OK;
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));

SET_KAFKA_CONF("metadata.broker.list", _ctx->kafka_info->brokers);
SET_KAFKA_CONF("group.id", _ctx->kafka_info->group_id);
SET_KAFKA_CONF("client.id", _ctx->kafka_info->client_id);
SET_KAFKA_CONF("enable.partition.eof", "false");
SET_KAFKA_CONF("enable.auto.offset.store", "false");
RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", _ctx->kafka_info->group_id));
RETURN_IF_ERROR(set_conf("client.id", _ctx->kafka_info->client_id));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
SET_KAFKA_CONF("statistics.interval.ms", "0");
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));

// create consumer
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
Expand All @@ -75,11 +79,11 @@ Status KafkaDataConsumer::init() {
}

// delete TopicPartition finally
auto tp_deleter = [] (const std::vector<RdKafka::TopicPartition*>& vec) {
std::for_each(vec.begin(), vec.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
auto tp_deleter = [&topic_partitions] () {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
};
DeferOp delete_tp(std::bind<void>(tp_deleter, topic_partitions));
DeferOp delete_tp(std::bind<void>(tp_deleter));

// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
Expand Down Expand Up @@ -120,13 +124,13 @@ Status KafkaDataConsumer::start() {
while (true) {
std::unique_lock<std::mutex> l(_lock);
if (_cancelled) {
st = Status::CANCELLED;
break;
_kafka_consumer_pipe->cancel();
return Status::CANCELLED;
}

if (_finished) {
st = Status::OK;
break;
_kafka_consumer_pipe->finish();
return Status::OK;
}

if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) {
Expand All @@ -143,7 +147,11 @@ Status KafkaDataConsumer::start() {
RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
VLOG(3) << "get kafka message, offset: " << msg->offset();
LOG(INFO) << "get kafka message"
<< ", partition: " << msg->partition()
<< ", offset: " << msg->offset()
<< ", len: " << msg->len();

st = _kafka_consumer_pipe->append_with_line_delimiter(
static_cast<const char *>(msg->payload()),
static_cast<size_t>(msg->len()));
Expand All @@ -162,6 +170,7 @@ Status KafkaDataConsumer::start() {
LOG(WARNING) << "kafka consume timeout";
break;
default:
LOG(WARNING) << "kafka consume failed: " << msg->errstr();
st = Status(msg->errstr());
break;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/routine_load/kafka_consumer_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

#include "exec/file_reader.h"
#include "runtime/message_body_sink.h"
#include "runtime/stream_load/stream_load_pipe.h"

namespace doris {

class KafkaConsumerPipe : public StreamLoadPipe {

public:
KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024,
size_t min_chunk_size = 64 * 1024)
Expand Down
56 changes: 54 additions & 2 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "runtime/stream_load/stream_load_executor.h"
#include "util/uid_util.h"

#include <thread>

#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/Types_types.h"
Expand All @@ -51,6 +53,15 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
ctx->label = task.label;
ctx->auth.auth_code = task.auth_code;

// set execute plan params
TStreamLoadPutResult put_result;
TStatus tstatus;
tstatus.status_code = TStatusCode::OK;
put_result.status = tstatus;
put_result.params = std::move(task.params);
put_result.__isset.params = true;
ctx->put_result = std::move(put_result);

// the routine load task'txn has alreay began in FE.
// so it need to rollback if encounter error.
ctx->need_rollback = true;
Expand All @@ -77,7 +88,8 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
[this] (StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
_task_map.erase(ctx->id);
LOG(INFO) << "finished routine load task " << ctx->brief();
LOG(INFO) << "finished routine load task " << ctx->brief()
<< ", current tasks num: " << _task_map.size();
if (ctx->unref()) {
delete ctx;
}
Expand Down Expand Up @@ -134,10 +146,15 @@ void RoutineLoadTaskExecutor::exec_task(
// must put pipe before executing plan fragment
HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe");

#ifndef BE_TEST
// execute plan fragment, async
HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx),
"failed to execute plan fragment");

#else
// only for test
HANDLE_ERROR(_execute_plan_for_test(ctx), "test failed");
#endif

// start to consume, this may block a while
HANDLE_ERROR(consumer->start(), "consuming failed");

Expand Down Expand Up @@ -170,5 +187,40 @@ void RoutineLoadTaskExecutor::err_handler(
return;
}

Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) {
auto mock_consumer = [this, ctx]() {
std::shared_ptr<StreamLoadPipe> pipe = _exec_env->load_stream_mgr()->get(ctx->id);
bool eof = false;
std::stringstream ss;
while (true) {
char one;
size_t len = 1;
Status st = pipe->read((uint8_t*) &one, &len, &eof);
if (!st.ok()) {
LOG(WARNING) << "read failed";
ctx->promise.set_value(st);
break;
}

if (eof) {
ctx->promise.set_value(Status::OK);
break;
}

if (one == '\n') {
LOG(INFO) << "get line: " << ss.str();
ss.str("");
ctx->number_loaded_rows++;
} else {
ss << one;
}
}
};

std::thread t1(mock_consumer);
t1.detach();
return Status::OK;
}

} // end namespace

2 changes: 2 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class RoutineLoadTaskExecutor {
const Status& st,
const std::string& err_msg);

Status _execute_plan_for_test(StreamLoadContext* ctx);

private:
ExecEnv* _exec_env;
ThreadPool* _thread_pool;
Expand Down
2 changes: 2 additions & 0 deletions be/test/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ ADD_BE_TEST(tablet_writer_mgr_test)
#ADD_BE_TEST(export_task_mgr_test)
ADD_BE_TEST(snapshot_loader_test)
ADD_BE_TEST(user_function_cache_test)
ADD_BE_TEST(kafka_consumer_pipe_test)
ADD_BE_TEST(routine_load_task_executor_test)
78 changes: 78 additions & 0 deletions be/test/runtime/kafka_consumer_pipe_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/routine_load/kafka_consumer_pipe.h"

#include <gtest/gtest.h>

namespace doris {

class KafkaConsumerPipeTest : public testing::Test {
public:
KafkaConsumerPipeTest() { }
virtual ~ KafkaConsumerPipeTest() { }

void SetUp() override {


}

void TearDown() override {

}

private:

};

TEST_F(KafkaConsumerPipeTest, append_read) {
KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024);

std::string msg1 = "i have a dream";
std::string msg2 = "This is from kafka";

Status st;
st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length());
ASSERT_TRUE(st.ok());
st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length());
ASSERT_TRUE(st.ok());
st = k_pipe.finish();
ASSERT_TRUE(st.ok());

char buf[1024];
size_t data_size = 1024;
bool eof = false;
st = k_pipe.read((uint8_t*) buf, &data_size, &eof);
ASSERT_TRUE(st.ok());
ASSERT_EQ(data_size, msg1.length() + msg2.length() + 2);
ASSERT_EQ(eof, false);

data_size = 1024;
st = k_pipe.read((uint8_t*) buf, &data_size, &eof);
ASSERT_TRUE(st.ok());
ASSERT_EQ(data_size, 0);
ASSERT_EQ(eof, true);
}

}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
return RUN_ALL_TESTS();
}

Loading