From 4041454910c7df8593ec8f5327bbe00ff0ed082a Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Mon, 23 Oct 2023 15:04:13 +0800 Subject: [PATCH 1/4] support multi table load --- be/src/pipeline/pipeline_fragment_context.cpp | 13 +++++++++---- be/src/pipeline/pipeline_fragment_context.h | 2 ++ .../pipeline_x/pipeline_x_fragment_context.cpp | 13 +++++++++---- be/src/runtime/plan_fragment_executor.cpp | 13 +++++++++---- be/src/runtime/plan_fragment_executor.h | 2 ++ .../apache/doris/service/FrontendServiceImpl.java | 8 +++++--- gensrc/thrift/PaloInternalService.thrift | 3 +++ 7 files changed, 39 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index bf0c1b80df5b77..530eef0264a6ad 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -249,13 +249,18 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re fragment_context->set_is_report_success(request.query_options.is_report_success); } - auto* desc_tbl = _query_ctx->desc_tbl; - _runtime_state->set_desc_tbl(desc_tbl); + if (request.is_multi_table_load) { + RETURN_IF_ERROR( + DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); + } else { + _desc_tbl = _query_ctx->desc_tbl; + } + _runtime_state->set_desc_tbl(_desc_tbl); // 2. Create ExecNode to build pipeline with PipelineFragmentContext RETURN_IF_ERROR_OR_CATCH_EXCEPTION( ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(), - request.fragment.plan, *desc_tbl, &_root_plan)); + request.fragment.plan, *_desc_tbl, &_root_plan)); // Set senders of exchange nodes before pipeline build std::vector exch_nodes; @@ -311,7 +316,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, request, idx, _root_plan->row_desc(), - _runtime_state.get(), &_sink, *desc_tbl)); + _runtime_state.get(), &_sink, *_desc_tbl)); } _root_pipeline = fragment_context->add_pipeline(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 80c38880bf332a..577cbc419298a1 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -216,6 +216,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this> _tasks; bool _group_commit; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 2c852eecac90f9..b60c0c0ab52e56 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -200,8 +200,13 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_load_job_id(request.load_job_id); } - auto* desc_tbl = _query_ctx->desc_tbl; - _runtime_state->set_desc_tbl(desc_tbl); + if (request.is_multi_table_load) { + RETURN_IF_ERROR( + DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); + } else { + _desc_tbl = _query_ctx->desc_tbl; + } + _runtime_state->set_desc_tbl(_desc_tbl); _runtime_state->set_num_per_fragment_instances(request.num_senders); // 2. Build pipelines with operators in this fragment. @@ -215,7 +220,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, - request, root_pipeline->output_row_desc(), _runtime_state.get(), *desc_tbl, + request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl, root_pipeline->id())); RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast(root_pipeline->set_sink(_sink)); @@ -402,7 +407,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_load_job_id(request.load_job_id); } - _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl); + _runtime_states[i]->set_desc_tbl(_desc_tbl); _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); std::map pipeline_id_to_task; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 265fad8882d2fc..82a3437a1e6cd5 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -165,13 +165,18 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { } // set up desc tbl - DescriptorTbl* desc_tbl = _query_ctx->desc_tbl; - _runtime_state->set_desc_tbl(desc_tbl); + if (request.is_multi_table_load) { + RETURN_IF_ERROR( + DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); + } else { + _desc_tbl = _query_ctx->desc_tbl; + } + _runtime_state->set_desc_tbl(_desc_tbl); // set up plan DCHECK(request.__isset.fragment); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree( - _runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan)); + _runtime_state.get(), obj_pool(), request.fragment.plan, *_desc_tbl, &_plan)); // set #senders of exchange nodes before calling Prepare() std::vector exch_nodes; @@ -222,7 +227,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (request.fragment.__isset.output_sink) { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params, - row_desc(), runtime_state(), &_sink, *desc_tbl)); + row_desc(), runtime_state(), &_sink, *_desc_tbl)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state())); RuntimeProfile* sink_profile = _sink->profile(); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 3bfcbe005af6a7..b3a9f6e9405962 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -242,6 +242,8 @@ class PlanFragmentExecutor { bool _group_commit = false; + DescriptorTbl* _desc_tbl; + ObjectPool* obj_pool() { return _runtime_state->obj_pool(); } // typedef for TPlanFragmentExecParams.per_node_scan_ranges diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 2cb8337db72ad2..ac95cbdcbe3dbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2013,12 +2013,14 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ for (OlapTable table : olapTables) { int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); if (enablePipelineLoad) { - planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, table, timeoutMs, - index, true)); + TPipelineFragmentParams planFragmentParams = generatePipelineStreamLoadPut(request, db, fullDbName, + table, timeoutMs, index, true); + planFragmentParams.setIsMultiTableLoad(true); + planFragmentParamsList.add(planFragmentParams); } else { TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, table, timeoutMs, index, true); - + planFragmentParams.setIsMultiTableLoad(true); planFragmentParamsList.add(planFragmentParams); } multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index a56e4f98a4fca2..5b0992e6b35f35 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -449,6 +449,8 @@ struct TExecPlanFragmentParams { 24: optional map file_scan_params 25: optional i64 wal_id + + 26: optional bool is_multi_table_load = false; } struct TExecPlanFragmentParamsList { @@ -664,6 +666,7 @@ struct TPipelineFragmentParams { // scan node id -> scan range params, only for external file scan 29: optional map file_scan_params 30: optional bool group_commit = false; + 31: optional bool is_multi_table_load = false; } struct TPipelineFragmentParamsList { From f5fcb2f1cef6fc3b55685be4ebcc9268d63fc274 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Mon, 23 Oct 2023 17:03:47 +0800 Subject: [PATCH 2/4] add regression test --- .../kafka/scripts/multi_table_csv1.csv | 2 + .../kafka/scripts/multi_table_json1.json | 2 + .../thirdparties/run-thirdparties-docker.sh | 4 +- .../routine_load/test_routine_load.out | 14 ++- .../routine_load/test_routine_load.groovy | 89 +++++++++++++++++++ 5 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv create mode 100644 docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv new file mode 100644 index 00000000000000..e69084077c8424 --- /dev/null +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv @@ -0,0 +1,2 @@ +routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} +routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json new file mode 100644 index 00000000000000..dea7aa722ba1c5 --- /dev/null +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json @@ -0,0 +1,2 @@ +routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} +routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} \ No newline at end of file diff --git a/docker/thirdparties/run-thirdparties-docker.sh b/docker/thirdparties/run-thirdparties-docker.sh index de55f9c3396526..8e8230608b240f 100755 --- a/docker/thirdparties/run-thirdparties-docker.sh +++ b/docker/thirdparties/run-thirdparties-docker.sh @@ -256,7 +256,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then local ip_host="$2" local backup_dir=/home/work/pipline/backup_center - declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv") + declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone" "multi_table_csv" "multi_table_csv1") for topic in "${topics[@]}"; do while IFS= read -r line; do @@ -267,7 +267,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv" done - declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json") + declare -a json_topics=("basic_data_json" "basic_array_data_json" "basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json" "multi_table_json1") for json_topic in "${json_topics[@]}"; do echo ${json_topics} diff --git a/regression-test/data/load_p0/routine_load/test_routine_load.out b/regression-test/data/load_p0/routine_load/test_routine_load.out index 161af660b4741d..4288223ca02476 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load.out @@ -986,4 +986,16 @@ 49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} -- !sql_multi_table_one_data -- -8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N \ No newline at end of file +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N + +-- !sql_multi_table -- +49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {} + +-- !sql_multi_table -- +49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 {} + +-- !sql_multi_table -- +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N + +-- !sql_multi_table -- +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 \N \N 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme \N true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy index 4b659df4eff984..d9560e312c56d0 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy @@ -31,6 +31,11 @@ suite("test_routine_load","p0") { "dup_tbl_basic_multi_table", ] + def multiTables1 = [ + "dup_tbl_basic", + "uniq_tbl_basic", + ] + def jobs = [ "dup_tbl_basic_job", "uniq_tbl_basic_job", @@ -127,6 +132,11 @@ suite("test_routine_load","p0") { "multi_table_json", ] + def multiTableJobName1 = [ + "multi_table_csv1", + "multi_table_json1", + ] + def formats = [ "csv", "json", @@ -980,4 +990,83 @@ suite("test_routine_load","p0") { j++ } } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def j = 0 + for (String jobName in multiTableJobName1) { + try { + for (String tableName in multiTables1) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + } + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "format" = "${formats[j]}", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${jobName}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + i = 0 + for (String tableName in multiTables1) { + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state == "NEED_SCHEDULE") { + continue; + } + assertEquals(res[0][8].toString(), "RUNNING") + break; + } + + def count = 0 + def tableName1 = "routine_load_" + tableName + while (true) { + def res = sql "select count(*) from ${tableName1}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + + if (i <= 3) { + qt_sql_multi_table "select * from ${tableName1} order by k00,k01" + } else { + qt_sql_multi_table "select * from ${tableName1} order by k00" + } + + i++ + } + } finally { + sql "stop routine load for ${jobName}" + for (String tableName in multiTables1) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + } + } + j++ + } + } } From 5de07341006a6fc90067cb1d41f449253c843081 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Wed, 25 Oct 2023 11:00:40 +0800 Subject: [PATCH 3/4] update --- be/src/pipeline/pipeline_fragment_context.cpp | 6 +++--- .../pipeline_x/pipeline_x_fragment_context.cpp | 6 +++--- be/src/runtime/plan_fragment_executor.cpp | 6 +++--- .../kafka/scripts/multi_table_csv1.csv | 3 ++- .../kafka/scripts/multi_table_json1.json | 3 ++- .../apache/doris/service/FrontendServiceImpl.java | 12 ++++-------- 6 files changed, 17 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 530eef0264a6ad..493aa2d6e54a2e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -249,11 +249,11 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re fragment_context->set_is_report_success(request.query_options.is_report_success); } - if (request.is_multi_table_load) { + if (request.is_simplified_param) { + _desc_tbl = _query_ctx->desc_tbl; + } else { RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); - } else { - _desc_tbl = _query_ctx->desc_tbl; } _runtime_state->set_desc_tbl(_desc_tbl); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index b60c0c0ab52e56..10aae2e88dbb96 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -200,11 +200,11 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_load_job_id(request.load_job_id); } - if (request.is_multi_table_load) { + if (request.is_simplified_param) { + _desc_tbl = _query_ctx->desc_tbl; + } else { RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); - } else { - _desc_tbl = _query_ctx->desc_tbl; } _runtime_state->set_desc_tbl(_desc_tbl); _runtime_state->set_num_per_fragment_instances(request.num_senders); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 82a3437a1e6cd5..abc972e471d024 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -165,11 +165,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { } // set up desc tbl - if (request.is_multi_table_load) { + if (request.is_simplified_param) { + _desc_tbl = _query_ctx->desc_tbl; + } else { RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); - } else { - _desc_tbl = _query_ctx->desc_tbl; } _runtime_state->set_desc_tbl(_desc_tbl); diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv index e69084077c8424..1df0d787733b07 100644 --- a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv @@ -1,2 +1,3 @@ routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} -routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} \ No newline at end of file +routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} +routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json index dea7aa722ba1c5..0099b0b5432501 100644 --- a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json +++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json @@ -1,2 +1,3 @@ routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} -routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} \ No newline at end of file +routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} +routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", "k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": "3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", "k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", "k16": "", "k17": "PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", "k18": "\\N"} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ac95cbdcbe3dbc..5d36690a79dd44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2013,15 +2013,11 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ for (OlapTable table : olapTables) { int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); if (enablePipelineLoad) { - TPipelineFragmentParams planFragmentParams = generatePipelineStreamLoadPut(request, db, fullDbName, - table, timeoutMs, index, true); - planFragmentParams.setIsMultiTableLoad(true); - planFragmentParamsList.add(planFragmentParams); + planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, + table, timeoutMs, index, true)); } else { - TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, - table, timeoutMs, index, true); - planFragmentParams.setIsMultiTableLoad(true); - planFragmentParamsList.add(planFragmentParams); + planFragmentParamsList.add(generatePlanFragmentParams(request, db, fullDbName, + table, timeoutMs, index, true)); } multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); } From 7691f3b8fae4425b1e1b749fce456661f71d5897 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Wed, 25 Oct 2023 11:27:35 +0800 Subject: [PATCH 4/4] update --- be/src/pipeline/pipeline_fragment_context.cpp | 1 + .../pipeline_x/pipeline_x_fragment_context.cpp | 1 + be/src/runtime/plan_fragment_executor.cpp | 1 + .../org/apache/doris/service/FrontendServiceImpl.java | 10 ++++++---- gensrc/thrift/PaloInternalService.thrift | 3 --- 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 493aa2d6e54a2e..09128e8f74a158 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -252,6 +252,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.is_simplified_param) { _desc_tbl = _query_ctx->desc_tbl; } else { + DCHECK(request.__isset.desc_tbl); RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 10aae2e88dbb96..8fb506f86e1bd1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r if (request.is_simplified_param) { _desc_tbl = _query_ctx->desc_tbl; } else { + DCHECK(request.__isset.desc_tbl); RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index abc972e471d024..57d524541a1966 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -168,6 +168,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (request.is_simplified_param) { _desc_tbl = _query_ctx->desc_tbl; } else { + DCHECK(request.__isset.desc_tbl); RETURN_IF_ERROR( DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 5d36690a79dd44..2cb8337db72ad2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2013,11 +2013,13 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ for (OlapTable table : olapTables) { int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); if (enablePipelineLoad) { - planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, - table, timeoutMs, index, true)); + planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, table, timeoutMs, + index, true)); } else { - planFragmentParamsList.add(generatePlanFragmentParams(request, db, fullDbName, - table, timeoutMs, index, true)); + TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, + table, timeoutMs, index, true); + + planFragmentParamsList.add(planFragmentParams); } multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 5b0992e6b35f35..a56e4f98a4fca2 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -449,8 +449,6 @@ struct TExecPlanFragmentParams { 24: optional map file_scan_params 25: optional i64 wal_id - - 26: optional bool is_multi_table_load = false; } struct TExecPlanFragmentParamsList { @@ -666,7 +664,6 @@ struct TPipelineFragmentParams { // scan node id -> scan range params, only for external file scan 29: optional map file_scan_params 30: optional bool group_commit = false; - 31: optional bool is_multi_table_load = false; } struct TPipelineFragmentParamsList {