Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -247,6 +249,7 @@ protected void runPendingJob() throws AlterCancelException {
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
Expand Down Expand Up @@ -291,7 +294,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down Expand Up @@ -401,6 +404,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
Expand Down Expand Up @@ -479,7 +483,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
whereClause);
whereClause, objectPool);
rollupBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@

import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -238,6 +240,7 @@ protected void runPendingJob() throws AlterCancelException {

Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
Expand Down Expand Up @@ -286,7 +289,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
tbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
Expand Down Expand Up @@ -410,7 +413,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
}

tbl.readLock();

Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
Expand Down Expand Up @@ -468,7 +471,8 @@ protected void runWaitingTxnJob() throws AlterCancelException {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null);
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null,
objectPool);
schemaChangeBatchTask.addTask(rollupTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DescriptorTable {

private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>();

private TDescriptorTable thriftDescTable = null; // serialized version of this

public DescriptorTable() {
}

Expand Down Expand Up @@ -182,6 +184,9 @@ public void materializeIntermediateSlots() {
}

public TDescriptorTable toThrift() {
if (thriftDescTable != null) {
return thriftDescTable;
}
TDescriptorTable result = new TDescriptorTable();
Map<Long, TableIf> referencedTbls = Maps.newHashMap();
for (TupleDescriptor tupleD : tupleDescs.values()) {
Expand All @@ -208,6 +213,7 @@ public TDescriptorTable toThrift() {
for (TableIf tbl : referencedTbls.values()) {
result.addToTableDescriptors(tbl.toThrift());
}
thriftDescTable = result;
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1035,6 +1036,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
} finally {
localTbl.readUnlock();
}
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
Expand Down Expand Up @@ -1067,7 +1069,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
localTbl.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

task.setInRestoreMode(true);
batchTask.addTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long

short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
Expand Down Expand Up @@ -1909,7 +1910,7 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long
compactionPolicy, timeSeriesCompactionGoalSizeMbytes,
timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds,
timeSeriesCompactionEmptyRowsetsThreshold, timeSeriesCompactionLevelThreshold,
storeRowColumn, isDynamicSchema, binlogConfig);
storeRowColumn, isDynamicSchema, binlogConfig, objectPool);

task.setStorageFormat(storageFormat);
batchTask.addTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down Expand Up @@ -827,7 +828,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class AlterReplicaTask extends AgentTask {
private Expr whereClause;
private DescriptorTable descTable;
private List<Column> baseSchemaColumns;
private Map<Object, Object> objectPool;

/**
* AlterReplicaTask constructor.
Expand All @@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause) {
DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);

this.baseTabletId = baseTabletId;
Expand All @@ -79,6 +81,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionI
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
this.objectPool = objectPool;
}

public long getBaseTabletId() {
Expand Down Expand Up @@ -125,27 +128,47 @@ public TAlterTabletReqV2 toThrift() {
}
if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setOriginColumnName(slots.get(0).getColumnName());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(entry.getKey());
if (value == null) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setOriginColumnName(slots.get(0).getColumnName());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(entry.getKey(), mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
}
if (whereClause != null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(Column.WHERE_SIGN);
if (value == null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
req.setDescTbl(descTable.toThrift());

if (baseSchemaColumns != null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
Object value = objectPool.get(baseSchemaColumns);
if (value == null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
req.setColumns(columns);
objectPool.put(baseSchemaColumns, columns);
} else {
List<TColumn> columns = (List<TColumn>) value;
req.setColumns(columns);
}
req.setColumns(columns);
}
return req;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -121,6 +122,8 @@ public class CreateReplicaTask extends AgentTask {

private BinlogConfig binlogConfig;

private Map<Object, Object> objectPool;

public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
Expand All @@ -143,7 +146,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
boolean isDynamicSchema,
BinlogConfig binlogConfig) {
BinlogConfig binlogConfig,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);

this.replicaId = replicaId;
Expand Down Expand Up @@ -188,6 +192,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
this.objectPool = objectPool;
}

public void setIsRecoverTask(boolean isRecoverTask) {
Expand Down Expand Up @@ -248,21 +253,32 @@ public TCreateTabletReq toThrift() {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
List<TColumn> tColumns = null;
Object tCols = objectPool.get(columns);
if (tCols != null) {
tColumns = (List<TColumn>) tCols;
} else {
tColumns = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
}
objectPool.put(columns, tColumns);
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
Expand All @@ -279,9 +295,15 @@ public TCreateTabletReq toThrift() {
tSchema.setVersionColIdx(versionCol);

if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
List<TOlapTableIndex> tIndexes = null;
Object value = objectPool.get(indexes);
if (value != null) {
tIndexes = (List<TOlapTableIndex>) value;
} else {
tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
}
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public void setUp() throws AnalysisException {
range2 = Range.closedOpen(pk2, pk3);

// create tasks

Map<Object, Object> objectPool = new HashMap<Object, Object>();
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null);
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, false, null, objectPool);

// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);
Expand Down