diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 9f371fbd0310c1..2403687fe8ec48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1001,6 +1001,9 @@ public void complete() throws UserException {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
+ if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
+ txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index df1abe1ec9edb5..821878584921d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -230,6 +230,9 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
throw new UserException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes(table);
+ if (isPartialUpdate()) {
+ txnState.setSchemaForPartialUpdate(table);
+ }
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 9b3ac1b845e163..86fe00a01a9a58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -899,6 +899,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserExc
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
+ if (isPartialUpdate) {
+ txnState.setSchemaForPartialUpdate((OlapTable) table);
+ }
return planParams;
} finally {
@@ -919,6 +922,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) thr
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
+ if (isPartialUpdate) {
+ txnState.setSchemaForPartialUpdate((OlapTable) table);
+ }
return planParams;
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 83273dc696199b..f7c30f3820bc34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -176,6 +176,9 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
+ if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
+ state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
+ }
executor.setProfileType(ProfileType.LOAD);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e6a883e07c728d..514b9df525dc7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2221,6 +2221,9 @@ private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
+ if (request.isPartialUpdate()) {
+ txnState.setSchemaForPartialUpdate(table);
+ }
}
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
@@ -2284,6 +2287,9 @@ private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequ
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
+ if (request.isPartialUpdate()) {
+ txnState.setSchemaForPartialUpdate(table);
+ }
}
return plan;
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 0fc355260eae91..e6d266e43eeca8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -674,6 +674,35 @@ public void commitTransaction(List
tableList, long transactionId, List tableCommitInfoIterator
+ = transactionState.getIdToTableCommitInfos().values().iterator();
+ while (tableCommitInfoIterator.hasNext()) {
+ TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+ long tableId = tableCommitInfo.getTableId();
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ if (table != null && table instanceof OlapTable) {
+ if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
+ throw new TransactionCommitFailedException("transaction [" + transactionId
+ + "] check schema compatibility failed, partial update can't commit with"
+ + " old schema sucessfully .");
+ }
+ }
+ }
+ } else {
+ for (Table table : tableList) {
+ if (table instanceof OlapTable) {
+ if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
+ throw new TransactionCommitFailedException("transaction [" + transactionId
+ + "] check schema compatibility failed, partial update can't commit with"
+ + " old schema sucessfully .");
+ }
+ }
+ }
+ }
+ }
+
Set errorReplicaIds = Sets.newHashSet();
Set totalInvolvedBackends = Sets.newHashSet();
Map> tableToPartition = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 5d95917e58d515..2b01a612534c3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,7 +17,9 @@
package org.apache.doris.transaction;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
@@ -45,8 +47,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -265,6 +269,24 @@ public String toString() {
// no need to persist.
private String errMsg = "";
+ public class SchemaInfo {
+ public List schema;
+ public int schemaVersion;
+
+ public SchemaInfo(OlapTable olapTable) {
+ Map indexIdToMeta = olapTable.getIndexIdToMeta();
+ for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
+ schema = indexMeta.getSchema();
+ schemaVersion = indexMeta.getSchemaVersion();
+ break;
+ }
+ }
+ }
+
+ private boolean isPartialUpdate = false;
+ // table id -> schema info
+ private Map txnSchemas = new HashMap<>();
+
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
@@ -725,4 +747,47 @@ public void clearErrorMsg() {
public String getErrMsg() {
return this.errMsg;
}
+
+ public void setSchemaForPartialUpdate(OlapTable olapTable) {
+ // the caller should hold the read lock of the table
+ isPartialUpdate = true;
+ txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
+ }
+
+ public boolean isPartialUpdate() {
+ return isPartialUpdate;
+ }
+
+ public SchemaInfo getTxnSchema(long id) {
+ return txnSchemas.get(id);
+ }
+
+ public boolean checkSchemaCompatibility(OlapTable olapTable) {
+ SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
+ SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
+ if (txnSchemaInfo == null) {
+ return true;
+ }
+ if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
+ return true;
+ }
+ for (Column txnCol : txnSchemaInfo.schema) {
+ if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
+ continue;
+ }
+ int uniqueId = txnCol.getUniqueId();
+ Optional currentCol = currentSchemaInfo.schema.stream()
+ .filter(col -> col.getUniqueId() == uniqueId).findFirst();
+ // for now Doris's light schema change only supports adding columns,
+ // dropping columns, and type conversions that increase the varchar length
+ if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
+ if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
+ LOG.warn("Check schema compatibility failed, txnId={}, table={}",
+ transactionId, olapTable.getName());
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv
new file mode 100644
index 00000000000000..23f43edc5856d5
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv
@@ -0,0 +1,21 @@
+0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv
new file mode 100644
index 00000000000000..5bc6c8de802793
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv
@@ -0,0 +1,21 @@
+0
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out
new file mode 100644
index 00000000000000..9ad9f8333a0117
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out
@@ -0,0 +1,70 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+0 \N \N \N \N \N
+1 \N \N \N \N \N
+10 \N \N \N \N \N
+11 \N \N \N \N \N
+12 \N \N \N \N \N
+13 \N \N \N \N \N
+14 \N \N \N \N \N
+15 \N \N \N \N \N
+16 \N \N \N \N \N
+17 \N \N \N \N \N
+18 \N \N \N \N \N
+19 \N \N \N \N \N
+2 \N \N \N \N \N
+20 \N \N \N \N \N
+3 \N \N \N \N \N
+4 \N \N \N \N \N
+5 \N \N \N \N \N
+6 \N \N \N \N \N
+7 \N \N \N \N \N
+8 \N \N \N \N \N
+9 \N \N \N \N \N
+
+-- !sql --
+0 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+1 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+10 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+11 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+12 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+13 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+14 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+15 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+16 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+17 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+18 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+19 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+2 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+20 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+3 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+4 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+5 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+6 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+7 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+8 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+9 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+
+-- !sql --
+0 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+1 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+10 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+11 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+12 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+13 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+14 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+15 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+16 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+17 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+18 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+19 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+2 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+20 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+3 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+4 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+5 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+6 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+7 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+8 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+9 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N
+
diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy
new file mode 100644
index 00000000000000..f63ebe9a45ecf1
--- /dev/null
+++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy
@@ -0,0 +1,181 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_2pc_schema_change", "p0") {
+
+ def tableName = "test_partial_update_2pc_schema_change"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ k1 varchar(20) not null,
+ v1 varchar(20),
+ v2 varchar(20),
+ v3 varchar(20),
+ v4 varchar(20),
+ v5 varchar(20))
+ UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 4
+ PROPERTIES(
+ "replication_num" = "1",
+ "light_schema_change" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true")"""
+
+
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'columns', "k1"
+
+ file 'concurrency_update3.csv'
+ time 10000 // limit inflight 10s
+ }
+ qt_sql """ select * from ${tableName} order by k1;"""
+
+
+ def wait_for_schema_change = {
+ def try_times=100
+ while(true){
+ def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+ Thread.sleep(10)
+ if(res[0][9].toString() == "FINISHED"){
+ break;
+ }
+ assert(try_times>0)
+ try_times--
+ }
+ }
+
+ InetSocketAddress address = context.config.feHttpInetSocketAddress
+ String user = context.config.feHttpUser
+ String password = context.config.feHttpPassword
+ String db = context.config.getDbNameByFile(context.file)
+
+ def do_streamload_2pc = { txn_id, txn_operation, name ->
+ HttpClients.createDefault().withCloseable { client ->
+ RequestBuilder requestBuilder = RequestBuilder.put("http://${address.hostString}:${address.port}/api/${db}/${name}/_stream_load_2pc")
+ String encoding = Base64.getEncoder()
+ .encodeToString((user + ":" + (password == null ? "" : password)).getBytes("UTF-8"))
+ requestBuilder.setHeader("Authorization", "Basic ${encoding}")
+ requestBuilder.setHeader("Expect", "100-Continue")
+ requestBuilder.setHeader("txn_id", "${txn_id}")
+ requestBuilder.setHeader("txn_operation", "${txn_operation}")
+
+ String backendStreamLoadUri = null
+ client.execute(requestBuilder.build()).withCloseable { resp ->
+ resp.withCloseable {
+ String body = EntityUtils.toString(resp.getEntity())
+ def respCode = resp.getStatusLine().getStatusCode()
+ // should redirect to backend
+ if (respCode != 307) {
+ throw new IllegalStateException("Expect frontend stream load response code is 307, " +
+ "but meet ${respCode}\nbody: ${body}")
+ }
+ backendStreamLoadUri = resp.getFirstHeader("location").getValue()
+ }
+ }
+
+ requestBuilder.setUri(backendStreamLoadUri)
+ try{
+ client.execute(requestBuilder.build()).withCloseable { resp ->
+ resp.withCloseable {
+ String body = EntityUtils.toString(resp.getEntity())
+ def respCode = resp.getStatusLine().getStatusCode()
+ if (respCode != 200) {
+ throw new IllegalStateException("Expect backend stream load response code is 200, " +
+ "but meet ${respCode}\nbody: ${body}")
+ }
+ }
+ }
+ } catch (Throwable t) {
+ log.info("StreamLoad Exception: ", t)
+ }
+ }
+ }
+
+ String txnId
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,tmp,v1=substr(tmp,1,20)'
+ set 'strict_mode', "false"
+ set 'two_phase_commit', 'true'
+ file 'concurrency_update2.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+
+ sql """ alter table ${tableName} modify column v2 varchar(40);"""
+ wait_for_schema_change()
+
+ sql """ alter table ${tableName} drop column v3;"""
+ wait_for_schema_change()
+
+ sql """ alter table ${tableName} add column v6 varchar(50);"""
+ wait_for_schema_change()
+
+ sql """ alter table ${tableName} rename column v4 renamed_v4;"""
+ wait_for_schema_change()
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,tmp,v2=substr(tmp,1,40)'
+ set 'strict_mode', "false"
+ file 'concurrency_update2.csv'
+ time 10000 // limit inflight 10s
+ }
+
+ qt_sql """ select * from ${tableName} order by k1;"""
+
+ do_streamload_2pc(txnId, "commit", tableName)
+
+ qt_sql """ select * from ${tableName} order by k1;"""
+
+ sql "drop table if exists ${tableName};"
+}