diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index bdbbf8cc94d2d6..3c93dd94941ca8 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -39,10 +39,9 @@ constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31); // Embed column_values and brpc request serialization string in controller attachment. template Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) { - auto block = brpc_request->block(); - Status st = request_embed_attachment(brpc_request, block.column_values(), closure); - block.set_column_values(""); - return st; + std::string column_values = std::move(*brpc_request->mutable_block()->mutable_column_values()); + brpc_request->mutable_block()->mutable_column_values()->clear(); + return request_embed_attachment(brpc_request, column_values, closure); } inline bool enable_http_send_block(const PTransmitDataParams& request) { @@ -90,7 +89,9 @@ Status request_embed_attachment(Params* brpc_request, const std::string& data, C // step1: serialize brpc_request to string, and append to attachment. std::string req_str; - brpc_request->SerializeToString(&req_str); + if (!brpc_request->SerializeToString(&req_str)) { + return Status::InternalError("failed to serialize the request"); + } int64_t req_str_size = req_str.size(); attachment.append(&req_str_size, sizeof(req_str_size)); attachment.append(req_str); diff --git a/regression-test/suites/load_p2/test_large_data_by_rpc.groovy b/regression-test/suites/load_p2/test_large_data_by_rpc.groovy new file mode 100644 index 00000000000000..4c470ce08d6e45 --- /dev/null +++ b/regression-test/suites/load_p2/test_large_data_by_rpc.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +suite("test_large_data_by_rpc", "p2") { + def tableName = "test_large_data_by_rpc" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` INT NULL, + `id1` INT NULL, + `id2` INT NULL, + `array1` ARRAY NULL, + `map1` MAP NULL, + `struct1` STRUCT NULL, + `json1` JSON NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `id1`, `id2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', '|' + set 'compress_type', 'GZ' + + file """${getS3Url()}/regression/load/data/large_data_by_rpc.csv.gz""" + + time 30000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, 3000) + } + } +}