From 755c571507c77a598e1328038a279c96ba83ef02 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 14 Feb 2019 11:25:36 +0800 Subject: [PATCH 1/5] first commit --- be/src/common/status.cpp | 3 + be/src/common/status.h | 1 + be/src/exec/plain_text_line_reader.cpp | 2 +- .../runtime/routine_load/kafka_consumer.cpp | 178 ++++++++++++++++++ be/src/runtime/routine_load/kafka_consumer.h | 90 +++++++++ .../kafka_consumer_pipe.cpp | 0 .../{ => routine_load}/kafka_consumer_pipe.h | 29 +-- .../routine_load_task_executor.cpp | 58 ++++++ .../routine_load/routine_load_task_executor.h | 49 +++++ 9 files changed, 397 insertions(+), 13 deletions(-) create mode 100644 be/src/runtime/routine_load/kafka_consumer.cpp create mode 100644 be/src/runtime/routine_load/kafka_consumer.h rename be/src/runtime/{ => routine_load}/kafka_consumer_pipe.cpp (100%) rename be/src/runtime/{ => routine_load}/kafka_consumer_pipe.h (65%) create mode 100644 be/src/runtime/routine_load/routine_load_task_executor.cpp create mode 100644 be/src/runtime/routine_load/routine_load_task_executor.h diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp index c66a108cfed537..64a9b865c098ee 100644 --- a/be/src/common/status.cpp +++ b/be/src/common/status.cpp @@ -34,6 +34,9 @@ const Status Status::MEM_LIMIT_EXCEEDED( const Status Status::THRIFT_RPC_ERROR( TStatusCode::THRIFT_RPC_ERROR, "Thrift RPC failed", true); +const Status Status::TIMEOUT( + TStatusCode::TIMEOUT, "timeout", true); + Status::ErrorDetail::ErrorDetail(const TStatus& status) : error_code(status.status_code), error_msgs(status.error_msgs) { diff --git a/be/src/common/status.h b/be/src/common/status.h index 040baf8c5ef207..958e30f5e286b3 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -54,6 +54,7 @@ class Status { static const Status CANCELLED; static const Status MEM_LIMIT_EXCEEDED; static const Status THRIFT_RPC_ERROR; + static const Status TIMEOUT; // copy c'tor makes copy of error detail so Status can be returned by value Status(const Status& status) : _error_detail( diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp index 7ad004706bdd2b..08db11bc630280 100644 --- a/be/src/exec/plain_text_line_reader.cpp +++ b/be/src/exec/plain_text_line_reader.cpp @@ -311,7 +311,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e // (cmy), for now, return failed to avoid potential endless loop std::stringstream ss; - ss << "decompress made no progess." + ss << "decompress made no progress." << " input_read_bytes: " << input_read_bytes << " decompressed_len: " << decompressed_len; LOG(WARNING) << ss.str(); diff --git a/be/src/runtime/routine_load/kafka_consumer.cpp b/be/src/runtime/routine_load/kafka_consumer.cpp new file mode 100644 index 00000000000000..c720e3f42ab431 --- /dev/null +++ b/be/src/runtime/routine_load/kafka_consumer.cpp @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/kafka_consumer.h" + +#include "runtime/routine_load/" +#include "librdkafka/rdkafkacpp.h" + +namespace doris { + +Status KafkaConsumer::init() { + std::unique_lock l(_lock); + if (_init) { + // this consumer has already been initialized. + return Status::OK; + } + + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + + // conf has to be deleted finally + auto conf_deleter = [] (RdKafka::Conf *conf) { delete conf; }; + DeferOp close_dir(std::bind(&conf_deleter, conf)); + + std::string errstr; + +#define SET_KAFKA_CONF(conf_key, conf_val) \ + if (conf->set(#conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { \ + std::stringstream ss; \ + ss << "failed to set '" << #conf_key << "'"; \ + LOG(WARNING) << ss.str(); \ + return Status(ss.str()); \ + } + + SET_KAFKA_CONF("metadata.broker.list", _k_brokers); + SET_KAFKA_CONF("group.id", _k_group_id); + SET_KAFKA_CONF("client.id", _k_client_id); + SET_KAFKA_CONF("enable.partition.eof", "false"); + SET_KAFKA_CONF("enable.auto.offset.store", "false"); + // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() + SET_KAFKA_CONF("statistics.interval.ms", "0"); + + // create consumer + _k_consumer = RdKafka::KafkaConsumer::create(conf, errstr); + if (!_k_consumer) { + LOG(WARNING) << "failed to create kafka consumer"; + return Status("failed to create kafka consumer"); + } + + // create TopicPartitions + std::vector topic_partitions; + std::stringstream ss; + for (auto& entry : _k_partition_offset) { + TopicPartition* tp1 = TopicPartition::create(_k_topic, entry.first); + tp1->set_offset(entry.second); + topic_partitions.push_back(tp1); + ss << "partition[" << entry.first << ": " << entry.second << "];"; + } + + RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); + if (err) { + LOG(WARNING) << "failed to assign topic partitions: " << ss.str() + << ", err: " << RdKafka::err2str(err); + return Status("failed to assgin topic partitions"); + } + + // delete TopicPartition finally + std::for_each(topic_partitions.begin(), topic_partitions.end(), + [](TopicPartition* tp1) { delete tp1};) + + VLOG(3) << "finished to init kafka consumer" + << ". brokers=" << _k_brokers + << ", group_id=" << _k_group_id + << ", client_id=" << _k_client_id + << ", topic=" << _k_topic + << ", partition= " << ss.str(); + + _init = true; + return Status::OK; +} + +Status KafkaConsumer::start() { + + int64_t left_time = _max_interval_ms; + int64_t left_rows = _max_batch_rows; + int64_t left_size = _max_batch_size; + + MonotonicStopWatch watch; + watch.start(); + Status st; + while (true) { + std::unique_lock l(_lock); + if (_cancelled) { + st = Status::CANCELLED; + break; + } + + if (_finished) { + st = Status::OK; + break; + } + + if (left_time <= 0 || left_rows <= 0 || left_size <=0) { + VLOG(3) << "kafka consume batch finished" + << ". left time=" << left_time + << ", left rows=" << left_rows + << ", left size=" << left_size; + _kafka_consumer_pipe->finish(); + _finished = true; + return Status::OK; + } + + // consume 1 message + RdKafka::Message *msg = consumer->consume(1000 /* timeout, ms */); + switch (msg->err()) { + case RdKafka::ERR_NO_ERROR: + VLOG(3) << "get kafka message, offset: " << msg->offset(); + st = _kafka_consumer_pipe->append_with_line_delimiter( + static_cast(msg->payload()), + static_cast(msg->len())); + if (st.ok()) { + left_rows--; + left_size -= msg->len(); + } + + break; + case RdKafka::ERR__TIMED_OUT: + // leave the status as OK, because this may happend + // if there is no data in kafka. + LOG(WARNING) << "kafka consume timeout"; + break; + default: + st = Status(msg->errstr()); + break; + } + delete msg; + + if (!st.ok()) { + _kafka_consumer_pipe->cancel(); + return st; + } + + left_time = _max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; + } + + return Status::OK; +} + +Status KafkaConsumer::cancel() { + std::unique_lock l(_lock); + if (!_init) { + return Status("consumer not being initialized"); + } + + if (_finished) { + return Status("consumer is already finished"); + } + + _cancelled = true; + return Status::OK; +} + +}; + +} // end namespace doris diff --git a/be/src/runtime/routine_load/kafka_consumer.h b/be/src/runtime/routine_load/kafka_consumer.h new file mode 100644 index 00000000000000..4dcb28d1b53fab --- /dev/null +++ b/be/src/runtime/routine_load/kafka_consumer.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include + +namespace doris { + +class KafkaConsumerPipe; +class RdKafka::KafkaConsumer; + +class KafkaConsumer { +public: + KafkaConsumer( + const std::string& brokers, + const std::string& group_id, + const std::string& client_id, + const std::string& topic, + const std::map& partition_offset, + std::shared_ptr kafka_consumer_pipe, + ): + _k_brokers(broker), + _k_group_id(group_id), + _k_client_id(client_id), + _k_topic(topic), + _k_partition_offset(partition_offset), + _kafka_consumer_pipe(kafka_consumer_pipe), + _init(false), + _finished(false), + _cancelled(false) { + + } + + // init the KafkaConsumer with the given parameters + Status init(); + + // start consuming + Status start(); + + // cancel the consuming process. + // if the consumer is not initialized, or the consuming + // process is already finished, call cancel() will + // return ERROR + Status cancel(); + + ~KafkaConsumer() { + if (_consumer) { + _consumer->close(); + delete _consumer; + } + } + +private: + + std::string _k_brokers; + std::string _k_group_id; + std::string _k_client_id; + std::string _k_topic; + // partition id -> offset + std::map _k_partition_offset; + std::shared_ptr _kafka_consumer_pipe; + RdKafka::KafkaConsumer* _consumer; + + // lock to protect the following bools + std::mutex _lock + bool _init; + bool _finished; + bool _cancelled; +}; + +} // end namespace doris diff --git a/be/src/runtime/kafka_consumer_pipe.cpp b/be/src/runtime/routine_load/kafka_consumer_pipe.cpp similarity index 100% rename from be/src/runtime/kafka_consumer_pipe.cpp rename to be/src/runtime/routine_load/kafka_consumer_pipe.cpp diff --git a/be/src/runtime/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h similarity index 65% rename from be/src/runtime/kafka_consumer_pipe.h rename to be/src/runtime/routine_load/kafka_consumer_pipe.h index 4d647273e3422c..65b65646f5a22a 100644 --- a/be/src/runtime/kafka_consumer_pipe.h +++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_RUNTIME_KAFKA_COMSUMER_PIPE_H -#define DORIS_BE_SRC_RUNTIME_KAFKA_COMSUMER_PIPE_H +#pragma once #include @@ -31,20 +30,26 @@ namespace doris { -class KafkaConsumerPipe : public MessageBodySink, public FileReader { +class KafkaConsumerPipe : public StreamLoadPipe { public: - KafkaConsumerPipe(); - ~KafkaConsumerPipe(); + KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, + size_t min_chunk_size = 64 * 1024) + : StreamLoadPipe(max_buffered_bytes, min_chunk_size) { + } -private: - // this is only for testing librdkafka.a - void test_kafka_lib() { - //rd_kafka_conf_t *conf = rd_kafka_conf_new(); - //rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + virtual ~KafkaConsumerPipe(); + + Status append_with_line_delimiter(const char* data, size_t size) { + Status st = append(data, size); + if (!st.ok()) { + return st; + } + + // append the line delimiter + st = append("\n", 1); + return st; } }; } // end namespace doris - -#endif // DORIS_BE_SRC_RUNTIME_KAFKA_COMSUMER_PIPE_H diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp new file mode 100644 index 00000000000000..884c0d17b9b170 --- /dev/null +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "agent/agent_server.h" + +using apache::thrift::transport::TProcessor; +using std::deque; +using std::list; +using std::map; +using std::nothrow; +using std::set; +using std::string; +using std::to_string; +using std::vector; + +namespace doris { + +Status RoutineLoadTaskExecutor::submit_task(RoutineLoadTask task) { + + std::unique_lock l(_lock); + if (_task_ids.find(task.id) != _task_ids.end()) { + // already submitted + return Status::OK; + } + + _task_ids.insert(task.id); + + // 1. create a stream load context with ConsumerPipe + StreamLoadContext* ctx = new StreamLoadContext(this); + auto st = _exec_env->fragment_mgr()->exec_plan_fragment( + plan_params, + + + ); + + + + // 2. execute plan + + // 3. activate the consumer to read data + +} + +} // end namespace diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h new file mode 100644 index 00000000000000..736aa94b76f2e7 --- /dev/null +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "agent/agent_server.h" + +using apache::thrift::transport::TProcessor; +using std::deque; +using std::list; +using std::map; +using std::nothrow; +using std::set; +using std::string; +using std::to_string; +using std::vector; + +namespace doris { + +class RoutineLoadTaskExecutor { + +public: + // submit a routine load task + Status submit_task(RoutineLoadTask task); + +private: + + ThreadPool _thread_pool; + std::mutex _lock; + + // submitted tasks + std::set task_ids; +} + +} // end namespace From c3b1ded54f6c176377bc1b498f40f647792eeed4 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 26 Feb 2019 19:49:36 +0800 Subject: [PATCH 2/5] compile BE ok --- be/CMakeLists.txt | 12 +- .../utils.h} | 29 +- be/src/exec/CMakeLists.txt | 2 +- be/src/exec/broker_scanner.cpp | 4 +- be/src/exec/schema_scan_node.cpp | 2 +- .../schema_scanner/schema_columns_scanner.cpp | 10 +- ...{frontend_helper.cpp => schema_helper.cpp} | 76 +---- .../{frontend_helper.h => schema_helper.h} | 36 +-- .../schema_schemata_scanner.cpp | 6 +- .../schema_scanner/schema_tables_scanner.cpp | 8 +- .../schema_variables_scanner.cpp | 4 +- be/src/http/CMakeLists.txt | 1 - be/src/http/action/stream_load.cpp | 292 ++---------------- be/src/http/action/stream_load.h | 11 +- be/src/http/http_common.h | 7 - be/src/http/utils.cpp | 3 +- be/src/http/utils.h | 14 +- be/src/runtime/CMakeLists.txt | 6 +- be/src/runtime/exec_env.h | 9 + be/src/runtime/exec_env_init.cpp | 9 +- .../{http => runtime}/message_body_sink.cpp | 5 +- be/src/{http => runtime}/message_body_sink.h | 5 +- .../{kafka_consumer.cpp => data_consumer.cpp} | 101 +++--- .../{kafka_consumer.h => data_consumer.h} | 94 +++--- .../routine_load/kafka_consumer_pipe.h | 4 +- .../routine_load_task_executor.cpp | 161 ++++++++-- .../routine_load/routine_load_task_executor.h | 62 ++-- be/src/runtime/snapshot_loader.cpp | 2 +- .../{ => stream_load}/load_stream_mgr.h | 11 +- .../stream_load/stream_load_context.cpp | 100 ++++++ .../runtime/stream_load/stream_load_context.h | 159 ++++++++++ .../stream_load/stream_load_executor.cpp | 242 +++++++++++++++ .../stream_load/stream_load_executor.h | 51 +++ .../{ => stream_load}/stream_load_pipe.h | 2 +- be/src/service/backend_service.cpp | 7 + be/src/service/backend_service.h | 2 + be/src/service/doris_main.cpp | 2 +- be/src/util/CMakeLists.txt | 5 +- be/src/util/frontend_helper.cpp | 91 ++++++ be/src/util/frontend_helper.h | 54 ++++ gensrc/thrift/AgentService.thrift | 2 +- gensrc/thrift/BackendService.thrift | 24 ++ gensrc/thrift/FrontendService.thrift | 33 +- gensrc/thrift/Types.thrift | 9 + thirdparty/build-thirdparty.sh | 2 +- 45 files changed, 1186 insertions(+), 585 deletions(-) rename be/src/{runtime/routine_load/kafka_consumer_pipe.cpp => common/utils.h} (56%) rename be/src/exec/schema_scanner/{frontend_helper.cpp => schema_helper.cpp} (57%) rename be/src/exec/schema_scanner/{frontend_helper.h => schema_helper.h} (69%) rename be/src/{http => runtime}/message_body_sink.cpp (95%) rename be/src/{http => runtime}/message_body_sink.h (94%) rename be/src/runtime/routine_load/{kafka_consumer.cpp => data_consumer.cpp} (60%) rename be/src/runtime/routine_load/{kafka_consumer.h => data_consumer.h} (57%) rename be/src/runtime/{ => stream_load}/load_stream_mgr.h (86%) create mode 100644 be/src/runtime/stream_load/stream_load_context.cpp create mode 100644 be/src/runtime/stream_load/stream_load_context.h create mode 100644 be/src/runtime/stream_load/stream_load_executor.cpp create mode 100644 be/src/runtime/stream_load/stream_load_executor.h rename be/src/runtime/{ => stream_load}/stream_load_pipe.h (99%) create mode 100644 be/src/util/frontend_helper.cpp create mode 100644 be/src/util/frontend_helper.h diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index fe4337ad8c7f45..152651a8119948 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -178,12 +178,12 @@ set_target_properties(brpc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/ add_library(rocksdb STATIC IMPORTED) set_target_properties(rocksdb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librocksdb.a) -add_library(librdkafka STATIC IMPORTED) -set_target_properties(librdkafka PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka.a) - add_library(librdkafka_cpp STATIC IMPORTED) set_target_properties(librdkafka_cpp PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka++.a) +add_library(librdkafka STATIC IMPORTED) +set_target_properties(librdkafka PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka.a) + find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin) # llvm-config @@ -440,9 +440,10 @@ set(DORIS_LINK_LIBS # Set thirdparty libraries set(DORIS_DEPENDENCIES + ${WL_START_GROUP} rocksdb - librdkafka librdkafka_cpp + librdkafka lzo snappy ${Boost_LIBRARIES} @@ -456,7 +457,6 @@ set(DORIS_DEPENDENCIES libevent mysql curl - ${WL_START_GROUP} ${LIBZ} ${LIBBZ2} gflags @@ -464,8 +464,8 @@ set(DORIS_DEPENDENCIES protobuf openssl crypto - ${WL_START_GROUP} leveldb + ${WL_END_GROUP} ) # Add all external dependencies. They should come after the palo libs. diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.cpp b/be/src/common/utils.h similarity index 56% rename from be/src/runtime/routine_load/kafka_consumer_pipe.cpp rename to be/src/common/utils.h index 10b7fd83edb9d7..a4d2d2aa6329ef 100644 --- a/be/src/runtime/routine_load/kafka_consumer_pipe.cpp +++ b/be/src/common/utils.h @@ -15,9 +15,34 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/kafka_consumer_pipe.h" +#pragma once + +#include namespace doris { +struct AuthInfo { + std::string user; + std::string passwd; + std::string cluster; + std::string user_ip; + // -1 as unset + int64_t auth_code = -1; +}; + +template +void set_request_auth(T* req, const AuthInfo& auth) { + if (auth.auth_code != -1) { + // if auth_code is set, no need to set other info + req->auth_code = auth.auth_code; + } else { + req->user = auth.user; + req->passwd = auth.passwd; + if (!auth.cluster.empty()) { + req->__set_cluster(auth.cluster); + } + req->__set_user_ip(auth.user_ip); + } +} -} // end namespace doris +} diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index ac101769ef104c..f9ff83b1bb3312 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -75,7 +75,7 @@ set(EXEC_FILES schema_scanner/schema_columns_scanner.cpp schema_scanner/schema_charsets_scanner.cpp schema_scanner/schema_collations_scanner.cpp - schema_scanner/frontend_helper.cpp + schema_scanner/schema_helper.cpp partitioned_hash_table.cc partitioned_hash_table_ir.cc partitioned_aggregation_node.cc diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index c60e35665ac3ea..017a03ef558b46 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -24,8 +24,8 @@ #include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/raw_value.h" -#include "runtime/load_stream_mgr.h" -#include "runtime/stream_load_pipe.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_pipe.h" #include "runtime/tuple.h" #include "exprs/expr.h" #include "exec/text_converter.h" diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 9977ba3a2d48ad..c99b19c9adb6f4 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -21,7 +21,7 @@ #include #include "exec/text_converter.hpp" -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/runtime_state.h" #include "runtime/row_batch.h" diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.cpp b/be/src/exec/schema_scanner/schema_columns_scanner.cpp index c0ffcfa9b0abc7..26226f1ea44c12 100644 --- a/be/src/exec/schema_scanner/schema_columns_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_columns_scanner.cpp @@ -21,7 +21,7 @@ #include "runtime/primitive_type.h" #include "runtime/string_value.h" #include "runtime/datetime_value.h" -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" namespace doris { @@ -74,7 +74,7 @@ Status SchemaColumnsScanner::start(RuntimeState *state) { } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::get_db_names(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); } else { return Status("IP or port dosn't exists"); @@ -151,7 +151,7 @@ Status SchemaColumnsScanner::fill_one_row(Tuple *tuple, MemPool *pool) { { void *slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); - std::string db_name = FrontendHelper::extract_db_name(_db_result.dbs[_db_index - 1]); + std::string db_name = SchemaHelper::extract_db_name(_db_result.dbs[_db_index - 1]); str_slot->ptr = (char *)pool->allocate(db_name.size()); str_slot->len = db_name.size(); memcpy(str_slot->ptr, db_name.c_str(), str_slot->len); @@ -327,7 +327,7 @@ Status SchemaColumnsScanner::get_new_desc() { } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::describe_table(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::describe_table(*(_param->ip), _param->port, desc_params, &_desc_result)); } else { return Status("IP or port dosn't exists"); @@ -351,7 +351,7 @@ Status SchemaColumnsScanner::get_new_table() { } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::get_table_names(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->ip), _param->port, table_params, &_table_result)); } else { return Status("IP or port dosn't exists"); diff --git a/be/src/exec/schema_scanner/frontend_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp similarity index 57% rename from be/src/exec/schema_scanner/frontend_helper.cpp rename to be/src/exec/schema_scanner/schema_helper.cpp index e7384451e67703..89a2f4245abf08 100644 --- a/be/src/exec/schema_scanner/frontend_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" #include @@ -35,6 +35,7 @@ #include "runtime/tuple_row.h" #include "runtime/client_cache.h" #include "util/debug_util.h" +#include "util/frontend_helper.h" #include "util/network_util.h" #include "util/thrift_util.h" #include "util/runtime_profile.h" @@ -42,113 +43,62 @@ namespace doris { -ExecEnv* FrontendHelper::_s_exec_env; - -using apache::thrift::protocol::TProtocol; -using apache::thrift::protocol::TBinaryProtocol; -using apache::thrift::transport::TSocket; -using apache::thrift::transport::TTransport; -using apache::thrift::transport::TBufferedTransport; - -void FrontendHelper::setup(ExecEnv* exec_env) { - _s_exec_env = exec_env; -} - -Status FrontendHelper::get_db_names( +Status SchemaHelper::get_db_names( const std::string& ip, const int32_t port, const TGetDbsParams &request, TGetDbsResult *result) { - return rpc(ip, port, + return FrontendHelper::rpc(ip, port, [&request, &result] (FrontendServiceConnection& client) { client->getDbNames(*result, request); }); } -Status FrontendHelper::get_table_names( +Status SchemaHelper::get_table_names( const std::string& ip, const int32_t port, const TGetTablesParams &request, TGetTablesResult *result) { - return rpc(ip, port, + return FrontendHelper::rpc(ip, port, [&request, &result] (FrontendServiceConnection& client) { client->getTableNames(*result, request); }); } -Status FrontendHelper::list_table_status( +Status SchemaHelper::list_table_status( const std::string& ip, const int32_t port, const TGetTablesParams &request, TListTableStatusResult *result) { - return rpc(ip, port, + return FrontendHelper::rpc(ip, port, [&request, &result] (FrontendServiceConnection& client) { client->listTableStatus(*result, request); }); } -Status FrontendHelper::describe_table( +Status SchemaHelper::describe_table( const std::string& ip, const int32_t port, const TDescribeTableParams &request, TDescribeTableResult *result) { - return rpc(ip, port, + return FrontendHelper::rpc(ip, port, [&request, &result] (FrontendServiceConnection& client) { client->describeTable(*result, request); }); } -Status FrontendHelper::show_varialbes( +Status SchemaHelper::show_varialbes( const std::string& ip, const int32_t port, const TShowVariableRequest &request, TShowVariableResult *result) { - return rpc(ip, port, + return FrontendHelper::rpc(ip, port, [&request, &result] (FrontendServiceConnection& client) { client->showVariables(*result, request); }); } -Status FrontendHelper::rpc( - const std::string& ip, - const int32_t port, - std::function callback, - int timeout_ms) { - TNetworkAddress address = make_network_address(ip, port); - Status status; - FrontendServiceConnection client( - _s_exec_env->frontend_client_cache(), address, timeout_ms, &status); - if (!status.ok()) { - LOG(WARNING) << "Connect frontent failed, address=" << address - << ", status=" << status.get_error_msg(); - return status; - } - try { - try { - callback(client); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "retrying call frontend service, address=" - << address << ", reason=" << e.what(); - status = client.reopen(timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "client repoen failed. address=" << address - << ", status=" << status.get_error_msg(); - return status; - } - callback(client); - } - } catch (apache::thrift::TException& e) { - // just reopen to disable this connection - client.reopen(timeout_ms); - LOG(WARNING) << "call frontend service failed, address=" << address - << ", reason=" << e.what(); - return Status(TStatusCode::THRIFT_RPC_ERROR, - "failed to call frontend service", false); - } - return Status::OK; -} - -std::string FrontendHelper::extract_db_name(const std::string& full_name) { +std::string SchemaHelper::extract_db_name(const std::string& full_name) { auto found = full_name.find(':'); if (found == std::string::npos) { return full_name; diff --git a/be/src/exec/schema_scanner/frontend_helper.h b/be/src/exec/schema_scanner/schema_helper.h similarity index 69% rename from be/src/exec/schema_scanner/frontend_helper.h rename to be/src/exec/schema_scanner/schema_helper.h index b13e92f26b4054..5b261f174718a5 100644 --- a/be/src/exec/schema_scanner/frontend_helper.h +++ b/be/src/exec/schema_scanner/schema_helper.h @@ -15,42 +15,40 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_QUERY_EXEC_SCHEMA_SCANNER_FRONTEND_HELPER_H -#define DORIS_BE_SRC_QUERY_EXEC_SCHEMA_SCANNER_FRONTEND_HELPER_H +#pragma once #include "common/status.h" #include "gen_cpp/FrontendService_types.h" namespace doris { -class ExecEnv; -class FrontendServiceClient; -template class ClientConnection; - -// this class is a helper for jni call. easy for unit test -class FrontendHelper { +// this class is a helper for getting schema info from FE +class SchemaHelper { public: - static void setup(ExecEnv* exec_env); static Status get_db_names( const std::string& ip, const int32_t port, const TGetDbsParams &db_params, TGetDbsResult *db_result); + static Status get_table_names( const std::string& ip, const int32_t port, const TGetTablesParams &table_params, TGetTablesResult *table_result); + static Status list_table_status( const std::string& ip, const int32_t port, const TGetTablesParams &table_params, TListTableStatusResult *table_result); + static Status describe_table( const std::string& ip, const int32_t port, const TDescribeTableParams &desc_params, TDescribeTableResult *desc_result); + static Status show_varialbes( const std::string& ip, const int32_t port, @@ -58,27 +56,7 @@ class FrontendHelper { TShowVariableResult *var_result); static std::string extract_db_name(const std::string& full_name); - - // for default timeout - static Status rpc( - const std::string& ip, - const int32_t port, - std::function&)> callback) { - - return rpc(ip, port, callback, config::thrift_rpc_timeout_ms); - } - - static Status rpc( - const std::string& ip, - const int32_t port, - std::function&)> callback, - int timeout_ms); - -private: - static ExecEnv* _s_exec_env; }; } -#endif - diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp index ec30fe0120cbce..f1794e50587fd8 100644 --- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp @@ -18,7 +18,7 @@ #include "exec/schema_scanner/schema_schemata_scanner.h" #include "runtime/primitive_type.h" #include "runtime/string_value.h" -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" namespace doris { @@ -54,7 +54,7 @@ Status SchemaSchemataScanner::start(RuntimeState *state) { db_params.__set_user_ip(*(_param->user_ip)); } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::get_db_names(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); } else { return Status("IP or port dosn't exists"); @@ -75,7 +75,7 @@ Status SchemaSchemataScanner::fill_one_row(Tuple *tuple, MemPool *pool) { { void *slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); - std::string db_name = FrontendHelper::extract_db_name(_db_result.dbs[_db_index]); + std::string db_name = SchemaHelper::extract_db_name(_db_result.dbs[_db_index]); str_slot->ptr = (char *)pool->allocate(db_name.size()); str_slot->len = db_name.size(); memcpy(str_slot->ptr, db_name.c_str(), str_slot->len); diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp b/be/src/exec/schema_scanner/schema_tables_scanner.cpp index 0f773817e6ba58..9afbf11cb7a9b4 100644 --- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" #include "exec/schema_scanner/schema_tables_scanner.h" #include "runtime/primitive_type.h" #include "runtime/string_value.h" @@ -75,7 +75,7 @@ Status SchemaTablesScanner::start(RuntimeState *state) { } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::get_db_names(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::get_db_names(*(_param->ip), _param->port, db_params, &_db_result)); } else { return Status("IP or port dosn't exists"); @@ -95,7 +95,7 @@ Status SchemaTablesScanner::fill_one_row(Tuple *tuple, MemPool *pool) { { void *slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); - std::string db_name = FrontendHelper::extract_db_name(_db_result.dbs[_db_index - 1]); + std::string db_name = SchemaHelper::extract_db_name(_db_result.dbs[_db_index - 1]); str_slot->ptr = (char *)pool->allocate(db_name.size()); str_slot->len = db_name.size(); memcpy(str_slot->ptr, db_name.c_str(), str_slot->len); @@ -232,7 +232,7 @@ Status SchemaTablesScanner::get_new_table() { } if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::list_table_status(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::list_table_status(*(_param->ip), _param->port, table_params, &_table_result)); } else { return Status("IP or port dosn't exists"); diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.cpp b/be/src/exec/schema_scanner/schema_variables_scanner.cpp index 53b0170c09eb6a..8604b712a7d65b 100644 --- a/be/src/exec/schema_scanner/schema_variables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_variables_scanner.cpp @@ -19,7 +19,7 @@ #include "runtime/primitive_type.h" #include "runtime/string_value.h" #include "runtime/runtime_state.h" -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" namespace doris { @@ -53,7 +53,7 @@ Status SchemaVariablesScanner::start(RuntimeState *state) { var_params.__set_threadId(_param->thread_id); if (NULL != _param->ip && 0 != _param->port) { - RETURN_IF_ERROR(FrontendHelper::show_varialbes(*(_param->ip), + RETURN_IF_ERROR(SchemaHelper::show_varialbes(*(_param->ip), _param->port, var_params, &_var_result)); } else { return Status("IP or port dosn't exists"); diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 38e63458994dc9..792e63406c9fce 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -29,7 +29,6 @@ add_library(Webserver STATIC http_channel.cpp http_status.cpp http_parser.cpp - message_body_sink.cpp web_page_handler.cpp monitor_action.cpp default_path_handlers.cpp diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d43d085ad9c5ec..bbc0e3dea57224 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -29,7 +29,8 @@ #include #include "common/logging.h" -#include "exec/schema_scanner/frontend_helper.h" +#include "common/utils.h" +#include "util/frontend_helper.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" @@ -44,8 +45,10 @@ #include "runtime/fragment_mgr.h" #include "runtime/load_path_mgr.h" #include "runtime/plan_fragment_executor.h" -#include "runtime/stream_load_pipe.h" -#include "runtime/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_pipe.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/load_stream_mgr.h" #include "util/byte_buffer.h" #include "util/debug_util.h" #include "util/json_util.h" @@ -62,11 +65,7 @@ IntCounter k_streaming_load_duration_ms; static IntGauge k_streaming_load_current_processing; #ifdef BE_TEST -TLoadTxnBeginResult k_stream_load_begin_result; -TLoadTxnCommitResult k_stream_load_commit_result; -TLoadTxnRollbackResult k_stream_load_rollback_result; TStreamLoadPutResult k_stream_load_put_result; -Status k_stream_load_plan_status; #endif static TFileFormatType::type parse_format(const std::string& format_str) { @@ -85,134 +84,6 @@ static bool is_format_support_streaming(TFileFormatType::type format) { } } -// stream load context -struct StreamLoadContext { - StreamLoadContext(StreamLoadAction* action_) : action(action_), _refs(0) { - start_nanos = MonotonicNanos(); - } - - ~StreamLoadContext(); - - StreamLoadAction* action; - // id for each load - UniqueId id; - - std::string db; - std::string table; - // load label, used to identify - std::string label; - - std::string user_ip; - - HttpAuthInfo auth; - - // only used to check if we receive whole body - size_t body_bytes = 0; - size_t receive_bytes = 0; - - int64_t txn_id = -1; - - bool need_rollback = false; - // when use_streaming is true, we use stream_pipe to send source data, - // otherwise we save source data to file first, then process it. - bool use_streaming = false; - TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; - - std::shared_ptr body_sink; - - TStreamLoadPutResult put_result; - double max_filter_ratio = 0.0; - std::vector commit_infos; - - std::promise promise; - std::future future = promise.get_future(); - - Status status; - - int64_t number_loaded_rows = 0; - int64_t number_filtered_rows = 0; - int64_t start_nanos = 0; - int64_t load_cost_nanos = 0; - std::string error_url; - - std::string to_json() const; - - std::string brief() const; - - void ref() { _refs.fetch_add(1); } - // If unref() returns true, this object should be delete - bool unref() { return _refs.fetch_sub(1) == 1; } - -private: - std::atomic _refs; -}; - -StreamLoadContext::~StreamLoadContext() { - if (need_rollback) { - action->rollback(this); - need_rollback = false; - } -} - -std::string StreamLoadContext::to_json() const { - rapidjson::StringBuffer s; - rapidjson::PrettyWriter writer(s); - - writer.StartObject(); - // txn id - writer.Key("TxnId"); - writer.Int64(txn_id); - - // label - writer.Key("Label"); - writer.String(label.c_str()); - - // status - writer.Key("Status"); - switch (status.code()) { - case TStatusCode::OK: - writer.String("Success"); - break; - case TStatusCode::PUBLISH_TIMEOUT: - writer.String("Publish Timeout"); - break; - case TStatusCode::LABEL_ALREADY_EXISTS: - writer.String("Label Already Exists"); - break; - default: - writer.String("Fail"); - break; - } - // msg - writer.Key("Message"); - if (status.ok()) { - writer.String("OK"); - } else { - writer.String(status.get_error_msg().c_str()); - } - // number_load_rows - writer.Key("NumberLoadedRows"); - writer.Int64(number_loaded_rows); - writer.Key("NumberFilteredRows"); - writer.Int64(number_filtered_rows); - writer.Key("LoadBytes"); - writer.Int64(receive_bytes); - writer.Key("LoadTimeMs"); - writer.Int64(load_cost_nanos / 1000000); - if (!error_url.empty()) { - writer.Key("ErrorURL"); - writer.String(error_url.c_str()); - } - writer.EndObject(); - return s.GetString(); -} - -std::string StreamLoadContext::brief() const { - std::stringstream ss; - ss << " id=" << id << ", txn id=" << txn_id << ", label=" << label; - return ss.str(); -} - StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { DorisMetrics::metrics()->register_metric("streaming_load_requests_total", &k_streaming_load_requests_total); @@ -245,7 +116,7 @@ void StreamLoadAction::handle(HttpRequest* req) { if (!ctx->status.ok()) { if (ctx->need_rollback) { - rollback(ctx); + _exec_env->stream_load_executor()->rollback_txn(ctx); ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -272,10 +143,10 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { } if (!ctx->use_streaming) { // if we use non-streaming, we need to close file first, - // then _execute_plan_fragment here + // then execute_plan_fragment here // this will close file ctx->body_sink.reset(); - RETURN_IF_ERROR(_execute_plan_fragment(ctx)); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx)); } else { RETURN_IF_ERROR(ctx->body_sink->finish()); } @@ -284,36 +155,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { RETURN_IF_ERROR(ctx->future.get()); // If put file succeess we need commit this load - TLoadTxnCommitRequest request; - set_http_auth(&request, ctx->auth); - request.db = ctx->db; - request.tbl = ctx->table; - request.txnId = ctx->txn_id; - request.sync = true; - request.commitInfos = std::move(ctx->commit_infos); - request.__isset.commitInfos = true; - - TNetworkAddress master_addr = _exec_env->master_info()->network_address; - TLoadTxnCommitResult result; -#ifndef BE_TEST - RETURN_IF_ERROR(FrontendHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { - client->loadTxnCommit(result, request); - }, config::txn_commit_rpc_timeout_ms)); -#else - result = k_stream_load_commit_result; -#endif - // Return if this transaction is committed successful; otherwise, we need try to - // rollback this transaction - Status status(result.status); - if (!status.ok()) { - LOG(WARNING) << "commit transaction failed, id=" << ctx->id - << ", errmsg=" << status.get_error_msg(); - return status; - } - // commit success, set need_rollback to false - ctx->need_rollback = false; + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); return Status::OK; } @@ -321,9 +163,12 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { int StreamLoadAction::on_header(HttpRequest* req) { k_streaming_load_current_processing.increment(1); - StreamLoadContext* ctx = new StreamLoadContext(this); + StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->ref(); req->set_handler_ctx(ctx); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; ctx->db = req->param(HTTP_DB_KEY); ctx->table = req->param(HTTP_TABLE_KEY); @@ -339,7 +184,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { if (!st.ok()) { ctx->status = st; if (ctx->need_rollback) { - rollback(ctx); + _exec_env->stream_load_executor()->rollback_txn(ctx); ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -394,34 +239,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct TNetworkAddress master_addr = _exec_env->master_info()->network_address; // begin transaction - { - TLoadTxnBeginRequest request; - set_http_auth(&request, ctx->auth); - request.db = ctx->db; - request.tbl = ctx->table; - request.label = ctx->label; - // set timestamp - request.__set_timestamp(GetCurrentTimeMicros()); - - TLoadTxnBeginResult result; -#ifndef BE_TEST - RETURN_IF_ERROR(FrontendHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { - client->loadTxnBegin(result, request); - })); -#else - result = k_stream_load_begin_result; -#endif - Status status(result.status); - if (!status.ok()) { - LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg() - << ctx->brief(); - return status; - } - ctx->txn_id = result.txnId; - ctx->need_rollback = true; - } + RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx)); // process put file return _process_put(http_req, ctx); @@ -453,7 +271,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { } void StreamLoadAction::free_handler_ctx(void* param) { - StreamLoadContext* ctx = (StreamLoadContext*)param; + StreamLoadContext* ctx = (StreamLoadContext*) param; if (ctx == nullptr) { return; } @@ -472,7 +290,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* // put request TStreamLoadPutRequest request; - set_http_auth(&request, ctx->auth); + set_request_auth(&request, ctx->auth); request.db = ctx->db; request.tbl = ctx->table; request.txnId = ctx->txn_id; @@ -531,79 +349,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* if (!ctx->use_streaming) { return Status::OK; } - return _execute_plan_fragment(ctx); -} - -Status StreamLoadAction::_execute_plan_fragment(StreamLoadContext* ctx) { - // submit this params -#ifndef BE_TEST - ctx->ref(); - auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, - [ctx] (PlanFragmentExecutor* executor) { - ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); - Status status = executor->status(); - if (status.ok()) { - ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success(); - ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); - int64_t num_total_rows = - ctx->number_loaded_rows + ctx->number_filtered_rows; - if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) { - status = Status("too many filtered rows"); - } - if (ctx->number_filtered_rows > 0 && - !executor->runtime_state()->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path( - executor->runtime_state()->get_error_log_file_path()); - } - } else { - LOG(WARNING) << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", errmsg=" << status.get_error_msg() - << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(); - } - } - ctx->promise.set_value(status); - if (ctx->unref()) { - delete ctx; - } - }); - if (!st.ok()) { - // no need to check unref's return value - ctx->unref(); - return st; - } -#else - ctx->promise.set_value(k_stream_load_plan_status); -#endif - return Status::OK; -} - -void StreamLoadAction::rollback(StreamLoadContext* ctx) { - TNetworkAddress master_addr = _exec_env->master_info()->network_address; - TLoadTxnRollbackRequest request; - set_http_auth(&request, ctx->auth); - request.db = ctx->db; - request.tbl = ctx->table; - request.txnId = ctx->txn_id; - request.__set_reason(ctx->status.get_error_msg()); - TLoadTxnRollbackResult result; -#ifndef BE_TEST - auto rpc_st = FrontendHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result] (FrontendServiceConnection& client) { - client->loadTxnRollback(result, request); - }); - if (!rpc_st.ok()) { - LOG(WARNING) << "transaction rollback failed. errmsg=" << rpc_st.get_error_msg() - << ctx->brief(); - } -#else - result = k_stream_load_rollback_result; -#endif + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) { diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index b82a4c2d16594b..34ee2b6498a027 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -19,19 +19,15 @@ #include -#include "common/status.h" #include "gen_cpp/PlanNodes_types.h" #include "http/http_handler.h" -#include "http/message_body_sink.h" #include "runtime/client_cache.h" +#include "runtime/message_body_sink.h" namespace doris { class ExecEnv; -class TStreamLoadPutRequest; -class TStreamLoadPutResult; -class HttpAuthInfo; -class TTabletCommitInfo; +class Status; class StreamLoadContext; class StreamLoadAction : public HttpHandler { @@ -48,9 +44,6 @@ class StreamLoadAction : public HttpHandler { void on_chunk_data(HttpRequest* req) override; void free_handler_ctx(void* ctx) override; - // called by deconstructor - void rollback(StreamLoadContext* ctx); - private: Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx); Status _handle(StreamLoadContext* ctx); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 05bc5d878d93d1..c3d34681f60156 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -21,13 +21,6 @@ namespace doris { -struct HttpAuthInfo { - std::string user; - std::string passwd; - std::string cluster; - std::string user_ip; -}; - static const std::string HTTP_DB_KEY = "db"; static const std::string HTTP_TABLE_KEY = "table"; static const std::string HTTP_LABEL_KEY = "label"; diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp index 2e8be225cf9ad4..7953a7cda10c92 100644 --- a/be/src/http/utils.cpp +++ b/be/src/http/utils.cpp @@ -18,6 +18,7 @@ #include #include "common/logging.h" +#include "common/utils.h" #include "http/http_common.h" #include "http/http_headers.h" #include "http/http_request.h" @@ -54,7 +55,7 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa return true; } -bool parse_basic_auth(const HttpRequest& req, HttpAuthInfo* auth) { +bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) { std::string full_user; if (!parse_basic_auth(req, &full_user, &auth->passwd)) { return false; diff --git a/be/src/http/utils.h b/be/src/http/utils.h index b388e6af1e5f5a..8e82d7bed58aa9 100644 --- a/be/src/http/utils.h +++ b/be/src/http/utils.h @@ -19,12 +19,12 @@ #include +#include "common/utils.h" #include "http/http_common.h" namespace doris { class HttpRequest; -class HttpAuthInfo; std::string encode_basic_auth(const std::string& user, const std::string& passwd); // parse Basic authorization @@ -32,16 +32,6 @@ std::string encode_basic_auth(const std::string& user, const std::string& passwd // Otherwise return fasle bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* passwd); -bool parse_basic_auth(const HttpRequest& req, HttpAuthInfo* auth); - -template -void set_http_auth(T* req, const HttpAuthInfo& auth) { - req->user = auth.user; - req->passwd = auth.passwd; - if (!auth.cluster.empty()) { - req->__set_cluster(auth.cluster); - } - req->__set_user_ip(auth.user_ip); -} +bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth); } diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 13464da33b01a4..0b5de1f47fbad5 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -92,8 +92,12 @@ add_library(Runtime STATIC bufferpool/system_allocator.cc initial_reservations.cc snapshot_loader.cpp - kafka_consumer_pipe.cpp query_statistics.cpp + message_body_sink.cpp + stream_load/stream_load_context.cpp + stream_load/stream_load_executor.cpp + routine_load/data_consumer.cpp + routine_load/routine_load_task_executor.cpp ) # This test runs forever so should not be part of 'make test' diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index fb07a7c792eef5..5d7b09f75d497a 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -50,6 +50,8 @@ class ThreadPool; class ThreadResourceMgr; class TmpFileMgr; class WebPageHandler; +class StreamLoadExecutor; +class RoutineLoadTaskExecutor; class BackendServiceClient; class FrontendServiceClient; @@ -110,11 +112,15 @@ class ExecEnv { BufferPool* buffer_pool() { return _buffer_pool; } TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; } LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; } + const std::vector& store_paths() const { return _store_paths; } void set_store_paths(const std::vector& paths) { _store_paths = paths; } OLAPEngine* olap_engine() { return _olap_engine; } void set_olap_engine(OLAPEngine* olap_engine) { _olap_engine = olap_engine; } + StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; } + RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } + private: Status _init(const std::vector& store_paths); void _destory(); @@ -158,6 +164,9 @@ class ExecEnv { BufferPool* _buffer_pool = nullptr; OLAPEngine* _olap_engine = nullptr; + + StreamLoadExecutor* _stream_load_executor = nullptr; + RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; }; } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 930286e51993e0..56db283f365e02 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -43,8 +43,10 @@ #include "util/bfd_parser.h" #include "runtime/etl_job_mgr.h" #include "runtime/load_path_mgr.h" -#include "runtime/load_stream_mgr.h" #include "runtime/pull_load_task_mgr.h" +#include "runtime/routine_load/routine_load_task_executor.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_executor.h" #include "util/pretty_printer.h" #include "util/doris_metrics.h" #include "util/brpc_stub_cache.h" @@ -95,6 +97,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { _tablet_writer_mgr = new TabletWriterMgr(this); _load_stream_mgr = new LoadStreamMgr(); _brpc_stub_cache = new BrpcStubCache(); + _stream_load_executor = new StreamLoadExecutor(this); + _routine_load_task_executor = new RoutineLoadTaskExecutor(this); _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); @@ -206,6 +210,9 @@ void ExecEnv::_destory() { delete _client_cache; delete _result_mgr; delete _stream_mgr; + delete _stream_load_executor; + delete _routine_load_task_executor; + _metrics = nullptr; } diff --git a/be/src/http/message_body_sink.cpp b/be/src/runtime/message_body_sink.cpp similarity index 95% rename from be/src/http/message_body_sink.cpp rename to be/src/runtime/message_body_sink.cpp index bb831b819b5e99..93fa788bca04f1 100644 --- a/be/src/http/message_body_sink.cpp +++ b/be/src/runtime/message_body_sink.cpp @@ -15,16 +15,13 @@ // specific language governing permissions and limitations // under the License. -#include "http/message_body_sink.h" +#include "runtime/message_body_sink.h" #include #include #include #include -#include "http/http_channel.h" -#include "http/http_parser.h" - #include "util/runtime_profile.h" namespace doris { diff --git a/be/src/http/message_body_sink.h b/be/src/runtime/message_body_sink.h similarity index 94% rename from be/src/http/message_body_sink.h rename to be/src/runtime/message_body_sink.h index 25d082c1822d12..b8ebe7f01f3bce 100644 --- a/be/src/http/message_body_sink.h +++ b/be/src/runtime/message_body_sink.h @@ -23,9 +23,6 @@ namespace doris { -class HttpChannel; -class BodySink; - class MessageBodySink { public: virtual ~MessageBodySink() { } @@ -41,7 +38,7 @@ class MessageBodySink { virtual void cancel() { } }; -// write HTTP request's message-body to a local file +// write message to a local file class MessageBodyFileSink : public MessageBodySink { public: MessageBodyFileSink(const std::string& path) : _path(path) { } diff --git a/be/src/runtime/routine_load/kafka_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp similarity index 60% rename from be/src/runtime/routine_load/kafka_consumer.cpp rename to be/src/runtime/routine_load/data_consumer.cpp index c720e3f42ab431..f180669fe0e473 100644 --- a/be/src/runtime/routine_load/kafka_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -15,14 +15,21 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/routine_load/kafka_consumer.h" +#include "runtime/routine_load/data_consumer.h" -#include "runtime/routine_load/" -#include "librdkafka/rdkafkacpp.h" +#include +#include +#include +#include + +#include "common/status.h" +#include "runtime/routine_load/kafka_consumer_pipe.h" +#include "util/defer_op.h" +#include "util/stopwatch.hpp" namespace doris { -Status KafkaConsumer::init() { +Status KafkaDataConsumer::init() { std::unique_lock l(_lock); if (_init) { // this consumer has already been initialized. @@ -33,21 +40,20 @@ Status KafkaConsumer::init() { // conf has to be deleted finally auto conf_deleter = [] (RdKafka::Conf *conf) { delete conf; }; - DeferOp close_dir(std::bind(&conf_deleter, conf)); + DeferOp delete_conf(std::bind(conf_deleter, conf)); std::string errstr; - #define SET_KAFKA_CONF(conf_key, conf_val) \ - if (conf->set(#conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { \ + if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) { \ std::stringstream ss; \ - ss << "failed to set '" << #conf_key << "'"; \ + ss << "failed to set '" << conf_key << "'"; \ LOG(WARNING) << ss.str(); \ return Status(ss.str()); \ } - SET_KAFKA_CONF("metadata.broker.list", _k_brokers); - SET_KAFKA_CONF("group.id", _k_group_id); - SET_KAFKA_CONF("client.id", _k_client_id); + SET_KAFKA_CONF("metadata.broker.list", _ctx->kafka_info->brokers); + SET_KAFKA_CONF("group.id", _ctx->kafka_info->group_id); + SET_KAFKA_CONF("client.id", _ctx->kafka_info->client_id); SET_KAFKA_CONF("enable.partition.eof", "false"); SET_KAFKA_CONF("enable.auto.offset.store", "false"); // TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb() @@ -61,42 +67,52 @@ Status KafkaConsumer::init() { } // create TopicPartitions - std::vector topic_partitions; - std::stringstream ss; - for (auto& entry : _k_partition_offset) { - TopicPartition* tp1 = TopicPartition::create(_k_topic, entry.first); - tp1->set_offset(entry.second); + std::vector topic_partitions; + for (auto& entry : _ctx->kafka_info->begin_offset) { + RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create( + _ctx->kafka_info->topic, entry.first, entry.second); topic_partitions.push_back(tp1); - ss << "partition[" << entry.first << ": " << entry.second << "];"; } + // delete TopicPartition finally + auto tp_deleter = [] (const std::vector& vec) { + std::for_each(vec.begin(), vec.end(), + [](RdKafka::TopicPartition* tp1) { delete tp1; }); + }; + DeferOp delete_tp(std::bind(tp_deleter, topic_partitions)); + + // assign partition RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); if (err) { - LOG(WARNING) << "failed to assign topic partitions: " << ss.str() + LOG(WARNING) << "failed to assign topic partitions: " << _ctx->brief(true) << ", err: " << RdKafka::err2str(err); return Status("failed to assgin topic partitions"); } - // delete TopicPartition finally - std::for_each(topic_partitions.begin(), topic_partitions.end(), - [](TopicPartition* tp1) { delete tp1};) - - VLOG(3) << "finished to init kafka consumer" - << ". brokers=" << _k_brokers - << ", group_id=" << _k_group_id - << ", client_id=" << _k_client_id - << ", topic=" << _k_topic - << ", partition= " << ss.str(); + VLOG(3) << "finished to init kafka consumer. " + << _ctx->brief(true); _init = true; return Status::OK; } -Status KafkaConsumer::start() { +Status KafkaDataConsumer::start() { + { + std::unique_lock l(_lock); + if (!_init) { + return Status("consumer is not initialized"); + } + } + + int64_t left_time = _ctx->kafka_info->max_interval_s; + int64_t left_rows = _ctx->kafka_info->max_batch_rows; + int64_t left_bytes = _ctx->kafka_info->max_batch_bytes; - int64_t left_time = _max_interval_ms; - int64_t left_rows = _max_batch_rows; - int64_t left_size = _max_batch_size; + LOG(INFO) << "start consumer" + << ". interval(s): " << left_time + << ", bath rows: " << left_rows + << ", batch size: " << left_bytes + << ". " << _ctx->brief(); MonotonicStopWatch watch; watch.start(); @@ -113,18 +129,18 @@ Status KafkaConsumer::start() { break; } - if (left_time <= 0 || left_rows <= 0 || left_size <=0) { + if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { VLOG(3) << "kafka consume batch finished" << ". left time=" << left_time << ", left rows=" << left_rows - << ", left size=" << left_size; + << ", left bytes=" << left_bytes; _kafka_consumer_pipe->finish(); _finished = true; return Status::OK; } - // consume 1 message - RdKafka::Message *msg = consumer->consume(1000 /* timeout, ms */); + // consume 1 message at a time + RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: VLOG(3) << "get kafka message, offset: " << msg->offset(); @@ -133,7 +149,10 @@ Status KafkaConsumer::start() { static_cast(msg->len())); if (st.ok()) { left_rows--; - left_size -= msg->len(); + left_bytes -= msg->len(); + _ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset(); + VLOG(3) << "consume partition[ " << msg->partition() + << " - " << msg->offset(); } break; @@ -153,16 +172,16 @@ Status KafkaConsumer::start() { return st; } - left_time = _max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; + left_time = _ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000; } return Status::OK; } -Status KafkaConsumer::cancel() { +Status KafkaDataConsumer::cancel() { std::unique_lock l(_lock); if (!_init) { - return Status("consumer not being initialized"); + return Status("consumer is not initialized"); } if (_finished) { @@ -173,6 +192,4 @@ Status KafkaConsumer::cancel() { return Status::OK; } -}; - } // end namespace doris diff --git a/be/src/runtime/routine_load/kafka_consumer.h b/be/src/runtime/routine_load/data_consumer.h similarity index 57% rename from be/src/runtime/routine_load/kafka_consumer.h rename to be/src/runtime/routine_load/data_consumer.h index 4dcb28d1b53fab..8830c9ea995dd1 100644 --- a/be/src/runtime/routine_load/kafka_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -17,74 +17,82 @@ #pragma once -#include +#include -#include -#include -#include +#include "librdkafka/rdkafkacpp.h" + +#include "runtime/stream_load/stream_load_context.h" namespace doris { class KafkaConsumerPipe; -class RdKafka::KafkaConsumer; +class Status; -class KafkaConsumer { +class DataConsumer { public: - KafkaConsumer( - const std::string& brokers, - const std::string& group_id, - const std::string& client_id, - const std::string& topic, - const std::map& partition_offset, - std::shared_ptr kafka_consumer_pipe, - ): - _k_brokers(broker), - _k_group_id(group_id), - _k_client_id(client_id), - _k_topic(topic), - _k_partition_offset(partition_offset), - _kafka_consumer_pipe(kafka_consumer_pipe), + DataConsumer(StreamLoadContext* ctx): + _ctx(ctx), _init(false), _finished(false), _cancelled(false) { - + + _ctx->ref(); } - // init the KafkaConsumer with the given parameters - Status init(); + virtual ~DataConsumer() { + if (_ctx->unref()) { + delete _ctx; + } + } + // init the consumer with the given parameters + virtual Status init() = 0; + // start consuming - Status start(); + virtual Status start() = 0; // cancel the consuming process. // if the consumer is not initialized, or the consuming // process is already finished, call cancel() will // return ERROR - Status cancel(); + virtual Status cancel() = 0; - ~KafkaConsumer() { - if (_consumer) { - _consumer->close(); - delete _consumer; - } - } - -private: - - std::string _k_brokers; - std::string _k_group_id; - std::string _k_client_id; - std::string _k_topic; - // partition id -> offset - std::map _k_partition_offset; - std::shared_ptr _kafka_consumer_pipe; - RdKafka::KafkaConsumer* _consumer; +protected: + StreamLoadContext* _ctx; // lock to protect the following bools - std::mutex _lock + std::mutex _lock; bool _init; bool _finished; bool _cancelled; }; +class KafkaDataConsumer : public DataConsumer { +public: + KafkaDataConsumer( + StreamLoadContext* ctx, + std::shared_ptr kafka_consumer_pipe + ): + DataConsumer(ctx), + _kafka_consumer_pipe(kafka_consumer_pipe) { + } + + virtual Status init() override; + + virtual Status start() override; + + virtual Status cancel() override; + + virtual ~KafkaDataConsumer() { + if (_k_consumer) { + _k_consumer->close(); + delete _k_consumer; + } + } + +private: + std::shared_ptr _kafka_consumer_pipe; + RdKafka::KafkaConsumer* _k_consumer = nullptr; +}; + } // end namespace doris diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h index 65b65646f5a22a..7827d9a68a4f94 100644 --- a/be/src/runtime/routine_load/kafka_consumer_pipe.h +++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h @@ -26,7 +26,7 @@ #include "librdkafka/rdkafka.h" #include "exec/file_reader.h" -#include "http/message_body_sink.h" +#include "runtime/message_body_sink.h" namespace doris { @@ -38,7 +38,7 @@ class KafkaConsumerPipe : public StreamLoadPipe { } - virtual ~KafkaConsumerPipe(); + virtual ~KafkaConsumerPipe() {} Status append_with_line_delimiter(const char* data, size_t size) { Status st = append(data, size); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 884c0d17b9b170..f6c45cb964b568 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -15,44 +15,159 @@ // specific language governing permissions and limitations // under the License. -#include "agent/agent_server.h" - -using apache::thrift::transport::TProcessor; -using std::deque; -using std::list; -using std::map; -using std::nothrow; -using std::set; -using std::string; -using std::to_string; -using std::vector; +#include "runtime/routine_load/routine_load_task_executor.h" -namespace doris { +#include "common/status.h" +#include "runtime/exec_env.h" +#include "runtime/routine_load/data_consumer.h" +#include "runtime/routine_load/kafka_consumer_pipe.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "util/uid_util.h" + +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/Types_types.h" -Status RoutineLoadTaskExecutor::submit_task(RoutineLoadTask task) { +namespace doris { +Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock l(_lock); - if (_task_ids.find(task.id) != _task_ids.end()) { + if (_task_map.find(task.id) != _task_map.end()) { // already submitted + LOG(INFO) << "routine load task " << task.id << " has already been submitted"; return Status::OK; } - _task_ids.insert(task.id); + // create the context + StreamLoadContext* ctx = new StreamLoadContext(_exec_env); + ctx->load_type = TLoadType::ROUTINE_LOAD; + ctx->load_src_type = task.type; + ctx->id = UniqueId(task.id); + ctx->txn_id = task.txn_id; + ctx->db = task.db; + ctx->table = task.tbl; + ctx->label = task.label; + ctx->auth.auth_code = task.auth_code; + + // the routine load task'txn has alreay began in FE. + // so it need to rollback if encounter error. + ctx->need_rollback = true; - // 1. create a stream load context with ConsumerPipe - StreamLoadContext* ctx = new StreamLoadContext(this); - auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - plan_params, - + // set source related params + switch (task.type) { + case TLoadSourceType::KAFKA: + ctx->kafka_info = new KafkaLoadInfo(task.kafka_load_info); + break; + default: + LOG(WARNING) << "unknown load source type: " << task.type; + delete ctx; + return Status("unknown load source type"); + } + + VLOG(1) << "receive a new routine load task: " << ctx->brief(); + // register the task + ctx->ref(); + _task_map[ctx->id] = ctx; + + // offer the task to thread pool + if (!_thread_pool->offer( + boost::bind(&RoutineLoadTaskExecutor::exec_task, this, ctx, + [this] (StreamLoadContext* ctx) { + std::unique_lock l(_lock); + _task_map.erase(ctx->id); + LOG(INFO) << "finished routine load task " << ctx->brief(); + if (ctx->unref()) { + delete ctx; + } + }))) { - ); + // failed to submit task, clear and return + LOG(WARNING) << "failed to submit routine load task: " << ctx->brief(); + _task_map.erase(ctx->id); + if (ctx->unref()) { + delete ctx; + } + return Status("failed to submit routine load task"); + } else { + LOG(INFO) << "submit a new routine load task: " << ctx->brief() + << ", current tasks num: " << _task_map.size(); + return Status::OK; + } +} + +void RoutineLoadTaskExecutor::exec_task( + StreamLoadContext* ctx, ExecFinishCallback cb) { + + // create pipe and consumer + std::shared_ptr pipe; + std::shared_ptr consumer; + switch (ctx->load_src_type) { + case TLoadSourceType::KAFKA: + pipe = std::make_shared(); + consumer = std::make_shared( + ctx, std::static_pointer_cast(pipe)); + ctx->body_sink = pipe; + break; + default: + std::stringstream ss; + ss << "unknown routine load task type: " << ctx->load_type; + err_handler(ctx, Status::CANCELLED, ss.str()); + cb(ctx); + return; + } +#define HANDLE_ERROR(stmt, err_msg) \ + do { \ + Status _status_ = (stmt); \ + if (UNLIKELY(!_status_.ok())) { \ + err_handler(ctx, _status_, err_msg); \ + cb(ctx); \ + return; \ + } \ + } while (false); + + HANDLE_ERROR(consumer->init(), "failed to init consumer"); + + // must put pipe before executing plan fragment + HANDLE_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); + + // execute plan fragment, async + HANDLE_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx), + "failed to execute plan fragment"); + + // start to consume, this may block a while + HANDLE_ERROR(consumer->start(), "consuming failed"); + + // wait for consumer finished + HANDLE_ERROR(ctx->future.get(), "consume failed"); + + ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; - // 2. execute plan + // commit txn + HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); + + cb(ctx); +} + +void RoutineLoadTaskExecutor::err_handler( + StreamLoadContext* ctx, + const Status& st, + const std::string& err_msg) { - // 3. activate the consumer to read data + LOG(WARNING) << err_msg; + ctx->status = st; + if (ctx->need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(ctx); + ctx->need_rollback = false; + } + if (ctx->body_sink.get() != nullptr) { + ctx->body_sink->cancel(); + } + return; } } // end namespace + diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 736aa94b76f2e7..4f16dbac269d91 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -17,33 +17,59 @@ #pragma once -#include "agent/agent_server.h" - -using apache::thrift::transport::TProcessor; -using std::deque; -using std::list; -using std::map; -using std::nothrow; -using std::set; -using std::string; -using std::to_string; -using std::vector; +#include +#include +#include + +#include "util/thread_pool.hpp" +#include "util/uid_util.h" namespace doris { -class RoutineLoadTaskExecutor { +class ExecEnv; +class Status; +class StreamLoadContext; +class TRoutineLoadTask; +// A routine load task executor will receive routine load +// tasks from FE, put it to a fixed thread pool. +// The thread pool will process each task and report the result +// to FE finally. +class RoutineLoadTaskExecutor { public: + // paramater: task id + typedef std::function ExecFinishCallback; + + RoutineLoadTaskExecutor(ExecEnv* exec_env): + _exec_env(exec_env) { + _thread_pool = new ThreadPool(10, 1000); + } + + ~RoutineLoadTaskExecutor() { + if (_thread_pool) { + delete _thread_pool; + } + } + // submit a routine load task - Status submit_task(RoutineLoadTask task); + Status submit_task(const TRoutineLoadTask& task); private: + // execute the task + void exec_task(StreamLoadContext* ctx, ExecFinishCallback cb); - ThreadPool _thread_pool; - std::mutex _lock; + void err_handler( + StreamLoadContext* ctx, + const Status& st, + const std::string& err_msg); - // submitted tasks - std::set task_ids; -} +private: + ExecEnv* _exec_env; + ThreadPool* _thread_pool; + + std::mutex _lock; + // task id -> load context + std::unordered_map _task_map; +}; } // end namespace diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 584269284c09b0..b2e3a0d5956782 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -28,13 +28,13 @@ #include "common/logging.h" #include "exec/broker_reader.h" #include "exec/broker_writer.h" -#include "exec/schema_scanner/frontend_helper.h" #include "olap/file_helper.h" #include "olap/olap_engine.h" #include "olap/olap_table.h" #include "runtime/exec_env.h" #include "runtime/broker_mgr.h" #include "util/file_utils.h" +#include "util/frontend_helper.h" namespace doris { diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/stream_load/load_stream_mgr.h similarity index 86% rename from be/src/runtime/load_stream_mgr.h rename to be/src/runtime/stream_load/load_stream_mgr.h index 02dfae035b32cb..97e5777adc1779 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/stream_load/load_stream_mgr.h @@ -21,7 +21,7 @@ #include #include -#include "runtime/stream_load_pipe.h" // for StreamLoadPipe +#include "runtime/stream_load/stream_load_pipe.h" // for StreamLoadPipe #include "util/uid_util.h" // for std::hash for UniqueId namespace doris { @@ -54,6 +54,15 @@ class LoadStreamMgr { return stream; } + void remove(const UniqueId& id) { + std::lock_guard l(_lock); + auto it = _stream_map.find(id); + if (it != std::end(_stream_map)) { + _stream_map.erase(it); + } + return; + } + private: std::mutex _lock; std::unordered_map> _stream_map; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp new file mode 100644 index 00000000000000..45c06d122cdace --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/stream_load/stream_load_context.h" + +namespace doris { + +std::string StreamLoadContext::to_json() const { + rapidjson::StringBuffer s; + rapidjson::PrettyWriter writer(s); + + writer.StartObject(); + // txn id + writer.Key("TxnId"); + writer.Int64(txn_id); + + // label + writer.Key("Label"); + writer.String(label.c_str()); + + // status + writer.Key("Status"); + switch (status.code()) { + case TStatusCode::OK: + writer.String("Success"); + break; + case TStatusCode::PUBLISH_TIMEOUT: + writer.String("Publish Timeout"); + break; + case TStatusCode::LABEL_ALREADY_EXISTS: + writer.String("Label Already Exists"); + break; + default: + writer.String("Fail"); + break; + } + // msg + writer.Key("Message"); + if (status.ok()) { + writer.String("OK"); + } else { + writer.String(status.get_error_msg().c_str()); + } + // number_load_rows + writer.Key("NumberLoadedRows"); + writer.Int64(number_loaded_rows); + writer.Key("NumberFilteredRows"); + writer.Int64(number_filtered_rows); + writer.Key("LoadBytes"); + writer.Int64(receive_bytes); + writer.Key("LoadTimeMs"); + writer.Int64(load_cost_nanos / 1000000); + if (!error_url.empty()) { + writer.Key("ErrorURL"); + writer.String(error_url.c_str()); + } + writer.EndObject(); + return s.GetString(); +} + +std::string StreamLoadContext::brief(bool detail) const { + std::stringstream ss; + ss << " id=" << id << ", txn id=" << txn_id << ", label=" << label; + if (detail) { + switch(load_src_type) { + case TLoadSourceType::KAFKA: + if (kafka_info != nullptr) { + ss << ", kafka" + << ", brokers: " << kafka_info->brokers + << ", group_id: " << kafka_info->group_id + << ", client_id: " << kafka_info->client_id + << ", topic: " << kafka_info->topic + << ", partition: "; + for (auto& entry : kafka_info->begin_offset) { + ss << "[" << entry.first << ": " << entry.second << "]"; + } + } + break; + default: + break; + } + } + return ss.str(); +} + +} // end namespace diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h new file mode 100644 index 00000000000000..7d8996221ab17d --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "gen_cpp/BackendService_types.h" +#include "gen_cpp/FrontendService_types.h" + +#include "common/status.h" +#include "common/logging.h" +#include "common/utils.h" +#include "runtime/exec_env.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "util/time.h" +#include "util/uid_util.h" + +namespace doris { + +// kafka related info +class KafkaLoadInfo { +public: + KafkaLoadInfo(const TKafkaLoadInfo& t_info): + brokers(t_info.brokers), + group_id(t_info.group_id), + client_id(t_info.client_id), + topic(t_info.topic), + max_interval_s(t_info.max_interval_s), + max_batch_rows(t_info.max_batch_rows), + max_batch_bytes(t_info.max_batch_size), + begin_offset(t_info.partition_begin_offset) { + } + +public: + std::string brokers; + std::string group_id; + std::string client_id; + std::string topic; + + // the following members control the max progress of a consuming + // process. if any of them reach, the consuming will finish. + int64_t max_interval_s; + int64_t max_batch_rows; + int64_t max_batch_bytes; + + // partition -> begin offset, inclusive. + std::map begin_offset; + // partiton -> commit offset, inclusive. + std::map cmt_offset; +}; + +class MessageBodySink; + +class StreamLoadContext { +public: + StreamLoadContext(ExecEnv* exec_env) : + _exec_env(exec_env), + _refs(0) { + start_nanos = MonotonicNanos(); + } + + ~StreamLoadContext() { + if (need_rollback) { + _exec_env->stream_load_executor()->rollback_txn(this); + need_rollback = false; + } + + _exec_env->load_stream_mgr()->remove(id); + + if (kafka_info != nullptr) { + delete kafka_info; + } + } + + void rollback(); + + std::string to_json() const; + + // return the brief info of this context. + // also print the load source info if detail is set to true + std::string brief(bool detail = false) const; + + void ref() { _refs.fetch_add(1); } + // If unref() returns true, this object should be delete + bool unref() { return _refs.fetch_sub(1) == 1; } + +public: + // load type, eg: ROUTINE LOAD/MANUL LOAD + TLoadType::type load_type; + // load data source: eg: KAFKA/RAW + TLoadSourceType::type load_src_type; + + // id for each load + UniqueId id; + + std::string db; + std::string table; + std::string label; + + std::string user_ip; + + AuthInfo auth; + + // only used to check if we receive whole body + size_t body_bytes = 0; + size_t receive_bytes = 0; + + int64_t txn_id = -1; + + bool need_rollback = false; + // when use_streaming is true, we use stream_pipe to send source data, + // otherwise we save source data to file first, then process it. + bool use_streaming = false; + TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; + + std::shared_ptr body_sink; + + TStreamLoadPutResult put_result; + double max_filter_ratio = 0.0; + std::vector commit_infos; + + std::promise promise; + std::future future = promise.get_future(); + + Status status; + + int64_t number_loaded_rows = 0; + int64_t number_filtered_rows = 0; + int64_t loaded_bytes = 0; + int64_t start_nanos = 0; + int64_t load_cost_nanos = 0; + std::string error_url; + + KafkaLoadInfo* kafka_info = nullptr; + +private: + ExecEnv* _exec_env; + std::atomic _refs; +}; + +} // end namespace diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp new file mode 100644 index 00000000000000..80a176f79ad743 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/stream_load/stream_load_executor.h" + +#include "common/status.h" +#include "common/utils.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/frontend_helper.h" + +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/Types_types.h" + +namespace doris { + +#ifdef BE_TEST +TLoadTxnBeginResult k_stream_load_begin_result; +TLoadTxnCommitResult k_stream_load_commit_result; +TLoadTxnRollbackResult k_stream_load_rollback_result; +Status k_stream_load_plan_status; +#endif + +Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { + // submit this params +#ifndef BE_TEST + ctx->ref(); + auto st = _exec_env->fragment_mgr()->exec_plan_fragment( + ctx->put_result.params, + [ctx] (PlanFragmentExecutor* executor) { + ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); + Status status = executor->status(); + if (status.ok()) { + ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_success(); + ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); + int64_t num_total_rows = + ctx->number_loaded_rows + ctx->number_filtered_rows; + if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) { + status = Status("too many filtered rows"); + } + if (ctx->number_filtered_rows > 0 && + !executor->runtime_state()->get_error_log_file_path().empty()) { + + if (ctx->load_type == TLoadType::MANUL_LOAD) { + ctx->error_url = to_load_error_http_path( + executor->runtime_state()->get_error_log_file_path()); + } + } + } else { + LOG(WARNING) << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", errmsg=" << status.get_error_msg() + << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(); + } + } + ctx->promise.set_value(status); + if (ctx->unref()) { + delete ctx; + } + }); + if (!st.ok()) { + // no need to check unref's return value + ctx->unref(); + return st; + } +#else + ctx->promise.set_value(k_stream_load_plan_status); +#endif + return Status::OK; +} + +Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + + TLoadTxnBeginRequest request; + set_request_auth(&request, ctx->auth); + request.db = ctx->db; + request.tbl = ctx->table; + request.label = ctx->label; + // set timestamp + request.__set_timestamp(GetCurrentTimeMicros()); + + TLoadTxnBeginResult result; +#ifndef BE_TEST + RETURN_IF_ERROR(FrontendHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result] (FrontendServiceConnection& client) { + client->loadTxnBegin(result, request); + })); +#else + result = k_stream_load_begin_result; +#endif + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "begin transaction failed, errmsg=" << status.get_error_msg() + << ctx->brief(); + return status; + } + ctx->txn_id = result.txnId; + ctx->need_rollback = true; + + return Status::OK; +} + +Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + + TLoadTxnCommitRequest request; + set_request_auth(&request, ctx->auth); + request.db = ctx->db; + request.tbl = ctx->table; + request.txnId = ctx->txn_id; + request.sync = true; + request.commitInfos = std::move(ctx->commit_infos); + request.__isset.commitInfos = true; + + // set attachment if has + TTxnCommitAttachment attachment; + if (collect_load_stat(ctx, &attachment)) { + request.txnCommitAttachment = std::move(attachment); + request.__isset.txnCommitAttachment = true; + } + + TLoadTxnCommitResult result; +#ifndef BE_TEST + RETURN_IF_ERROR(FrontendHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result] (FrontendServiceConnection& client) { + client->loadTxnCommit(result, request); + }, config::txn_commit_rpc_timeout_ms)); +#else + result = k_stream_load_commit_result; +#endif + // Return if this transaction is committed successful; otherwise, we need try to + // rollback this transaction + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "commit transaction failed, id=" << ctx->id + << ", errmsg=" << status.get_error_msg(); + return status; + } + // commit success, set need_rollback to false + ctx->need_rollback = false; + return Status::OK; +} + +void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TLoadTxnRollbackRequest request; + set_request_auth(&request, ctx->auth); + request.db = ctx->db; + request.tbl = ctx->table; + request.txnId = ctx->txn_id; + request.__set_reason(ctx->status.get_error_msg()); + TLoadTxnRollbackResult result; + + // set attachment if has + TTxnCommitAttachment attachment; + if (collect_load_stat(ctx, &attachment)) { + request.txnCommitAttachment = std::move(attachment); + request.__isset.txnCommitAttachment = true; + } + +#ifndef BE_TEST + auto rpc_st = FrontendHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result] (FrontendServiceConnection& client) { + client->loadTxnRollback(result, request); + }); + if (!rpc_st.ok()) { + LOG(WARNING) << "transaction rollback failed. errmsg=" << rpc_st.get_error_msg() + << ctx->brief(); + } +#else + result = k_stream_load_rollback_result; +#endif +} + +bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attach) { + if (ctx->load_type != TLoadType::ROUTINE_LOAD) { + // currently, only routine load need to set attachment + return false; + } + + switch(ctx->load_src_type) { + case TLoadSourceType::KAFKA: { + attach->loadType = TLoadType::ROUTINE_LOAD; + + TRLTaskTxnCommitAttachment rl_attach; + rl_attach.loadSourceType = TLoadSourceType::KAFKA; + rl_attach.id = ctx->id.to_thrift(); + rl_attach.__set_loadedRows(ctx->number_loaded_rows); + rl_attach.__set_filteredRows(ctx->number_filtered_rows); + rl_attach.__set_receivedBytes(ctx->receive_bytes); + rl_attach.__set_loadedBytes(ctx->loaded_bytes); + rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); + + if (ctx->status.ok()) { + TKafkaRLTaskProgress kafka_progress; + kafka_progress.partitionCmtOffset = std::move(ctx->kafka_info->cmt_offset); + rl_attach.kafkaRLTaskProgress = std::move(kafka_progress); + rl_attach.__isset.kafkaRLTaskProgress = true; + } + + attach->rlTaskTxnCommitAttachment = std::move(rl_attach); + attach->__isset.rlTaskTxnCommitAttachment = true; + + return true; + } + case TLoadSourceType::RAW: + return false; + default: + // unknown type, should not happend + return false; + } + return false; +} + +} // end namespace diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h new file mode 100644 index 00000000000000..68d49be828f6e4 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace doris { + +class ExecEnv; +class StreamLoadContext; +class Status; +class TTxnCommitAttachment; + +class StreamLoadExecutor { + +public: + StreamLoadExecutor(ExecEnv* exec_env): + _exec_env(exec_env) { + } + + Status begin_txn(StreamLoadContext* ctx); + + Status commit_txn(StreamLoadContext* ctx); + + void rollback_txn(StreamLoadContext* ctx); + + Status execute_plan_fragment(StreamLoadContext* ctx); + +private: + // collect the load statistics from context and set them to stat + // return true if stat is set, otherwise, return false + bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment); + +private: + ExecEnv* _exec_env; +}; + +} diff --git a/be/src/runtime/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h similarity index 99% rename from be/src/runtime/stream_load_pipe.h rename to be/src/runtime/stream_load/stream_load_pipe.h index 781f4d66232a20..c422967635b2a3 100644 --- a/be/src/runtime/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -22,7 +22,7 @@ #include #include "exec/file_reader.h" -#include "http/message_body_sink.h" +#include "runtime/message_body_sink.h" #include "util/bit_util.h" #include "util/byte_buffer.h" diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 496fa89131806c..a9353fd606c749 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -34,6 +34,7 @@ #include "runtime/pull_load_task_mgr.h" #include "runtime/export_task_mgr.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/routine_load/routine_load_task_executor.h" namespace doris { @@ -228,4 +229,10 @@ void BackendService::get_tablet_stat(TTabletStatResult& result) { OLAPEngine::get_instance()->get_tablet_stat(result); } +void BackendService::submit_routine_load_task( + TStatus& t_status, const TRoutineLoadTask& task) { + Status status = _exec_env->routine_load_task_executor()->submit_task(task); + status.to_thrift(&t_status); +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 094be53c10cadf..ecb8629a3db562 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -147,6 +147,8 @@ class BackendService : public BackendServiceIf { virtual void get_tablet_stat(TTabletStatResult& result) override; + virtual void submit_routine_load_task(TStatus& t_status, const TRoutineLoadTask& task) override; + private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index ceec930eb7d81c..9ad546fd91db89 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -55,7 +55,7 @@ #include "service/http_service.h" #include #include "common/resource_tls.h" -#include "exec/schema_scanner/frontend_helper.h" +#include "util/frontend_helper.h" static void help(const char*); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 3fa44dc6fa1059..0c3884e4a65a11 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -42,7 +42,7 @@ add_library(Util STATIC parse_util.cpp path_builder.cpp # TODO: not supported on RHEL 5 -# perf-counters.cpp +# perf-counters.cpp progress_updater.cpp runtime_profile.cpp static_asserts.cpp @@ -67,13 +67,14 @@ add_library(Util STATIC null_load_error_hub.cpp time.cpp os_info.cpp -# coding_util.cpp +# coding_util.cpp cidr.cpp core_local.cpp uid_util.cpp aes_util.cpp string_util.cpp md5.cpp + frontend_helper.cpp ) #ADD_BE_TEST(integer-array-test) diff --git a/be/src/util/frontend_helper.cpp b/be/src/util/frontend_helper.cpp new file mode 100644 index 00000000000000..ddc36befc067c3 --- /dev/null +++ b/be/src/util/frontend_helper.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/frontend_helper.h" + +#include + +#include +#include +#include +#include + +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/FrontendService.h" +#include "runtime/runtime_state.h" +#include "runtime/exec_env.h" +#include "runtime/client_cache.h" +#include "util/network_util.h" +#include "util/thrift_util.h" +#include "util/runtime_profile.h" +#include "runtime/client_cache.h" + +namespace doris { + +ExecEnv* FrontendHelper::_s_exec_env; + +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TBinaryProtocol; +using apache::thrift::transport::TSocket; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TBufferedTransport; + +void FrontendHelper::setup(ExecEnv* exec_env) { + _s_exec_env = exec_env; +} + +Status FrontendHelper::rpc( + const std::string& ip, + const int32_t port, + std::function callback, + int timeout_ms) { + TNetworkAddress address = make_network_address(ip, port); + Status status; + FrontendServiceConnection client( + _s_exec_env->frontend_client_cache(), address, timeout_ms, &status); + if (!status.ok()) { + LOG(WARNING) << "Connect frontent failed, address=" << address + << ", status=" << status.get_error_msg(); + return status; + } + try { + try { + callback(client); + } catch (apache::thrift::transport::TTransportException& e) { + LOG(WARNING) << "retrying call frontend service, address=" + << address << ", reason=" << e.what(); + status = client.reopen(timeout_ms); + if (!status.ok()) { + LOG(WARNING) << "client repoen failed. address=" << address + << ", status=" << status.get_error_msg(); + return status; + } + callback(client); + } + } catch (apache::thrift::TException& e) { + // just reopen to disable this connection + client.reopen(timeout_ms); + LOG(WARNING) << "call frontend service failed, address=" << address + << ", reason=" << e.what(); + return Status(TStatusCode::THRIFT_RPC_ERROR, + "failed to call frontend service", false); + } + return Status::OK; +} + +} + diff --git a/be/src/util/frontend_helper.h b/be/src/util/frontend_helper.h new file mode 100644 index 00000000000000..d0ef6d0a8e4e00 --- /dev/null +++ b/be/src/util/frontend_helper.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "gen_cpp/FrontendService_types.h" + +namespace doris { + +class ExecEnv; +class FrontendServiceClient; +template class ClientConnection; + +// this class is a helper for jni call. easy for unit test +class FrontendHelper { +public: + static void setup(ExecEnv* exec_env); + + // for default timeout + static Status rpc( + const std::string& ip, + const int32_t port, + std::function&)> callback) { + + return rpc(ip, port, callback, config::thrift_rpc_timeout_ms); + } + + static Status rpc( + const std::string& ip, + const int32_t port, + std::function&)> callback, + int timeout_ms); + +private: + static ExecEnv* _s_exec_env; +}; + +} + diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index b27d10ce337a8e..79443dddc0aeb9 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -192,7 +192,7 @@ struct TRecoverTabletReq { 3: optional Types.TVersion version 4: optional Types.TVersionHash version_hash } - + struct TAgentTaskRequest { 1: required TAgentServiceVersion protocol_version 2: required Types.TTaskType task_type diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 798607c20c7ba0..c0c6d2a0cc07e8 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -63,6 +63,28 @@ struct TTabletStatResult { 1: required map tablets_stats } +struct TKafkaLoadInfo { + 1: required string brokers; + 2: required string group_id; + 3: required string client_id; + 4: required string topic; + 5: optional i64 max_interval_s; + 6: optional i64 max_batch_rows; + 7: optional i64 max_batch_size; + 8: optional map partition_begin_offset; +} + +struct TRoutineLoadTask { + 1: required Types.TLoadSourceType type + 2: required Types.TUniqueId id + 3: required i64 txn_id + 4: required i64 auth_code + 5: optional string db + 6: optional string tbl + 7: optional string label + 8: optional TKafkaLoadInfo kafka_load_info +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -119,4 +141,6 @@ service BackendService { Status.TStatus erase_export_task(1:Types.TUniqueId task_id); TTabletStatResult get_tablet_stat(); + + Status.TStatus submit_routine_load_task(1:TRoutineLoadTask task); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index eeb846464a0c74..3007712ebaf20f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -458,6 +458,7 @@ struct TLoadTxnBeginRequest { 6: optional string user_ip 7: required string label 8: optional i64 timestamp + 9: optional i64 auth_code } struct TLoadTxnBeginResult { @@ -493,6 +494,7 @@ struct TStreamLoadPutRequest { 14: optional string columnSeparator 15: optional string partitions + 16: optional i64 auth_code } struct TStreamLoadPutResult { @@ -501,31 +503,23 @@ struct TStreamLoadPutResult { 2: optional PaloInternalService.TExecPlanFragmentParams params } -enum TRoutineLoadType { - KAFKA = 1 -} - struct TKafkaRLTaskProgress { - 1: required map partitionIdToOffset -} - -enum TTxnSourceType { - ROUTINE_LOAD_TASK = 1 + 1: required map partitionCmtOffset } struct TRLTaskTxnCommitAttachment { - 1: required TRoutineLoadType routineLoadType - 2: required i64 backendId - 3: required i64 taskSignature - 4: required i32 numOfErrorData - 5: required i32 numOfTotalData - 6: required string taskId - 7: required string jobId + 1: required Types.TLoadSourceType loadSourceType + 2: required Types.TUniqueId id + 3: optional i64 loadedRows + 4: optional i64 filteredRows + 5: optional i64 receivedBytes + 6: optional i64 loadedBytes + 7: optional i64 loadCostMs 8: optional TKafkaRLTaskProgress kafkaRLTaskProgress } struct TTxnCommitAttachment { - 1: required TTxnSourceType txnSourceType + 1: required Types.TLoadType loadType 2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment } @@ -539,7 +533,8 @@ struct TLoadTxnCommitRequest { 7: required i64 txnId 8: required bool sync 9: optional list commitInfos - 10: optional TTxnCommitAttachment txnCommitAttachment + 10: optional i64 auth_code + 11: optional TTxnCommitAttachment txnCommitAttachment } struct TLoadTxnCommitResult { @@ -555,6 +550,8 @@ struct TLoadTxnRollbackRequest { 6: optional string user_ip 7: required i64 txnId 8: optional string reason + 9: optional i64 auth_code + 10: optional TTxnCommitAttachment txnCommitAttachment } struct TLoadTxnRollbackResult { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index c9934ff74ad7ed..044b6c24e2ee28 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -353,3 +353,12 @@ struct TTabletCommitInfo { 2: required i64 backendId } +enum TLoadType { + MANUL_LOAD, + ROUTINE_LOAD, +} + +enum TLoadSourceType { + RAW, + KAFKA, +} diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 1068cc1b57068f..3e77e29096c21a 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -507,7 +507,7 @@ build_librdkafka() { CPPFLAGS="-I${TP_INCLUDE_DIR}" \ LDFLAGS="-L${TP_LIB_DIR}" CFLAGS="-fPIC" \ - ./configure --prefix=$TP_INSTALL_DIR --enable-static + ./configure --prefix=$TP_INSTALL_DIR --enable-static --disable-ssl --disable-sasl make -j$PARALLEL && make install } From 1b77066a0f6412876f4afc214f59c132bca6abb1 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 27 Feb 2019 16:02:08 +0800 Subject: [PATCH 3/5] fix fe --- .../apache/doris/common/GenericPoolTest.java | 7 ++++ .../transaction/GlobalTransactionMgrTest.java | 42 ++++++++----------- gensrc/thrift/FrontendService.thrift | 13 +++--- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java index 64432e386477fc..caee523999fc26 100644 --- a/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -39,6 +39,7 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPullLoadSubTaskInfo; import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TTabletStatResult; @@ -217,6 +218,12 @@ public TTabletStatResult get_tablet_stat() throws TException { // TODO Auto-generated method stub return null; } + + @Override + public TStatus submit_routine_load_task(TRoutineLoadTask task) throws TException { + // TODO Auto-generated method stub + return null; + } } @Test diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index a25da4b87f4c0e..5afdff7accdf5c 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -22,11 +22,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import com.google.common.collect.Maps; -import mockit.Deencapsulation; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.Database; @@ -51,12 +46,14 @@ import org.apache.doris.meta.MetaContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TKafkaRLTaskProgress; +import org.apache.doris.thrift.TLoadSourceType; import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TRoutineLoadType; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -70,6 +67,11 @@ import java.util.Map; import java.util.Set; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + public class GlobalTransactionMgrTest { private static FakeEditLog fakeEditLog; @@ -331,25 +333,21 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); - rlTaskTxnCommitAttachment.setBackendId(1L); - rlTaskTxnCommitAttachment.setTaskSignature(1L); - rlTaskTxnCommitAttachment.setNumOfTotalData(100); - rlTaskTxnCommitAttachment.setNumOfErrorData(1); - rlTaskTxnCommitAttachment.setTaskId("label"); + rlTaskTxnCommitAttachment.setId(new TUniqueId()); + rlTaskTxnCommitAttachment.setLoadedRows(100); + rlTaskTxnCommitAttachment.setFilteredRows(1); rlTaskTxnCommitAttachment.setJobId(Deencapsulation.getField(routineLoadJob, "id")); - rlTaskTxnCommitAttachment.setRoutineLoadType(TRoutineLoadType.KAFKA); + rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map kafkaProgress = Maps.newHashMap(); kafkaProgress.put(1, 10L); - tKafkaRLTaskProgress.setPartitionIdToOffset(kafkaProgress); + tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); - RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(routineLoadJob); - new Expectations() { { catalog.getDb(1L); @@ -409,25 +407,21 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi routineLoadJob.setState(RoutineLoadJob.JobState.RUNNING); TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = new TRLTaskTxnCommitAttachment(); - rlTaskTxnCommitAttachment.setBackendId(1L); - rlTaskTxnCommitAttachment.setTaskSignature(1L); - rlTaskTxnCommitAttachment.setNumOfTotalData(100); - rlTaskTxnCommitAttachment.setNumOfErrorData(11); - rlTaskTxnCommitAttachment.setTaskId("label"); + rlTaskTxnCommitAttachment.setId(new TUniqueId()); + rlTaskTxnCommitAttachment.setLoadedRows(100); + rlTaskTxnCommitAttachment.setFilteredRows(11); rlTaskTxnCommitAttachment.setJobId(Deencapsulation.getField(routineLoadJob, "id")); - rlTaskTxnCommitAttachment.setRoutineLoadType(TRoutineLoadType.KAFKA); + rlTaskTxnCommitAttachment.setLoadSourceType(TLoadSourceType.KAFKA); TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress(); Map kafkaProgress = Maps.newHashMap(); kafkaProgress.put(1, 10L); - tKafkaRLTaskProgress.setPartitionIdToOffset(kafkaProgress); + tKafkaRLTaskProgress.setPartitionCmtOffset(kafkaProgress); rlTaskTxnCommitAttachment.setKafkaRLTaskProgress(tKafkaRLTaskProgress); TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); - RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(routineLoadJob); - new Expectations() { { catalog.getDb(1L); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3007712ebaf20f..35e9827289205f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -510,12 +510,13 @@ struct TKafkaRLTaskProgress { struct TRLTaskTxnCommitAttachment { 1: required Types.TLoadSourceType loadSourceType 2: required Types.TUniqueId id - 3: optional i64 loadedRows - 4: optional i64 filteredRows - 5: optional i64 receivedBytes - 6: optional i64 loadedBytes - 7: optional i64 loadCostMs - 8: optional TKafkaRLTaskProgress kafkaRLTaskProgress + 3: required i64 jobId + 4: optional i64 loadedRows + 5: optional i64 filteredRows + 6: optional i64 receivedBytes + 7: optional i64 loadedBytes + 8: optional i64 loadCostMs + 9: optional TKafkaRLTaskProgress kafkaRLTaskProgress } struct TTxnCommitAttachment { From 3ca0ee6414eb5f26a1b08ec60b8b66aab725b372 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 27 Feb 2019 16:28:52 +0800 Subject: [PATCH 4/5] fix FE interface --- be/test/exec/olap_table_sink_test.cpp | 2 +- be/test/http/message_body_sink_test.cpp | 2 +- be/test/http/stream_load_test.cpp | 8 +- be/test/runtime/stream_load_pipe_test.cpp | 2 +- .../doris/load/routineload/KafkaProgress.java | 6 +- .../RLTaskTxnCommitAttachment.java | 117 +++++------------- .../load/routineload/RoutineLoadJob.java | 11 +- .../transaction/TxnCommitAttachment.java | 4 +- 8 files changed, 46 insertions(+), 106 deletions(-) diff --git a/be/test/exec/olap_table_sink_test.cpp b/be/test/exec/olap_table_sink_test.cpp index 8f4bbab9c5a3df..8709d1ad4773ee 100644 --- a/be/test/exec/olap_table_sink_test.cpp +++ b/be/test/exec/olap_table_sink_test.cpp @@ -23,11 +23,11 @@ #include "gen_cpp/internal_service.pb.h" #include "runtime/decimal_value.h" #include "runtime/exec_env.h" -#include "runtime/load_stream_mgr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/thread_resource_mgr.h" #include "runtime/tuple_row.h" +#include "runtime/stream_load/load_stream_mgr.h" #include "service/brpc.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" diff --git a/be/test/http/message_body_sink_test.cpp b/be/test/http/message_body_sink_test.cpp index 145b71a6f69e6c..cf71f1344301de 100644 --- a/be/test/http/message_body_sink_test.cpp +++ b/be/test/http/message_body_sink_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "http/message_body_sink.h" +#include "runtime/message_body_sink.h" #include diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index a4086f29d7563e..3090ae42edd2b7 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -23,12 +23,13 @@ #include #include -#include "exec/schema_scanner/frontend_helper.h" +#include "exec/schema_scanner/schema_helper.h" #include "gen_cpp/HeartbeatService_types.h" #include "http/http_channel.h" #include "http/http_request.h" #include "runtime/exec_env.h" -#include "runtime/load_stream_mgr.h" +#include "runtime/stream_load/load_stream_mgr.h" +#include "runtime/stream_load/stream_load_executor.h" #include "runtime/thread_resource_mgr.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" @@ -81,6 +82,7 @@ class StreamLoadActionTest : public testing::Test { _env._master_info = new TMasterInfo(); _env._load_stream_mgr = new LoadStreamMgr(); _env._brpc_stub_cache = new BrpcStubCache(); + _env._stream_load_executor = new StreamLoadExecutor(&_env); _evhttp_req = evhttp_request_new(nullptr, nullptr); } @@ -93,6 +95,8 @@ class StreamLoadActionTest : public testing::Test { _env._master_info = nullptr; delete _env._thread_mgr; _env._thread_mgr = nullptr; + delete _env._stream_load_executor; + _env._stream_load_executor = nullptr; if (_evhttp_req != nullptr) { evhttp_request_free(_evhttp_req); diff --git a/be/test/runtime/stream_load_pipe_test.cpp b/be/test/runtime/stream_load_pipe_test.cpp index bed70392a1662e..175d5338632c88 100644 --- a/be/test/runtime/stream_load_pipe_test.cpp +++ b/be/test/runtime/stream_load_pipe_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/stream_load_pipe.h" +#include "runtime/stream_load/stream_load_pipe.h" #include diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 9ac12e06ad3e63..97097cfbbb75ec 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,10 +17,10 @@ package org.apache.doris.load.routineload; +import org.apache.doris.thrift.TKafkaRLTaskProgress; + import com.google.common.base.Joiner; import com.google.common.collect.Maps; -import org.apache.doris.common.io.Writable; -import org.apache.doris.thrift.TKafkaRLTaskProgress; import java.io.DataInput; import java.io.DataOutput; @@ -42,7 +42,7 @@ public KafkaProgress() { } public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) { - this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionIdToOffset(); + this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset(); } public Map getPartitionIdToOffset() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 71f58a0a3c1213..90f1b1a795ce34 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -17,8 +17,8 @@ package org.apache.doris.load.routineload; -import org.apache.doris.common.io.Text; import org.apache.doris.thrift.TRLTaskTxnCommitAttachment; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TxnCommitAttachment; import java.io.DataInput; @@ -29,12 +29,12 @@ // "numOfTotalData": "", "taskId": "", "jobId": ""} public class RLTaskTxnCommitAttachment extends TxnCommitAttachment { - public enum RoutineLoadType { + public enum LoadSourceType { KAFKA(1); private final int flag; - private RoutineLoadType(int flag) { + private LoadSourceType(int flag) { this.flag = flag; } @@ -42,7 +42,7 @@ public int value() { return flag; } - public static RoutineLoadType valueOf(int flag) { + public static LoadSourceType valueOf(int flag) { switch (flag) { case 1: return KAFKA; @@ -52,93 +52,55 @@ public static RoutineLoadType valueOf(int flag) { } } + private long jobId; + private TUniqueId taskId; + private long filteredRows; + private long loadedRows; private RoutineLoadProgress progress; - private long backendId; - private long taskSignature; - private int numOfErrorData; - private int numOfTotalData; - private String taskId; - private String jobId; - private RoutineLoadType routineLoadType; + private LoadSourceType loadSourceType; public RLTaskTxnCommitAttachment() { } public RLTaskTxnCommitAttachment(TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { - this.backendId = rlTaskTxnCommitAttachment.getBackendId(); - this.taskSignature = rlTaskTxnCommitAttachment.getTaskSignature(); - this.numOfErrorData = rlTaskTxnCommitAttachment.getNumOfErrorData(); - this.numOfTotalData = rlTaskTxnCommitAttachment.getNumOfTotalData(); - this.taskId = rlTaskTxnCommitAttachment.getTaskId(); this.jobId = rlTaskTxnCommitAttachment.getJobId(); - switch (rlTaskTxnCommitAttachment.getRoutineLoadType()) { + this.taskId = rlTaskTxnCommitAttachment.getId(); + this.filteredRows = rlTaskTxnCommitAttachment.getFilteredRows(); + this.loadedRows = rlTaskTxnCommitAttachment.getLoadedRows(); + + switch (rlTaskTxnCommitAttachment.getLoadSourceType()) { case KAFKA: + this.loadSourceType = LoadSourceType.KAFKA; this.progress = new KafkaProgress(rlTaskTxnCommitAttachment.getKafkaRLTaskProgress()); + default: + break; } } - public RoutineLoadProgress getProgress() { - return progress; - } - - public void setProgress(RoutineLoadProgress progress) { - this.progress = progress; - } - - public long getBackendId() { - return backendId; - } - - public void setBackendId(long backendId) { - this.backendId = backendId; - } - - public long getTaskSignature() { - return taskSignature; - } - - public void setTaskSignature(long taskSignature) { - this.taskSignature = taskSignature; - } - - public int getNumOfErrorData() { - return numOfErrorData; - } - - public void setNumOfErrorData(int numOfErrorData) { - this.numOfErrorData = numOfErrorData; - } - - public int getNumOfTotalData() { - return numOfTotalData; - } - - public void setNumOfTotalData(int numOfTotalData) { - this.numOfTotalData = numOfTotalData; + public long getJobId() { + return jobId; } - public String getTaskId() { + public TUniqueId getTaskId() { return taskId; } - public void setTaskId(String taskId) { - this.taskId = taskId; + public long getFilteredRows() { + return filteredRows; } - public String getJobId() { - return jobId; + public long getLoadedRows() { + return loadedRows; } - public void setJobId(String jobId) { - this.jobId = jobId; + public RoutineLoadProgress getProgress() { + return progress; } @Override public String toString() { - return "RoutineLoadTaskTxnExtra [backendId=" + backendId - + ", taskSignature=" + taskSignature - + ", numOfErrorData=" + numOfErrorData - + ", numOfTotalData=" + numOfTotalData + return "RoutineLoadTaskTxnExtra [filteredRows=" + filteredRows + + ", loadedRows=" + loadedRows + ", taskId=" + taskId + ", jobId=" + jobId + ", progress=" + progress.toString() + "]"; @@ -146,30 +108,11 @@ public String toString() { @Override public void write(DataOutput out) throws IOException { - out.writeLong(backendId); - out.writeLong(taskSignature); - out.writeInt(numOfErrorData); - out.writeInt(numOfTotalData); - Text.writeString(out, taskId); - Text.writeString(out, jobId); - out.writeInt(routineLoadType.value()); - progress.write(out); + // TODO: think twice } @Override public void readFields(DataInput in) throws IOException { - backendId = in.readLong(); - taskSignature = in.readLong(); - numOfErrorData = in.readInt(); - numOfTotalData = in.readInt(); - taskId = Text.readString(in); - jobId = Text.readString(in); - routineLoadType = RoutineLoadType.valueOf(in.readInt()); - switch (routineLoadType) { - case KAFKA: - KafkaProgress kafkaProgress = new KafkaProgress(); - kafkaProgress.readFields(in); - progress = kafkaProgress; - } + // TODO: think twice } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index fe0453ca792fc4..e0302138edb26f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -34,16 +34,13 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendServiceImpl; -import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.thrift.TLoadTxnCommitRequest; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.AbortTransactionException; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -421,7 +418,7 @@ private void loadTxnCommit(TLoadTxnCommitRequest request) throws TException { frontendService.loadTxnCommit(request); } - private void updateNumOfData(int numOfErrorData, int numOfTotalData) { + private void updateNumOfData(long numOfErrorData, long numOfTotalData) { currentErrorNum += numOfErrorData; currentTotalNum += numOfTotalData; if (currentTotalNum > BASE_OF_ERROR_RATE) { @@ -487,12 +484,8 @@ public void onCommitted(TransactionState txnState) { // step2: update job progress updateProgress(rlTaskTxnCommitAttachment.getProgress()); - // step3: remove task in agentTaskQueue - AgentTaskQueue.removeTask(rlTaskTxnCommitAttachment.getBackendId(), TTaskType.STREAM_LOAD, - rlTaskTxnCommitAttachment.getTaskSignature()); - // step4: if rate of error data is more then max_filter_ratio, pause job - updateNumOfData(rlTaskTxnCommitAttachment.getNumOfErrorData(), rlTaskTxnCommitAttachment.getNumOfTotalData()); + updateNumOfData(rlTaskTxnCommitAttachment.getFilteredRows(), rlTaskTxnCommitAttachment.getLoadedRows()); if (state == JobState.RUNNING) { // step5: create a new task for partitions diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java index ee9ff2c2957e6c..784ee122512618 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java @@ -41,8 +41,8 @@ public static TxnCommitAttachment readTxnCommitAttachment(DataInput in, public static TxnCommitAttachment fromThrift(TTxnCommitAttachment txnCommitAttachment) { if (txnCommitAttachment != null) { - switch (txnCommitAttachment.txnSourceType) { - case ROUTINE_LOAD_TASK: + switch (txnCommitAttachment.getLoadType()) { + case ROUTINE_LOAD: return new RLTaskTxnCommitAttachment(txnCommitAttachment.getRlTaskTxnCommitAttachment()); default: return null; From 093c4f365d86d86e2e549679138876931a4d8e34 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 27 Feb 2019 17:28:24 +0800 Subject: [PATCH 5/5] make it work --- .../routine_load/routine_load_task_executor.cpp | 1 + be/src/runtime/stream_load/stream_load_context.h | 4 ++++ .../runtime/stream_load/stream_load_executor.cpp | 1 + gensrc/thrift/BackendService.thrift | 15 ++++++++------- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index f6c45cb964b568..5814727f4a9cd9 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -43,6 +43,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->load_type = TLoadType::ROUTINE_LOAD; ctx->load_src_type = task.type; + ctx->job_id = task.job_id; ctx->id = UniqueId(task.id); ctx->txn_id = task.txn_id; ctx->db = task.db; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 7d8996221ab17d..5939dd83273f04 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -108,6 +108,10 @@ class StreamLoadContext { // load data source: eg: KAFKA/RAW TLoadSourceType::type load_src_type; + // the job this stream load task belongs to, + // set to -1 if there is no job + int64_t job_id = -1; + // id for each load UniqueId id; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 80a176f79ad743..9fc3d388de3cf0 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -211,6 +211,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt TRLTaskTxnCommitAttachment rl_attach; rl_attach.loadSourceType = TLoadSourceType::KAFKA; + rl_attach.jobId = ctx->job_id; rl_attach.id = ctx->id.to_thrift(); rl_attach.__set_loadedRows(ctx->number_loaded_rows); rl_attach.__set_filteredRows(ctx->number_filtered_rows); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index c0c6d2a0cc07e8..79e332eabec55a 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -76,13 +76,14 @@ struct TKafkaLoadInfo { struct TRoutineLoadTask { 1: required Types.TLoadSourceType type - 2: required Types.TUniqueId id - 3: required i64 txn_id - 4: required i64 auth_code - 5: optional string db - 6: optional string tbl - 7: optional string label - 8: optional TKafkaLoadInfo kafka_load_info + 2: required i64 job_id + 3: required Types.TUniqueId id + 4: required i64 txn_id + 5: required i64 auth_code + 6: optional string db + 7: optional string tbl + 8: optional string label + 9: optional TKafkaLoadInfo kafka_load_info } service BackendService {