From 2d80a4f9d51e6e6f0d1ca1815a07289b416fd257 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 23 Jul 2025 14:16:00 +0800 Subject: [PATCH 1/3] add case --- ...test_partial_update_publish_all_del.groovy | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy new file mode 100644 index 00000000000000..6a7d9c2276d7ec --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_publish_all_del") { + def dbName = context.config.getDbNameByFile(context.file) + + def tableName = "test_partial_update_publish_all_del" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NULL COMMENT "用户姓名", + `score` int(11) NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true") + """ + + sql """insert into ${tableName} values + (2, "doris2", 2000, 223, 2), + (1, "doris", 1000, 123, 1), + (5, "doris5", 5000, 523, 5), + (4, "doris4", 4000, 423, 4), + (3, "doris3", 3000, 323, 3);""" + qt_sql "select * from ${tableName} order by id;" + + def do_streamload_2pc_commit = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + def code = process.waitFor() + def out = process.text + def json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + def txnId1, txnId2, txnId3 + + String data1 = """1,"ddddddddddd"\n2,"eeeeee"\n3,"aaaaa"\n4,"bbbbbbbb"\n5,"cccccccccccc"\n""" + streamLoad { + table "${tableName}" + set 'two_phase_commit', 'true' + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,name' + + inputStream new ByteArrayInputStream(data1.getBytes()) + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + + } + + String data2 = """1,10\n2,20\n3,30\n4,40\n5,50\n""" + streamLoad { + table "${tableName}" + set 'two_phase_commit', 'true' + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,dft' + + inputStream new ByteArrayInputStream(data2.getBytes()) + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + txnId2 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + // all rows have delete sign marks + String data3 = """1,1\n3,1\n5,1\n""" + streamLoad { + table "${tableName}" + set 'two_phase_commit', 'true' + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,__DORIS_DELETE_SIGN__' + + inputStream new ByteArrayInputStream(data3.getBytes()) + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + txnId3 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 60) + do_streamload_2pc_commit(txnId2) + wait_for_publish(txnId2, 60) + do_streamload_2pc_commit(txnId3) + wait_for_publish(txnId3, 60) + + qt_sql "select * from ${tableName} order by id;" +} \ No newline at end of file From 655b50ff7ddd9037c4e24ba83fcf15078dde30a3 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 23 Jul 2025 14:16:07 +0800 Subject: [PATCH 2/3] fix --- be/src/olap/base_tablet.cpp | 53 ++++++++++--------- .../test_partial_update_publish_all_del.out | 12 +++++ 2 files changed, 40 insertions(+), 25 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.out diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index c61cc4365fa2db..6f1600c39ae226 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1075,37 +1075,40 @@ Status BaseTablet::generate_new_block_for_partial_update( // before, even the `strict_mode` is true (which requires partial update // load job can't insert new keys), this "new" key MUST be written into // the new generated segment file. - bool use_default = false; bool new_row_delete_sign = (new_block_delete_signs != nullptr && new_block_delete_signs[idx]); - bool old_row_delete_sign = (old_block_delete_signs != nullptr && - old_block_delete_signs[read_index_old[idx]] != 0); - if (old_row_delete_sign) { - if (!rowset_schema->has_sequence_col()) { - use_default = true; - } else if (have_input_seq_column || !rs_column.is_seqeunce_col()) { - // to keep the sequence column value not decreasing, we should read values of seq column - // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise - // it may cause the merge-on-read based compaction to produce incorrect results - use_default = true; - } - } - if (new_row_delete_sign) { mutable_column->insert_default(); - } else if (use_default) { - if (rs_column.has_default_value()) { - mutable_column->insert_from(*default_value_block.get_by_position(i).column, 0); - } else if (rs_column.is_nullable()) { - assert_cast( - mutable_column.get()) - ->insert_default(); + } else { + bool use_default = false; + bool old_row_delete_sign = (old_block_delete_signs != nullptr && + old_block_delete_signs[read_index_old.at(idx)] != 0); + if (old_row_delete_sign) { + if (!rowset_schema->has_sequence_col()) { + use_default = true; + } else if (have_input_seq_column || !rs_column.is_seqeunce_col()) { + // to keep the sequence column value not decreasing, we should read values of seq column + // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise + // it may cause the merge-on-read based compaction to produce incorrect results + use_default = true; + } + } + + if (use_default) { + if (rs_column.has_default_value()) { + mutable_column->insert_from(*default_value_block.get_by_position(i).column, + 0); + } else if (rs_column.is_nullable()) { + assert_cast( + mutable_column.get()) + ->insert_default(); + } else { + mutable_column->insert(rs_column.get_vec_type()->get_default()); + } } else { - mutable_column->insert(rs_column.get_vec_type()->get_default()); + mutable_column->insert_from(*old_block.get_by_position(i).column, + read_index_old[idx]); } - } else { - mutable_column->insert_from(*old_block.get_by_position(i).column, - read_index_old[idx]); } } } diff --git a/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.out b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.out new file mode 100644 index 00000000000000..52388a27a0ceae --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 doris 1000 123 1 +2 doris2 2000 223 2 +3 doris3 3000 323 3 +4 doris4 4000 423 4 +5 doris5 5000 523 5 + +-- !sql -- +2 "eeeeee" 2000 223 20 +4 "bbbbbbbb" 4000 423 40 + From 5dd7fdcb59f4068708c8b74ca814c90752e8f24d Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 23 Jul 2025 15:21:26 +0800 Subject: [PATCH 3/3] fix --- .../publish/test_partial_update_publish_all_del.groovy | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy index 6a7d9c2276d7ec..f1a153fd036e4a 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/publish/test_partial_update_publish_all_del.groovy @@ -16,6 +16,10 @@ // under the License. suite("test_partial_update_publish_all_del") { + if (isCloudMode()) { + logger.info("skip test_partial_update_publish_all_del in cloud mode") + return + } def dbName = context.config.getDbNameByFile(context.file) def tableName = "test_partial_update_publish_all_del"