diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000000..54c87c05e67e70
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,84 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "en"
+}
+---
+
+
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP
+
+
+
+### Description
+
+This statement is used to modify the colocation group.
+
+Syntax:
+
+```sql
+ALTER COLOCATE GROUP [database.]group
+SET (
+ property_list
+);
+```
+
+NOTE:
+
+1. If the colocate group is global, that is, its name starts with `__global__`, then it does not belong to any database;
+
+2. property_list is a colocation group attribute, currently only supports modifying `replication_num` and `replication_allocation`. After modifying these two attributes of the colocation group, at the same time, change the attribute `default.replication_allocation`, the attribute `dynamic.replication_allocation` of the table of the group, and the `replication_allocation` of the existing partition to be the same as it.
+
+### Example
+
+1. Modify the number of copies of a global group
+
+ ```sql
+ # Set "colocate_with" = "__global__foo" when creating the table
+
+ ALTER COLOCATE GROUP __global__foo
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+2. Modify the number of copies of a non-global group
+
+ ```sql
+ # Set "colocate_with" = "bar" when creating the table, and the Database is "example_db"
+
+ ALTER COLOCATE GROUP example_db.bar
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE, GROUP
+```
+
+### Best Practice
diff --git a/docs/sidebars.json b/docs/sidebars.json
index cfc10dbb5174fe..0caf967961af3a 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -924,6 +924,7 @@
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-VIEW",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-STORAGE-POLICY",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-RESOURCE",
+ "sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-SQL-BLOCK-RULE",
"sql-manual/sql-reference/Data-Definition-Statements/Alter/PAUSE-JOB",
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
new file mode 100644
index 00000000000000..2b5ca2cc727296
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-COLOCATE-GROUP.md
@@ -0,0 +1,88 @@
+---
+{
+"title": "ALTER-COLOCATE-GROUP",
+"language": "zh-CN"
+}
+---
+
+
+
+## ALTER-COLOCATE-GROUP
+
+### Name
+
+ALTER COLOCATE GROUP
+
+
+
+### Description
+
+该语句用于修改 Colocation Group 的属性。
+
+语法:
+
+```sql
+ALTER COLOCATE GROUP [database.]group
+SET (
+ property_list
+);
+```
+
+注意:
+
+1. 如果colocate group是全局的,即它的名称是以 `__global__` 开头的,那它不属于任何一个Database;
+
+2. property_list 是colocation group属性,目前只支持修改`replication_num` 和 `replication_allocation`。
+ 修改colocation group的这两个属性修改之后,同时把该group的表的属性`default.replication_allocation` 、
+ 属性`dynamic.replication_allocation `、以及已有分区的`replication_allocation`改成跟它一样。
+
+
+
+### Example
+
+1. 修改一个全局group的副本数
+
+ ```sql
+ # 建表时设置 "colocate_with" = "__global__foo"
+
+ ALTER COLOCATE GROUP __global__foo
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+2. 修改一个非全局group的副本数
+
+ ```sql
+ # 建表时设置 "colocate_with" = "bar",且表属于Database example_db
+
+ ALTER COLOCATE GROUP example_db.bar
+ SET (
+ "replication_num"="1"
+ );
+ ```
+
+### Keywords
+
+```sql
+ALTER, COLOCATE , GROUP
+```
+
+### Best Practice
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
index 1bc19780f6c72a..e3c7c17b660764 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-WORKLOAD-GROUP.md
@@ -1,6 +1,6 @@
---
{
-"title": "ALTER-WORKLOAD -GROUP",
+"title": "ALTER-WORKLOAD-GROUP",
"language": "zh-CN"
}
---
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
under the License.
-->
-## ALTER-WORKLOAD -GROUP
+## ALTER-WORKLOAD-GROUP
### Name
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 72c99d3fbbada5..eb26ec2992630e 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -297,6 +297,7 @@ terminal String
KW_CLUSTERS,
KW_COLLATE,
KW_COLLATION,
+ KW_COLOCATE,
KW_COLUMN,
KW_COLUMNS,
KW_COMMENT,
@@ -812,6 +813,7 @@ nonterminal ArrayList opt_common_hints;
nonterminal String optional_on_ident;
nonterminal String opt_job_starts;
nonterminal String opt_job_ends;
+nonterminal ColocateGroupName colocate_group_name;
nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
@@ -1347,6 +1349,10 @@ alter_stmt ::=
{:
RESULT = new AlterResourceStmt(resourceName, properties);
:}
+ | KW_ALTER KW_COLOCATE KW_GROUP colocate_group_name:colocateGroupName KW_SET LPAREN key_value_map:properties RPAREN
+ {:
+ RESULT = new AlterColocateGroupStmt(colocateGroupName, properties);
+ :}
| KW_ALTER KW_WORKLOAD KW_GROUP ident_or_text:workloadGroupName opt_properties:properties
{:
RESULT = new AlterWorkloadGroupStmt(workloadGroupName, properties);
@@ -5642,6 +5648,17 @@ table_name ::=
{: RESULT = new TableName(ctl, db, tbl); :}
;
+colocate_group_name ::=
+ ident:group
+ {:
+ RESULT = new ColocateGroupName(null, group);
+ :}
+ | ident:db DOT ident:group
+ {:
+ RESULT = new ColocateGroupName(db, group);
+ :}
+ ;
+
encryptkey_name ::=
ident:name
{:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
new file mode 100644
index 00000000000000..e268322dcc8f9f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColocateGroupStmt.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+import java.util.Map;
+
+public class AlterColocateGroupStmt extends DdlStmt {
+ private final ColocateGroupName colocateGroupName;
+ private final Map properties;
+
+ public AlterColocateGroupStmt(ColocateGroupName colocateGroupName, Map properties) {
+ this.colocateGroupName = colocateGroupName;
+ this.properties = properties;
+ }
+
+ public ColocateGroupName getColocateGroupName() {
+ return colocateGroupName;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ colocateGroupName.analyze(analyzer);
+
+ String dbName = colocateGroupName.getDb();
+ if (Strings.isNullOrEmpty(dbName)) {
+ if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(
+ ConnectContext.get(), PrivPredicate.ADMIN)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
+ }
+ } else {
+ if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(
+ ConnectContext.get(), dbName, PrivPredicate.ADMIN)) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR,
+ ConnectContext.get().getQualifiedUser(), dbName);
+ }
+ }
+
+ if (properties == null || properties.isEmpty()) {
+ throw new AnalysisException("Colocate group properties can't be null");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ALTER COLOCATE GROUP ").append(colocateGroupName.toSql()).append(" ");
+ sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
new file mode 100644
index 00000000000000..b7f0c0afd34a53
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColocateGroupName.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+
+import com.google.common.base.Strings;
+
+public class ColocateGroupName {
+ private String db;
+ private String group;
+
+ public ColocateGroupName(String db, String group) {
+ this.db = db;
+ this.group = group;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if (GroupId.isGlobalGroupName(group)) {
+ if (!Strings.isNullOrEmpty(db)) {
+ throw new AnalysisException("group that name starts with `" + GroupId.GLOBAL_COLOCATE_PREFIX + "`"
+ + " is a global group, it doesn't belong to any specific database");
+ }
+ } else {
+ if (Strings.isNullOrEmpty(db)) {
+ if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+ }
+ db = analyzer.getDefaultDb();
+ }
+ db = ClusterNamespace.getFullName(analyzer.getClusterName(), db);
+ }
+ }
+
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ if (!Strings.isNullOrEmpty(db)) {
+ sb.append("`").append(db).append("`.");
+ }
+ sb.append("`").append(group).append("`");
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
index b5004973c37f1c..57d512b9789d9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateGroupSchema.java
@@ -66,6 +66,10 @@ public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
+ public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+ this.replicaAlloc = replicaAlloc;
+ }
+
public List getDistributionColTypes() {
return distributionColTypes;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 23703278fd890b..fcefcff132a7a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -17,10 +17,16 @@
package org.apache.doris.catalog;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DynamicPartitionUtil;
+import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -249,10 +255,34 @@ public void addBackendsPerBucketSeq(GroupId groupId, Map>>
}
}
- public void addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, List> backendsPerBucketSeq) {
+ public void setBackendsPerBucketSeq(GroupId groupId, Map>> backendsPerBucketSeq) {
writeLock();
try {
+ Map>> backendsPerBucketSeqMap = group2BackendsPerBucketSeq.row(groupId);
+ if (backendsPerBucketSeqMap != null) {
+ backendsPerBucketSeqMap.clear();
+ }
+ for (Map.Entry>> entry : backendsPerBucketSeq.entrySet()) {
+ group2BackendsPerBucketSeq.put(groupId, entry.getKey(), entry.getValue());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public boolean addBackendsPerBucketSeqByTag(GroupId groupId, Tag tag, List> backendsPerBucketSeq,
+ ReplicaAllocation originReplicaAlloc) {
+ writeLock();
+ try {
+ ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+ // replica allocation has outdate
+ if (groupSchema != null && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+ LOG.info("replica allocation has outdate for group {}, old replica alloc {}, new replica alloc {}",
+ groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc());
+ return false;
+ }
group2BackendsPerBucketSeq.put(groupId, tag, backendsPerBucketSeq);
+ return true;
} finally {
writeUnlock();
}
@@ -277,12 +307,20 @@ public void markGroupUnstable(GroupId groupId, String reason, boolean needEditLo
}
}
- public void markGroupStable(GroupId groupId, boolean needEditLog) {
+ public void markGroupStable(GroupId groupId, boolean needEditLog, ReplicaAllocation originReplicaAlloc) {
writeLock();
try {
if (!group2Tables.containsKey(groupId)) {
return;
}
+ // replica allocation is outdate
+ ColocateGroupSchema groupSchema = group2Schema.get(groupId);
+ if (groupSchema != null && originReplicaAlloc != null
+ && !originReplicaAlloc.equals(groupSchema.getReplicaAlloc())) {
+ LOG.warn("mark group {} failed, replica alloc has outdate, old replica alloc {}, new replica alloc {}",
+ groupId, originReplicaAlloc.getAllocMap(), groupSchema.getReplicaAlloc());
+ return;
+ }
if (unstableGroups.remove(groupId)) {
group2ErrMsgs.put(groupId, "");
if (needEditLog) {
@@ -604,13 +642,23 @@ public void replayMarkGroupUnstable(ColocatePersistInfo info) {
}
public void replayMarkGroupStable(ColocatePersistInfo info) {
- markGroupStable(info.getGroupId(), false);
+ markGroupStable(info.getGroupId(), false, null);
}
public void replayRemoveTable(ColocatePersistInfo info) {
removeTable(info.getTableId());
}
+ public void replayModifyReplicaAlloc(ColocatePersistInfo info) throws UserException {
+ writeLock();
+ try {
+ modifyColocateGroupReplicaAllocation(info.getGroupId(), info.getReplicaAlloc(),
+ info.getBackendsPerBucketSeq(), false);
+ } finally {
+ writeUnlock();
+ }
+ }
+
// only for test
public void clear() {
writeLock();
@@ -633,7 +681,22 @@ public List> getInfos() {
List info = Lists.newArrayList();
GroupId groupId = entry.getValue();
info.add(groupId.toString());
- info.add(entry.getKey());
+ String dbName = "";
+ if (groupId.dbId != 0) {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(groupId.dbId);
+ if (db != null) {
+ dbName = db.getFullName();
+ int index = dbName.indexOf(":");
+ if (index > 0) {
+ dbName = dbName.substring(index + 1); //use short db name
+ }
+ }
+ }
+ String groupName = entry.getKey();
+ if (!GroupId.isGlobalGroupName(groupName)) {
+ groupName = dbName + "." + groupName.substring(groupName.indexOf("_") + 1);
+ }
+ info.add(groupName);
info.add(Joiner.on(", ").join(group2Tables.get(groupId)));
ColocateGroupSchema groupSchema = group2Schema.get(groupId);
info.add(String.valueOf(groupSchema.getBucketsNum()));
@@ -756,4 +819,124 @@ public void setErrMsgForGroup(GroupId groupId, String message) {
public Map getTable2Group() {
return table2Group;
}
+
+ public void alterColocateGroup(AlterColocateGroupStmt stmt) throws UserException {
+ writeLock();
+ try {
+ Map properties = stmt.getProperties();
+ String dbName = stmt.getColocateGroupName().getDb();
+ String groupName = stmt.getColocateGroupName().getGroup();
+ long dbId = 0;
+ if (!GroupId.isGlobalGroupName(groupName)) {
+ Database db = (Database) Env.getCurrentInternalCatalog().getDbOrMetaException(dbName);
+ dbId = db.getId();
+ }
+ String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
+ ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
+ if (groupSchema == null) {
+ throw new DdlException("Not found colocate group " + stmt.getColocateGroupName().toSql());
+ }
+
+ GroupId groupId = groupSchema.getGroupId();
+
+ if (properties.size() > 1) {
+ throw new DdlException("Can only set one colocate group property at a time");
+ }
+
+ if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM)
+ || properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
+ ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
+ Preconditions.checkState(!replicaAlloc.isNotSet());
+ Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+ Map>> backendsPerBucketSeq = getBackendsPerBucketSeq(groupId);
+ Map>> newBackendsPerBucketSeq = Maps.newHashMap();
+ for (Map.Entry>> entry : backendsPerBucketSeq.entrySet()) {
+ List> newList = Lists.newArrayList();
+ for (List backends : entry.getValue()) {
+ newList.add(Lists.newArrayList(backends));
+ }
+ newBackendsPerBucketSeq.put(entry.getKey(), newList);
+ }
+ try {
+ ColocateTableCheckerAndBalancer.modifyGroupReplicaAllocation(replicaAlloc,
+ newBackendsPerBucketSeq, groupSchema.getBucketsNum());
+ } catch (Exception e) {
+ LOG.warn("modify group [{}, {}] to replication allocation {} failed, bucket seq {}",
+ fullGroupName, groupId, replicaAlloc, backendsPerBucketSeq, e);
+ throw new DdlException(e.getMessage());
+ }
+ backendsPerBucketSeq = newBackendsPerBucketSeq;
+ Preconditions.checkState(backendsPerBucketSeq.size() == replicaAlloc.getAllocMap().size());
+ modifyColocateGroupReplicaAllocation(groupSchema.getGroupId(), replicaAlloc,
+ backendsPerBucketSeq, true);
+ } else {
+ throw new DdlException("Unknown colocate group property: " + properties.keySet());
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void modifyColocateGroupReplicaAllocation(GroupId groupId, ReplicaAllocation replicaAlloc,
+ Map>> backendsPerBucketSeq, boolean needEditLog) throws UserException {
+ ColocateGroupSchema groupSchema = getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.warn("not found group {}", groupId);
+ return;
+ }
+
+ List tableIds = getAllTableIds(groupId);
+ for (Long tableId : tableIds) {
+ long dbId = groupId.dbId;
+ if (dbId == 0) {
+ dbId = groupId.getDbIdByTblId(tableId);
+ }
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+ OlapTable table = (OlapTable) db.getTableNullable(tableId);
+ if (table == null || !isColocateTable(table.getId())) {
+ continue;
+ }
+ table.writeLock();
+ try {
+ Map tblProperties = Maps.newHashMap();
+ tblProperties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
+ replicaAlloc.toCreateStmt());
+ table.setReplicaAllocation(tblProperties);
+ if (table.dynamicPartitionExists()) {
+ TableProperty tableProperty = table.getTableProperty();
+ // Merge the new properties with origin properties, and then analyze them
+ Map origDynamicProperties = tableProperty.getOriginDynamicPartitionProperty();
+ origDynamicProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION,
+ replicaAlloc.toCreateStmt());
+ Map analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition(
+ origDynamicProperties, table, db);
+ tableProperty.modifyTableProperties(analyzedDynamicPartition);
+ tableProperty.buildDynamicProperty();
+ }
+ for (ReplicaAllocation alloc : table.getPartitionInfo().getPartitionReplicaAllocations().values()) {
+ Map allocMap = alloc.getAllocMap();
+ allocMap.clear();
+ allocMap.putAll(replicaAlloc.getAllocMap());
+ }
+ } finally {
+ table.writeUnlock();
+ }
+ }
+
+ if (!backendsPerBucketSeq.equals(group2BackendsPerBucketSeq.row(groupId))) {
+ markGroupUnstable(groupId, "change replica allocation", false);
+ }
+ groupSchema.setReplicaAlloc(replicaAlloc);
+ setBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
+
+ if (needEditLog) {
+ ColocatePersistInfo info = ColocatePersistInfo.createForModifyReplicaAlloc(groupId,
+ replicaAlloc, backendsPerBucketSeq);
+ Env.getCurrentEnv().getEditLog().logColocateModifyRepliaAlloc(info);
+ }
+ LOG.info("modify group {} replication allocation to {}, is replay {}", groupId, replicaAlloc, !needEditLog);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index b7ca3c622cc7b0..34f80a91038e26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -261,6 +261,10 @@ public void setStoragePolicy(long partitionId, String storagePolicy) {
idToStoragePolicy.put(partitionId, storagePolicy);
}
+ public Map getPartitionReplicaAllocations() {
+ return idToReplicaAllocation;
+ }
+
public ReplicaAllocation getReplicaAllocation(long partitionId) {
if (!idToReplicaAllocation.containsKey(partitionId)) {
LOG.debug("failed to get replica allocation for partition: {}", partitionId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 5c18c2bd468263..4ec8993be0d2b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -35,6 +35,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.resource.Tag;
@@ -183,7 +184,12 @@ private void relocateAndBalanceGroup() {
List> balancedBackendsPerBucketSeq = Lists.newArrayList();
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex,
infoService, statistic, balancedBackendsPerBucketSeq)) {
- colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq);
+ if (!colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, balancedBackendsPerBucketSeq,
+ replicaAlloc)) {
+ LOG.warn("relocate group {} succ, but replica allocation has change, old replica alloc {}",
+ groupId, replicaAlloc);
+ continue;
+ }
Map>> balancedBackendsPerBucketSeqMap = Maps.newHashMap();
balancedBackendsPerBucketSeqMap.put(tag, balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo
@@ -219,6 +225,8 @@ private void matchGroup() {
continue;
}
+ ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
String unstableReason = null;
OUT:
for (Long tableId : tableIds) {
@@ -237,8 +245,6 @@ private void matchGroup() {
olapTable.readLock();
try {
for (Partition partition : olapTable.getPartitions()) {
- ReplicaAllocation replicaAlloc
- = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
short replicationNum = replicaAlloc.getTotalReplicaNum();
long visibleVersion = partition.getVisibleVersion();
// Here we only get VISIBLE indexes. All other indexes are not queryable.
@@ -269,8 +275,7 @@ private void matchGroup() {
TabletSchedCtx tabletCtx = new TabletSchedCtx(
TabletSchedCtx.Type.REPAIR,
db.getId(), tableId, partition.getId(), index.getId(), tablet.getId(),
- olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()),
- System.currentTimeMillis());
+ replicaAlloc, System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(st);
tabletCtx.setPriority(Priority.NORMAL);
@@ -299,7 +304,7 @@ private void matchGroup() {
// mark group as stable or unstable
if (Strings.isNullOrEmpty(unstableReason)) {
- colocateIndex.markGroupStable(groupId, true);
+ colocateIndex.markGroupStable(groupId, true, replicaAlloc);
} else {
colocateIndex.markGroupUnstable(groupId, unstableReason, true);
}
@@ -521,6 +526,122 @@ private List> getHostsPerBucketSeq(List> backendsPerBuck
return hostsPerBucketSeq;
}
+ public static void modifyGroupReplicaAllocation(ReplicaAllocation replicaAlloc,
+ Map>> backendBucketsSeq, int bucketNum) throws Exception {
+ Map allocMap = replicaAlloc.getAllocMap();
+ List deleteTags = Lists.newArrayList();
+ for (Tag tag : backendBucketsSeq.keySet()) {
+ if (!allocMap.containsKey(tag)) {
+ deleteTags.add(tag);
+ }
+ Preconditions.checkState(bucketNum == backendBucketsSeq.get(tag).size(),
+ bucketNum + " vs " + backendBucketsSeq.get(tag).size());
+ }
+ deleteTags.forEach(tag -> backendBucketsSeq.remove(tag));
+
+ for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
+ if (!backendBucketsSeq.containsKey(tag)) {
+ List> tagBackendBucketsSeq = Lists.newArrayList();
+ for (int i = 0; i < bucketNum; i++) {
+ tagBackendBucketsSeq.add(Lists.newArrayList());
+ }
+ backendBucketsSeq.put(tag, tagBackendBucketsSeq);
+ }
+ }
+
+ Map backendToBucketNum = Maps.newHashMap();
+ backendBucketsSeq.values().forEach(tagBackendIds ->
+ tagBackendIds.forEach(backendIds ->
+ backendIds.forEach(backendId -> backendToBucketNum.put(
+ backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1))));
+
+ for (Tag tag : backendBucketsSeq.keySet()) {
+ List> tagBackendBucketsSeq = backendBucketsSeq.get(tag);
+ int oldReplicaNum = tagBackendBucketsSeq.get(0).size();
+ for (List backendIdsOneBucket : tagBackendBucketsSeq) {
+ Preconditions.checkState(backendIdsOneBucket.size() == oldReplicaNum,
+ backendIdsOneBucket.size() + " vs " + oldReplicaNum);
+ }
+
+ int newReplicaNum = allocMap.get(tag);
+ if (newReplicaNum == oldReplicaNum) {
+ continue;
+ }
+
+ List backends = Env.getCurrentSystemInfo().getBackendsByTag(tag);
+ Set availableBeIds = backends.stream().filter(be -> be.isScheduleAvailable())
+ .map(be -> be.getId()).collect(Collectors.toSet());
+
+ for (Long backendId : availableBeIds) {
+ if (!backendToBucketNum.containsKey(backendId)) {
+ backendToBucketNum.put(backendId, 0);
+ }
+ }
+
+ for (int i = 0; i < tagBackendBucketsSeq.size(); i++) {
+ modifyGroupBucketReplicas(tag, newReplicaNum, tagBackendBucketsSeq.get(i),
+ availableBeIds, backendToBucketNum);
+ }
+ }
+ }
+
+ private static void modifyGroupBucketReplicas(Tag tag, int newReplicaNum, List backendIds,
+ Set availableBeIds, Map backendToBucketNum) throws Exception {
+ final boolean smallIdFirst = Math.random() < 0.5;
+ if (backendIds.size() > newReplicaNum) {
+ backendIds.sort((id1, id2) -> {
+ boolean alive1 = availableBeIds.contains(id1);
+ boolean alive2 = availableBeIds.contains(id2);
+ if (alive1 != alive2) {
+ return alive1 ? -1 : 1;
+ }
+ int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+ int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+ if (bucketNum1 != bucketNum2) {
+ return Integer.compare(bucketNum1, bucketNum2);
+ }
+
+ return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1);
+ });
+
+ for (int i = backendIds.size() - 1; i >= newReplicaNum; i--) {
+ long backendId = backendIds.get(i);
+ backendIds.remove(i);
+ backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) - 1);
+ }
+ }
+
+ if (backendIds.size() < newReplicaNum) {
+ Set candBackendSet = Sets.newHashSet();
+ candBackendSet.addAll(availableBeIds);
+ candBackendSet.removeAll(backendIds);
+ if (backendIds.size() + candBackendSet.size() < newReplicaNum) {
+ throw new UserException("Can not add backend for tag: " + tag);
+ }
+
+ List candBackendList = Lists.newArrayList(candBackendSet);
+ candBackendList.sort((id1, id2) -> {
+ int bucketNum1 = backendToBucketNum.getOrDefault(id1, 0);
+ int bucketNum2 = backendToBucketNum.getOrDefault(id2, 0);
+ if (bucketNum1 != bucketNum2) {
+ return Integer.compare(bucketNum1, bucketNum2);
+ }
+
+ return smallIdFirst ? Long.compare(id1, id2) : Long.compare(id2, id1);
+ });
+
+ int addNum = newReplicaNum - backendIds.size();
+ for (int i = 0; i < addNum; i++) {
+ long backendId = candBackendList.get(i);
+ backendIds.add(backendId);
+ backendToBucketNum.put(backendId, backendToBucketNum.getOrDefault(backendId, 0) + 1);
+ }
+ }
+
+ Preconditions.checkState(newReplicaNum == backendIds.size(),
+ newReplicaNum + " vs " + backendIds.size());
+ }
+
private List> getSortedBackendReplicaNumPairs(List allAvailBackendIds,
Set unavailBackendIds, LoadStatisticForTag statistic, List flatBackendsPerBucketSeq) {
// backend id -> replica num, and sorted by replica num, descending.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index d6a8e1efa0e8f6..ee9da3ac10008f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -19,6 +19,7 @@
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
@@ -490,15 +491,20 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throw new SchedException(Status.UNRECOVERABLE, "index does not exist");
}
+ ReplicaAllocation replicaAlloc = null;
Tablet tablet = idx.getTablet(tabletId);
Preconditions.checkNotNull(tablet);
- ReplicaAllocation replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
-
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
if (groupId == null) {
throw new SchedException(Status.UNRECOVERABLE, "colocate group does not exist");
}
+ ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ throw new SchedException(Status.UNRECOVERABLE,
+ "colocate group schema " + groupId + " does not exist");
+ }
+ replicaAlloc = groupSchema.getReplicaAlloc();
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
if (tabletOrderIdx == -1) {
@@ -512,6 +518,7 @@ private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
+ replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
List aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
@@ -1484,14 +1491,18 @@ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
return;
}
- replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
boolean isColocateTable = colocateTableIndex.isColocateTable(tbl.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
if (groupId == null) {
return;
}
+ ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ return;
+ }
+ replicaAlloc = groupSchema.getReplicaAlloc();
int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
if (tabletOrderIdx == -1) {
tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
@@ -1504,6 +1515,7 @@ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
+ replicaAlloc = tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
List aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
infoService, partition.getVisibleVersion(), replicaAlloc, aliveBeIds);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index 93f54483cbfa5e..3ce3ff74c7adaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.proc;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -185,6 +186,10 @@ static class DBTabletStatistic {
++tabletNum;
Tablet.TabletStatus res = null;
if (groupId != null) {
+ ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema != null) {
+ replicaAlloc = groupSchema.getReplicaAlloc();
+ }
Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i);
res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
backendsSet);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
index 9e51d38de5cfcc..b7c2a615aac280 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/ColocateMetaService.java
@@ -114,7 +114,7 @@ public Object group_stable(HttpServletRequest request, HttpServletResponse respo
if ("POST".equalsIgnoreCase(method)) {
colocateIndex.markGroupUnstable(groupId, "mark unstable via http api", true);
} else if ("DELETE".equalsIgnoreCase(method)) {
- colocateIndex.markGroupStable(groupId, true);
+ colocateIndex.markGroupStable(groupId, true, null);
}
return ResponseEntityBuilder.ok();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index ce2768b46eac29..80ee1a19aac5f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -469,6 +469,7 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
+ case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC:
case OperationType.OP_COLOCATE_ADD_TABLE:
case OperationType.OP_COLOCATE_REMOVE_TABLE:
case OperationType.OP_COLOCATE_BACKENDS_PER_BUCKETSEQ:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 2e6be68604160a..4461ba19473a02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -19,6 +19,7 @@
import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -1172,6 +1173,10 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI
int tabletOrderIdx = materializedIndex.getTabletOrderIdx(tabletId);
Preconditions.checkState(tabletOrderIdx != -1, "get tablet materializedIndex for %s fail", tabletId);
Set backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+ ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(groupId);
+ if (groupSchema != null) {
+ replicaAlloc = groupSchema.getReplicaAlloc();
+ }
TabletStatus status =
tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet);
if (status == TabletStatus.HEALTHY) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
index 459be6460524ea..429d4e0e1a6b94 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ColocatePersistInfo.java
@@ -18,6 +18,7 @@
package org.apache.doris.persist;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -45,29 +46,38 @@ public class ColocatePersistInfo implements Writable {
private long tableId;
@SerializedName(value = "backendsPerBucketSeq")
private Map>> backendsPerBucketSeq = Maps.newHashMap();
+ @SerializedName(value = "replicaAlloc")
+ private ReplicaAllocation replicaAlloc = new ReplicaAllocation();
- private ColocatePersistInfo(GroupId groupId, long tableId, Map>> backendsPerBucketSeq) {
+ private ColocatePersistInfo(GroupId groupId, long tableId, Map>> backendsPerBucketSeq,
+ ReplicaAllocation replicaAlloc) {
this.groupId = groupId;
this.tableId = tableId;
this.backendsPerBucketSeq = backendsPerBucketSeq;
+ this.replicaAlloc = replicaAlloc;
}
public static ColocatePersistInfo createForAddTable(GroupId groupId, long tableId,
Map>> backendsPerBucketSeq) {
- return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq);
+ return new ColocatePersistInfo(groupId, tableId, backendsPerBucketSeq, new ReplicaAllocation());
}
public static ColocatePersistInfo createForBackendsPerBucketSeq(GroupId groupId,
Map>> backendsPerBucketSeq) {
- return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq);
+ return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, new ReplicaAllocation());
}
public static ColocatePersistInfo createForMarkUnstable(GroupId groupId) {
- return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+ return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new ReplicaAllocation());
}
public static ColocatePersistInfo createForMarkStable(GroupId groupId) {
- return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
+ return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap(), new ReplicaAllocation());
+ }
+
+ public static ColocatePersistInfo createForModifyReplicaAlloc(GroupId groupId, ReplicaAllocation replicaAlloc,
+ Map>> backendsPerBucketSeq) {
+ return new ColocatePersistInfo(groupId, -1L, backendsPerBucketSeq, replicaAlloc);
}
public static ColocatePersistInfo read(DataInput in) throws IOException {
@@ -87,6 +97,10 @@ public Map>> getBackendsPerBucketSeq() {
return backendsPerBucketSeq;
}
+ public ReplicaAllocation getReplicaAlloc() {
+ return replicaAlloc;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -129,7 +143,7 @@ public boolean equals(Object obj) {
ColocatePersistInfo info = (ColocatePersistInfo) obj;
return tableId == info.tableId && groupId.equals(info.groupId) && backendsPerBucketSeq.equals(
- info.backendsPerBucketSeq);
+ info.backendsPerBucketSeq) && replicaAlloc.equals(info.replicaAlloc);
}
@Override
@@ -138,6 +152,7 @@ public String toString() {
sb.append("table id: ").append(tableId);
sb.append(" group id: ").append(groupId);
sb.append(" backendsPerBucketSeq: ").append(backendsPerBucketSeq);
+ sb.append(" replicaAlloc: ").append(replicaAlloc);
return sb.toString();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 014a3c79feaa17..6e714eb3bfcb9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -605,6 +605,11 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
env.getColocateTableIndex().replayMarkGroupStable(info);
break;
}
+ case OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC: {
+ final ColocatePersistInfo info = (ColocatePersistInfo) journal.getData();
+ env.getColocateTableIndex().replayModifyReplicaAlloc(info);
+ break;
+ }
case OperationType.OP_MODIFY_TABLE_COLOCATE: {
final TablePropertyInfo info = (TablePropertyInfo) journal.getData();
env.replayModifyTableColocate(info);
@@ -1548,6 +1553,10 @@ public void logTruncateTable(TruncateTableInfo info) {
Env.getCurrentEnv().getBinlogManager().addTruncateTable(info, logId);
}
+ public void logColocateModifyRepliaAlloc(ColocatePersistInfo info) {
+ logEdit(OperationType.OP_COLOCATE_MOD_REPLICA_ALLOC, info);
+ }
+
public void logColocateAddTable(ColocatePersistInfo info) {
logEdit(OperationType.OP_COLOCATE_ADD_TABLE, info);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 27cb57d214da9e..c5e784cb9d7eff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -189,6 +189,7 @@ public class OperationType {
// modify database/table/tablet/replica meta
public static final short OP_SET_REPLICA_VERSION = 141;
+ public static final short OP_COLOCATE_MOD_REPLICA_ALLOC = 142;
// routine load 200
public static final short OP_CREATE_ROUTINE_LOAD_JOB = 200;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 9056ac4e802ec5..2af8feda27a5a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -31,6 +31,7 @@
import org.apache.doris.analysis.AdminSetTableStatusStmt;
import org.apache.doris.analysis.AlterCatalogNameStmt;
import org.apache.doris.analysis.AlterCatalogPropertyStmt;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
@@ -319,6 +320,8 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
+ } else if (ddlStmt instanceof AlterColocateGroupStmt) {
+ env.getColocateTableIndex().alterColocateGroup((AlterColocateGroupStmt) ddlStmt);
} else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
env.getWorkloadGroupMgr().alterWorkloadGroup((AlterWorkloadGroupStmt) ddlStmt);
} else if (ddlStmt instanceof CreatePolicyStmt) {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 0f8eaa5d9bc4d4..f743d5edd4e6b0 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -147,6 +147,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("clusters", new Integer(SqlParserSymbols.KW_CLUSTERS));
keywordMap.put("collate", new Integer(SqlParserSymbols.KW_COLLATE));
keywordMap.put("collation", new Integer(SqlParserSymbols.KW_COLLATION));
+ keywordMap.put("colocate", new Integer(SqlParserSymbols.KW_COLOCATE));
keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN));
keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS));
keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
index a3ddd9991a89e3..d681376d24061b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.alter;
+import org.apache.doris.analysis.AlterColocateGroupStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
@@ -26,6 +27,8 @@
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.analysis.ShowCreateMaterializedViewStmt;
+import org.apache.doris.catalog.ColocateGroupSchema;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
@@ -36,10 +39,13 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.Type;
+import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -47,6 +53,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
@@ -70,18 +77,36 @@ public class AlterTest {
private static String runningDir = "fe/mocked/AlterTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
- private static Backend be;
+
+ private static Map backendTags;
@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.runningUnitTest = true;
FeConstants.default_scheduler_interval_millisecond = 100;
+ FeConstants.tablet_checker_interval_ms = 100;
+ FeConstants.tablet_checker_interval_ms = 100;
Config.dynamic_partition_check_interval_seconds = 1;
Config.disable_storage_medium_check = true;
Config.enable_storage_policy = true;
- UtFrameUtils.createDorisCluster(runningDir);
+ Config.disable_balance = true;
+ Config.schedule_batch_size = 400;
+ Config.schedule_slot_num_per_hdd_path = 100;
+ UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5);
+
+ List backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+
+ Map tagMap = Maps.newHashMap();
+ tagMap.put(Tag.TYPE_LOCATION, "group_a");
+ backends.get(2).setTagMap(tagMap);
+ backends.get(3).setTagMap(tagMap);
+
+ tagMap = Maps.newHashMap();
+ tagMap.put(Tag.TYPE_LOCATION, "group_b");
+ backends.get(4).setTagMap(tagMap);
- be = Env.getCurrentSystemInfo().getIdToBackend().values().asList().get(0);
+ backendTags = Maps.newHashMap();
+ backends.forEach(be -> backendTags.put(be.getId(), be.getLocationTag()));
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
@@ -435,21 +460,16 @@ public void testConflictAlterOperations() throws Exception {
// set un-partitioned table's real replication num
// first we need to change be's tag
- Map originTagMap = be.getTagMap();
- Map tagMap = Maps.newHashMap();
- tagMap.put(Tag.TYPE_LOCATION, "group1");
- be.setTagMap(tagMap);
OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("tbl2");
Partition partition = tbl2.getPartition(tbl2.getName());
Assert.assertEquals(Short.valueOf("1"),
Short.valueOf(tbl2.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()));
- stmt = "alter table test.tbl2 set ('replication_allocation' = 'tag.location.group1:1');";
+ stmt = "alter table test.tbl2 set ('replication_allocation' = 'tag.location.group_a:1');";
alterTable(stmt, false);
Assert.assertEquals((short) 1, (short) tbl2.getPartitionInfo().getReplicaAllocation(partition.getId())
- .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group1")));
+ .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group_a")));
Assert.assertEquals((short) 1, (short) tbl2.getTableProperty().getReplicaAllocation()
- .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group1")));
- be.setTagMap(originTagMap);
+ .getReplicaNumByTag(Tag.createNotCheck(Tag.TYPE_LOCATION, "group_a")));
Thread.sleep(5000); // sleep to wait dynamic partition scheduler run
// add partition without set replication num, and default num is 3.
@@ -1202,6 +1222,145 @@ public void testDropInUseResource() throws Exception {
Env.getCurrentEnv().getResourceMgr().dropResource(stmt);
}
+ @Test
+ public void testModifyColocateGroupReplicaAlloc() throws Exception {
+ Config.enable_round_robin_create_tablet = true;
+
+ createTable("CREATE TABLE test.col_tbl0\n" + "(\n" + " k1 date,\n" + " k2 int,\n" + " v1 int \n"
+ + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n"
+ + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+ + "PROPERTIES('replication_num' = '2', 'colocate_with' = 'mod_group_0');");
+
+ createTable("CREATE TABLE test.col_tbl1\n" + "(\n" + " k1 date,\n" + " k2 int,\n" + " v1 int \n"
+ + ") ENGINE=OLAP\n" + "UNIQUE KEY (k1,k2)\n" + "PARTITION BY RANGE(k1)\n" + "(\n"
+ + " PARTITION p1 values less than('2020-02-01'),\n"
+ + " PARTITION p2 values less than('2020-03-01')\n" + ")\n" + "DISTRIBUTED BY HASH(k2) BUCKETS 4\n"
+ + "PROPERTIES('replication_num' = '2', 'colocate_with' = 'mod_group_1');");
+
+ createTable("CREATE TABLE test.col_tbl2 (\n"
+ + "`uuid` varchar(255) NULL,\n"
+ + "`action_datetime` date NULL\n"
+ + ")\n"
+ + "DUPLICATE KEY(uuid)\n"
+ + "PARTITION BY RANGE(action_datetime)()\n"
+ + "DISTRIBUTED BY HASH(uuid) BUCKETS 4\n"
+ + "PROPERTIES\n"
+ + "(\n"
+ + "\"replication_num\" = \"2\",\n"
+ + "\"colocate_with\" = \"mod_group_2\",\n"
+ + "\"dynamic_partition.enable\" = \"true\",\n"
+ + "\"dynamic_partition.time_unit\" = \"DAY\",\n"
+ + "\"dynamic_partition.end\" = \"2\",\n"
+ + "\"dynamic_partition.prefix\" = \"p\",\n"
+ + "\"dynamic_partition.buckets\" = \"4\",\n"
+ + "\"dynamic_partition.replication_num\" = \"2\"\n"
+ + ");\n");
+
+ Env env = Env.getCurrentEnv();
+ Database db = env.getInternalCatalog().getDbOrMetaException("default_cluster:test");
+ OlapTable tbl2 = (OlapTable) db.getTableOrMetaException("col_tbl2");
+ for (int j = 0; true; j++) {
+ Thread.sleep(2000);
+ if (tbl2.getAllPartitions().size() > 0) {
+ break;
+ }
+ if (j >= 5) {
+ Assert.assertTrue("dynamic table not create partition", false);
+ }
+ }
+
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ ReplicaAllocation newReplicaAlloc = new ReplicaAllocation();
+ newReplicaAlloc.put(Tag.DEFAULT_BACKEND_TAG, (short) 1);
+ newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_a"), (short) 1);
+ newReplicaAlloc.put(Tag.create(Tag.TYPE_LOCATION, "group_b"), (short) 1);
+
+ for (int i = 0; i < 3; i++) {
+ String groupName = "mod_group_" + i;
+ String sql = "alter colocate group test." + groupName
+ + " set ( 'replication_allocation' = '" + newReplicaAlloc.toCreateStmt() + "')";
+ String fullGroupName = GroupId.getFullGroupName(db.getId(), groupName);
+ AlterColocateGroupStmt stmt = (AlterColocateGroupStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ DdlExecutor.execute(env, stmt);
+
+ ColocateGroupSchema groupSchema = env.getColocateTableIndex().getGroupSchema(fullGroupName);
+ Assert.assertNotNull(groupSchema);
+ Assert.assertEquals(newReplicaAlloc, groupSchema.getReplicaAlloc());
+
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" + i);
+ Assert.assertEquals(newReplicaAlloc, tbl.getDefaultReplicaAllocation());
+ for (Partition partition : tbl.getAllPartitions()) {
+ Assert.assertEquals(newReplicaAlloc,
+ tbl.getPartitionInfo().getReplicaAllocation(partition.getId()));
+ }
+
+ if (i == 2) {
+ Assert.assertEquals(newReplicaAlloc,
+ tbl.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation());
+ }
+ }
+
+ Config.enable_round_robin_create_tablet = false;
+
+ for (int k = 0; true; k++) {
+ Thread.sleep(1000); // sleep to wait dynamic partition scheduler run
+ boolean allStable = true;
+ for (int i = 0; i < 3; i++) {
+ String fullGroupName = GroupId.getFullGroupName(db.getId(), "mod_group_" + i);
+ ColocateGroupSchema groupSchema = env.getColocateTableIndex().getGroupSchema(fullGroupName);
+ Assert.assertNotNull(groupSchema);
+
+ if (env.getColocateTableIndex().isGroupUnstable(groupSchema.getGroupId())) {
+ allStable = false;
+ if (k >= 120) {
+ Assert.assertTrue(fullGroupName + " is unstable", false);
+ }
+ continue;
+ }
+
+ Map backendReplicaNum = Maps.newHashMap();
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("col_tbl" + i);
+ int tabletNum = 0;
+ for (Partition partition : tbl.getAllPartitions()) {
+ for (MaterializedIndex idx : partition.getMaterializedIndices(
+ MaterializedIndex.IndexExtState.VISIBLE)) {
+ for (Tablet tablet : idx.getTablets()) {
+ Map allocMap = Maps.newHashMap();
+ tabletNum++;
+ for (Replica replica : tablet.getReplicas()) {
+ long backendId = replica.getBackendId();
+ Tag tag = backendTags.get(backendId);
+ Assert.assertNotNull(tag);
+ short oldNum = allocMap.getOrDefault(tag, (short) 0);
+ allocMap.put(tag, (short) (oldNum + 1));
+ backendReplicaNum.put(backendId, backendReplicaNum.getOrDefault(backendId, 0) + 1);
+ }
+ Assert.assertEquals(newReplicaAlloc.getAllocMap(), allocMap);
+ }
+ }
+ }
+
+ Assert.assertTrue(tabletNum > 0);
+
+ for (Map.Entry entry : backendReplicaNum.entrySet()) {
+ long backendId = entry.getKey();
+ int replicaNum = entry.getValue();
+ Tag tag = backendTags.get(backendId);
+ int sameTagReplicaNum = tabletNum * newReplicaAlloc.getAllocMap().getOrDefault(tag, (short) 0);
+ int sameTagBeNum = (int) (backendTags.values().stream().filter(t -> t.equals(tag)).count());
+ Assert.assertEquals("backend " + backendId + " failed: " + " all backend replica num: "
+ + backendReplicaNum + ", all backend tag: " + backendTags,
+ sameTagReplicaNum / sameTagBeNum, replicaNum);
+ }
+ }
+
+ if (allStable) {
+ break;
+ }
+ }
+ }
+
@Test
public void testShowMV() throws Exception {
createMV("CREATE MATERIALIZED VIEW test_mv as select k1 from test.show_test group by k1;", false);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 407171a69c7ec9..8bb8581fd85220 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -255,12 +255,16 @@ public static void createDorisClusterWithMultiTag(String runningDir, int backend
FeConstants.runningUnitTest = true;
FeConstants.enableInternalSchemaDb = false;
int feRpcPort = startFEServer(runningDir);
+ List bes = Lists.newArrayList();
for (int i = 0; i < backendNum; i++) {
String host = "127.0.0." + (i + 1);
createBackend(host, feRpcPort);
}
+ System.out.println("after create backend");
+ checkBEHeartbeat(bes);
// sleep to wait first heartbeat
- Thread.sleep(6000);
+ // Thread.sleep(6000);
+ System.out.println("after create backend2");
}
public static Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
@@ -296,6 +300,7 @@ beHttpPort, beArrowFlightSqlPort, new DefaultHeartbeatServiceImpl(beThriftPort,
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setDataUsedCapacityB(480000);
+ diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
diff --git a/regression-test/suites/alter_p2/test_alter_colocate_group.groovy b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
new file mode 100644
index 00000000000000..1f5b8496630b40
--- /dev/null
+++ b/regression-test/suites/alter_p2/test_alter_colocate_group.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("test_alter_colocate_group") {
+ sql "DROP DATABASE IF EXISTS test_alter_colocate_group_db FORCE"
+ test {
+ sql """
+ ALTER COLOCATE GROUP test_alter_colocate_group_db.bad_group_1
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "unknown databases"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP bad_group_2
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "Not found colocate group `default_cluster:regression_test_alter_p2`.`bad_group_2`"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP bad_db.__global__bad_group_3
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "group that name starts with `__global__` is a global group, it doesn't belong to any specific database"
+ }
+ test {
+ sql """
+ ALTER COLOCATE GROUP __global__bad_group_4
+ SET ( "replication_num" = "1" );
+ """
+
+ exception "Not found colocate group `__global__bad_group_4`"
+ }
+
+ sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+
+ sql """
+ CREATE TABLE tbl1
+ (
+ k1 int,
+ k2 int
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 6
+ PROPERTIES
+ (
+ "colocate_with" = "group_1",
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE tbl2
+ (
+ k1 date,
+ k2 int
+ )
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 values less than('2020-02-01'),
+ PARTITION p2 values less than('2020-03-01')
+ )
+ DISTRIBUTED BY HASH(k2) BUCKETS 5
+ PROPERTIES
+ (
+ "colocate_with" = "group_2",
+ "replication_num" = "1"
+ );
+ """
+
+ sql """
+ CREATE TABLE tbl3
+ (
+ `uuid` varchar(255) NULL,
+ `action_datetime` date NULL
+ )
+ DUPLICATE KEY(uuid)
+ PARTITION BY RANGE(action_datetime)()
+ DISTRIBUTED BY HASH(uuid) BUCKETS 4
+ PROPERTIES
+ (
+ "colocate_with" = "group_3",
+ "replication_num" = "1",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.end" = "2",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "4",
+ "dynamic_partition.replication_num" = "1"
+ );
+ """
+
+ def checkGroupsReplicaAlloc = { groupName, replicaNum ->
+ // groupName -> replicaAlloc
+ def allocMap = [:]
+ def groups = sql """ show proc "/colocation_group" """
+ for (def group : groups) {
+ allocMap[group[1]] = group[4]
+ }
+
+ assertEquals("tag.location.default: ${replicaNum}".toString(), allocMap[groupName])
+ }
+
+ def checkTableReplicaAlloc = { tableName, hasDynamicPart, replicaNum ->
+ def result = sql """ show create table ${tableName} """
+ def createTbl = result[0][1].toString()
+ assertTrue(createTbl.indexOf("\"replication_allocation\" = \"tag.location.default: ${replicaNum}\"") > 0)
+ if (hasDynamicPart) {
+ assertTrue(createTbl.indexOf(
+ "\"dynamic_partition.replication_allocation\" = \"tag.location.default: ${replicaNum}\"") > 0)
+ }
+
+ result = sql """ show partitions from ${tableName} """
+ assertTrue(result.size() > 0)
+ for (int i = 0; i < result.size(); i++) {
+ assertEquals("${replicaNum}".toString(), result[i][9].toString())
+ }
+ }
+
+ for (int i = 1; i <= 3; i++) {
+ def groupName = "regression_test_alter_p2.group_${i}"
+ checkGroupsReplicaAlloc(groupName, 1)
+
+ def tableName = "tbl${i}"
+ def hasDynamicPart = i == 3
+ checkTableReplicaAlloc(tableName, hasDynamicPart, 1)
+
+ test {
+ sql """
+ ALTER COLOCATE GROUP ${groupName}
+ SET ( "replication_num" = "100" );
+ """
+
+ exception "Failed to find enough host"
+ }
+
+ test {
+ sql """
+ ALTER COLOCATE GROUP ${groupName}
+ SET ( "replication_num" = "3" );
+ """
+ }
+
+ checkGroupsReplicaAlloc(groupName, 3)
+ checkTableReplicaAlloc(tableName, hasDynamicPart, 3)
+ }
+
+ sql " DROP TABLE IF EXISTS tbl1 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl2 FORCE; "
+ sql " DROP TABLE IF EXISTS tbl3 FORCE; "
+}