Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
Expand All @@ -86,6 +85,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -388,7 +388,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
Expand All @@ -401,14 +401,13 @@ protected void runWaitingTxnJob() throws AlterCancelException {
// the rollup task will transform the data before visible version(included).
long visibleVersion = partition.getVisibleVersion();

Map<String, Expr> defineExprs = Maps.newHashMap();
MaterializedIndex rollupIndex = entry.getValue();
Map<Long, Long> tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
long rollupTabletId = rollupTablet.getId();
long baseTabletId = tabletIdMap.get(rollupTabletId);

Map<String, Expr> defineExprs = Maps.newHashMap();

DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
Map<String, SlotDescriptor> descMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
Expand Down Expand Up @@ -470,7 +469,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
tcloumnsPool, whereClause, expiration, vaultId);
objectPool, whereClause, expiration, vaultId);
rollupBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
Expand All @@ -76,6 +75,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -418,7 +418,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
Expand Down Expand Up @@ -478,12 +478,13 @@ protected void runWaitingTxnJob() throws AlterCancelException {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool,
null, expiration, vaultId);
schemaChangeBatchTask.addTask(rollupTask);
}
}
}

} // end for partitions
} finally {
tbl.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* The new replica can be a rollup replica, or a shadow replica of schema change.
*/
public class AlterReplicaTask extends AgentTask {

private long baseTabletId;
private long newReplicaId;
private int baseSchemaHash;
Expand All @@ -53,7 +54,7 @@ public class AlterReplicaTask extends AgentTask {
private Map<String, Expr> defineExprs;
private Expr whereClause;
private DescriptorTable descTable;
private Map<Object, List<TColumn>> tcloumnsPool;
private Map<Object, Object> objectPool;
private List<Column> baseSchemaColumns;

private long expiration;
Expand All @@ -67,7 +68,7 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, List<TColumn>> tcloumnsPool,
DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, Object> objectPool,
Expr whereClause, long expiration, String vaultId) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);

Expand All @@ -85,7 +86,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
this.tcloumnsPool = tcloumnsPool;
this.objectPool = objectPool;
this.expiration = expiration;
this.vaultId = vaultId;
}
Expand Down Expand Up @@ -134,32 +135,51 @@ public TAlterTabletReqV2 toThrift() {
default:
break;
}

if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(entry.getKey());
if (value == null) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(entry.getKey(), mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
}

if (whereClause != null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(Column.WHERE_SIGN);
if (value == null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(Column.WHERE_SIGN, mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
req.setDescTbl(descTable.toThrift());

if (baseSchemaColumns != null) {
List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
if (columns == null) {
columns = new ArrayList<TColumn>();
Object value = objectPool.get(baseSchemaColumns);
if (value == null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
tcloumnsPool.put(baseSchemaColumns, columns);
objectPool.put(baseSchemaColumns, columns);
req.setColumns(columns);
} else {
List<TColumn> columns = (List<TColumn>) value;
req.setColumns(columns);
}
req.setColumns(columns);
}
req.setStorageVaultId(this.vaultId);
return req;
Expand Down