Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/io/reader_buffer.h"
#include "vec/jsonb/serialize.h"
#include "vec/olap/olap_data_convertor.h"
Expand Down Expand Up @@ -364,6 +365,17 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
bool has_default_or_nullable = false;
std::vector<bool> use_default_or_null_flag;
use_default_or_null_flag.reserve(num_rows);
const vectorized::Int8* delete_sign_column_data = nullptr;
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
full_block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() == num_rows) {
delete_sign_column_data = delete_sign_col.get_data().data();
}
}

std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(_tablet->get_header_lock());
Expand Down Expand Up @@ -412,10 +424,19 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
LOG(WARNING) << "failed to lookup row key, error: " << st;
return st;
}
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
_tablet->prepare_to_read(loc, pos, &_rssid_to_rid);

// if the delete sign is marked, it means that the value columns of the row
// will not be read. So we don't need to read the missing values from the previous rows.
// But we still need to mark the previous row on delete bitmap
if (delete_sign_column_data != nullptr && delete_sign_column_data[pos - row_pos] != 0) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
_tablet->prepare_to_read(loc, pos, &_rssid_to_rid);
}

if (st.is<ALREADY_EXIST>()) {
// although we need to mark delete current row, we still need to read missing columns
Expand Down Expand Up @@ -551,14 +572,18 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
// if the column has default value, fiil it with default value
// otherwise, if the column is nullable, fill it with null value
const auto& tablet_column = _tablet_schema->column(cids_missing[i]);
CHECK(tablet_column.has_default_value() || tablet_column.is_nullable());
if (tablet_column.has_default_value()) {
mutable_full_columns[cids_missing[i]]->insert_from(
*mutable_default_value_columns[i].get(), 0);
} else {
} else if (tablet_column.is_nullable()) {
auto nullable_column = assert_cast<vectorized::ColumnNullable*>(
mutable_full_columns[cids_missing[i]].get());
nullable_column->insert_null_elements(1);
} else {
// If the control flow reaches this branch, the column neither has default value
// nor is nullable. It means that the row's delete sign is marked, and the value
// columns are useless and won't be read. So we can just put arbitary values in the cells
mutable_full_columns[cids_missing[i]]->insert_default();
}
}
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
Expand Down Expand Up @@ -373,6 +376,11 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
+ col.getName());
}
partialUpdateInputColumns.add(col.getName());
if (destTable.hasSequenceCol() && (taskInfo.hasSequenceCol() || (
destTable.getSequenceMapCol() != null
&& destTable.getSequenceMapCol().equalsIgnoreCase(col.getName())))) {
partialUpdateInputColumns.add(Column.SEQUENCE_COL);
}
existInExpr = true;
break;
}
Expand All @@ -381,6 +389,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,33 @@
5 5

-- !sql --
1 \N 1
1 1 0
1 1 1
2 \N 1
2 2 0
2 2 1
3 \N 1
3 3 0
3 3 1
4 4 0
5 5 0

-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5

-- !sql --
4 4 4 4 4
5 5 5 5 5

-- !sql --
1 \N \N \N \N 1
1 1 1 1 1 0
2 \N \N \N \N 1
2 2 2 2 2 0
3 \N \N \N \N 1
3 3 3 3 3 0
4 4 4 4 4 0
5 5 5 5 5 0

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1
2
3
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1
2
3
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5

-- !sql --
4 4 4 4 4
5 5 5 5 5

-- !with_delete_sign --
1 \N \N \N \N 1
1 1 1 1 1 0
2 \N \N \N \N 1
2 2 2 2 2 0
3 \N \N \N \N 1
3 3 3 3 3 0
4 4 4 4 4 0
5 5 5 5 5 0

-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5

-- !sql --
4 4 4 4 4
5 5 5 5 5

-- !sql --
1 \N \N \N \N 1
1 1 1 1 1 0
2 \N \N \N \N 1
2 2 2 2 2 0
3 \N \N \N \N 1
3 3 3 3 3 0
4 4 4 4 4 0
5 5 5 5 5 0

Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,43 @@ suite('nereids_delete_mow_partial_update') {
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;"
qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid,v1,__DORIS_DELETE_SIGN__;"

sql "set skip_delete_sign=false;"
sql "set skip_storage_engine_merge=false;"
sql "set skip_delete_bitmap=false;"
def tableName3 = "test_partial_update_delete3"
sql "DROP TABLE IF EXISTS ${tableName3};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName3} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"
);"""
sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;"
streamLoad {
table "${tableName3}"

set 'column_separator', ','
set 'format', 'csv'
set 'columns', 'k1'
set 'partial_colunms', 'true'
set 'merge_type', 'DELETE'

file 'partial_update_delete.csv'
time 10000
}
qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_partial_update_delete') {
sql 'set enable_nereids_planner=false'
sql "set experimental_enable_nereids_planner=false;"
sql 'set enable_nereids_dml=false'

def tableName1 = "test_partial_update_delete1"
sql "DROP TABLE IF EXISTS ${tableName1};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName1} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"
);"""

def tableName2 = "test_partial_update_delete2"
sql "DROP TABLE IF EXISTS ${tableName2};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
`k` BIGINT NULL
) UNIQUE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"
);"""

sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select * from ${tableName1} order by k1;"
sql "insert into ${tableName2} values(1),(2),(3);"
sql "delete from ${tableName1} A using ${tableName2} B where A.k1=B.k;"
qt_sql "select * from ${tableName1} order by k1;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"

sql "set skip_delete_sign=false;"
sql "set skip_storage_engine_merge=false;"
sql "set skip_delete_bitmap=false;"
def tableName3 = "test_partial_update_delete3"
sql "DROP TABLE IF EXISTS ${tableName3};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName3} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"
);"""
sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;"
streamLoad {
table "${tableName3}"

set 'column_separator', ','
set 'format', 'csv'
set 'columns', 'k1'
set 'partial_colunms', 'true'
set 'merge_type', 'DELETE'

file 'partial_update_delete.csv'
time 10000
}
qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"

}