From 99e6ed77529382a8ea1c6dc72011062ecc6f7637 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 1 Jun 2020 19:33:01 +0800 Subject: [PATCH 01/11] submit-spark-load-be-part --- be/src/agent/agent_server.cpp | 3 +- be/src/agent/task_worker_pool.cpp | 21 +- be/src/olap/olap_common.h | 1 + be/src/olap/push_handler.cpp | 280 +++++++++++- be/src/olap/push_handler.h | 45 ++ be/src/olap/task/engine_batch_load_task.cpp | 19 +- be/test/olap/CMakeLists.txt | 1 + be/test/olap/push_handler_test.cpp | 459 ++++++++++++++++++++ 8 files changed, 807 insertions(+), 22 deletions(-) create mode 100644 be/test/olap/push_handler_test.cpp diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index b30ed70cb2a447..08e7b5baaf96fd 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -158,7 +158,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const vectorsubmit_task(task); } else if (task.push_req.push_type == TPushType::DELETE) { _delete_workers->submit_task(task); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 42f178a37252bd..e96c847f2a9444 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -618,7 +618,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { #endif LOG(INFO) << "get push task. signature: " << agent_task_req.signature - << " priority: " << priority; + << " priority: " << priority << " push_type: " << push_req.push_type; vector tablet_infos; EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req.signature, &status); @@ -645,7 +645,8 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { } if (status == DORIS_SUCCESS) { - VLOG(3) << "push ok.signature: " << agent_task_req.signature; + VLOG(3) << "push ok. signature: " << agent_task_req.signature + << ", push_type: " << push_req.push_type; error_msgs.push_back("push success"); ++_s_report_version; @@ -1104,14 +1105,14 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) disk.__set_used(root_path_info.is_used); disks[root_path_info.path] = disk; - DorisMetrics::instance()->disks_total_capacity.set_metric( - root_path_info.path, root_path_info.disk_capacity); - DorisMetrics::instance()->disks_avail_capacity.set_metric( - root_path_info.path, root_path_info.available); - DorisMetrics::instance()->disks_data_used_capacity.set_metric( - root_path_info.path, root_path_info.data_used_capacity); - DorisMetrics::instance()->disks_state.set_metric( - root_path_info.path, root_path_info.is_used ? 1L : 0L); + DorisMetrics::instance()->disks_total_capacity.set_metric(root_path_info.path, + root_path_info.disk_capacity); + DorisMetrics::instance()->disks_avail_capacity.set_metric(root_path_info.path, + root_path_info.available); + DorisMetrics::instance()->disks_data_used_capacity.set_metric(root_path_info.path, + root_path_info.data_used_capacity); + DorisMetrics::instance()->disks_state.set_metric(root_path_info.path, + root_path_info.is_used ? 1L : 0L); } request.__set_disks(disks); diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 3a62b633417a9b..961571b947fc9c 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -169,6 +169,7 @@ enum PushType { PUSH_NORMAL = 1, PUSH_FOR_DELETE = 2, PUSH_FOR_LOAD_DELETE = 3, + PUSH_NORMAL_V2 = 4, }; enum ReaderType { diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 90b7fb840e82d8..a5230bca0ac92b 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -23,12 +23,16 @@ #include +#include "common/status.h" +#include "exec/parquet_scanner.h" +#include "olap/row.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "runtime/exec_env.h" using std::list; using std::map; @@ -184,9 +188,16 @@ OLAPStatus PushHandler::_do_streaming_ingestion( } // write - res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, - &(tablet_vars->at(0).rowset_to_add), - &(tablet_vars->at(1).rowset_to_add)); + if (push_type == PUSH_NORMAL_V2) { + res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, + &(tablet_vars->at(0).rowset_to_add), + &(tablet_vars->at(1).rowset_to_add)); + + } else { + res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, + &(tablet_vars->at(0).rowset_to_add), + &(tablet_vars->at(1).rowset_to_add)); + } if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res << ", failed to process realtime push." @@ -248,6 +259,149 @@ void PushHandler::_get_tablet_infos(const vector& tablet_vars, } } +OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, + TabletSharedPtr new_tablet, + RowsetSharedPtr* cur_rowset, + RowsetSharedPtr* new_rowset) { + OLAPStatus res = OLAP_SUCCESS; + PushBrokerReader* reader = nullptr; + Schema* schema = nullptr; + uint32_t num_rows = 0; + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + + do { + VLOG(3) << "start to convert delta file."; + + // 1. init RowsetBuilder of cur_tablet for current push + VLOG(3) << "init rowset builder. tablet=" << cur_tablet->full_name() + << ", block_row_size=" << cur_tablet->num_rows_per_row_block(); + RowsetWriterContext context; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.tablet_uid = cur_tablet->tablet_uid(); + context.tablet_id = cur_tablet->tablet_id(); + context.partition_id = _request.partition_id; + context.tablet_schema_hash = cur_tablet->schema_hash(); + context.rowset_type = StorageEngine::instance()->default_rowset_type(); + context.rowset_path_prefix = cur_tablet->tablet_path(); + context.tablet_schema = &(cur_tablet->tablet_schema()); + context.rowset_state = PREPARED; + context.txn_id = _request.transaction_id; + context.load_id = load_id; + // although the spark load output files are fully sorted, + // but it depends on thirparty implementation, so we conservatively + // set this value to OVERLAP_UNKNOWN + context.segments_overlap = OVERLAP_UNKNOWN; + + std::unique_ptr rowset_writer; + res = RowsetFactory::create_rowset_writer(context, &rowset_writer); + if (OLAP_SUCCESS != res) { + LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name() + << ", txn_id=" << _request.transaction_id + << ", res=" << res; + break; + } + + // 2. Init PushBrokerReader to read broker file if exist, + // in case of empty push this will be skipped. + std::string path = _request.broker_scan_range.ranges[0].path; + LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path + << ", file size=" << _request.broker_scan_range.ranges[0].file_size; + if (!path.empty()) { + reader = new(std::nothrow) PushBrokerReader(); + if (reader == nullptr) { + LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + // init schema + schema = new(std::nothrow) Schema(cur_tablet->tablet_schema()); + if (schema == nullptr) { + LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + // init Reader + if (OLAP_SUCCESS != (res = reader->init(schema, + _request.broker_scan_range, + _request.desc_tbl))) { + LOG(WARNING) << "fail to init reader. res=" << res + << ", tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_PUSH_INIT_ERROR; + break; + } + + // 3. Init Row + uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size()); + ContiguousRow row(schema, tuple_buf); + + // 4. Read data from broker and write into SegmentGroup of cur_tablet + // Convert from raw to delta + VLOG(3) << "start to convert etl file to delta."; + while (!reader->eof()) { + res = reader->next(&row); + if (OLAP_SUCCESS != res) { + LOG(WARNING) << "read next row failed." + << " res=" << res << " read_rows=" << num_rows; + break; + } else { + if (reader->eof()) { + break; + } + if (OLAP_SUCCESS != (res = rowset_writer->add_row(row))) { + LOG(WARNING) << "fail to attach row to rowset_writer. " + << "res=" << res + << ", tablet=" << cur_tablet->full_name() + << ", read_rows=" << num_rows; + break; + } + num_rows++; + } + } + + reader->print_profile(); + reader->finalize(); + } + + if (rowset_writer->flush() != OLAP_SUCCESS) { + LOG(WARNING) << "failed to finalize writer"; + break; + } + *cur_rowset = rowset_writer->build(); + if (*cur_rowset == nullptr) { + LOG(WARNING) << "fail to build rowset"; + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + _write_bytes += (*cur_rowset)->data_disk_size(); + _write_rows += (*cur_rowset)->num_rows(); + + // 5. Convert data for schema change tables + VLOG(10) << "load to related tables of schema_change if possible."; + if (new_tablet != nullptr) { + SchemaChangeHandler schema_change; + res = schema_change.schema_version_convert(cur_tablet, new_tablet, + cur_rowset, new_rowset); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to change schema version for delta." + << "[res=" << res << " new_tablet='" + << new_tablet->full_name() << "']"; + } + } + } while (0); + + SAFE_DELETE(reader); + SAFE_DELETE(schema); + VLOG(10) << "convert delta file end. res=" << res + << ", tablet=" << cur_tablet->full_name() + << ", processed_rows" << num_rows; + return res; +} + OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet, RowsetSharedPtr* cur_rowset, @@ -761,6 +915,126 @@ OLAPStatus LzoBinaryReader::_next_block() { return res; } +OLAPStatus PushBrokerReader::init(const Schema* schema, + const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl) { + // init schema + _schema = schema; + + // init runtime state, runtime profile, counter + TUniqueId dummy_id; + dummy_id.hi = 0; + dummy_id.lo = 0; + TPlanFragmentExecParams params; + params.fragment_instance_id = dummy_id; + params.query_id = dummy_id; + TExecPlanFragmentParams fragment_params; + fragment_params.params = params; + fragment_params.protocol_version = PaloInternalServiceVersion::V1; + TQueryOptions query_options; + TQueryGlobals query_globals; + _runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals, + ExecEnv::GetInstance())); + DescriptorTbl* desc_tbl = NULL; + Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_state->set_desc_tbl(desc_tbl); + status = _runtime_state->init_mem_trackers(dummy_id); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_profile.reset(new RuntimeProfile(_runtime_state->obj_pool(), "PushBrokerReader")); + _mem_tracker.reset(new MemTracker(-1)); + _mem_pool.reset(new MemPool(_mem_tracker.get())); + _counter.reset(new ScannerCounter()); + + // init scanner + BaseScanner *scanner = nullptr; + switch (t_scan_range.ranges[0].format_type) { + case TFileFormatType::FORMAT_PARQUET: + scanner = new ParquetScanner(_runtime_state.get(), + _runtime_profile.get(), + t_scan_range.params, + t_scan_range.ranges, + t_scan_range.broker_addresses, + _counter.get()); + break; + default: + LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _scanner.reset(scanner); + status = _scanner->open(); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to open scanner, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + + // init tuple + auto tuple_id = t_scan_range.params.dest_tuple_id; + _tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id; + return OLAP_ERR_PUSH_INIT_ERROR; + } + + int tuple_buffer_size = _tuple_desc->byte_size(); + void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size); + if (tuple_buffer == nullptr) { + LOG(WARNING) << "Allocate memory for tuple failed"; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _tuple = reinterpret_cast(tuple_buffer); + + _ready = true; + return OLAP_SUCCESS; +} + +OLAPStatus PushBrokerReader::next(ContiguousRow* row) { + if (!_ready || row == nullptr) { + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + + memset(_tuple, 0, _tuple_desc->num_null_bytes()); + // Get from scanner + Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Scanner get next tuple failed"; + return OLAP_ERR_PUSH_INPUT_DATA_ERROR; + } + if (_eof) { + return OLAP_SUCCESS; + } + //LOG(INFO) << "row data: " << _tuple->to_string(*_tuple_desc); + + auto slot_descs = _tuple_desc->slots(); + size_t num_key_columns = _schema->num_key_columns(); + for (size_t i = 0; i < slot_descs.size(); ++i) { + auto cell = row->cell(i); + const SlotDescriptor* slot = slot_descs[i]; + bool is_null = _tuple->is_null(slot->null_indicator_offset()); + const void* value = _tuple->get_slot(slot->tuple_offset()); + _schema->column(i)->consume(&cell, (const char*)value, is_null, + _mem_pool.get(), _runtime_state->obj_pool()); + if (i >= num_key_columns) { + _schema->column(i)->agg_finalize(&cell, _mem_pool.get()); + } + } + + return OLAP_SUCCESS; +} + +void PushBrokerReader::print_profile() { + std::stringstream ss; + _runtime_profile->pretty_print(&ss); + LOG(INFO) << ss.str(); +} + string PushHandler::_debug_version_list(const Versions& versions) const { std::ostringstream txt; txt << "Versions: "; diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 127a084624c3e9..05501c0c791856 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -22,8 +22,10 @@ #include #include +#include "exec/base_scanner.h" #include "gen_cpp/AgentService_types.h" #include "gen_cpp/MasterService_types.h" +#include "gen_cpp/PaloInternalService_types.h" #include "olap/file_helper.h" #include "olap/merger.h" #include "olap/olap_common.h" @@ -60,6 +62,11 @@ class PushHandler { int64_t write_bytes() const { return _write_bytes; } int64_t write_rows() const { return _write_rows; } private: + OLAPStatus _convert_v2( + TabletSharedPtr cur_tablet, + TabletSharedPtr new_tablet_vec, + RowsetSharedPtr* cur_rowset, + RowsetSharedPtr* new_rowset); // Convert local data file to internal formatted delta, // return new delta's SegmentGroup OLAPStatus _convert( @@ -209,6 +216,44 @@ class LzoBinaryReader: public IBinaryReader { size_t _next_row_start; }; +class PushBrokerReader { +public: + PushBrokerReader() + : _ready(false), + _eof(false) {} + ~PushBrokerReader() {} + + OLAPStatus init(const Schema* schema, + const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl); + OLAPStatus next(ContiguousRow* row); + void print_profile(); + + OLAPStatus finalize() { + _ready = false; + return OLAP_SUCCESS; + } + bool eof() { + return _eof; + } + MemPool* mem_pool() { + return _mem_pool.get(); + } + +private: + bool _ready; + bool _eof; + TupleDescriptor* _tuple_desc; + Tuple* _tuple; + const Schema* _schema; + std::unique_ptr _runtime_state; + std::unique_ptr _runtime_profile; + std::unique_ptr _mem_tracker; + std::unique_ptr _mem_pool; + std::unique_ptr _counter; + std::unique_ptr _scanner; +}; + } // namespace doris #endif // DORIS_BE_SRC_OLAP_PUSH_HANDLER_H diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 99517779625e76..7346f598d5ed98 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -59,7 +59,8 @@ EngineBatchLoadTask::~EngineBatchLoadTask() { OLAPStatus EngineBatchLoadTask::execute() { AgentStatus status = DORIS_SUCCESS; - if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE) { + if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE + || _push_req.push_type == TPushType::LOAD_V2) { status = _init(); if (status == DORIS_SUCCESS) { uint32_t retry_time = 0; @@ -116,18 +117,18 @@ AgentStatus EngineBatchLoadTask::_init() { return DORIS_PUSH_INVALID_TABLE; } - // Empty remote_path - if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { - _is_init = true; - return status; - } - // check disk capacity - if (_push_req.push_type == TPushType::LOAD) { + if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) { if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) { return DORIS_DISK_REACH_CAPACITY_LIMIT; } } + + // Empty remote_path + if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { + _is_init = true; + return status; + } // Check remote path _remote_file_path = _push_req.http_file_path; @@ -310,6 +311,8 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, PushType type = PUSH_NORMAL; if (request.push_type == TPushType::LOAD_DELETE) { type = PUSH_FOR_LOAD_DELETE; + } else if (request.push_type == TPushType::LOAD_V2) { + type = PUSH_NORMAL_V2; } int64_t duration_ns = 0; diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index fc9f2a883af5c9..8e30250f783847 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -88,3 +88,4 @@ ADD_BE_TEST(memory/schema_test) ADD_BE_TEST(memory/column_test) ADD_BE_TEST(memory/partial_row_batch_test) ADD_BE_TEST(memory/mem_tablet_test) +ADD_BE_TEST(push_handler_test) diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp new file mode 100644 index 00000000000000..ec0d0a8d558436 --- /dev/null +++ b/be/test/olap/push_handler_test.cpp @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exprs/cast_functions.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "olap/push_handler.h" +#include "olap/row.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/user_function_cache.h" + +namespace doris { + +class PushHandlerTest : public testing::Test { +public: + PushHandlerTest() { + init(); + } + static void SetUpTestCase() { + UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + } +protected: + virtual void SetUp() {} + virtual void TearDown() {} + +private: + void init(); + Schema create_schema(); + int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + void create_expr_info(); + TDescriptorTable init_desc_table(); + + TDescriptorTable _t_desc_table; + TBrokerScanRangeParams _params; +}; + +Schema PushHandlerTest::create_schema() { + std::vector columns; + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_SMALLINT, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_VARCHAR, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_SUM, OLAP_FIELD_TYPE_BIGINT, true); + Schema schema(columns, 3); + return schema; +} + +#define TUPLE_ID_DST 0 +#define TUPLE_ID_SRC 1 +#define CLOMN_NUMBERS 4 +#define DST_TUPLE_SLOT_ID_START 1 +#define SRC_TUPLE_SLOT_ID_START 5 +int PushHandlerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + const char *clomnNames[] = {"k1_int", "k2_smallint", "k3_varchar", "v_bigint"}; + for (int i = 0; i < CLOMN_NUMBERS; i++) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + slot_desc.byteOffset = i*16+8; // 8 bytes for null + slot_desc.nullIndicatorBit = i%8; + slot_desc.colName = clomnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + { + // TTupleDescriptor source + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_SRC; + t_tuple_desc.byteSize = CLOMN_NUMBERS*16+8; // 8 bytes for null + t_tuple_desc.numNullBytes = 1; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +int PushHandlerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + {//k1_int + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 4; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 1; + slot_desc.colName = "k1_int"; + slot_desc.slotIdx = 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {// k2_smallint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::SMALLINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 2; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 0; + slot_desc.colName = "k2_smallint"; + slot_desc.slotIdx = 0; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {//k3_varchar + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 16; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 3; + slot_desc.colName = "k3_varchar"; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {// v_bigint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 8; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 2; + slot_desc.colName = "v_bigint"; + slot_desc.slotIdx = 2; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_DST; + t_tuple_desc.byteSize = 32; + t_tuple_desc.numNullBytes = 1; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +TDescriptorTable PushHandlerTest::init_desc_table() { + TDescriptorTable t_desc_table; + int next_slot_id = 1; + next_slot_id = create_dst_tuple(t_desc_table, next_slot_id); + next_slot_id = create_src_tuple(t_desc_table, next_slot_id); + return t_desc_table; +} + +void PushHandlerTest::create_expr_info() { + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + // k1_int VARCHAR --> INT + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttoint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START); + } + // k2_smallint VARCHAR --> SMALLINT + { + TTypeDesc smallint_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::SMALLINT); + node.__set_scalar_type(scalar_type); + smallint_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = smallint_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttosmallint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = smallint_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttosmallint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_small_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1); + } + // k3_varchar VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2); + } + // v_bigint VARCHAR --> BIGINT + { + TTypeDesc bigint_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + bigint_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = bigint_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttobigint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = bigint_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttobigint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3); + } + + _params.__set_dest_tuple_id(TUPLE_ID_DST); + _params.__set_src_tuple_id(TUPLE_ID_SRC); +} + +void PushHandlerTest::init() { + create_expr_info(); + _t_desc_table = init_desc_table(); +} + +TEST_F(PushHandlerTest, PushBrokerReaderNormal) { + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_PARQUET; + range.splittable = false; + range.path = "./be/test/olap/test_data/push_broker_reader.parquet"; + range.file_type = TFileType::FILE_LOCAL; + broker_scan_range.ranges.push_back(range); + + ExecEnv::GetInstance()->_thread_mgr = new ThreadResourceMgr(); + + Schema schema = create_schema(); + // data + // k1_int k2_smallint varchar bigint + // 0 0 a0 0 + // 0 2 a1 3 + // 1 4 a2 6 + PushBrokerReader reader; + reader.init(&schema, broker_scan_range, _t_desc_table); + uint8_t* tuple_buf = reader.mem_pool()->allocate(schema.schema_size()); + ContiguousRow row(&schema, tuple_buf); + + // line 1 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(0, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(0, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a0", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(0, *(int64_t*)row.cell(3).cell_ptr()); + + // line 2 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(0, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(2, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a1", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(3, *(int64_t*)row.cell(3).cell_ptr()); + + // line 3 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(1, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(4, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a2", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(6, *(int64_t*)row.cell(3).cell_ptr()); + + // eof + reader.next(&row); + ASSERT_TRUE(reader.eof()); +} +} // namespace doris + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 00332c2d29b4a4b0b56d6500a4287b9e20f735c1 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 1 Jun 2020 23:30:27 +0800 Subject: [PATCH 02/11] fix run-ut.sh --- run-ut.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/run-ut.sh b/run-ut.sh index dca8eaec789522..ed0665a3fd9578 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -307,6 +307,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test +${DORIS_TEST_BINARY_DIR}/olap/push_handler_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test From 6c9a7dbb50768ac3ceb6cb48dc91a0331e831193 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 4 Jun 2020 13:50:10 +0800 Subject: [PATCH 03/11] fix ut test data --- .../olap/test_data/push_broker_reader.parquet | Bin 0 -> 1133 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 be/test/olap/test_data/push_broker_reader.parquet diff --git a/be/test/olap/test_data/push_broker_reader.parquet b/be/test/olap/test_data/push_broker_reader.parquet new file mode 100644 index 0000000000000000000000000000000000000000..371c7474c6381f77b56ec225095d1bbacb194a41 GIT binary patch literal 1133 zcmb7^Pmhy87{&)m5Va;|vqOf27(#5*O)7!bWlPhG#)C0EY`m&zhB9s0LfZveW14#K zGi*G1(|9(<&*wXo77j*N(lRse`#!(<^8lC6UOQOA5Ao>`8VMmqM{o)7#|WQbwT)#+ z$XL;d#G?di5K>XAGJ?QeLxg+nj!=_8_ZYXh4i!2sDRt!t!Y42GI_M(a<4MrjxKjdL z)Dmfjh}4E8IV~oVpHiE5sgvJ3pc>#_sUnIDS$+{^-{#ByLlm-)$exNG2!mQteUM{O zd;7AUu4pnJrQ;ckDOL~`8zPbC&L~`D`@#KrXOzY?iXhT35UxYPy?J-Ep~-kglLqWd zf{?Nq`QenSR9VxtVN=mzf^%7sv;z-|s;qt%)GtC)8T{qcwKlg5i~PWNgHZFEP|%P@ zstd}IAJtC+Q~ zx4T`t1eETVlytUC_^!ALy<5d!(<-hgsc8I(xj_*mF7#o(P;c{U( zmSbCv(el?}6d1JU1(V*y^ST}1WfRx!b_3hBU5}mHW6!_vE^MFqfpew};VW|dq37^Q IUBZXvH-%x=H~;_u literal 0 HcmV?d00001 From c97c3b0d3d92d0481e074120de3a19467aacf7c7 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 4 Jun 2020 15:17:59 +0800 Subject: [PATCH 04/11] add comments --- be/src/olap/push_handler.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index a5230bca0ac92b..4c692d5e85e07e 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -947,8 +947,9 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); return OLAP_ERR_PUSH_INIT_ERROR; } - _runtime_profile.reset(new RuntimeProfile(_runtime_state->obj_pool(), "PushBrokerReader")); - _mem_tracker.reset(new MemTracker(-1)); + _runtime_profile.reset(_runtime_state->runtime_profile()); + _runtime_profile->set_name("PushBrokerReader"); + _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker())); _mem_pool.reset(new MemPool(_mem_tracker.get())); _counter.reset(new ScannerCounter()); @@ -1010,17 +1011,22 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) { if (_eof) { return OLAP_SUCCESS; } - //LOG(INFO) << "row data: " << _tuple->to_string(*_tuple_desc); auto slot_descs = _tuple_desc->slots(); size_t num_key_columns = _schema->num_key_columns(); + + // finalize row for (size_t i = 0; i < slot_descs.size(); ++i) { auto cell = row->cell(i); const SlotDescriptor* slot = slot_descs[i]; bool is_null = _tuple->is_null(slot->null_indicator_offset()); const void* value = _tuple->get_slot(slot->tuple_offset()); + // try execute init method defined in aggregateInfo + // by default it only copies data into cell _schema->column(i)->consume(&cell, (const char*)value, is_null, _mem_pool.get(), _runtime_state->obj_pool()); + // if column(i) is a value column, try execute finalize method defined in aggregateInfo + // to convert data into final format if (i >= num_key_columns) { _schema->column(i)->agg_finalize(&cell, _mem_pool.get()); } From 32770821db2c5912dfcefab5e2c7707a495c3dc2 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 4 Jun 2020 15:41:09 +0800 Subject: [PATCH 05/11] change functionName finalize to close --- be/src/olap/push_handler.cpp | 2 +- be/src/olap/push_handler.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 4c692d5e85e07e..33c8d3093ed2cd 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -522,7 +522,7 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, } } - reader->finalize(); + reader->close(); if (!reader->validate_checksum()) { LOG(WARNING) << "pushed delta file has wrong checksum."; diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 05501c0c791856..a48716ef54f809 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -229,7 +229,7 @@ class PushBrokerReader { OLAPStatus next(ContiguousRow* row); void print_profile(); - OLAPStatus finalize() { + OLAPStatus close() { _ready = false; return OLAP_SUCCESS; } From b0cbadd4e2ba4dd9d9254d345fc083df735412e4 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 4 Jun 2020 15:43:45 +0800 Subject: [PATCH 06/11] fix ut --- be/test/olap/push_handler_test.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp index ec0d0a8d558436..dedc93b30c43a1 100644 --- a/be/test/olap/push_handler_test.cpp +++ b/be/test/olap/push_handler_test.cpp @@ -450,6 +450,8 @@ TEST_F(PushHandlerTest, PushBrokerReaderNormal) { // eof reader.next(&row); ASSERT_TRUE(reader.eof()); + + reader.close(); } } // namespace doris From 395ada00a75e6b3952ba03697d6e49c5c0a34c71 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 5 Jun 2020 13:28:44 +0800 Subject: [PATCH 07/11] reset function name to finalize --- be/src/olap/push_handler.cpp | 2 +- be/src/olap/push_handler.h | 2 +- be/test/olap/push_handler_test.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 33c8d3093ed2cd..4c692d5e85e07e 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -522,7 +522,7 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, } } - reader->close(); + reader->finalize(); if (!reader->validate_checksum()) { LOG(WARNING) << "pushed delta file has wrong checksum."; diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index a48716ef54f809..05501c0c791856 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -229,7 +229,7 @@ class PushBrokerReader { OLAPStatus next(ContiguousRow* row); void print_profile(); - OLAPStatus close() { + OLAPStatus finalize() { _ready = false; return OLAP_SUCCESS; } diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp index dedc93b30c43a1..f855667c639616 100644 --- a/be/test/olap/push_handler_test.cpp +++ b/be/test/olap/push_handler_test.cpp @@ -451,7 +451,7 @@ TEST_F(PushHandlerTest, PushBrokerReaderNormal) { reader.next(&row); ASSERT_TRUE(reader.eof()); - reader.close(); + reader.finalize(); } } // namespace doris From 9500dd7a20bfa112093346acb1cd1341dad0fd8a Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 5 Jun 2020 13:39:52 +0800 Subject: [PATCH 08/11] set function name to close --- be/src/olap/push_handler.cpp | 2 +- be/src/olap/push_handler.h | 2 +- be/test/olap/push_handler_test.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 4c692d5e85e07e..b6c461ac2faaaa 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -363,7 +363,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, } reader->print_profile(); - reader->finalize(); + reader->close(); } if (rowset_writer->flush() != OLAP_SUCCESS) { diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 05501c0c791856..a48716ef54f809 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -229,7 +229,7 @@ class PushBrokerReader { OLAPStatus next(ContiguousRow* row); void print_profile(); - OLAPStatus finalize() { + OLAPStatus close() { _ready = false; return OLAP_SUCCESS; } diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp index f855667c639616..dedc93b30c43a1 100644 --- a/be/test/olap/push_handler_test.cpp +++ b/be/test/olap/push_handler_test.cpp @@ -451,7 +451,7 @@ TEST_F(PushHandlerTest, PushBrokerReaderNormal) { reader.next(&row); ASSERT_TRUE(reader.eof()); - reader.finalize(); + reader.close(); } } // namespace doris From 1479c7d9885f2e68e923763edf49032253040b97 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 22 Jun 2020 14:43:49 +0800 Subject: [PATCH 09/11] use unique_ptr --- be/src/olap/olap_common.h | 8 ++++---- be/src/olap/push_handler.cpp | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 961571b947fc9c..ad1f5885dbd355 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -166,10 +166,10 @@ enum OLAPCompressionType { }; enum PushType { - PUSH_NORMAL = 1, - PUSH_FOR_DELETE = 2, - PUSH_FOR_LOAD_DELETE = 3, - PUSH_NORMAL_V2 = 4, + PUSH_NORMAL = 1, // for broker/hadoop load + PUSH_FOR_DELETE = 2, // for delete + PUSH_FOR_LOAD_DELETE = 3, // not use + PUSH_NORMAL_V2 = 4, // for spark load }; enum ReaderType { diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index b6c461ac2faaaa..bf8e6ec9ee54d9 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -264,7 +264,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset) { OLAPStatus res = OLAP_SUCCESS; - PushBrokerReader* reader = nullptr; + std::unique_ptr reader; Schema* schema = nullptr; uint32_t num_rows = 0; PUniqueId load_id; @@ -309,7 +309,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path << ", file size=" << _request.broker_scan_range.ranges[0].file_size; if (!path.empty()) { - reader = new(std::nothrow) PushBrokerReader(); + reader = std::make_unique(); if (reader == nullptr) { LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name(); res = OLAP_ERR_MALLOC_ERROR; @@ -394,7 +394,6 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, } } while (0); - SAFE_DELETE(reader); SAFE_DELETE(schema); VLOG(10) << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name() From e7d643d9c7317c19c917a053a51f2a50e3510924 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 22 Jun 2020 18:45:38 +0800 Subject: [PATCH 10/11] fix unique_ptr --- be/src/olap/push_handler.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index bf8e6ec9ee54d9..0d61cd45a6c415 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -264,8 +264,6 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset) { OLAPStatus res = OLAP_SUCCESS; - std::unique_ptr reader; - Schema* schema = nullptr; uint32_t num_rows = 0; PUniqueId load_id; load_id.set_hi(0); @@ -308,8 +306,9 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, std::string path = _request.broker_scan_range.ranges[0].path; LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path << ", file size=" << _request.broker_scan_range.ranges[0].file_size; + if (!path.empty()) { - reader = std::make_unique(); + std::unique_ptr reader(new(std::nothrow) PushBrokerReader()); if (reader == nullptr) { LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name(); res = OLAP_ERR_MALLOC_ERROR; @@ -317,7 +316,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, } // init schema - schema = new(std::nothrow) Schema(cur_tablet->tablet_schema()); + std::unique_ptr schema(new(std::nothrow) Schema(cur_tablet->tablet_schema())); if (schema == nullptr) { LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name(); res = OLAP_ERR_MALLOC_ERROR; @@ -394,7 +393,6 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, } } while (0); - SAFE_DELETE(schema); VLOG(10) << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name() << ", processed_rows" << num_rows; From 3bf38c090aa6ab0c43e472edca0112790f079189 Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Mon, 22 Jun 2020 19:34:16 +0800 Subject: [PATCH 11/11] fix some mistake --- be/src/olap/push_handler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 0d61cd45a6c415..62e80d8e9005d8 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -324,7 +324,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, } // init Reader - if (OLAP_SUCCESS != (res = reader->init(schema, + if (OLAP_SUCCESS != (res = reader->init(schema.get(), _request.broker_scan_range, _request.desc_tbl))) { LOG(WARNING) << "fail to init reader. res=" << res @@ -335,7 +335,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, // 3. Init Row uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size()); - ContiguousRow row(schema, tuple_buf); + ContiguousRow row(schema.get(), tuple_buf); // 4. Read data from broker and write into SegmentGroup of cur_tablet // Convert from raw to delta