diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 79eb89c620c04f..8567fa209cbae4 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -18,6 +18,7 @@ #pragma once #include "olap/tablet_schema.h" +#include "util/string_util.h" namespace doris { @@ -27,6 +28,7 @@ struct PartialUpdateInfo { int64_t timestamp_ms, const std::string& timezone) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; + this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -43,8 +45,37 @@ struct PartialUpdateInfo { } } this->is_strict_mode = is_strict_mode; + _generate_default_values_for_missing_cids(tablet_schema); + } + +private: + void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) { + for (auto i = 0; i < missing_cids.size(); ++i) { + auto cur_cid = missing_cids[i]; + const auto& column = tablet_schema.column(cur_cid); + if (column.has_default_value()) { + std::string default_value; + if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_TIMESTAMP")) != + std::string::npos)) { + vectorized::DateV2Value dtv; + dtv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dtv.debug_string(); + } else { + default_value = tablet_schema.column(cur_cid).default_value(); + } + default_values.emplace_back(default_value); + } else { + // place an empty string here + default_values.emplace_back(); + } + } + CHECK_EQ(missing_cids.size(), default_values.size()); } +public: bool is_partial_update {false}; std::set partial_update_input_columns; std::vector missing_cids; @@ -55,5 +86,8 @@ struct PartialUpdateInfo { bool is_strict_mode {false}; int64_t timestamp_ms {0}; std::string timezone; + + // default values for missing cids + std::vector default_values; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index c9fc6a939cbdb9..31f22c48be47a3 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -625,7 +625,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const vectorized::Int8* delete_sign_column_data = nullptr; if (const vectorized::ColumnWithTypeAndName* delete_sign_column = old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) { + delete_sign_column != nullptr) { auto& delete_sign_col = reinterpret_cast(*(delete_sign_column->column)); delete_sign_column_data = delete_sign_col.get_data().data(); @@ -635,19 +635,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f for (auto i = 0; i < cids_missing.size(); ++i) { const auto& column = _tablet_schema->column(cids_missing[i]); if (column.has_default_value()) { - std::string default_value; - if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(_tablet_schema->column(cids_missing[i]).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != - std::string::npos)) { - vectorized::DateV2Value dtv; - dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dtv.debug_string(); - } else { - default_value = _tablet_schema->column(cids_missing[i]).default_value(); - } + const auto& default_value = + _opts.rowset_ctx->partial_update_info->default_values[i]; vectorized::ReadBuffer rb(const_cast(default_value.c_str()), default_value.size()); old_value_block.get_by_position(i).type->from_string( diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 97cbfea955456b..1698ff1603f8cc 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3292,8 +3292,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, auto partial_update_info = rowset_writer->get_partial_update_info(); DCHECK(partial_update_info); RETURN_IF_ERROR(generate_new_block_for_partial_update( - rowset_schema, partial_update_info->missing_cids, partial_update_info->update_cids, - read_plan_ori, read_plan_update, rsid_to_rowset, &block)); + rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update, + rsid_to_rowset, &block)); sort_block(block, ordered_block); int64_t size; RETURN_IF_ERROR(rowset_writer->flush_single_memtable(&ordered_block, &size)); @@ -3359,9 +3359,8 @@ std::vector Tablet::get_rowset_by_ids( } Status Tablet::generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector& missing_cids, - const std::vector& update_cids, const PartialUpdateReadPlan& read_plan_ori, - const PartialUpdateReadPlan& read_plan_update, + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, vectorized::Block* output_block) { // do partial update related works @@ -3371,6 +3370,8 @@ Status Tablet::generate_new_block_for_partial_update( // 4. mark current keys deleted CHECK(output_block); auto full_mutable_columns = output_block->mutate_columns(); + const auto& missing_cids = partial_update_info->missing_cids; + const auto& update_cids = partial_update_info->update_cids; auto old_block = rowset_schema->create_block_by_cids(missing_cids); auto update_block = rowset_schema->create_block_by_cids(update_cids); @@ -3382,10 +3383,57 @@ Status Tablet::generate_new_block_for_partial_update( RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, rsid_to_rowset, update_block, &read_index_update)); + const vectorized::Int8* delete_sign_column_data = nullptr; + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + old_block.try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + auto& delete_sign_col = + reinterpret_cast(*(delete_sign_column->column)); + delete_sign_column_data = delete_sign_col.get_data().data(); + } + + // build default value block + auto default_value_block = old_block.clone_empty(); + auto mutable_default_value_columns = default_value_block.mutate_columns(); + if (delete_sign_column_data != nullptr) { + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& column = rowset_schema->column(missing_cids[i]); + if (column.has_default_value()) { + const auto& default_value = partial_update_info->default_values[i]; + vectorized::ReadBuffer rb(const_cast(default_value.c_str()), + default_value.size()); + RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string( + rb, mutable_default_value_columns[i].get())); + } + } + } + // build full block CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& rs_column = rowset_schema->column(missing_cids[i]); for (auto idx = 0; idx < read_index_old.size(); ++idx) { + // if the conflict update is a delete sign, which means that the key is + // not exist now, we should not read old values from the deleted data, + // and should use default value instead. + // NOTE: since now we are in the publishing phase, all data is commited + // before, even the `strict_mode` is true (which requires partial update + // load job can't insert new keys), this "new" key MUST be written into + // the new generated segment file. + if (delete_sign_column_data != nullptr && + delete_sign_column_data[read_index_old[idx]] != 0) { + auto& mutable_column = full_mutable_columns[missing_cids[i]]; + if (rs_column.has_default_value()) { + mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0); + } else if (rs_column.is_nullable()) { + assert_cast(mutable_column.get()) + ->insert_null_elements(1); + } else { + mutable_column->insert_default(); + } + continue; + } full_mutable_columns[missing_cids[i]]->insert_from( *old_block.get_columns_with_type_and_name()[i].column.get(), read_index_old[idx]); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index fd781722d1e079..0518e32ae8cbcc 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -491,8 +491,8 @@ class Tablet : public BaseTablet { void prepare_to_read(const RowLocation& row_location, size_t pos, PartialUpdateReadPlan* read_plan); Status generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector& missing_cids, - const std::vector& update_cids, const PartialUpdateReadPlan& read_plan_ori, + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, vectorized::Block* output_block); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv new file mode 100644 index 00000000000000..62aa3a38c16311 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv @@ -0,0 +1,5 @@ +1,10,1 +2,20,0 +3,30,1 +4,40,0 +5,50,1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out index 8d3e69bbe26ac9..784dbd69536a52 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out @@ -10,7 +10,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -21,12 +21,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 @@ -47,7 +86,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -58,12 +97,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out new file mode 100644 index 00000000000000..faa4ca1d0bb148 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +2 20 \N \N foo +4 40 \N \N foo + +-- !sql -- +1 100 10 \N foo +2 20 20 \N foo +3 100 30 \N foo +4 40 40 \N foo +5 100 50 \N foo + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy index a9c389290b7c5f..6635c67f055afa 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy @@ -65,68 +65,67 @@ suite('test_partial_update_delete_sign') { sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" sql "drop table if exists ${tableName1};" - // sql "set skip_delete_sign=false;" - // sql "set skip_storage_engine_merge=false;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // def tableName2 = "test_partial_update_delete_sign2" - // sql "DROP TABLE IF EXISTS ${tableName2};" - // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( - // `k1` int NOT NULL, - // `c1` int, - // `c2` int, - // `c3` int, - // `c4` int - // )UNIQUE KEY(k1) - // DISTRIBUTED BY HASH(k1) BUCKETS 1 - // PROPERTIES ( - // "enable_unique_key_merge_on_write" = "true", - // "disable_auto_compaction" = "true", - // "replication_num" = "1", - // "function_column.sequence_col" = 'c4' - // );""" - - // sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" - // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - // streamLoad { - // table "${tableName2}" - - // set 'column_separator', ',' - // set 'format', 'csv' - // set 'partial_columns', 'true' - // set 'columns', 'k1,__DORIS_DELETE_SIGN__' - - // file 'delete_sign.csv' - // time 10000 // limit inflight 10s - // } - // sql "sync" - // qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_storage_engine_merge=true;" - // sql "set skip_delete_bitmap=true;" - // sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - // sql "drop table if exists ${tableName2};" + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + sql "sync" + def tableName2 = "test_partial_update_delete_sign2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "function_column.sequence_col" = 'c4' + );""" + + sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' /* NOTE: it's a partial update */ + set 'columns', 'k1,__DORIS_DELETE_SIGN__' + + file 'delete_sign.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + sql "sync" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName2};" // partial update a row that has been deleted by delete sign(table without sequence column) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy new file mode 100644 index 00000000000000..327ad574743bba --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy @@ -0,0 +1,151 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils + +suite("test_partial_update_delete_sign_with_conflict") { + def dbName = context.config.getDbNameByFile(context.file) + def tableName = "test_partial_update_delete_sign_with_conflict" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NOT NULL, + `c1` int default '100', + `c2` int, + `c3` int, + `c4` varchar(100) default 'foo' + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${tableName} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // NOTE: use streamload 2pc to construct the conflict of publish + def do_streamload_2pc_commit = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + // concurrent load 1 + String txnId1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1,c1,label_c2' + set 'merge_type', 'MERGE' + set 'delete', 'label_c2=1' + set 'strict_mode', 'false' + set 'two_phase_commit', 'true' + file 'partial_update_parallel_with_delete_sign.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + String txnId2 + // concurrent load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,c2' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId2 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync;" + + // complete load 1 first + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 10) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // publish will retry until success + // FE retry may take logger time, wait for 20 secs + do_streamload_2pc_commit(txnId2) + wait_for_publish(txnId2, 20) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" +}