diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index b30ed70cb2a447..08e7b5baaf96fd 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -158,7 +158,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, const vectorsubmit_task(task); } else if (task.push_req.push_type == TPushType::DELETE) { _delete_workers->submit_task(task); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 42f178a37252bd..e96c847f2a9444 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -618,7 +618,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { #endif LOG(INFO) << "get push task. signature: " << agent_task_req.signature - << " priority: " << priority; + << " priority: " << priority << " push_type: " << push_req.push_type; vector tablet_infos; EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req.signature, &status); @@ -645,7 +645,8 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { } if (status == DORIS_SUCCESS) { - VLOG(3) << "push ok.signature: " << agent_task_req.signature; + VLOG(3) << "push ok. signature: " << agent_task_req.signature + << ", push_type: " << push_req.push_type; error_msgs.push_back("push success"); ++_s_report_version; @@ -1104,14 +1105,14 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) disk.__set_used(root_path_info.is_used); disks[root_path_info.path] = disk; - DorisMetrics::instance()->disks_total_capacity.set_metric( - root_path_info.path, root_path_info.disk_capacity); - DorisMetrics::instance()->disks_avail_capacity.set_metric( - root_path_info.path, root_path_info.available); - DorisMetrics::instance()->disks_data_used_capacity.set_metric( - root_path_info.path, root_path_info.data_used_capacity); - DorisMetrics::instance()->disks_state.set_metric( - root_path_info.path, root_path_info.is_used ? 1L : 0L); + DorisMetrics::instance()->disks_total_capacity.set_metric(root_path_info.path, + root_path_info.disk_capacity); + DorisMetrics::instance()->disks_avail_capacity.set_metric(root_path_info.path, + root_path_info.available); + DorisMetrics::instance()->disks_data_used_capacity.set_metric(root_path_info.path, + root_path_info.data_used_capacity); + DorisMetrics::instance()->disks_state.set_metric(root_path_info.path, + root_path_info.is_used ? 1L : 0L); } request.__set_disks(disks); diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 3a62b633417a9b..ad1f5885dbd355 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -166,9 +166,10 @@ enum OLAPCompressionType { }; enum PushType { - PUSH_NORMAL = 1, - PUSH_FOR_DELETE = 2, - PUSH_FOR_LOAD_DELETE = 3, + PUSH_NORMAL = 1, // for broker/hadoop load + PUSH_FOR_DELETE = 2, // for delete + PUSH_FOR_LOAD_DELETE = 3, // not use + PUSH_NORMAL_V2 = 4, // for spark load }; enum ReaderType { diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 90b7fb840e82d8..62e80d8e9005d8 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -23,12 +23,16 @@ #include +#include "common/status.h" +#include "exec/parquet_scanner.h" +#include "olap/row.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "runtime/exec_env.h" using std::list; using std::map; @@ -184,9 +188,16 @@ OLAPStatus PushHandler::_do_streaming_ingestion( } // write - res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, - &(tablet_vars->at(0).rowset_to_add), - &(tablet_vars->at(1).rowset_to_add)); + if (push_type == PUSH_NORMAL_V2) { + res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, + &(tablet_vars->at(0).rowset_to_add), + &(tablet_vars->at(1).rowset_to_add)); + + } else { + res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet, + &(tablet_vars->at(0).rowset_to_add), + &(tablet_vars->at(1).rowset_to_add)); + } if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res << ", failed to process realtime push." @@ -248,6 +259,146 @@ void PushHandler::_get_tablet_infos(const vector& tablet_vars, } } +OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, + TabletSharedPtr new_tablet, + RowsetSharedPtr* cur_rowset, + RowsetSharedPtr* new_rowset) { + OLAPStatus res = OLAP_SUCCESS; + uint32_t num_rows = 0; + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + + do { + VLOG(3) << "start to convert delta file."; + + // 1. init RowsetBuilder of cur_tablet for current push + VLOG(3) << "init rowset builder. tablet=" << cur_tablet->full_name() + << ", block_row_size=" << cur_tablet->num_rows_per_row_block(); + RowsetWriterContext context; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.tablet_uid = cur_tablet->tablet_uid(); + context.tablet_id = cur_tablet->tablet_id(); + context.partition_id = _request.partition_id; + context.tablet_schema_hash = cur_tablet->schema_hash(); + context.rowset_type = StorageEngine::instance()->default_rowset_type(); + context.rowset_path_prefix = cur_tablet->tablet_path(); + context.tablet_schema = &(cur_tablet->tablet_schema()); + context.rowset_state = PREPARED; + context.txn_id = _request.transaction_id; + context.load_id = load_id; + // although the spark load output files are fully sorted, + // but it depends on thirparty implementation, so we conservatively + // set this value to OVERLAP_UNKNOWN + context.segments_overlap = OVERLAP_UNKNOWN; + + std::unique_ptr rowset_writer; + res = RowsetFactory::create_rowset_writer(context, &rowset_writer); + if (OLAP_SUCCESS != res) { + LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name() + << ", txn_id=" << _request.transaction_id + << ", res=" << res; + break; + } + + // 2. Init PushBrokerReader to read broker file if exist, + // in case of empty push this will be skipped. + std::string path = _request.broker_scan_range.ranges[0].path; + LOG(INFO) << "tablet=" << cur_tablet->full_name() << ", file path=" << path + << ", file size=" << _request.broker_scan_range.ranges[0].file_size; + + if (!path.empty()) { + std::unique_ptr reader(new(std::nothrow) PushBrokerReader()); + if (reader == nullptr) { + LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + // init schema + std::unique_ptr schema(new(std::nothrow) Schema(cur_tablet->tablet_schema())); + if (schema == nullptr) { + LOG(WARNING) << "fail to create schema. tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + // init Reader + if (OLAP_SUCCESS != (res = reader->init(schema.get(), + _request.broker_scan_range, + _request.desc_tbl))) { + LOG(WARNING) << "fail to init reader. res=" << res + << ", tablet=" << cur_tablet->full_name(); + res = OLAP_ERR_PUSH_INIT_ERROR; + break; + } + + // 3. Init Row + uint8_t* tuple_buf = reader->mem_pool()->allocate(schema->schema_size()); + ContiguousRow row(schema.get(), tuple_buf); + + // 4. Read data from broker and write into SegmentGroup of cur_tablet + // Convert from raw to delta + VLOG(3) << "start to convert etl file to delta."; + while (!reader->eof()) { + res = reader->next(&row); + if (OLAP_SUCCESS != res) { + LOG(WARNING) << "read next row failed." + << " res=" << res << " read_rows=" << num_rows; + break; + } else { + if (reader->eof()) { + break; + } + if (OLAP_SUCCESS != (res = rowset_writer->add_row(row))) { + LOG(WARNING) << "fail to attach row to rowset_writer. " + << "res=" << res + << ", tablet=" << cur_tablet->full_name() + << ", read_rows=" << num_rows; + break; + } + num_rows++; + } + } + + reader->print_profile(); + reader->close(); + } + + if (rowset_writer->flush() != OLAP_SUCCESS) { + LOG(WARNING) << "failed to finalize writer"; + break; + } + *cur_rowset = rowset_writer->build(); + if (*cur_rowset == nullptr) { + LOG(WARNING) << "fail to build rowset"; + res = OLAP_ERR_MALLOC_ERROR; + break; + } + + _write_bytes += (*cur_rowset)->data_disk_size(); + _write_rows += (*cur_rowset)->num_rows(); + + // 5. Convert data for schema change tables + VLOG(10) << "load to related tables of schema_change if possible."; + if (new_tablet != nullptr) { + SchemaChangeHandler schema_change; + res = schema_change.schema_version_convert(cur_tablet, new_tablet, + cur_rowset, new_rowset); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to change schema version for delta." + << "[res=" << res << " new_tablet='" + << new_tablet->full_name() << "']"; + } + } + } while (0); + + VLOG(10) << "convert delta file end. res=" << res + << ", tablet=" << cur_tablet->full_name() + << ", processed_rows" << num_rows; + return res; +} + OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet, RowsetSharedPtr* cur_rowset, @@ -761,6 +912,132 @@ OLAPStatus LzoBinaryReader::_next_block() { return res; } +OLAPStatus PushBrokerReader::init(const Schema* schema, + const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl) { + // init schema + _schema = schema; + + // init runtime state, runtime profile, counter + TUniqueId dummy_id; + dummy_id.hi = 0; + dummy_id.lo = 0; + TPlanFragmentExecParams params; + params.fragment_instance_id = dummy_id; + params.query_id = dummy_id; + TExecPlanFragmentParams fragment_params; + fragment_params.params = params; + fragment_params.protocol_version = PaloInternalServiceVersion::V1; + TQueryOptions query_options; + TQueryGlobals query_globals; + _runtime_state.reset(new RuntimeState(fragment_params, query_options, query_globals, + ExecEnv::GetInstance())); + DescriptorTbl* desc_tbl = NULL; + Status status = DescriptorTbl::create(_runtime_state->obj_pool(), t_desc_tbl, &desc_tbl); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to create descriptor table, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_state->set_desc_tbl(desc_tbl); + status = _runtime_state->init_mem_trackers(dummy_id); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + _runtime_profile.reset(_runtime_state->runtime_profile()); + _runtime_profile->set_name("PushBrokerReader"); + _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker())); + _mem_pool.reset(new MemPool(_mem_tracker.get())); + _counter.reset(new ScannerCounter()); + + // init scanner + BaseScanner *scanner = nullptr; + switch (t_scan_range.ranges[0].format_type) { + case TFileFormatType::FORMAT_PARQUET: + scanner = new ParquetScanner(_runtime_state.get(), + _runtime_profile.get(), + t_scan_range.params, + t_scan_range.ranges, + t_scan_range.broker_addresses, + _counter.get()); + break; + default: + LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _scanner.reset(scanner); + status = _scanner->open(); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to open scanner, msg: " << status.get_error_msg(); + return OLAP_ERR_PUSH_INIT_ERROR; + } + + // init tuple + auto tuple_id = t_scan_range.params.dest_tuple_id; + _tuple_desc = _runtime_state->desc_tbl().get_tuple_descriptor(tuple_id); + if (_tuple_desc == nullptr) { + std::stringstream ss; + LOG(WARNING) << "Failed to get tuple descriptor, tuple_id: " << tuple_id; + return OLAP_ERR_PUSH_INIT_ERROR; + } + + int tuple_buffer_size = _tuple_desc->byte_size(); + void* tuple_buffer = _mem_pool->allocate(tuple_buffer_size); + if (tuple_buffer == nullptr) { + LOG(WARNING) << "Allocate memory for tuple failed"; + return OLAP_ERR_PUSH_INIT_ERROR; + } + _tuple = reinterpret_cast(tuple_buffer); + + _ready = true; + return OLAP_SUCCESS; +} + +OLAPStatus PushBrokerReader::next(ContiguousRow* row) { + if (!_ready || row == nullptr) { + return OLAP_ERR_INPUT_PARAMETER_ERROR; + } + + memset(_tuple, 0, _tuple_desc->num_null_bytes()); + // Get from scanner + Status status = _scanner->get_next(_tuple, _mem_pool.get(), &_eof); + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Scanner get next tuple failed"; + return OLAP_ERR_PUSH_INPUT_DATA_ERROR; + } + if (_eof) { + return OLAP_SUCCESS; + } + + auto slot_descs = _tuple_desc->slots(); + size_t num_key_columns = _schema->num_key_columns(); + + // finalize row + for (size_t i = 0; i < slot_descs.size(); ++i) { + auto cell = row->cell(i); + const SlotDescriptor* slot = slot_descs[i]; + bool is_null = _tuple->is_null(slot->null_indicator_offset()); + const void* value = _tuple->get_slot(slot->tuple_offset()); + // try execute init method defined in aggregateInfo + // by default it only copies data into cell + _schema->column(i)->consume(&cell, (const char*)value, is_null, + _mem_pool.get(), _runtime_state->obj_pool()); + // if column(i) is a value column, try execute finalize method defined in aggregateInfo + // to convert data into final format + if (i >= num_key_columns) { + _schema->column(i)->agg_finalize(&cell, _mem_pool.get()); + } + } + + return OLAP_SUCCESS; +} + +void PushBrokerReader::print_profile() { + std::stringstream ss; + _runtime_profile->pretty_print(&ss); + LOG(INFO) << ss.str(); +} + string PushHandler::_debug_version_list(const Versions& versions) const { std::ostringstream txt; txt << "Versions: "; diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 127a084624c3e9..a48716ef54f809 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -22,8 +22,10 @@ #include #include +#include "exec/base_scanner.h" #include "gen_cpp/AgentService_types.h" #include "gen_cpp/MasterService_types.h" +#include "gen_cpp/PaloInternalService_types.h" #include "olap/file_helper.h" #include "olap/merger.h" #include "olap/olap_common.h" @@ -60,6 +62,11 @@ class PushHandler { int64_t write_bytes() const { return _write_bytes; } int64_t write_rows() const { return _write_rows; } private: + OLAPStatus _convert_v2( + TabletSharedPtr cur_tablet, + TabletSharedPtr new_tablet_vec, + RowsetSharedPtr* cur_rowset, + RowsetSharedPtr* new_rowset); // Convert local data file to internal formatted delta, // return new delta's SegmentGroup OLAPStatus _convert( @@ -209,6 +216,44 @@ class LzoBinaryReader: public IBinaryReader { size_t _next_row_start; }; +class PushBrokerReader { +public: + PushBrokerReader() + : _ready(false), + _eof(false) {} + ~PushBrokerReader() {} + + OLAPStatus init(const Schema* schema, + const TBrokerScanRange& t_scan_range, + const TDescriptorTable& t_desc_tbl); + OLAPStatus next(ContiguousRow* row); + void print_profile(); + + OLAPStatus close() { + _ready = false; + return OLAP_SUCCESS; + } + bool eof() { + return _eof; + } + MemPool* mem_pool() { + return _mem_pool.get(); + } + +private: + bool _ready; + bool _eof; + TupleDescriptor* _tuple_desc; + Tuple* _tuple; + const Schema* _schema; + std::unique_ptr _runtime_state; + std::unique_ptr _runtime_profile; + std::unique_ptr _mem_tracker; + std::unique_ptr _mem_pool; + std::unique_ptr _counter; + std::unique_ptr _scanner; +}; + } // namespace doris #endif // DORIS_BE_SRC_OLAP_PUSH_HANDLER_H diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 99517779625e76..7346f598d5ed98 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -59,7 +59,8 @@ EngineBatchLoadTask::~EngineBatchLoadTask() { OLAPStatus EngineBatchLoadTask::execute() { AgentStatus status = DORIS_SUCCESS; - if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE) { + if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE + || _push_req.push_type == TPushType::LOAD_V2) { status = _init(); if (status == DORIS_SUCCESS) { uint32_t retry_time = 0; @@ -116,18 +117,18 @@ AgentStatus EngineBatchLoadTask::_init() { return DORIS_PUSH_INVALID_TABLE; } - // Empty remote_path - if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { - _is_init = true; - return status; - } - // check disk capacity - if (_push_req.push_type == TPushType::LOAD) { + if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_V2) { if (tablet->data_dir()->reach_capacity_limit(_push_req.__isset.http_file_size)) { return DORIS_DISK_REACH_CAPACITY_LIMIT; } } + + // Empty remote_path + if (!_push_req.__isset.http_file_path || !_push_req.__isset.http_file_size) { + _is_init = true; + return status; + } // Check remote path _remote_file_path = _push_req.http_file_path; @@ -310,6 +311,8 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, PushType type = PUSH_NORMAL; if (request.push_type == TPushType::LOAD_DELETE) { type = PUSH_FOR_LOAD_DELETE; + } else if (request.push_type == TPushType::LOAD_V2) { + type = PUSH_NORMAL_V2; } int64_t duration_ns = 0; diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index fc9f2a883af5c9..8e30250f783847 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -88,3 +88,4 @@ ADD_BE_TEST(memory/schema_test) ADD_BE_TEST(memory/column_test) ADD_BE_TEST(memory/partial_row_batch_test) ADD_BE_TEST(memory/mem_tablet_test) +ADD_BE_TEST(push_handler_test) diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp new file mode 100644 index 00000000000000..dedc93b30c43a1 --- /dev/null +++ b/be/test/olap/push_handler_test.cpp @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exprs/cast_functions.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "olap/push_handler.h" +#include "olap/row.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/user_function_cache.h" + +namespace doris { + +class PushHandlerTest : public testing::Test { +public: + PushHandlerTest() { + init(); + } + static void SetUpTestCase() { + UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + } +protected: + virtual void SetUp() {} + virtual void TearDown() {} + +private: + void init(); + Schema create_schema(); + int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + void create_expr_info(); + TDescriptorTable init_desc_table(); + + TDescriptorTable _t_desc_table; + TBrokerScanRangeParams _params; +}; + +Schema PushHandlerTest::create_schema() { + std::vector columns; + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_INT, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_SMALLINT, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_VARCHAR, true); + columns.emplace_back(OLAP_FIELD_AGGREGATION_SUM, OLAP_FIELD_TYPE_BIGINT, true); + Schema schema(columns, 3); + return schema; +} + +#define TUPLE_ID_DST 0 +#define TUPLE_ID_SRC 1 +#define CLOMN_NUMBERS 4 +#define DST_TUPLE_SLOT_ID_START 1 +#define SRC_TUPLE_SLOT_ID_START 5 +int PushHandlerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + const char *clomnNames[] = {"k1_int", "k2_smallint", "k3_varchar", "v_bigint"}; + for (int i = 0; i < CLOMN_NUMBERS; i++) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + slot_desc.byteOffset = i*16+8; // 8 bytes for null + slot_desc.nullIndicatorBit = i%8; + slot_desc.colName = clomnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + { + // TTupleDescriptor source + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_SRC; + t_tuple_desc.byteSize = CLOMN_NUMBERS*16+8; // 8 bytes for null + t_tuple_desc.numNullBytes = 1; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +int PushHandlerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + {//k1_int + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 4; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 1; + slot_desc.colName = "k1_int"; + slot_desc.slotIdx = 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {// k2_smallint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::SMALLINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 2; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 0; + slot_desc.colName = "k2_smallint"; + slot_desc.slotIdx = 0; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {//k3_varchar + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 16; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 3; + slot_desc.colName = "k3_varchar"; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + {// v_bigint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = -1; + slot_desc.byteOffset = 8; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 2; + slot_desc.colName = "v_bigint"; + slot_desc.slotIdx = 2; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_DST; + t_tuple_desc.byteSize = 32; + t_tuple_desc.numNullBytes = 1; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +TDescriptorTable PushHandlerTest::init_desc_table() { + TDescriptorTable t_desc_table; + int next_slot_id = 1; + next_slot_id = create_dst_tuple(t_desc_table, next_slot_id); + next_slot_id = create_src_tuple(t_desc_table, next_slot_id); + return t_desc_table; +} + +void PushHandlerTest::create_expr_info() { + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + // k1_int VARCHAR --> INT + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttoint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttoint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START); + } + // k2_smallint VARCHAR --> SMALLINT + { + TTypeDesc smallint_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::SMALLINT); + node.__set_scalar_type(scalar_type); + smallint_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = smallint_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttosmallint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = smallint_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttosmallint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_small_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1); + } + // k3_varchar VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2); + } + // v_bigint VARCHAR --> BIGINT + { + TTypeDesc bigint_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + bigint_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = bigint_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttobigint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = bigint_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttobigint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_big_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3); + } + + _params.__set_dest_tuple_id(TUPLE_ID_DST); + _params.__set_src_tuple_id(TUPLE_ID_SRC); +} + +void PushHandlerTest::init() { + create_expr_info(); + _t_desc_table = init_desc_table(); +} + +TEST_F(PushHandlerTest, PushBrokerReaderNormal) { + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_PARQUET; + range.splittable = false; + range.path = "./be/test/olap/test_data/push_broker_reader.parquet"; + range.file_type = TFileType::FILE_LOCAL; + broker_scan_range.ranges.push_back(range); + + ExecEnv::GetInstance()->_thread_mgr = new ThreadResourceMgr(); + + Schema schema = create_schema(); + // data + // k1_int k2_smallint varchar bigint + // 0 0 a0 0 + // 0 2 a1 3 + // 1 4 a2 6 + PushBrokerReader reader; + reader.init(&schema, broker_scan_range, _t_desc_table); + uint8_t* tuple_buf = reader.mem_pool()->allocate(schema.schema_size()); + ContiguousRow row(&schema, tuple_buf); + + // line 1 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(0, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(0, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a0", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(0, *(int64_t*)row.cell(3).cell_ptr()); + + // line 2 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(0, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(2, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a1", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(3, *(int64_t*)row.cell(3).cell_ptr()); + + // line 3 + reader.next(&row); + ASSERT_FALSE(reader.eof()); + ASSERT_EQ(1, *(int32_t*)row.cell(0).cell_ptr()); + ASSERT_EQ(4, *(int16_t*)row.cell(1).cell_ptr()); + ASSERT_EQ("a2", ((Slice*)row.cell(2).cell_ptr())->to_string()); + ASSERT_EQ(6, *(int64_t*)row.cell(3).cell_ptr()); + + // eof + reader.next(&row); + ASSERT_TRUE(reader.eof()); + + reader.close(); +} +} // namespace doris + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/test_data/push_broker_reader.parquet b/be/test/olap/test_data/push_broker_reader.parquet new file mode 100644 index 00000000000000..371c7474c6381f Binary files /dev/null and b/be/test/olap/test_data/push_broker_reader.parquet differ diff --git a/run-ut.sh b/run-ut.sh index dca8eaec789522..ed0665a3fd9578 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -307,6 +307,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test +${DORIS_TEST_BINARY_DIR}/olap/push_handler_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test