From 9ec29cfde72140dba93d872de479c91d259f1ff0 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 26 Jul 2023 22:28:47 +0800 Subject: [PATCH 1/5] update --- .../olap/rowset/segment_v2/segment_writer.cpp | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index be1330cb4a1faf..6838f9f823cbaa 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -55,6 +55,7 @@ #include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" #include "vec/io/reader_buffer.h" #include "vec/jsonb/serialize.h" #include "vec/olap/olap_data_convertor.h" @@ -364,6 +365,16 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(num_rows); + const vectorized::Int8* delete_sign_column_data = nullptr; + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + block->try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + delete_sign_column_data = + reinterpret_cast(*(delete_sign_column->column)) + .get_data() + .data(); + } + std::vector specified_rowsets; { std::shared_lock rlock(_tablet->get_header_lock()); @@ -412,10 +423,19 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* LOG(WARNING) << "failed to lookup row key, error: " << st; return st; } - // partial update should not contain invisible columns - use_default_or_null_flag.emplace_back(false); - _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - _tablet->prepare_to_read(loc, pos, &_rssid_to_rid); + + // if the delete sign is marked, it means that the value columns of the row + // will not be read. So we don't need to read the missing values from the previous rows. + // But we still need to mark the previous row on delete bitmap + if (delete_sign_column_data != nullptr && delete_sign_column_data[pos - row_pos] != 0) { + has_default_or_nullable = true; + use_default_or_null_flag.emplace_back(true); + } else { + // partial update should not contain invisible columns + use_default_or_null_flag.emplace_back(false); + _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); + _tablet->prepare_to_read(loc, pos, &_rssid_to_rid); + } if (st.is()) { // although we need to mark delete current row, we still need to read missing columns @@ -551,14 +571,18 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f // if the column has default value, fiil it with default value // otherwise, if the column is nullable, fill it with null value const auto& tablet_column = _tablet_schema->column(cids_missing[i]); - CHECK(tablet_column.has_default_value() || tablet_column.is_nullable()); if (tablet_column.has_default_value()) { mutable_full_columns[cids_missing[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); - } else { + } else if (tablet_column.is_nullable()) { auto nullable_column = assert_cast( mutable_full_columns[cids_missing[i]].get()); nullable_column->insert_null_elements(1); + } else { + // If the control flow reaches this branch, the column neither has default value + // nor is nullable. It means that the row's delete sign is marked, and the value + // columns are useless and won't be read. So we can just put arbitary values in the cells + mutable_full_columns[cids_missing[i]]->insert_default(); } } continue; From 967892f7a2832d835c8ac1ae893e45b76f727a16 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 26 Jul 2023 23:24:20 +0800 Subject: [PATCH 2/5] fix and add tests --- .../olap/rowset/segment_v2/segment_writer.cpp | 4 +- .../delete/delete_mow_partial_update.out | 6 +- .../test_partial_update_delete.out | 22 +++++++ .../delete/delete_mow_partial_update.groovy | 2 +- .../test_partial_update_delete.groovy | 60 +++++++++++++++++++ 5 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 6838f9f823cbaa..97ae8bb1fd2848 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -367,7 +367,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* use_default_or_null_flag.reserve(num_rows); const vectorized::Int8* delete_sign_column_data = nullptr; if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - block->try_get_by_name(DELETE_SIGN); + full_block.try_get_by_name(DELETE_SIGN); delete_sign_column != nullptr) { delete_sign_column_data = reinterpret_cast(*(delete_sign_column->column)) @@ -579,7 +579,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f mutable_full_columns[cids_missing[i]].get()); nullable_column->insert_null_elements(1); } else { - // If the control flow reaches this branch, the column neither has default value + // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value // columns are useless and won't be read. So we can just put arbitary values in the cells mutable_full_columns[cids_missing[i]]->insert_default(); diff --git a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out index 0b7c6bf68e9513..2da22fb0667d70 100644 --- a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out +++ b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out @@ -15,12 +15,12 @@ 5 5 -- !sql -- +1 \N 1 1 1 0 -1 1 1 +2 \N 1 2 2 0 -2 2 1 +3 \N 1 3 3 0 -3 3 1 4 4 0 5 5 0 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out new file mode 100644 index 00000000000000..091109620d7f6a --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out @@ -0,0 +1,22 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !with_delete_sign -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + diff --git a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy index b70bfc2986b7dc..eb8aa67634d48b 100644 --- a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy +++ b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy @@ -60,5 +60,5 @@ suite('nereids_delete_mow_partial_update') { sql "set skip_delete_sign=true;" sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" - qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;" + qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid,v1,__DORIS_DELETE_SIGN__;" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy new file mode 100644 index 00000000000000..d275e25ef3b520 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_partial_update_delete') { + sql 'set enable_nereids_planner=false' + sql "set experimental_enable_nereids_planner=false;" + sql 'set enable_nereids_dml=false' + + def tableName1 = "test_partial_update_delete1" + sql "DROP TABLE IF EXISTS ${tableName1};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + def tableName2 = "test_partial_update_delete2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k` BIGINT NULL + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select * from ${tableName1} order by k1;" + sql "insert into ${tableName2} values(1),(2),(3);" + sql "delete from ${tableName1} A using ${tableName2} B where A.k1=B.k;" + qt_sql "select * from ${tableName1} order by k1;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" +} From 99c1ae510cb9601ef155ded31272a8a6a5d8b7bb Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 27 Jul 2023 10:12:19 +0800 Subject: [PATCH 3/5] update --- be/src/olap/rowset/segment_v2/segment_writer.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 97ae8bb1fd2848..edc8c43a10287b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -369,10 +369,11 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* if (const vectorized::ColumnWithTypeAndName* delete_sign_column = full_block.try_get_by_name(DELETE_SIGN); delete_sign_column != nullptr) { - delete_sign_column_data = - reinterpret_cast(*(delete_sign_column->column)) - .get_data() - .data(); + auto& delete_sign_col = + reinterpret_cast(*(delete_sign_column->column)); + if (delete_sign_col.size() == num_rows) { + delete_sign_column_data = delete_sign_col.get_data().data(); + } } std::vector specified_rowsets; From eec47a006138d35daa54a4017726ac1695a2be91 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 27 Jul 2023 22:47:10 +0800 Subject: [PATCH 4/5] update --- .../doris/planner/StreamLoadPlanner.java | 3 ++ .../delete/delete_mow_partial_update.out | 21 ++++++++++ .../delete/partial_update_delete.csv | 3 ++ .../partial_update/partial_update_delete.csv | 3 ++ .../test_partial_update_delete.out | 21 ++++++++++ .../delete/delete_mow_partial_update.groovy | 38 ++++++++++++++++++ .../test_partial_update_delete.groovy | 39 +++++++++++++++++++ 7 files changed, 128 insertions(+) create mode 100644 regression-test/data/nereids_p0/delete/partial_update_delete.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index ac5d6c595c0954..25a5d5d9c48060 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -171,6 +171,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } } + if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { + partialUpdateInputColumns.add(Column.DELETE_SIGN); + } } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { diff --git a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out index 2da22fb0667d70..2f8e157a9449c0 100644 --- a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out +++ b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out @@ -24,3 +24,24 @@ 4 4 0 5 5 0 +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + diff --git a/regression-test/data/nereids_p0/delete/partial_update_delete.csv b/regression-test/data/nereids_p0/delete/partial_update_delete.csv new file mode 100644 index 00000000000000..5f5fbe759f10b9 --- /dev/null +++ b/regression-test/data/nereids_p0/delete/partial_update_delete.csv @@ -0,0 +1,3 @@ +1 +2 +3 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv new file mode 100644 index 00000000000000..5f5fbe759f10b9 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv @@ -0,0 +1,3 @@ +1 +2 +3 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out index 091109620d7f6a..89faa7fed05223 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out @@ -20,3 +20,24 @@ 4 4 4 4 4 0 5 5 5 5 5 0 +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +1 \N \N \N \N 1 +1 1 1 1 1 0 +2 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + diff --git a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy index eb8aa67634d48b..7bfc06120ab410 100644 --- a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy +++ b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy @@ -61,4 +61,42 @@ suite('nereids_delete_mow_partial_update') { sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid,v1,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName3 = "test_partial_update_delete3" + sql "DROP TABLE IF EXISTS ${tableName3};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_colunms', 'true' + set 'merge_type', 'DELETE' + + file 'partial_update_delete.csv' + time 10000 + } + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" } diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy index d275e25ef3b520..4022bb6d98e046 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy @@ -57,4 +57,43 @@ suite('test_partial_update_delete') { sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName3 = "test_partial_update_delete3" + sql "DROP TABLE IF EXISTS ${tableName3};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_colunms', 'true' + set 'merge_type', 'DELETE' + + file 'partial_update_delete.csv' + time 10000 + } + qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + } From e7d2aea281963335a71fc2bce7e1a6378cacef12 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 28 Jul 2023 10:25:59 +0800 Subject: [PATCH 5/5] add to planForPipeline --- .../java/org/apache/doris/planner/StreamLoadPlanner.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 25a5d5d9c48060..ccdd7b336e1ff7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -376,6 +376,11 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns + col.getName()); } partialUpdateInputColumns.add(col.getName()); + if (destTable.hasSequenceCol() && (taskInfo.hasSequenceCol() || ( + destTable.getSequenceMapCol() != null + && destTable.getSequenceMapCol().equalsIgnoreCase(col.getName())))) { + partialUpdateInputColumns.add(Column.SEQUENCE_COL); + } existInExpr = true; break; } @@ -384,6 +389,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } } + if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { + partialUpdateInputColumns.add(Column.DELETE_SIGN); + } } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) {