From 6f6478877942893092731878182dd87733b1f9e5 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Mon, 18 Dec 2023 10:31:16 +0800 Subject: [PATCH 1/3] [Fix](memtable) fix `shrink_memtable_by_agg` should also update `_row_in_blocks` (#28536) Otherwise using the stale `_row_in_blocks` will result in heap-buffer-overflow ``` ==2695213==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x62900122e210 at pc 0x56524744aecf bp 0x7f62c595ef7 0 sp 0x7f62c595ef68 READ of size 8 at 0x62900122e210 thread T1627 (MemTableFlushTh) #0 0x56524744aece in doris::vectorized::ColumnVector::insert_indices_from(doris::vectorized::IColumn const&, unsigned int const*, unsigned int const*) /mnt/disk2/lihangyu/doris/be/src/vec/columns/column_vector.cpp:378:33 #1 0x5652472a7538 in doris::vectorized::ColumnNullable::insert_indices_from(doris::vectorized::IColumn const&, unsigned int const*, unsigned int const*) /mnt/disk2/lihangyu/doris/be/src/vec/columns/column_nullable.cpp:310:25 #2 0x56524782a62a in doris::vectorized::MutableBlock::add_rows(doris::vectorized::Block const*, unsigned int const*, unsigned int const*) /mnt/disk2/lihangyu/doris/be/src/vec/core/block.cpp:961:14 #3 0x565233f187ae in doris::MemTable::_put_into_output(doris::vectorized::Block&) /mnt/disk2/lihangyu/doris/be/src/olap/memtable.cpp:248:27 #4 0x565233f1db66 in doris::MemTable::to_block() /mnt/disk2/lihangyu/doris/be/src/olap/memtable.cpp:496:13 #5 0x565233efae60 in doris::FlushToken::_do_flush_memtable(doris::MemTable*, int, long*) /mnt/disk2/lihangyu/doris/be/src/olap/memtable_flush_executor.cpp:121:62 #6 0x565233efc8d6 in doris::FlushToken::_flush_memtable(doris::MemTable*, int, long) /mnt/disk2/lihangyu/doris/be/src/olap/memtable_flush_executor.cpp:150:16 #7 0x565233f0c5eb in doris::MemtableFlushTask::run() /mnt/disk2/lihangyu/doris/be/src/olap/memtable_flush_executor.cpp:58:23 ``` --- be/src/olap/memtable.cpp | 2 + ...st_insert_with_aggregation_memtable.groovy | 139 ++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 578ee4a7857c70..2e21005fe62ddb 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -426,6 +426,8 @@ void MemTable::_aggregate() { _output_mutable_block = vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); _output_mutable_block.clear_column_data(); + _row_in_blocks = temp_row_in_blocks; + _last_sorted_pos = _row_in_blocks.size(); } } diff --git a/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy new file mode 100644 index 00000000000000..4e6d838a17e6fc --- /dev/null +++ b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_insert_with_aggregation_memtable", "nonConcurrent") { + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string:[:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def testTable = "test_memtable_enable_with_aggregate" + sql """ DROP TABLE IF EXISTS ${testTable}""" + def testTableDDL = """ + create table ${testTable} + ( + `id` LARGEINT NOT NULL, + `k1` DATE NOT NULL, + `k2` VARCHAR(20), + `k3` SMALLINT, + `k4` TINYINT, + `k5` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00", + `k6` BIGINT SUM DEFAULT "0", + `k7` INT MAX DEFAULT "0", + `k8` INT MIN DEFAULT "99999" + ) + AGGREGATE KEY(`id`, `k1`, `k2`, `k3`, `k4`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + def insert_sql = """ + insert into ${testTable} values + (10000,"2017-10-01","北京",20,0,"2017-10-01 06:00:00",20,10,10), + (10000,"2017-10-01","北京",20,0,"2017-10-01 07:00:00",15,2,2), + (10001,"2017-10-01","北京",30,1,"2017-10-01 17:05:45",2,22,22), + (10002,"2017-10-02","上海",20,1,"2017-10-02 12:59:12",200,5,5), + (10003,"2017-10-02","广州",32,0,"2017-10-02 11:20:00",30,11,11), + (10004,"2017-10-01","深圳",35,0,"2017-10-01 10:00:15",100,3,3), + (10004,"2017-10-03","深圳",35,0,"2017-10-03 10:20:22",11,6,6); + """ + + sql testTableDDL + sql "sync" + sql insert_sql + sql "sync" + qt_sql "select * from ${testTable} order by id asc" + + // store the original value + get_be_param("enable_shrink_memory") + get_be_param("write_buffer_size_for_agg") + + // the original value is false + set_be_param("enable_shrink_memory", "true") + // the original value is 400MB + set_be_param("write_buffer_size_for_agg", "512") // change it to 0.5KB + sql """ DROP TABLE IF EXISTS ${testTable}""" + sql testTableDDL + sql "sync" + sql insert_sql + sql "sync" + qt_sql "select * from ${testTable} order by id asc" + + // test with mv + def table_name = "agg_shrink" + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v text + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 4 + properties("replication_num" = "1"); + """ + set_be_param("write_buffer_size_for_agg", "10240") // change it to 10KB + sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = "4096")""" + sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}""" + sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}""" + createMV("""create materialized view var_cnt as select k, count(k) from ${table_name} group by k""") + sql """INSERT INTO ${table_name} SELECT k, v from ${table_name} limit 8101""" + + reset_be_param("enable_shrink_memory") + reset_be_param("write_buffer_size_for_agg") + +} \ No newline at end of file From 9d362bf327eb4244a32ebdc00ce41f9c713c2f9c Mon Sep 17 00:00:00 2001 From: eldenmoon <15605149486@163.com> Date: Tue, 19 Dec 2023 15:28:59 +0800 Subject: [PATCH 2/3] make enable_shrink_memory modifiable --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index dafc613d77ce06..40f5a2b6b66e9b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1030,7 +1030,7 @@ DEFINE_mInt32(s3_write_buffer_whole_size, "524288000"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); //disable shrink memory by default -DEFINE_Bool(enable_shrink_memory, "false"); +DEFINE_mBool(enable_shrink_memory, "false"); DEFINE_mInt32(schema_cache_capacity, "1024"); DEFINE_mInt32(schema_cache_sweep_time_sec, "100"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 65b9de8f12e721..7d7119ecfc11ef 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1067,7 +1067,7 @@ DECLARE_mInt32(s3_write_buffer_whole_size); // the max number of cached file handle for block segemnt DECLARE_mInt64(file_cache_max_file_reader_cache_size); //enable shrink memory -DECLARE_Bool(enable_shrink_memory); +DECLARE_mBool(enable_shrink_memory); // enable cache for high concurrent point query work load DECLARE_mInt32(schema_cache_capacity); DECLARE_mInt32(schema_cache_sweep_time_sec); From 1a8cd1b094c7642e627b6d3a6ff7a9d3dde3f592 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Tue, 19 Dec 2023 20:45:16 +0800 Subject: [PATCH 3/3] [Fix](memtable) fix `shrink_memtable_by_agg` without duplicated keys (#28660) remove duplicated logic: ``` vectorized::Block in_block = _input_mutable_block.to_block(); _put_into_output(in_block); ``` `_input_mutable_block.to_block()` will move `_input_mutable_block`, and lead to `flush` with empty block --- be/src/olap/memtable.cpp | 5 +---- .../test_insert_with_aggregation_memtable.out | 17 +++++++++++++++++ ...test_insert_with_aggregation_memtable.groovy | 2 ++ 3 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2e21005fe62ddb..0f1a9ac831f254 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -437,10 +437,7 @@ void MemTable::shrink_memtable_by_agg() { return; } size_t same_keys_num = _sort(); - if (same_keys_num == 0) { - vectorized::Block in_block = _input_mutable_block.to_block(); - _put_into_output(in_block); - } else { + if (same_keys_num != 0) { _aggregate(); } } diff --git a/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out new file mode 100644 index 00000000000000..030677444b31de --- /dev/null +++ b/regression-test/data/load_p0/insert/test_insert_with_aggregation_memtable.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10000 2017-10-01 北京 20 0 2017-10-01T07:00 35 10 2 +10001 2017-10-01 北京 30 1 2017-10-01T17:05:45 2 22 22 +10002 2017-10-02 上海 20 1 2017-10-02T12:59:12 200 5 5 +10003 2017-10-02 广州 32 0 2017-10-02T11:20 30 11 11 +10004 2017-10-01 深圳 35 0 2017-10-01T10:00:15 100 3 3 +10004 2017-10-03 深圳 35 0 2017-10-03T10:20:22 11 6 6 + +-- !sql -- +10000 2017-10-01 北京 20 0 2017-10-01T07:00 35 10 2 +10001 2017-10-01 北京 30 1 2017-10-01T17:05:45 2 22 22 +10002 2017-10-02 上海 20 1 2017-10-02T12:59:12 200 5 5 +10003 2017-10-02 广州 32 0 2017-10-02T11:20 30 11 11 +10004 2017-10-01 深圳 35 0 2017-10-01T10:00:15 100 3 3 +10004 2017-10-03 深圳 35 0 2017-10-03T10:20:22 11 6 6 + diff --git a/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy index 4e6d838a17e6fc..bbfca8fa5f657c 100644 --- a/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy +++ b/regression-test/suites/load_p0/insert/test_insert_with_aggregation_memtable.groovy @@ -132,6 +132,8 @@ suite("test_insert_with_aggregation_memtable", "nonConcurrent") { sql """INSERT INTO ${table_name} SELECT k, v from ${table_name}""" createMV("""create materialized view var_cnt as select k, count(k) from ${table_name} group by k""") sql """INSERT INTO ${table_name} SELECT k, v from ${table_name} limit 8101""" + // insert with no duplicate + sql """INSERT INTO ${table_name} SELECT *, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}' FROM numbers("number" = "4096"); """ reset_be_param("enable_shrink_memory") reset_be_param("write_buffer_size_for_agg")