From 90b2160f2fb468678749db244c446f73d2d364c2 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Fri, 15 Nov 2024 18:00:19 +0800 Subject: [PATCH 1/8] fix 1 --- be/src/util/arrow/row_batch.cpp | 9 +++- .../serde/data_type_bitmap_serde.cpp | 44 +++++++++++++++-- .../data_types/serde/data_type_bitmap_serde.h | 5 +- .../serde/data_type_date64_serde.cpp | 2 +- .../data_types/serde/data_type_hll_serde.cpp | 26 ++++++++-- .../data_types/serde/data_type_ipv6_serde.cpp | 2 +- .../serde/data_type_number_serde.cpp | 2 +- .../serde/data_type_quantilestate_serde.h | 49 +++++++++++++++++-- .../apache/doris/analysis/OutFileClause.java | 17 ++----- 9 files changed, 124 insertions(+), 32 deletions(-) diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index dd11d5ae46f740..a0cd77aee41931 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr #include #include @@ -96,6 +97,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr result.writeEndBinary(); } +void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, + arrow::ArrayBuilder* array_builder, int start, + int end, const cctz::time_zone& ctz) const { + const auto& col = assert_cast(column); + auto& builder = assert_cast(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& bitmap_value = const_cast(col.get_element(string_i)); + std::string memory_buffer(bitmap_value.getSizeInBytes(), '0'); + bitmap_value.write_to(memory_buffer.data()); + checkArrowStatus( + builder.Append(memory_buffer.data(), static_cast(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } +} + void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const { auto& col = reinterpret_cast(column); auto blob = static_cast(arg); @@ -148,11 +169,28 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + if (!ptr) { + return Status::InternalError( + "malloc memory error when write largeint column data to orc file."); + } + StringRef bufferRef; + bufferRef.data = ptr; + bufferRef.size = BUFFER_UNIT_SIZE; + size_t offset = 0; + buffer_list.emplace_back(bufferRef); + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast(ele.data); - cur_batch->length[row_id] = ele.size; + auto bitmap_value = const_cast(col_data.get_element(row_id)); + size_t len = bitmap_value.getSizeInBytes(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + bitmap_value.write_to(const_cast(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index a4be5b8ec204f1..3fc96c40b9ddeb 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -63,10 +63,7 @@ class DataTypeBitMapSerDe : public DataTypeSerDe { void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, - const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); - } + const cctz::time_zone& ctz) const override; void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 6cb5b31ae387a2..48a4b2c16785e9 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -299,6 +299,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con bufferRef.size = BUFFER_UNIT_SIZE; size_t offset = 0; const size_t begin_off = offset; + buffer_list.emplace_back(bufferRef); for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -321,7 +322,6 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con } } - buffer_list.emplace_back(bufferRef); cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index c22bb31862e41c..2c4794d7fe6741 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include "arrow/array/builder_binary.h" @@ -139,7 +140,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const NullMa arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, const cctz::time_zone& ctz) const { const auto& col = assert_cast(column); - auto& builder = assert_cast(*array_builder); + auto& builder = assert_cast(*array_builder); for (size_t string_i = start; string_i < end; ++string_i) { if (null_map && (*null_map)[string_i]) { checkArrowStatus(builder.AppendNull(), column.get_name(), @@ -198,11 +199,28 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + if (!ptr) { + return Status::InternalError( + "malloc memory error when write largeint column data to orc file."); + } + StringRef bufferRef; + bufferRef.data = ptr; + bufferRef.size = BUFFER_UNIT_SIZE; + size_t offset = 0; + buffer_list.emplace_back(bufferRef); + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - const auto& ele = col_data.get_data_at(row_id); - cur_batch->data[row_id] = const_cast(ele.data); - cur_batch->length[row_id] = ele.size; + auto hll_value = const_cast(col_data.get_element(row_id)); + size_t len = hll_value.max_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + hll_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index ac4dbc030432b8..643d136c22e0c6 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -197,6 +197,7 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const bufferRef.size = BUFFER_UNIT_SIZE; size_t offset = 0; const size_t begin_off = offset; + buffer_list.emplace_back(bufferRef); for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -218,7 +219,6 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const data_off += cur_batch->length[row_id]; } } - buffer_list.emplace_back(bufferRef); cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index 9416fc9a8b3020..9b2ad5676f8a58 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -352,6 +352,7 @@ Status DataTypeNumberSerDe::write_column_to_orc(const std::string& timezone, bufferRef.size = BUFFER_UNIT_SIZE; size_t offset = 0; const size_t begin_off = offset; + buffer_list.emplace_back(bufferRef); for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -373,7 +374,6 @@ Status DataTypeNumberSerDe::write_column_to_orc(const std::string& timezone, data_off += cur_batch->length[row_id]; } } - buffer_list.emplace_back(bufferRef); cur_batch->numElements = end - start; } else if constexpr (std::is_same_v || std::is_same_v) { // tinyint/boolean WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch) diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index 9608bf1cb7899a..f15e6423c571f8 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -102,8 +103,21 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe { void write_column_to_arrow(const IColumn& column, const NullMap* null_map, arrow::ArrayBuilder* array_builder, int64_t start, int64_t end, const cctz::time_zone& ctz) const override { - throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, - "write_column_to_arrow with type " + column.get_name()); + const auto& col = assert_cast(column); + auto& builder = assert_cast(*array_builder); + for (size_t string_i = start; string_i < end; ++string_i) { + if (null_map && (*null_map)[string_i]) { + checkArrowStatus(builder.AppendNull(), column.get_name(), + array_builder->type()->name()); + } else { + auto& quantile_state_value = const_cast(col.get_element(string_i)); + std::string memory_buffer(quantile_state_value.get_serialized_size(), '0'); + quantile_state_value.serialize((uint8_t*)memory_buffer.data()); + checkArrowStatus(builder.Append(memory_buffer.data(), + static_cast(memory_buffer.size())), + column.get_name(), array_builder->type()->name()); + } + } } void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start, int end, const cctz::time_zone& ctz) const override { @@ -126,7 +140,36 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe { const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, std::vector& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto& col_data = assert_cast(column); + orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); + + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + if (!ptr) { + return Status::InternalError( + "malloc memory error when write largeint column data to orc file."); + } + StringRef bufferRef; + bufferRef.data = ptr; + bufferRef.size = BUFFER_UNIT_SIZE; + size_t offset = 0; + buffer_list.emplace_back(bufferRef); + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto quantilestate_value = const_cast(col_data.get_element(row_id)); + size_t len = quantilestate_value.get_serialized_size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + quantilestate_value.serialize((uint8_t*)(bufferRef.data) + offset); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } private: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index ef65b405853765..201860f423d614 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -36,7 +36,6 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TParquetCompressionType; @@ -302,11 +301,8 @@ private String dorisTypeToOrcTypeMap(Type dorisType) throws AnalysisException { break; case HLL: case BITMAP: - if (!(ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary())) { - break; - } - orcType = "string"; + case QUANTILE_STATE: + orcType = "binary"; break; case DATEV2: orcType = "date"; @@ -455,13 +451,8 @@ private void analyzeForOrcFormat(List resultExprs, List colLabels) break; case HLL: case BITMAP: - if (ConnectContext.get() != null && ConnectContext.get() - .getSessionVariable().isReturnObjectDataAsBinary()) { - checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); - } else { - throw new AnalysisException("Orc format does not support column type: " - + resultType.getPrimitiveType()); - } + case QUANTILE_STATE: + checkOrcType(schema.second, "binary", true, resultType.getPrimitiveType().toString()); break; case STRUCT: checkOrcType(schema.second, "struct", false, resultType.getPrimitiveType().toString()); From 02d6ac94ae31c046b47bf86db4516cc832d7cfcc Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Mon, 18 Nov 2024 17:31:59 +0800 Subject: [PATCH 2/8] fix 2 --- .../outfile/test_outfile_complex_type.out | 17 ++++ .../outfile/test_outfile_complex_type.groovy | 92 +++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 regression-test/data/export_p0/outfile/test_outfile_complex_type.out create mode 100644 regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy diff --git a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out new file mode 100644 index 00000000000000..aa22d97c0e17e7 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 0501F300000000000000 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F 050604000000000000000100000000000000050000000000000002000000000000006FA10600000000000300000000000000 +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 050186D6120000000000 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 0501388E109B15080000 +20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000 +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000 + +-- !select_load_orc -- +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 0501F300000000000000 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F 050604000000000000000100000000000000050000000000000002000000000000006FA10600000000000300000000000000 +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 050186D6120000000000 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 0501388E109B15080000 +20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000 +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000 + diff --git a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy new file mode 100644 index 00000000000000..3f3f270e8fb5d0 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_complex_type", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_complex_type_table" + def outFilePath = "${bucket}/outfile/complex_type/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `price` quantile_state QUANTILE_UNION NOT NULL COMMENT "", + `hll_t` hll hll_union, + `device_id` bitmap BITMAP_UNION + ) ENGINE=OLAP + AGGREGATE KEY(`dt`, `id`) + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, to_quantile_state(1, 2048), hll_hash(1), to_bitmap(243)), + (20220201,1, to_quantile_state(-1, 2048), hll_hash(2), bitmap_from_array([1,2,3,4,5,434543])), + (20220201,2, to_quantile_state(0, 2048), hll_hash(3), to_bitmap(1234566)), + (20220201,3, to_quantile_state(1, 2048), hll_hash(4), to_bitmap(8888888888888)), + (20220201,4, to_quantile_state(2, 2048), hll_hash(5), to_bitmap(98392819412234)), + (20220201,5, to_quantile_state(3, 2048), hll_hash(6), to_bitmap(253234234)); + """ + + // parquet file format + def outfile_url = outfile_to_S3("parquet") + qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.parquet", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "parquet", + "region" = "${region}" + ); + """ + + // orc file foramt + outfile_url = outfile_to_S3("orc") + qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.orc", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "orc", + "region" = "${region}" + ); + """ +} \ No newline at end of file From c9267bda118be5ef303c03c1f1536f3b68d58ed2 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Tue, 19 Nov 2024 20:14:18 +0800 Subject: [PATCH 3/8] fix 3 --- .../serde/data_type_bitmap_serde.cpp | 24 ++++ .../data_types/serde/data_type_bitmap_serde.h | 8 +- .../data_types/serde/data_type_hll_serde.cpp | 31 ++---- .../serde/data_type_jsonb_serde.cpp | 20 +++- .../serde/data_type_object_serde.cpp | 28 +++++ .../data_types/serde/data_type_object_serde.h | 4 +- .../serde/data_type_quantilestate_serde.h | 16 ++- .../outfile/test_outfile_complex_type.out | 8 ++ .../test_outfile_jsonb_and_variant.out | 25 +++++ .../outfile/test_outfile_complex_type.groovy | 26 ++++- .../test_outfile_jsonb_and_variant.groovy | 104 ++++++++++++++++++ 11 files changed, 255 insertions(+), 39 deletions(-) create mode 100644 regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out create mode 100644 regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index b7efd8da674879..bb9332d8d6ba17 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -28,6 +28,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { @@ -35,6 +36,29 @@ namespace vectorized { class IColumn; #include "common/compile_check_begin.h" +Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int start_idx, + int end_idx, BufferWritable& bw, + FormatOptions& options) const { + SERIALIZE_COLUMN_TO_JSON(); +} + +Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num, + BufferWritable& bw, + FormatOptions& options) const { + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); +} + Status DataTypeBitMapSerDe::deserialize_column_from_json_vector( IColumn& column, std::vector& slices, int* num_deserialized, const FormatOptions& options) const { diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h b/be/src/vec/data_types/serde/data_type_bitmap_serde.h index 3fc96c40b9ddeb..24c2e6f930d203 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h @@ -36,14 +36,10 @@ class DataTypeBitMapSerDe : public DataTypeSerDe { DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {}; Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, - FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); - } + FormatOptions& options) const override; Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx, - BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); - } + BufferWritable& bw, FormatOptions& options) const override; Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override; diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index 2c4794d7fe6741..f63124f291219f 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -49,28 +49,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int64_t Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, FormatOptions& options) const { - if (!options._output_object_data) { - /** - * For null values in ordinary types, we use \N to represent them; - * for null values in nested types, we use null to represent them, just like the json format. - */ - if (_nesting_level >= 2) { - bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), - strlen(NULL_IN_COMPLEX_TYPE.c_str())); - } else { - bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), - strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); - } - return Status::OK(); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); } - auto col_row = check_column_const_set_readability(column, row_num); - ColumnPtr ptr = col_row.first; - row_num = col_row.second; - auto& data = const_cast(assert_cast(*ptr).get_element(row_num)); - std::unique_ptr buf = - std::make_unique_for_overwrite(data.max_serialized_size()); - size_t size = data.serialize((uint8*)buf.get()); - bw.write(buf.get(), size); return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 10218e4164d5c1..ea66d391e655ff 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -142,7 +142,25 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, std::vector& buffer_list) const { - return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name()); + auto* cur_batch = dynamic_cast(orc_col_batch); + const auto& string_column = assert_cast(column); + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + std::string_view string_ref = string_column.get_data_at(row_id).to_string_view(); + auto* serialized_value = new std::string(); + *serialized_value = + JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size()); + auto len = serialized_value->length(); + StringRef bufferRef(*serialized_value); + buffer_list.emplace_back(bufferRef); + cur_batch->data[row_id] = const_cast(bufferRef.data); + cur_batch->length[row_id] = len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); } void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target, diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index f67194372857cf..35fe26e8f255b4 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -20,6 +20,7 @@ #include #include +#include #include "common/exception.h" #include "common/status.h" @@ -164,6 +165,33 @@ void DataTypeObjectSerDe::write_column_to_arrow(const IColumn& column, const Nul } } +Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column, + const NullMap* null_map, + orc::ColumnVectorBatch* orc_col_batch, int start, + int end, + std::vector& buffer_list) const { + const auto* var = check_and_get_column(column); + orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); + + for (size_t row_id = start; row_id < end; row_id++) { + if (cur_batch->notNull[row_id] == 1) { + auto* serialized_value = new std::string(); + if (!var->serialize_one_row_to_string(row_id, serialized_value)) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", + var->dump_structure()); + } + auto len = serialized_value->length(); + StringRef bufferRef(*serialized_value); + buffer_list.emplace_back(bufferRef); + cur_batch->data[row_id] = const_cast(bufferRef.data); + cur_batch->length[row_id] = len; + } + } + + cur_batch->numElements = end - start; + return Status::OK(); +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/data_types/serde/data_type_object_serde.h b/be/src/vec/data_types/serde/data_type_object_serde.h index 414755ef0f8d04..c08d4d0af0d2c3 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.h +++ b/be/src/vec/data_types/serde/data_type_object_serde.h @@ -89,9 +89,7 @@ class DataTypeObjectSerDe : public DataTypeSerDe { Status write_column_to_orc(const std::string& timezone, const IColumn& column, const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch, int64_t start, int64_t end, - std::vector& buffer_list) const override { - return Status::NotSupported("write_column_to_orc with type " + column.get_name()); - } + std::vector& buffer_list) const override; private: template diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index f15e6423c571f8..d64552e46a87d6 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -33,6 +33,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/string_ref.h" +#include "vec/data_types/serde/data_type_nullable_serde.h" namespace doris { @@ -44,12 +45,23 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe { Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name()); + /** + * For null values in ordinary types, we use \N to represent them; + * for null values in nested types, we use null to represent them, just like the json format. + */ + if (_nesting_level >= 2) { + bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(), + strlen(NULL_IN_COMPLEX_TYPE.c_str())); + } else { + bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), + strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str())); + } + return Status::OK(); } Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx, BufferWritable& bw, FormatOptions& options) const override { - return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name()); + SERIALIZE_COLUMN_TO_JSON(); } Status deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const override { diff --git a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out index aa22d97c0e17e7..914602586b3ae7 100644 --- a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out +++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out @@ -15,3 +15,11 @@ 20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000 20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000 +-- !select_load_orc -- +20220201 0 \N \N \N +20220201 1 \N \N \N +20220201 2 \N \N \N +20220201 3 \N \N \N +20220201 4 \N \N \N +20220201 5 \N \N \N + diff --git a/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out new file mode 100644 index 00000000000000..d25830939645f6 --- /dev/null +++ b/regression-test/data/export_p0/outfile/test_outfile_jsonb_and_variant.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_load_parquet -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + +-- !select_load_orc -- +20220201 0 {"k1":"100"} {"k1":"100"} +20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"} +20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"} +20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123} +20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"} +20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"} + diff --git a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy index 3f3f270e8fb5d0..e9bf4a6fed0f7b 100644 --- a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy +++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy @@ -69,23 +69,37 @@ suite("test_outfile_complex_type", "p0") { """ // parquet file format - def outfile_url = outfile_to_S3("parquet") + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( - "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.parquet", + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}", - "format" = "parquet", + "format" = "${format}", "region" = "${region}" ); """ // orc file foramt - outfile_url = outfile_to_S3("orc") + format = "orc" + outfile_url = outfile_to_S3("${format}") qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( - "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.orc", + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}", - "format" = "orc", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", "region" = "${region}" ); """ diff --git a/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy new file mode 100644 index 00000000000000..ed3019436aef29 --- /dev/null +++ b/regression-test/suites/export_p0/outfile/test_outfile_jsonb_and_variant.groovy @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_outfile_jsonb_and_variant", "p0") { + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + + def export_table_name = "test_outfile_jsonb_and_variant_table" + def outFilePath = "${bucket}/outfile/jsonb_and_variant/exp_" + + def outfile_to_S3 = { format -> + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${export_table_name} t + INTO OUTFILE "s3://${outFilePath}" + FORMAT AS ${format} + PROPERTIES ( + "s3.endpoint" = "${s3_endpoint}", + "s3.region" = "${region}", + "s3.secret_key"="${sk}", + "s3.access_key" = "${ak}" + ); + """ + + return res[0][3] + } + + sql """ DROP TABLE IF EXISTS ${export_table_name} """ + sql """ + CREATE TABLE `${export_table_name}` ( + `dt` int(11) NULL COMMENT "", + `id` int(11) NULL COMMENT "", + `json_col` JSON NULL COMMENT "", + `variant_col` variant NULL COMMENT "" + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`dt`) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + INSERT INTO `${export_table_name}` values + (20220201,0, '{"k1": "100"}', '{"k1": "100"}'), + (20220201,1, '{"k1": "100", "k2": "123"}', '{"k1": "100", "k2": "123"}'), + (20220201,2, '{"k1": "100", "abc": "567"}', '{"k1": "100", "abc": "567"}'), + (20220201,3, '{"k1": "100", "k3": 123}', '{"k1": "100", "k3": 123}'), + (20220201,4, '{"k1": "100", "doris": "nereids"}', '{"k1": "100", "doris": "nereids"}'), + (20220201,5, '{"k1": "100", "doris": "pipeline"}', '{"k1": "100", "doris": "pipeline"}'); + """ + + // parquet file format + def format = "parquet" + def outfile_url = outfile_to_S3("${format}") + qt_select_load_parquet """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "orc" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ + + // orc file foramt + format = "csv" + outfile_url = outfile_to_S3("${format}") + qt_select_load_orc """ SELECT * FROM S3 ( + "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", + "ACCESS_KEY"= "${ak}", + "SECRET_KEY" = "${sk}", + "format" = "${format}", + "region" = "${region}" + ); + """ +} \ No newline at end of file From 20c9c5676945d2ac0b04b52cfb033d4e2b370a93 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Wed, 20 Nov 2024 15:29:40 +0800 Subject: [PATCH 4/8] fix 4 --- be/src/vec/data_types/serde/data_type_bitmap_serde.cpp | 10 +++++----- be/src/vec/data_types/serde/data_type_object_serde.cpp | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index bb9332d8d6ba17..8c677425310138 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -36,13 +36,13 @@ namespace vectorized { class IColumn; #include "common/compile_check_begin.h" -Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int start_idx, - int end_idx, BufferWritable& bw, +Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int64_t start_idx, + int64_t end_idx, BufferWritable& bw, FormatOptions& options) const { SERIALIZE_COLUMN_TO_JSON(); } -Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num, +Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw, FormatOptions& options) const { /** @@ -122,8 +122,8 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr } void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map, - arrow::ArrayBuilder* array_builder, int start, - int end, const cctz::time_zone& ctz) const { + arrow::ArrayBuilder* array_builder, int64_t start, + int64_t end, const cctz::time_zone& ctz) const { const auto& col = assert_cast(column); auto& builder = assert_cast(*array_builder); for (size_t string_i = start; string_i < end; ++string_i) { diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 35fe26e8f255b4..03c0b771abe8eb 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -167,8 +167,8 @@ void DataTypeObjectSerDe::write_column_to_arrow(const IColumn& column, const Nul Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column, const NullMap* null_map, - orc::ColumnVectorBatch* orc_col_batch, int start, - int end, + orc::ColumnVectorBatch* orc_col_batch, + int64_t start, int64_t end, std::vector& buffer_list) const { const auto* var = check_and_get_column(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); From f3c9d90e5300bd5d0e0c5a5070cdee43bba30017 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Thu, 21 Nov 2024 09:55:38 +0800 Subject: [PATCH 5/8] fix 5 --- .../main/java/org/apache/doris/analysis/OutFileClause.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 201860f423d614..026e4da29b59da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -323,6 +323,8 @@ private String dorisTypeToOrcTypeMap(Type dorisType) throws AnalysisException { case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: orcType = "string"; break; case DECIMALV2: @@ -441,6 +443,8 @@ private void analyzeForOrcFormat(List resultExprs, List colLabels) case DATE: case DATETIME: case IPV6: + case VARIANT: + case JSONB: checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString()); break; case DECIMAL32: From d325d5249e590abceda6d3ca3dc5e3cdf8e59017 Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Fri, 22 Nov 2024 14:18:30 +0800 Subject: [PATCH 6/8] fix memory problem --- .../serde/data_type_jsonb_serde.cpp | 28 ++++++++++++++----- .../serde/data_type_object_serde.cpp | 24 ++++++++++++---- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index ea66d391e655ff..75e89e9dcf3d41 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -23,6 +23,7 @@ #include #include +#include #include "arrow/array/builder_binary.h" #include "common/exception.h" @@ -145,17 +146,30 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons auto* cur_batch = dynamic_cast(orc_col_batch); const auto& string_column = assert_cast(column); + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + if (!ptr) { + return Status::InternalError( + "malloc memory error when write largeint column data to orc file."); + } + StringRef bufferRef; + bufferRef.data = ptr; + bufferRef.size = BUFFER_UNIT_SIZE; + size_t offset = 0; + buffer_list.emplace_back(bufferRef); + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { std::string_view string_ref = string_column.get_data_at(row_id).to_string_view(); - auto* serialized_value = new std::string(); - *serialized_value = - JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size()); - auto len = serialized_value->length(); - StringRef bufferRef(*serialized_value); - buffer_list.emplace_back(bufferRef); - cur_batch->data[row_id] = const_cast(bufferRef.data); + auto serialized_value = std::make_unique( + JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size())); + auto len = serialized_value->size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; cur_batch->length[row_id] = len; + offset += len; } } diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index 03c0b771abe8eb..d489739c622a00 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -173,18 +173,32 @@ Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, con const auto* var = check_and_get_column(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); + if (!ptr) { + return Status::InternalError( + "malloc memory error when write largeint column data to orc file."); + } + StringRef bufferRef; + bufferRef.data = ptr; + bufferRef.size = BUFFER_UNIT_SIZE; + size_t offset = 0; + buffer_list.emplace_back(bufferRef); + for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - auto* serialized_value = new std::string(); - if (!var->serialize_one_row_to_string(row_id, serialized_value)) { + auto serialized_value = std::make_unique(); + if (!var->serialize_one_row_to_string(row_id, serialized_value.get())) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}", var->dump_structure()); } auto len = serialized_value->length(); - StringRef bufferRef(*serialized_value); - buffer_list.emplace_back(bufferRef); - cur_batch->data[row_id] = const_cast(bufferRef.data); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + memcpy(const_cast(bufferRef.data) + offset, serialized_value->data(), len); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; cur_batch->length[row_id] = len; + offset += len; } } From 005a47c98156457559d99b12be67abfe0fea80cf Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Mon, 25 Nov 2024 14:30:32 +0800 Subject: [PATCH 7/8] refactor --- .../serde/data_type_bitmap_serde.cpp | 11 +----- .../serde/data_type_date64_serde.cpp | 20 +--------- .../data_types/serde/data_type_hll_serde.cpp | 11 +----- .../data_types/serde/data_type_ipv6_serde.cpp | 37 ++++++------------- .../serde/data_type_jsonb_serde.cpp | 11 +----- .../serde/data_type_number_serde.cpp | 36 +++++------------- .../serde/data_type_object_serde.cpp | 11 +----- .../serde/data_type_quantilestate_serde.h | 11 +----- be/src/vec/data_types/serde/data_type_serde.h | 12 ++++++ 9 files changed, 40 insertions(+), 120 deletions(-) diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp index 8c677425310138..5d024a418340e8 100644 --- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp @@ -193,16 +193,7 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_date64_serde.cpp b/be/src/vec/data_types/serde/data_type_date64_serde.cpp index 48a4b2c16785e9..6cc83faf7a5b52 100644 --- a/be/src/vec/data_types/serde/data_type_date64_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_date64_serde.cpp @@ -289,17 +289,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con auto& col_data = static_cast&>(column).get_data(); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 0) { @@ -311,16 +301,10 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con REALLOC_MEMORY_FOR_ORC_WRITER() + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; cur_batch->length[row_id] = len; offset += len; } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; - } - } cur_batch->numElements = end - start; return Status::OK(); diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp b/be/src/vec/data_types/serde/data_type_hll_serde.cpp index f63124f291219f..42260b092605e1 100644 --- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp @@ -188,16 +188,7 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp index 643d136c22e0c6..e899de93c90ce0 100644 --- a/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_ipv6_serde.cpp @@ -187,38 +187,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const std::vector& buffer_list) const { const auto& col_data = assert_cast(column).get_data(); orc::StringVectorBatch* cur_batch = assert_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); - - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); - size_t len = ipv6_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + INIT_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast(bufferRef.data) + offset, ipv6_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + std::string ipv6_str = IPv6Value::to_string(col_data[row_id]); + size_t len = ipv6_str.size(); + + REALLOC_MEMORY_FOR_ORC_WRITER() + + strcpy(const_cast(bufferRef.data) + offset, ipv6_str.c_str()); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } + cur_batch->numElements = end - start; return Status::OK(); } diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp index 75e89e9dcf3d41..adc041f511198e 100644 --- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp @@ -146,16 +146,7 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons auto* cur_batch = dynamic_cast(orc_col_batch); const auto& string_column = assert_cast(column); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp b/be/src/vec/data_types/serde/data_type_number_serde.cpp index 9b2ad5676f8a58..55c7b2c9505dae 100644 --- a/be/src/vec/data_types/serde/data_type_number_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp @@ -342,38 +342,22 @@ Status DataTypeNumberSerDe::write_column_to_orc(const std::string& timezone, if constexpr (std::is_same_v) { // largeint orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - const size_t begin_off = offset; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 0) { - continue; - } - std::string value_str = fmt::format("{}", col_data[row_id]); - size_t len = value_str.size(); + if (cur_batch->notNull[row_id] == 1) { + std::string value_str = fmt::format("{}", col_data[row_id]); + size_t len = value_str.size(); - REALLOC_MEMORY_FOR_ORC_WRITER() + REALLOC_MEMORY_FOR_ORC_WRITER() - strcpy(const_cast(bufferRef.data) + offset, value_str.c_str()); - offset += len; - cur_batch->length[row_id] = len; - } - size_t data_off = 0; - for (size_t row_id = start; row_id < end; row_id++) { - if (cur_batch->notNull[row_id] == 1) { - cur_batch->data[row_id] = const_cast(bufferRef.data) + begin_off + data_off; - data_off += cur_batch->length[row_id]; + strcpy(const_cast(bufferRef.data) + offset, value_str.c_str()); + cur_batch->data[row_id] = const_cast(bufferRef.data) + offset; + cur_batch->length[row_id] = len; + offset += len; } } + cur_batch->numElements = end - start; } else if constexpr (std::is_same_v || std::is_same_v) { // tinyint/boolean WRITE_INTEGRAL_COLUMN_TO_ORC(orc::ByteVectorBatch) diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index d489739c622a00..fc536d9ef0df7b 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -173,16 +173,7 @@ Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, con const auto* var = check_and_get_column(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h index d64552e46a87d6..d3526ba389925f 100644 --- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h +++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h @@ -155,16 +155,7 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe { auto& col_data = assert_cast(column); orc::StringVectorBatch* cur_batch = dynamic_cast(orc_col_batch); - char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); - if (!ptr) { - return Status::InternalError( - "malloc memory error when write largeint column data to orc file."); - } - StringRef bufferRef; - bufferRef.data = ptr; - bufferRef.size = BUFFER_UNIT_SIZE; - size_t offset = 0; - buffer_list.emplace_back(bufferRef); + INIT_MEMORY_FOR_ORC_WRITER() for (size_t row_id = start; row_id < end; row_id++) { if (cur_batch->notNull[row_id] == 1) { diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index f0e9eb27961439..a9200d1fccf316 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -77,6 +77,18 @@ struct ColumnVectorBatch; ++*num_deserialized; \ } +#define INIT_MEMORY_FOR_ORC_WRITER() \ + char* ptr = (char*)malloc(BUFFER_UNIT_SIZE); \ + if (!ptr) { \ + return Status::InternalError( \ + "malloc memory error when write largeint column data to orc file."); \ + } \ + StringRef bufferRef; \ + bufferRef.data = ptr; \ + bufferRef.size = BUFFER_UNIT_SIZE; \ + size_t offset = 0; \ + buffer_list.emplace_back(bufferRef); + #define REALLOC_MEMORY_FOR_ORC_WRITER() \ while (bufferRef.size - BUFFER_RESERVED_SIZE < offset + len) { \ char* new_ptr = (char*)malloc(bufferRef.size + BUFFER_UNIT_SIZE); \ From 74a1c2c2854594bad66b5b52613b687d082c45ae Mon Sep 17 00:00:00 2001 From: TieweiFang Date: Wed, 27 Nov 2024 16:03:53 +0800 Subject: [PATCH 8/8] fix test --- .../outfile/test_outfile_complex_type.out | 26 +++++++++---------- .../outfile/test_outfile_complex_type.groovy | 8 +++--- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out index 914602586b3ae7..cd6f000b6c5052 100644 --- a/regression-test/data/export_p0/outfile/test_outfile_complex_type.out +++ b/regression-test/data/export_p0/outfile/test_outfile_complex_type.out @@ -1,21 +1,21 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_load_parquet -- -20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 0501F300000000000000 -20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F 050604000000000000000100000000000000050000000000000002000000000000006FA10600000000000300000000000000 -20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 050186D6120000000000 -20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 0501388E109B15080000 -20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000 -20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000 +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C -- !select_load_orc -- -20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 0501F300000000000000 -20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F 050604000000000000000100000000000000050000000000000002000000000000006FA10600000000000300000000000000 -20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 050186D6120000000000 -20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 0501388E109B15080000 -20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000 -20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000 +20220201 0 0000004501000000000000F03F 0101675D86AC33FA8CD6 +20220201 1 0000004501000000000000F0BF 01010B3C52B765A11A2F +20220201 2 00000045010000000000000000 0101DDEA60F9C89AA329 +20220201 3 0000004501000000000000F03F 0101EF81F59130F8B748 +20220201 4 00000045010000000000000040 010114CAA737BD54146E +20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C --- !select_load_orc -- +-- !select_load_csv -- 20220201 0 \N \N \N 20220201 1 \N \N \N 20220201 2 \N \N \N diff --git a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy index e9bf4a6fed0f7b..49f81732791126 100644 --- a/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy +++ b/regression-test/suites/export_p0/outfile/test_outfile_complex_type.groovy @@ -71,7 +71,7 @@ suite("test_outfile_complex_type", "p0") { // parquet file format def format = "parquet" def outfile_url = outfile_to_S3("${format}") - qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( + qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}", @@ -83,7 +83,7 @@ suite("test_outfile_complex_type", "p0") { // orc file foramt format = "orc" outfile_url = outfile_to_S3("${format}") - qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 ( + qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t) FROM S3 ( "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}", @@ -92,10 +92,10 @@ suite("test_outfile_complex_type", "p0") { ); """ - // orc file foramt + // csv file foramt format = "csv" outfile_url = outfile_to_S3("${format}") - qt_select_load_orc """ SELECT * FROM S3 ( + qt_select_load_csv """ SELECT * FROM S3 ( "uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}",