diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 6312735473936c..94d1bb2fe031e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -154,6 +154,92 @@ public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws Dd } } + private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List alterClauses, + final String clusterName, Database db) throws UserException { + stmt.rewriteAlterClause(olapTable); + + // check conflict alter ops first + alterClauses.addAll(stmt.getOps()); + AlterOperations currentAlterOps = new AlterOperations(); + currentAlterOps.checkConflict(alterClauses); + + // check cluster capacity and db quota, only need to check once. + if (currentAlterOps.needCheckCapacity()) { + Catalog.getCurrentSystemInfo().checkClusterCapacity(clusterName); + db.checkQuota(); + } + + if (olapTable.getState() != OlapTableState.NORMAL) { + throw new DdlException( + "Table[" + olapTable.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops"); + } + + boolean needProcessOutsideDatabaseLock = false; + if (currentAlterOps.hasSchemaChangeOp()) { + // if modify storage type to v2, do schema change to convert all related tablets to segment v2 format + schemaChangeHandler.process(alterClauses, clusterName, db, olapTable); + } else if (currentAlterOps.hasRollupOp()) { + materializedViewHandler.process(alterClauses, clusterName, db, olapTable); + } else if (currentAlterOps.hasPartitionOp()) { + Preconditions.checkState(alterClauses.size() == 1); + AlterClause alterClause = alterClauses.get(0); + if (alterClause instanceof DropPartitionClause) { + if (!((DropPartitionClause) alterClause).isTempPartition()) { + DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(olapTable.getName())); + } + Catalog.getCurrentCatalog().dropPartition(db, olapTable, ((DropPartitionClause) alterClause)); + } else if (alterClause instanceof ReplacePartitionClause) { + Catalog.getCurrentCatalog().replaceTempPartition(db, olapTable.getName(), (ReplacePartitionClause) alterClause); + } else if (alterClause instanceof ModifyPartitionClause) { + ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause); + // expand the partition names if it is 'Modify Partition(*)' + if (clause.isNeedExpand()) { + List partitionNames = clause.getPartitionNames(); + partitionNames.clear(); + for (Partition partition : olapTable.getPartitions()) { + partitionNames.add(partition.getName()); + } + } + Map properties = clause.getProperties(); + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) { + needProcessOutsideDatabaseLock = true; + } else { + List partitionNames = clause.getPartitionNames(); + modifyPartitionsProperty(db, olapTable, partitionNames, properties); + } + } else if (alterClause instanceof AddPartitionClause) { + needProcessOutsideDatabaseLock = true; + } else { + throw new DdlException("Invalid alter operation: " + alterClause.getOpType()); + } + } else if (currentAlterOps.hasRenameOp()) { + processRename(db, olapTable, alterClauses); + } else if (currentAlterOps.hasReplaceTableOp()) { + processReplaceTable(db, olapTable, alterClauses); + } else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) { + needProcessOutsideDatabaseLock = true; + } else { + throw new DdlException("Invalid alter operations: " + currentAlterOps); + } + + return needProcessOutsideDatabaseLock; + } + + private void processAlterExternalTable(AlterTableStmt stmt, Table externalTable, Database db) throws UserException { + stmt.rewriteAlterClause(externalTable); + + // check conflict alter ops first + List alterClauses = stmt.getOps(); + AlterOperations currentAlterOps = new AlterOperations(); + currentAlterOps.checkConflict(alterClauses); + + if (currentAlterOps.hasRenameOp()) { + processRename(db, externalTable, alterClauses); + } else if (currentAlterOps.hasSchemaChangeOp()) { + schemaChangeHandler.processExternalTable(alterClauses, db, externalTable); + } + } + public void processAlterTable(AlterTableStmt stmt) throws UserException { TableName dbTableName = stmt.getTbl(); String dbName = dbTableName.getDb(); @@ -163,7 +249,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException { if (db == null) { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName); } - List alterClauses; + List alterClauses = Lists.newArrayList(); // some operations will take long time to process, need to be done outside the database lock boolean needProcessOutsideDatabaseLock = false; @@ -175,73 +261,18 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName); } - if (table.getType() != TableType.OLAP) { - throw new DdlException("Do not support alter non-OLAP table[" + tableName + "]"); - } - OlapTable olapTable = (OlapTable) table; - stmt.rewriteAlterClause(olapTable); - - // check conflict alter ops first - alterClauses = stmt.getOps(); - AlterOperations currentAlterOps = new AlterOperations(); - currentAlterOps.checkConflict(alterClauses); - - // check cluster capacity and db quota, only need to check once. - if (currentAlterOps.needCheckCapacity()) { - Catalog.getCurrentSystemInfo().checkClusterCapacity(clusterName); - db.checkQuota(); - } - - if (olapTable.getState() != OlapTableState.NORMAL) { - throw new DdlException( - "Table[" + table.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops"); - } - - if (currentAlterOps.hasSchemaChangeOp()) { - // if modify storage type to v2, do schema change to convert all related tablets to segment v2 format - schemaChangeHandler.process(alterClauses, clusterName, db, olapTable); - } else if (currentAlterOps.hasRollupOp()) { - materializedViewHandler.process(alterClauses, clusterName, db, olapTable); - } else if (currentAlterOps.hasPartitionOp()) { - Preconditions.checkState(alterClauses.size() == 1); - AlterClause alterClause = alterClauses.get(0); - if (alterClause instanceof DropPartitionClause) { - if (!((DropPartitionClause) alterClause).isTempPartition()) { - DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName)); - } - Catalog.getCurrentCatalog().dropPartition(db, olapTable, ((DropPartitionClause) alterClause)); - } else if (alterClause instanceof ReplacePartitionClause) { - Catalog.getCurrentCatalog().replaceTempPartition(db, tableName, (ReplacePartitionClause) alterClause); - } else if (alterClause instanceof ModifyPartitionClause) { - ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause); - // expand the partition names if it is 'Modify Partition(*)' - if (clause.isNeedExpand()) { - List partitionNames = clause.getPartitionNames(); - partitionNames.clear(); - for (Partition partition : olapTable.getPartitions()) { - partitionNames.add(partition.getName()); - } - } - Map properties = clause.getProperties(); - if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) { - needProcessOutsideDatabaseLock = true; - } else { - List partitionNames = clause.getPartitionNames(); - modifyPartitionsProperty(db, olapTable, partitionNames, properties); - } - } else if (alterClause instanceof AddPartitionClause) { - needProcessOutsideDatabaseLock = true; - } else { - throw new DdlException("Invalid alter operation: " + alterClause.getOpType()); - } - } else if (currentAlterOps.hasRenameOp()) { - processRename(db, olapTable, alterClauses); - } else if (currentAlterOps.hasReplaceTableOp()) { - processReplaceTable(db, olapTable, alterClauses); - } else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) { - needProcessOutsideDatabaseLock = true; - } else { - throw new DdlException("Invalid alter operations: " + currentAlterOps); + switch (table.getType()) { + case OLAP: + OlapTable olapTable = (OlapTable) table; + needProcessOutsideDatabaseLock = processAlterOlapTable(stmt, olapTable, alterClauses, clusterName, db); + break; + case ODBC: + case MYSQL: + case ELASTICSEARCH: + processAlterExternalTable(stmt, table, db); + return; + default: + throw new DdlException("Do not support alter " + table.getType().toString() + " table[" + tableName + "]"); } } finally { db.writeUnlock(); @@ -462,6 +493,17 @@ private void processRename(Database db, OlapTable table, List alter } } + private void processRename(Database db, Table table, List alterClauses) throws DdlException { + for (AlterClause alterClause : alterClauses) { + if (alterClause instanceof TableRenameClause) { + Catalog.getCurrentCatalog().renameTable(db, table, (TableRenameClause) alterClause); + break; + } else { + Preconditions.checkState(false); + } + } + } + /** * Batch update partitions' properties * caller should hold the db lock diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 372ff2b7f11f3b..65b92463e7b36b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -380,6 +381,12 @@ public void start() { public abstract void process(List alterClauses, String clusterName, Database db, OlapTable olapTable) throws UserException; + /* + * entry function. handle alter ops for external table + */ + public void processExternalTable(List alterClauses, Database db, Table externalTable) + throws UserException {}; + /* * cancel alter ops */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 6b061ed3831735..1801007410850b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -132,6 +132,26 @@ private void processAddColumn(AddColumnClause alterClause, OlapTable olapTable, indexSchemaMap, newColNameSet); } + private void processAddColumn(AddColumnClause alterClause, Table externalTable, List newSchema) throws DdlException { + Column column = alterClause.getColumn(); + ColumnPosition columnPos = alterClause.getColPos(); + Set newColNameSet = Sets.newHashSet(column.getName()); + + addColumnInternal(column, columnPos, newSchema, newColNameSet); + } + + private void processAddColumns(AddColumnsClause alterClause, Table externalTable, List newSchema) throws DdlException { + List columns = alterClause.getColumns(); + Set newColNameSet = Sets.newHashSet(); + for (Column column : alterClause.getColumns()) { + newColNameSet.add(column.getName()); + } + + for (Column newColumn : columns) { + addColumnInternal(newColumn, null, newSchema, newColNameSet); + } + } + private void processAddColumns(AddColumnsClause alterClause, OlapTable olapTable, Map> indexSchemaMap) throws DdlException { List columns = alterClause.getColumns(); @@ -158,6 +178,31 @@ private void processAddColumns(AddColumnsClause alterClause, OlapTable olapTable } } + private void processDropColumn(DropColumnClause alterClause, Table externalTable, List newSchema) throws DdlException { + String dropColName = alterClause.getColName(); + + // find column in base index and remove it + boolean found = false; + Iterator baseIter = newSchema.iterator(); + while (baseIter.hasNext()) { + Column column = baseIter.next(); + if (column.getName().equalsIgnoreCase(dropColName)) { + if (newSchema.size() > 1) { + baseIter.remove(); + found = true; + } else { + throw new DdlException("Do not allow remove last column of table: " + externalTable.getName() + + " column: " + dropColName); + } + break; + } + } + + if (!found) { + throw new DdlException("Column does not exists: " + dropColName); + } + } + private void processDropColumn(DropColumnClause alterClause, OlapTable olapTable, Map> indexSchemaMap, List indexes) throws DdlException { String dropColName = alterClause.getColName(); @@ -294,6 +339,75 @@ private void processDropColumn(DropColumnClause alterClause, OlapTable olapTable } } + // User can modify column type and column position + private void processModifyColumn(ModifyColumnClause alterClause, Table externalTable, List newSchema) throws DdlException { + Column modColumn = alterClause.getColumn(); + ColumnPosition columnPos = alterClause.getColPos(); + + // find modified column + String newColName = modColumn.getName(); + boolean hasColPos = (columnPos != null && !columnPos.isFirst()); + boolean found = false; + boolean typeChanged = false; + + int modColIndex = -1; + int lastColIndex = -1; + for (int i = 0; i < newSchema.size(); i++) { + Column col = newSchema.get(i); + if (col.getName().equalsIgnoreCase(newColName)) { + modColIndex = i; + found = true; + if (!col.equals(modColumn)) { + typeChanged = true; + } + } + if (hasColPos) { + if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) { + lastColIndex = i; + } + } else { + // save the last Key position + if (col.isKey()) { + lastColIndex = i; + } + } + } + // mod col not find + if (!found) { + throw new DdlException("Column[" + newColName + "] does not exists"); + } + + // last col not find + if (hasColPos && lastColIndex == -1) { + throw new DdlException("Column[" + columnPos.getLastCol() + "] does not exists"); + } + + // check if add to first + if (columnPos != null && columnPos.isFirst()) { + lastColIndex = -1; + hasColPos = true; + } + + Column oriColumn = newSchema.get(modColIndex); + // retain old column name + modColumn.setName(oriColumn.getName()); + + // handle the move operation in 'indexForFindingColumn' if has + if (hasColPos) { + // move col + if (lastColIndex > modColIndex) { + newSchema.add(lastColIndex + 1, modColumn); + newSchema.remove(modColIndex); + } else if (lastColIndex < modColIndex) { + newSchema.remove(modColIndex); + newSchema.add(lastColIndex + 1, modColumn); + } else { + throw new DdlException("Column[" + columnPos.getLastCol() + "] modify position is invalid"); + } + } else { + newSchema.set(modColIndex, modColumn); + } + } // User can modify column type and column position private void processModifyColumn(ModifyColumnClause alterClause, OlapTable olapTable, Map> indexSchemaMap) throws DdlException { @@ -481,6 +595,38 @@ private void processModifyColumn(ModifyColumnClause alterClause, OlapTable olapT } } + private void processReorderColumn(ReorderColumnsClause alterClause, Table externalTable, List newSchema) throws DdlException { + List orderedColNames = alterClause.getColumnsByPos(); + + newSchema.clear(); + List targetIndexSchema = externalTable.getBaseSchema(); + + // check and create new ordered column list + Set colNameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + for (String colName : orderedColNames) { + Column oneCol = null; + for (Column column : targetIndexSchema) { + if (column.getName().equalsIgnoreCase(colName) && column.isVisible()) { + oneCol = column; + break; + } + } + if (oneCol == null) { + throw new DdlException("Column[" + colName + "] not exists"); + } + newSchema.add(oneCol); + if (colNameSet.contains(colName)) { + throw new DdlException("Reduplicative column[" + colName + "]"); + } else { + colNameSet.add(colName); + } + } + + if (newSchema.size() != targetIndexSchema.size()) { + throw new DdlException("Reorder stmt should contains all columns"); + } + } + private void processReorderColumn(ReorderColumnsClause alterClause, OlapTable olapTable, Map> indexSchemaMap) throws DdlException { List orderedColNames = alterClause.getColumnsByPos(); @@ -533,6 +679,64 @@ private void processReorderColumn(ReorderColumnsClause alterClause, OlapTable ol indexSchemaMap.put(targetIndexId, newSchema); } + /* + * Add 'newColumn' to specified index. + * Modified schema will be saved in 'indexSchemaMap' + */ + private void addColumnInternal(Column newColumn, ColumnPosition columnPos, + List modIndexSchema, + Set newColNameSet) throws DdlException { + String newColName = newColumn.getName(); + int posIndex = -1; + boolean hasPos = (columnPos != null && !columnPos.isFirst()); + + for (int i = 0; i < modIndexSchema.size(); i++) { + Column col = modIndexSchema.get(i); + if (col.getName().equalsIgnoreCase(newColName)) { + if (!newColNameSet.contains(newColName)) { + // if this is not a base index, we should check if user repeatedly add columns + throw new DdlException("Repeatedly add column: " + newColName); + } + // this is a base index, and the column we check here is added by previous 'add column clause' + // in same ALTER stmt. + // so here we will check if the 2 columns is exactly same. if not, throw exception + if (!col.equals(newColumn)) { + throw new DdlException("Repeatedly add same column with different definition: " + newColName); + } + + // column already exist, return + return; + } + + if (hasPos) { + // after the field + if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) { + posIndex = i; + } + } + } + + // check if lastCol was found + if (hasPos && posIndex == -1) { + throw new DdlException("Column[" + columnPos.getLastCol() + "] does not found"); + } + + // check if add to first + if (columnPos != null && columnPos.isFirst()) { + posIndex = -1; + hasPos = true; + } + + if (hasPos) { + // key + modIndexSchema.add(posIndex + 1, newColumn); + } else { + modIndexSchema.add(newColumn); + } + + checkRowLength(modIndexSchema); + } + /* * Add 'newColumn' to specified index. * Modified schema will be saved in 'indexSchemaMap' @@ -1469,6 +1673,39 @@ public void process(List alterClauses, String clusterName, Database createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); } + @Override + public void processExternalTable(List alterClauses, Database db, Table externalTable) + throws UserException { + // copy the external table schema columns + List newSchema = Lists.newArrayList(); + newSchema.addAll(externalTable.getBaseSchema(true)); + + for (AlterClause alterClause : alterClauses) { + if (alterClause instanceof AddColumnClause) { + // add column + processAddColumn((AddColumnClause) alterClause, externalTable, newSchema); + } else if (alterClause instanceof AddColumnsClause) { + // add columns + processAddColumns((AddColumnsClause) alterClause, externalTable, newSchema); + } else if (alterClause instanceof DropColumnClause) { + // drop column and drop indexes on this column + processDropColumn((DropColumnClause) alterClause, externalTable, newSchema); + } else if (alterClause instanceof ModifyColumnClause) { + // modify column + processModifyColumn((ModifyColumnClause) alterClause, externalTable, newSchema); + } else if (alterClause instanceof ReorderColumnsClause) { + // reorder column + processReorderColumn((ReorderColumnsClause) alterClause, externalTable, newSchema); + } else { + Preconditions.checkState(false); + } + } // end for alter clauses + // replace the old column list + externalTable.setNewFullSchema(newSchema); + // refresh external table column in edit log + Catalog.getCurrentCatalog().refreshExternalTableSchema(db, externalTable, newSchema); + } + private void sendClearAlterTask(Database db, OlapTable olapTable) { AgentBatchTask batchTask = new AgentBatchTask(); db.readLock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java index 96f87dd7be58b2..0433d3c2000994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStmt.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -167,6 +168,23 @@ public void rewriteAlterClause(OlapTable table) throws UserException { ops = clauses; } + public void rewriteAlterClause(Table table) throws UserException { + List clauses = new ArrayList<>(); + for (AlterClause alterClause : ops) { + if (alterClause instanceof TableRenameClause || + alterClause instanceof AddColumnClause || + alterClause instanceof AddColumnsClause || + alterClause instanceof DropColumnClause || + alterClause instanceof ModifyColumnClause || + alterClause instanceof ReorderColumnsClause) { + clauses.add(alterClause); + } else { + throw new AnalysisException( table.getType().toString() + " [" + table.getName() + "] " + + "do not support " + alterClause.getOpType().toString() + " clause now"); + } + } + ops = clauses; + } @Override public String toSql() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 3744cae8722323..2b73e27ccdd2e7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -179,6 +179,7 @@ import org.apache.doris.persist.OperationType; import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.persist.RecoverInfo; +import org.apache.doris.persist.RefreshExternalTableInfo; import org.apache.doris.persist.ReplacePartitionOperationLog; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.persist.SetReplicaStatusOperationLog; @@ -4297,6 +4298,11 @@ public void replayCreateTable(String dbName, Table table) { } } + public void replayAlterExteranlTableSchema(String dbName, String tableName, List newSchema) throws DdlException { + Database db = this.fullNameToDb.get(dbName); + db.allterExternalTableSchemaWithLock(tableName, newSchema); + } + private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, long versionHash, short replicationNum, TabletMeta tabletMeta, Set tabletIdSet) throws DdlException { @@ -5066,9 +5072,12 @@ public void cancelBackup(CancelBackupStmt stmt) throws DdlException { } // entry of rename table operation - public void renameTable(Database db, OlapTable table, TableRenameClause tableRenameClause) throws DdlException { - if (table.getState() != OlapTableState.NORMAL) { - throw new DdlException("Table[" + table.getName() + "] is under " + table.getState()); + public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException { + if (table instanceof OlapTable) { + OlapTable olapTable = (OlapTable) table; + if ( olapTable.getState() != OlapTableState.NORMAL) { + throw new DdlException("Table[" + olapTable.getName() + "] is under " + olapTable.getState()); + } } String tableName = table.getName(); @@ -5082,9 +5091,9 @@ public void renameTable(Database db, OlapTable table, TableRenameClause tableRen throw new DdlException("Table name[" + newTableName + "] is already used"); } - table.checkAndSetName(newTableName, false); + table.setName(newTableName); - db.dropTable(table.getName()); + db.dropTable(tableName); db.createTable(table); TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName); @@ -5092,6 +5101,13 @@ public void renameTable(Database db, OlapTable table, TableRenameClause tableRen LOG.info("rename table[{}] to {}", tableName, newTableName); } + public void refreshExternalTableSchema(Database db, Table table, List newSchema) { + RefreshExternalTableInfo refreshExternalTableInfo = new RefreshExternalTableInfo(db.getFullName(), + table.getName(), newSchema); + editLog.logRefreshExternalTableSchema(refreshExternalTableInfo); + LOG.info("refresh db[{}] table[{}] for schema change", db.getFullName(), table.getName()); + } + public void replayRenameTable(TableInfo tableInfo) throws DdlException { long dbId = tableInfo.getDbId(); long tableId = tableInfo.getTableId(); @@ -5100,7 +5116,7 @@ public void replayRenameTable(TableInfo tableInfo) throws DdlException { Database db = getDb(dbId); db.writeLock(); try { - OlapTable table = (OlapTable) db.getTable(tableId); + Table table = db.getTable(tableId); String tableName = table.getName(); db.dropTable(tableName); table.setName(newTableName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index bf3ed7a551f4e4..d2067f2566503b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -305,6 +305,20 @@ public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfN } } + public void allterExternalTableSchemaWithLock(String tableName, List newSchema) throws DdlException{ + writeLock(); + try { + if (!nameToTable.containsKey(tableName)) { + throw new DdlException("Do not contain proper table " + tableName + " in refresh table"); + } else { + Table table = nameToTable.get(tableName); + table.setNewFullSchema(newSchema); + } + } finally { + writeUnlock(); + } + } + public boolean createTable(Table table) { boolean result = true; String tableName = table.getName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 59c741432e948e..5edf32959dc1ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -137,6 +137,10 @@ public String getName() { return name; } + public void setName(String newName) { + name = newName; + } + public TableType getType() { return type; } @@ -149,6 +153,7 @@ public List getFullSchema() { public List getBaseSchema() { return getBaseSchema(Util.showHiddenColumns()); } + public List getBaseSchema(boolean full) { if (full) { return fullSchema; diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 74d1f21bec4249..8ecc4da5db6209 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -70,6 +70,7 @@ import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.persist.PrivInfo; import org.apache.doris.persist.RecoverInfo; +import org.apache.doris.persist.RefreshExternalTableInfo; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.persist.ReplacePartitionOperationLog; import org.apache.doris.persist.ReplaceTableOperationLog; @@ -176,6 +177,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: { + data = RefreshExternalTableInfo.read(in); + isRead = true; + break; + } case OperationType.OP_ADD_PARTITION: { data = new PartitionPersistInfo(); ((PartitionPersistInfo) data).readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java index 34f39d9f438a5a..b6741b24595f9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java @@ -40,6 +40,7 @@ import org.apache.doris.persist.OperationType; import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.persist.RecoverInfo; +import org.apache.doris.persist.RefreshExternalTableInfo; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.persist.Storage; import org.apache.doris.persist.TableInfo; @@ -235,6 +236,11 @@ private JournalEntity getJournalEntity(DataInputStream in, short opCode) throws ret.setData(info); break; } + case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: { + RefreshExternalTableInfo info = RefreshExternalTableInfo.read(in); + ret.setData(info); + break; + } case OperationType.OP_ADD_PARTITION: { PartitionPersistInfo info = new PartitionPersistInfo(); info.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index df32a4601b1f57..35988d42b7f9b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -180,6 +180,13 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.replayCreateTable(info.getDbName(), info.getTable()); break; } + case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: { + RefreshExternalTableInfo info = (RefreshExternalTableInfo) journal.getData(); + LOG.info("Begin to unprotect alter external table schema. db = " + + info.getDbName() + " table = " + info.getTableName()); + catalog.replayAlterExteranlTableSchema(info.getDbName(), info.getTableName(), info.getNewSchema()); + break; + } case OperationType.OP_DROP_TABLE: { DropInfo info = (DropInfo) journal.getData(); Database db = catalog.getDb(info.getDbId()); @@ -924,6 +931,10 @@ public void logCreateTable(CreateTableInfo info) { logEdit(OperationType.OP_CREATE_TABLE, info); } + public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) { + logEdit(OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA, info); + } + public void logAddPartition(PartitionPersistInfo info) { logEdit(OperationType.OP_ADD_PARTITION, info); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2455df197c42e3..53bb587cc5bd25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -183,4 +183,7 @@ public class OperationType { // resource 276~290 public static final short OP_CREATE_RESOURCE = 276; public static final short OP_DROP_RESOURCE = 277; + + // alter external table + public static final short OP_ALTER_EXTERNAL_TABLE_SCHEMA = 280; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RefreshExternalTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/RefreshExternalTableInfo.java new file mode 100644 index 00000000000000..a2e327df5d1902 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RefreshExternalTableInfo.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import com.google.gson.annotations.SerializedName; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.io.Text; + +import org.apache.doris.persist.gson.GsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +public class RefreshExternalTableInfo implements Writable { + public static final Logger LOG = LoggerFactory.getLogger(RefreshExternalTableInfo.class); + + @SerializedName(value = "dbName") + private String dbName; + @SerializedName(value = "tableName") + private String tableName; + @SerializedName(value = "newSchema") + private List newSchema; + + public RefreshExternalTableInfo() { + // for persist + } + + public RefreshExternalTableInfo(String dbName, String tableName, List newSchema) { + this.dbName = dbName; + this.tableName = tableName; + this.newSchema = newSchema; + + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public List getNewSchema() { + return newSchema; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static RefreshExternalTableInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, RefreshExternalTableInfo.class); + } + + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof RefreshExternalTableInfo)) { + return false; + } + + RefreshExternalTableInfo info = (RefreshExternalTableInfo) obj; + + return (dbName.equals(info.dbName)) + && (tableName.equals(info.tableName)) && (newSchema.equals(newSchema)); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index c30ecbc74a3d66..41cd00441fd851 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -27,7 +27,10 @@ import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; @@ -140,6 +143,25 @@ public static void beforeClass() throws Exception { ")\n" + "DISTRIBUTED BY HASH(k2) BUCKETS 3\n" + "PROPERTIES('replication_num' = '1');"); + + Config.enable_odbc_table = true; + createTable("create external table test.odbc_table\n" + + "( `k1` bigint(20) COMMENT \"\",\n" + + " `k2` datetime COMMENT \"\",\n" + + " `k3` varchar(20) COMMENT \"\",\n" + + " `k4` varchar(100) COMMENT \"\",\n" + + " `k5` float COMMENT \"\"\n" + + ")ENGINE=ODBC\n" + + "PROPERTIES (\n" + + "\"host\" = \"127.0.0.1\",\n" + + "\"port\" = \"3306\",\n" + + "\"user\" = \"root\",\n" + + "\"password\" = \"123\",\n" + + "\"database\" = \"db1\",\n" + + "\"table\" = \"tbl1\",\n" + + "\"driver\" = \"Oracle Driver\",\n" + + "\"odbc_type\" = \"oracle\"\n" + + ");"); } @AfterClass @@ -548,4 +570,76 @@ public void testReplaceTable() throws Exception { Assert.assertNotNull(replace3.getIndexIdByName("r1")); Assert.assertNotNull(replace3.getIndexIdByName("r2")); } + + public void testExternalTableAlterOperations() throws Exception { + // external table do not support partition operation + String stmt = "alter table test.odbc_table add partition p3 values less than('2020-04-01'), add partition p4 values less than('2020-05-01')"; + alterTable(stmt, true); + + // external table do not support rollup + stmt = "alter table test.odbc_table add rollup r1 (k1)"; + alterTable(stmt, true); + + // external table support add column + stmt = "alter table test.odbc_table add column k6 INT KEY after k1, add column k7 TINYINT KEY after k6"; + alterTable(stmt, false); + Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); + Table odbc_table = db.getTable("odbc_table"); + Assert.assertEquals(odbc_table.getBaseSchema().size(), 7); + Assert.assertEquals(odbc_table.getBaseSchema().get(1).getDataType(), PrimitiveType.INT); + Assert.assertEquals(odbc_table.getBaseSchema().get(2).getDataType(), PrimitiveType.TINYINT); + + // external table support drop column + stmt = "alter table test.odbc_table drop column k7"; + alterTable(stmt, false); + db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); + odbc_table = db.getTable("odbc_table"); + Assert.assertEquals(odbc_table.getBaseSchema().size(), 6); + + // external table support modify column + stmt = "alter table test.odbc_table modify column k6 bigint after k5"; + alterTable(stmt, false); + db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); + odbc_table = db.getTable("odbc_table"); + Assert.assertEquals(odbc_table.getBaseSchema().size(), 6); + Assert.assertEquals(odbc_table.getBaseSchema().get(5).getDataType(), PrimitiveType.BIGINT); + + // external table support reorder column + db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); + odbc_table = db.getTable("odbc_table"); + Assert.assertTrue(odbc_table.getBaseSchema().stream(). + map(column -> column.getName()). + reduce("", (totalName, columnName) -> totalName + columnName).equals("k1k2k3k4k5k6")); + stmt = "alter table test.odbc_table order by (k6, k5, k4, k3, k2, k1)"; + alterTable(stmt, false); + Assert.assertTrue(odbc_table.getBaseSchema().stream(). + map(column -> column.getName()). + reduce("", (totalName, columnName) -> totalName + columnName).equals("k6k5k4k3k2k1")); + + // external table support drop column + stmt = "alter table test.odbc_table drop column k6"; + alterTable(stmt, false); + stmt = "alter table test.odbc_table drop column k5"; + alterTable(stmt, false); + stmt = "alter table test.odbc_table drop column k4"; + alterTable(stmt, false); + stmt = "alter table test.odbc_table drop column k3"; + alterTable(stmt, false); + stmt = "alter table test.odbc_table drop column k2"; + alterTable(stmt, false); + // do not allow drop last column + Assert.assertEquals(odbc_table.getBaseSchema().size(), 1); + stmt = "alter table test.odbc_table drop column k1"; + alterTable(stmt, true); + Assert.assertEquals(odbc_table.getBaseSchema().size(), 1); + + // external table support rename operation + stmt = "alter table test.odbc_table rename oracle_table"; + alterTable(stmt, false); + db = Catalog.getCurrentCatalog().getDb("default_cluster:test"); + odbc_table = db.getTable("oracle_table"); + Assert.assertTrue(odbc_table != null); + odbc_table = db.getTable("odbc_table"); + Assert.assertTrue(odbc_table == null); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/RefreshExternalTableInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/RefreshExternalTableInfoTest.java new file mode 100644 index 00000000000000..4bb56e0fdaf3f4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/RefreshExternalTableInfoTest.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.doris.catalog.FakeCatalog; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.FeConstants; + +public class RefreshExternalTableInfoTest { + private Catalog catalog; + + private FakeCatalog fakeCatalog; + + @Before + public void setUp() { + fakeCatalog = new FakeCatalog(); + catalog = Deencapsulation.newInstance(Catalog.class); + + FakeCatalog.setCatalog(catalog); + FakeCatalog.setMetaVersion(FeConstants.meta_version); + } + + @Test + public void testSerialization() throws Exception { + // 1. Write objects to file + File file = new File("./RefreshExteranlTableInfo"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + List columns = new ArrayList(); + Column column2 = new Column("column2", + ScalarType.createType(PrimitiveType.TINYINT), false, AggregateType.MIN, "", ""); + columns.add(column2); + columns.add(new Column("column3", + ScalarType.createType(PrimitiveType.SMALLINT), false, AggregateType.SUM, "", "")); + columns.add(new Column("column4", + ScalarType.createType(PrimitiveType.INT), false, AggregateType.REPLACE, "", "")); + columns.add(new Column("column5", + ScalarType.createType(PrimitiveType.BIGINT), false, AggregateType.REPLACE, "", "")); + columns.add(new Column("column6", + ScalarType.createType(PrimitiveType.FLOAT), false, AggregateType.REPLACE, "", "")); + columns.add(new Column("column7", + ScalarType.createType(PrimitiveType.DOUBLE), false, AggregateType.REPLACE, "", "")); + columns.add(new Column("column8", ScalarType.createChar(10), true, null, "", "")); + columns.add(new Column("column9", ScalarType.createVarchar(10), true, null, "", "")); + columns.add(new Column("column10", ScalarType.createType(PrimitiveType.DATE), true, null, "", "")); + columns.add(new Column("column11", ScalarType.createType(PrimitiveType.DATETIME), true, null, "", "")); + + RefreshExternalTableInfo info = new RefreshExternalTableInfo("db1", "table1", columns); + info.write(dos); + + dos.flush(); + dos.close(); + + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + RefreshExternalTableInfo rInfo1 = RefreshExternalTableInfo.read(dis); + Assert.assertTrue(rInfo1.getDbName().equals(info.getDbName())); + Assert.assertTrue(rInfo1.getTableName().equals(info.getTableName())); + Assert.assertTrue(rInfo1.getNewSchema().equals(info.getNewSchema())); + + // 3. delete files + dis.close(); + file.delete(); + } +}