From d583cdf53457d4be9fe1d4fe6ef35fc5854e7d35 Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 11 Feb 2025 11:56:58 +0800 Subject: [PATCH 1/2] [fix](parquet)Fix data column and null map column not equal when reading Parquet complex type cross-page data --- .../format/parquet/vparquet_column_reader.cpp | 23 +++++++- .../format/parquet/vparquet_column_reader.h | 14 ++--- .../tvf/{test_tvf_p2.out => test_tvf_p0.out} | 9 ++++ .../hive/test_parquet_complex_cross_page.out | 10 ++++ ...{test_tvf_p2.groovy => test_tvf_p0.groovy} | 24 ++++++++- .../test_parquet_complex_cross_page.groovy | 52 +++++++++++++++++++ 6 files changed, 122 insertions(+), 10 deletions(-) rename regression-test/data/external_table_p0/tvf/{test_tvf_p2.out => test_tvf_p0.out} (98%) create mode 100644 regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out rename regression-test/suites/external_table_p0/tvf/{test_tvf_p2.groovy => test_tvf_p0.groovy} (78%) create mode 100644 regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index ade98ada48bd1f..c36fda023a2c28 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -329,6 +329,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType // just read the remaining values of the last row in previous page, // so there's no a new row should be read. batch_size = 0; + /* + * Since the function is repeatedly called to fetch data for the batch size, + * it causes `_rep_levels.resize(0); _def_levels.resize(0);`, resulting in the + * definition and repetition levels of the reader only containing the latter + * part of the batch (i.e., missing some parts). Therefore, when using the + * definition and repetition levels to fill the null_map for structs and maps, + * the function should not be called multiple times before filling. + * todo: + * We may need to consider reading the entire batch of data at once, as this approach + * would be more user-friendly in terms of function usage. However, we must consider that if the + * data spans multiple pages, memory usage may increase significantly. + */ } else { _rep_levels.resize(0); _def_levels.resize(0); @@ -835,7 +847,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr continue; } - _read_column_names.insert(doris_name); + _read_column_names.emplace_back(doris_name); // select_vector.reset(); size_t field_rows = 0; @@ -847,6 +859,15 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr is_dict_filter)); *read_rows = field_rows; *eof = field_eof; + /* + * Considering the issue in the `_read_nested_column` function where data may span across pages, leading + * to missing definition and repetition levels, when filling the null_map of the struct later, it is + * crucial to use the definition and repetition levels from the first read column + * (since `_read_nested_column` is not called repeatedly). + * + * It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map, + * and selecting the shortest one will offer better performance + */ } else { while (field_rows < *read_rows && !field_eof) { size_t loop_rows = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 5ced83a498e258..613bf57413e589 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -298,24 +298,24 @@ class StructColumnReader : public ParquetColumnReader { if (!_read_column_names.empty()) { // can't use _child_readers[*_read_column_names.begin()] // because the operator[] of std::unordered_map is not const :( - return _child_readers.find(*_read_column_names.begin())->second->get_rep_level(); + return _child_readers.find(_read_column_names.front())->second->get_rep_level(); } return _child_readers.begin()->second->get_rep_level(); } const std::vector& get_def_level() const override { if (!_read_column_names.empty()) { - return _child_readers.find(*_read_column_names.begin())->second->get_def_level(); + return _child_readers.find(_read_column_names.front())->second->get_def_level(); } return _child_readers.begin()->second->get_def_level(); } Statistics statistics() override { Statistics st; - for (const auto& reader : _child_readers) { - // make sure the field is read - if (_read_column_names.find(reader.first) != _read_column_names.end()) { - Statistics cst = reader.second->statistics(); + for (const auto& column_name : _read_column_names) { + auto reader = _child_readers.find(column_name); + if (reader != _child_readers.end()) { + Statistics cst = reader->second->statistics(); st.merge(cst); } } @@ -332,7 +332,7 @@ class StructColumnReader : public ParquetColumnReader { private: std::unordered_map> _child_readers; - std::set _read_column_names; + std::vector _read_column_names; }; }; // namespace doris::vectorized diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_p2.out b/regression-test/data/external_table_p0/tvf/test_tvf_p0.out similarity index 98% rename from regression-test/data/external_table_p0/tvf/test_tvf_p2.out rename to regression-test/data/external_table_p0/tvf/test_tvf_p0.out index 53b454df858e2a..5ec7cc860da33b 100644 --- a/regression-test/data/external_table_p0/tvf/test_tvf_p2.out +++ b/regression-test/data/external_table_p0/tvf/test_tvf_p0.out @@ -65,3 +65,12 @@ -- !viewfs -- 25001 25001 25001 +-- !row_cross_pages_2 -- +149923 149923 + +-- !row_cross_pages_3 -- +74815 74815 + +-- !row_cross_pages_4 -- +457 457 + diff --git a/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out b/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out new file mode 100644 index 00000000000000..e68420444ba294 --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_parquet_complex_cross_page.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 + +-- !2 -- +5000 + +-- !3 -- +5000 + diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy similarity index 78% rename from regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy rename to regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy index f68fe55e859c90..990ef03cc50520 100644 --- a/regression-test/suites/external_table_p0/tvf/test_tvf_p2.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_p0.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") { +suite("test_tvf_p0", "p0,external,tvf,external_docker,hive") { String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { String nameNodeHost = context.config.otherConfigs.get("externalEnvIp") @@ -46,7 +46,7 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") { "format" = "orc"); """ - // a row of complex type may be stored across more pages + // (1): a row of complex type may be stored across more pages qt_row_cross_pages """select count(id), count(m1), count(m2) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet", @@ -73,5 +73,25 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") { "format" = "parquet", "fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/", "fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")""" + + // (2): a row of complex type may be stored across more pages + qt_row_cross_pages_2 """select count(id), count(experiment) + from hdfs( + "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet", + "format" = "parquet"); + """ //149923 + + qt_row_cross_pages_3 """select count(id), count(experiment) + from hdfs( + "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet", + "format" = "parquet") where id > 49923 ; + """ // 74815 + + qt_row_cross_pages_4 """select count(id), count(experiment) + from hdfs( + "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet", + "format" = "parquet") where id < 300 ; + """ //457 + } } diff --git a/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy b/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy new file mode 100644 index 00000000000000..685f5f3204d2d4 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_parquet_complex_cross_page.groovy @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_parquet_complex_cross_page", "p2,external,hive,external_remote,external_remote_hive") { + + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + //hudi hive use same catalog in p2. + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable test") + return; + } + + String props = context.config.otherConfigs.get("hudiEmrCatalog") + String hms_catalog_name = "test_parquet_complex_cross_page" + + sql """drop catalog if exists ${hms_catalog_name};""" + sql """ + CREATE CATALOG IF NOT EXISTS ${hms_catalog_name} + PROPERTIES ( + ${props} + ,'hive.version' = '3.1.3' + ); + """ + + logger.info("catalog " + hms_catalog_name + " created") + sql """switch ${hms_catalog_name};""" + logger.info("switched to catalog " + hms_catalog_name) + sql """ use regression;""" + + sql """ set dry_run_query=true; """ + + qt_1 """ SELECT * FROM test_parquet_complex_cross_page WHERE device_id='DZ692' and format_time between 1737693770300 and 1737693770500 + and date between '20250124' and '20250124' and project='GA20230001' ; """ + qt_2 """ SELECT functions_pnc_ssm_road_di_objects from test_parquet_complex_cross_page ; """ + qt_3 """ select * from test_parquet_complex_cross_page ; """ + + sql """drop catalog ${hms_catalog_name};""" +} From c55c27cb7bf2c7cef393da201093b923d974be12 Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 17 Feb 2025 19:57:29 +0800 Subject: [PATCH 2/2] add comments --- be/src/vec/exec/format/parquet/vparquet_column_reader.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 613bf57413e589..838d4724a877f8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -298,6 +298,12 @@ class StructColumnReader : public ParquetColumnReader { if (!_read_column_names.empty()) { // can't use _child_readers[*_read_column_names.begin()] // because the operator[] of std::unordered_map is not const :( + /* + * Considering the issue in the `_read_nested_column` function where data may span across pages, leading + * to missing definition and repetition levels, when filling the null_map of the struct later, it is + * crucial to use the definition and repetition levels from the first read column, + * that is `_read_column_names.front()`. + */ return _child_readers.find(_read_column_names.front())->second->get_rep_level(); } return _child_readers.begin()->second->get_rep_level(); @@ -333,6 +339,7 @@ class StructColumnReader : public ParquetColumnReader { private: std::unordered_map> _child_readers; std::vector _read_column_names; + //Need to use vector instead of set,see `get_rep_level()` for the reason. }; }; // namespace doris::vectorized