From 3d06aa71cad4de68d40a70ba5e17d542ae337fdb Mon Sep 17 00:00:00 2001 From: zhannngchen <48427519+zhannngchen@users.noreply.github.com> Date: Fri, 14 Jun 2024 09:27:09 +0800 Subject: [PATCH 1/4] [fix](partial update) partial update should not read old fileds from rows with delete sign (#36210) Issue Number: close #34296 1. When partial update filling in the missing fields, if a load job previously wrote data with a delete sign, it will also read out the data in the column with the delete sign, so that the newly written data will also become invisible 2. This problem was fixed in #24877, but was introduced again in #26721, and was never found because the case was changed to the wrong output in 3. The fix in #24877 didn't take into account the handling of concurrent conflicts in the publish phase, the current PR adds this part of the handling, and adds the corresponding case. --- be/src/olap/partial_update_info.h | 33 ++++ .../olap/rowset/segment_v2/segment_writer.cpp | 17 +- be/src/olap/tablet.cpp | 60 ++++++- be/src/olap/tablet.h | 4 +- ...rtial_update_parallel_with_delete_sign.csv | 5 + .../test_partial_update_delete_sign.out | 82 +++++++++- ...rtial_update_delete_sign_with_conflict.out | 19 +++ .../test_partial_update_delete_sign.groovy | 117 +++++++------- ...al_update_delete_sign_with_conflict.groovy | 151 ++++++++++++++++++ 9 files changed, 405 insertions(+), 83 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 79eb89c620c04f..b1d4287fcb6e0f 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -27,6 +27,7 @@ struct PartialUpdateInfo { int64_t timestamp_ms, const std::string& timezone) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; + this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -43,8 +44,37 @@ struct PartialUpdateInfo { } } this->is_strict_mode = is_strict_mode; + _generate_default_values_for_missing_cids(tablet_schema); + } + +private: + void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) { + for (auto i = 0; i < missing_cids.size(); ++i) { + auto cur_cid = missing_cids[i]; + const auto& column = tablet_schema.column(cur_cid); + if (column.has_default_value()) { + std::string default_value; + if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_TIMESTAMP")) != + std::string::npos)) { + DateV2Value dtv; + dtv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dtv.debug_string(); + } else { + default_value = tablet_schema.column(cur_cid).default_value(); + } + default_values.emplace_back(default_value); + } else { + // place an empty string here + default_values.emplace_back(); + } + } + CHECK_EQ(missing_cids.size(), default_values.size()); } +public: bool is_partial_update {false}; std::set partial_update_input_columns; std::vector missing_cids; @@ -55,5 +85,8 @@ struct PartialUpdateInfo { bool is_strict_mode {false}; int64_t timestamp_ms {0}; std::string timezone; + + // default values for missing cids + std::vector default_values; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index c9fc6a939cbdb9..31f22c48be47a3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -625,7 +625,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const vectorized::Int8* delete_sign_column_data = nullptr; if (const vectorized::ColumnWithTypeAndName* delete_sign_column = old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) { + delete_sign_column != nullptr) { auto& delete_sign_col = reinterpret_cast(*(delete_sign_column->column)); delete_sign_column_data = delete_sign_col.get_data().data(); @@ -635,19 +635,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f for (auto i = 0; i < cids_missing.size(); ++i) { const auto& column = _tablet_schema->column(cids_missing[i]); if (column.has_default_value()) { - std::string default_value; - if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(_tablet_schema->column(cids_missing[i]).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != - std::string::npos)) { - vectorized::DateV2Value dtv; - dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dtv.debug_string(); - } else { - default_value = _tablet_schema->column(cids_missing[i]).default_value(); - } + const auto& default_value = + _opts.rowset_ctx->partial_update_info->default_values[i]; vectorized::ReadBuffer rb(const_cast(default_value.c_str()), default_value.size()); old_value_block.get_by_position(i).type->from_string( diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 97cbfea955456b..811746d5445a62 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3292,8 +3292,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, auto partial_update_info = rowset_writer->get_partial_update_info(); DCHECK(partial_update_info); RETURN_IF_ERROR(generate_new_block_for_partial_update( - rowset_schema, partial_update_info->missing_cids, partial_update_info->update_cids, - read_plan_ori, read_plan_update, rsid_to_rowset, &block)); + rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update, + rsid_to_rowset, &block)); sort_block(block, ordered_block); int64_t size; RETURN_IF_ERROR(rowset_writer->flush_single_memtable(&ordered_block, &size)); @@ -3358,10 +3358,9 @@ std::vector Tablet::get_rowset_by_ids( return rowsets; } -Status Tablet::generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector& missing_cids, - const std::vector& update_cids, const PartialUpdateReadPlan& read_plan_ori, - const PartialUpdateReadPlan& read_plan_update, +Status BaseTablet::generate_new_block_for_partial_update( + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, vectorized::Block* output_block) { // do partial update related works @@ -3371,6 +3370,8 @@ Status Tablet::generate_new_block_for_partial_update( // 4. mark current keys deleted CHECK(output_block); auto full_mutable_columns = output_block->mutate_columns(); + const auto& missing_cids = partial_update_info->missing_cids; + const auto& update_cids = partial_update_info->update_cids; auto old_block = rowset_schema->create_block_by_cids(missing_cids); auto update_block = rowset_schema->create_block_by_cids(update_cids); @@ -3382,10 +3383,57 @@ Status Tablet::generate_new_block_for_partial_update( RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, rsid_to_rowset, update_block, &read_index_update)); + const vectorized::Int8* delete_sign_column_data = nullptr; + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + old_block.try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + auto& delete_sign_col = + reinterpret_cast(*(delete_sign_column->column)); + delete_sign_column_data = delete_sign_col.get_data().data(); + } + + // build default value block + auto default_value_block = old_block.clone_empty(); + auto mutable_default_value_columns = default_value_block.mutate_columns(); + if (delete_sign_column_data != nullptr) { + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& column = rowset_schema->column(missing_cids[i]); + if (column.has_default_value()) { + const auto& default_value = partial_update_info->default_values[i]; + vectorized::ReadBuffer rb(const_cast(default_value.c_str()), + default_value.size()); + RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string( + rb, mutable_default_value_columns[i].get())); + } + } + } + // build full block CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& rs_column = rowset_schema->column(missing_cids[i]); for (auto idx = 0; idx < read_index_old.size(); ++idx) { + // if the conflict update is a delete sign, which means that the key is + // not exist now, we should not read old values from the deleted data, + // and should use default value instead. + // NOTE: since now we are in the publishing phase, all data is commited + // before, even the `strict_mode` is true (which requires partial update + // load job can't insert new keys), this "new" key MUST be written into + // the new generated segment file. + if (delete_sign_column_data != nullptr && + delete_sign_column_data[read_index_old[idx]] != 0) { + auto& mutable_column = full_mutable_columns[missing_cids[i]]; + if (rs_column.has_default_value()) { + mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0); + } else if (rs_column.is_nullable()) { + assert_cast(mutable_column.get()) + ->insert_null_elements(1); + } else { + mutable_column->insert_default(); + } + continue; + } full_mutable_columns[missing_cids[i]]->insert_from( *old_block.get_columns_with_type_and_name()[i].column.get(), read_index_old[idx]); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index fd781722d1e079..0518e32ae8cbcc 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -491,8 +491,8 @@ class Tablet : public BaseTablet { void prepare_to_read(const RowLocation& row_location, size_t pos, PartialUpdateReadPlan* read_plan); Status generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector& missing_cids, - const std::vector& update_cids, const PartialUpdateReadPlan& read_plan_ori, + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, vectorized::Block* output_block); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv new file mode 100644 index 00000000000000..62aa3a38c16311 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv @@ -0,0 +1,5 @@ +1,10,1 +2,20,0 +3,30,1 +4,40,0 +5,50,1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out index 8d3e69bbe26ac9..784dbd69536a52 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out @@ -10,7 +10,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -21,12 +21,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 @@ -47,7 +86,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -58,12 +97,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out new file mode 100644 index 00000000000000..faa4ca1d0bb148 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +2 20 \N \N foo +4 40 \N \N foo + +-- !sql -- +1 100 10 \N foo +2 20 20 \N foo +3 100 30 \N foo +4 40 40 \N foo +5 100 50 \N foo + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy index a9c389290b7c5f..6635c67f055afa 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy @@ -65,68 +65,67 @@ suite('test_partial_update_delete_sign') { sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" sql "drop table if exists ${tableName1};" - // sql "set skip_delete_sign=false;" - // sql "set skip_storage_engine_merge=false;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // def tableName2 = "test_partial_update_delete_sign2" - // sql "DROP TABLE IF EXISTS ${tableName2};" - // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( - // `k1` int NOT NULL, - // `c1` int, - // `c2` int, - // `c3` int, - // `c4` int - // )UNIQUE KEY(k1) - // DISTRIBUTED BY HASH(k1) BUCKETS 1 - // PROPERTIES ( - // "enable_unique_key_merge_on_write" = "true", - // "disable_auto_compaction" = "true", - // "replication_num" = "1", - // "function_column.sequence_col" = 'c4' - // );""" - - // sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" - // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - // streamLoad { - // table "${tableName2}" - - // set 'column_separator', ',' - // set 'format', 'csv' - // set 'partial_columns', 'true' - // set 'columns', 'k1,__DORIS_DELETE_SIGN__' - - // file 'delete_sign.csv' - // time 10000 // limit inflight 10s - // } - // sql "sync" - // qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_storage_engine_merge=true;" - // sql "set skip_delete_bitmap=true;" - // sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - // sql "drop table if exists ${tableName2};" + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + sql "sync" + def tableName2 = "test_partial_update_delete_sign2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "function_column.sequence_col" = 'c4' + );""" + + sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' /* NOTE: it's a partial update */ + set 'columns', 'k1,__DORIS_DELETE_SIGN__' + + file 'delete_sign.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + sql "sync" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName2};" // partial update a row that has been deleted by delete sign(table without sequence column) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy new file mode 100644 index 00000000000000..7e2cd9cdfe308a --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy @@ -0,0 +1,151 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils + +suite("test_partial_update_delete_sign_with_conflict") { + def dbName = context.config.getDbNameByFile(context.file) + def tableName = "test_partial_update_delete_sign_with_conflict" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NOT NULL, + `c1` int default 100, + `c2` int, + `c3` int, + `c4` varchar(100) default 'foo' + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${tableName} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // NOTE: use streamload 2pc to construct the conflict of publish + def do_streamload_2pc_commit = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + // concurrent load 1 + String txnId1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1,c1,label_c2' + set 'merge_type', 'MERGE' + set 'delete', 'label_c2=1' + set 'strict_mode', 'false' + set 'two_phase_commit', 'true' + file 'partial_update_parallel_with_delete_sign.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + String txnId2 + // concurrent load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,c2' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId2 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync;" + + // complete load 1 first + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 10) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // publish will retry until success + // FE retry may take logger time, wait for 20 secs + do_streamload_2pc_commit(txnId2) + wait_for_publish(txnId2, 20) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" +} From 5efc2825c5597610b9d6792f59e05b40bf1c0fc4 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Mon, 24 Jun 2024 20:32:58 +0800 Subject: [PATCH 2/4] fix compile --- be/src/olap/partial_update_info.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index b1d4287fcb6e0f..8968cf858369f9 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -59,7 +59,7 @@ struct PartialUpdateInfo { to_lower(tablet_schema.column(cur_cid).default_value()) .find(to_lower("CURRENT_TIMESTAMP")) != std::string::npos)) { - DateV2Value dtv; + vectorized::DateV2Value dtv; dtv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dtv.debug_string(); } else { From 8ed491004ab1c52f1b9d9e63b3a56e7ee9bb3c7a Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Mon, 24 Jun 2024 21:00:51 +0800 Subject: [PATCH 3/4] fix compile --- be/src/olap/partial_update_info.h | 1 + be/src/olap/tablet.cpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 8968cf858369f9..8567fa209cbae4 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -18,6 +18,7 @@ #pragma once #include "olap/tablet_schema.h" +#include "util/string_util.h" namespace doris { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 811746d5445a62..1698ff1603f8cc 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3358,7 +3358,7 @@ std::vector Tablet::get_rowset_by_ids( return rowsets; } -Status BaseTablet::generate_new_block_for_partial_update( +Status Tablet::generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, From 70dfbcd62d621318fc0eb0d902a616eef1fbf428 Mon Sep 17 00:00:00 2001 From: Chen Zhang Date: Tue, 25 Jun 2024 10:53:15 +0800 Subject: [PATCH 4/4] fix case fail --- .../test_partial_update_delete_sign_with_conflict.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy index 7e2cd9cdfe308a..327ad574743bba 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy @@ -41,7 +41,7 @@ suite("test_partial_update_delete_sign_with_conflict") { sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( `k1` int NOT NULL, - `c1` int default 100, + `c1` int default '100', `c2` int, `c3` int, `c4` varchar(100) default 'foo'