diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 762abfe3b6061..71d890805be88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5989,6 +5989,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val LEGACY_PRESERVE_HIVE_COLUMN_ORDER = + buildConf("spark.sql.legacy.preserveHiveColumnOrder") + .internal() + .doc("When true, tables created by HiveExternalCatalog will maintain Hive-style column " + + "order where the partition columns are at the end. Otherwise, use the user-specified " + + "column order. Does not affect tables with provider = `hive`") + .version("4.1.0") + .booleanConf + .createWithDefault(true) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c7a60151c496..378b934ae1bda 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.SQLConf.LEGACY_PRESERVE_HIVE_COLUMN_ORDER import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -259,6 +260,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) } + if (!conf.get(LEGACY_PRESERVE_HIVE_COLUMN_ORDER)) { + tableProperties.put(LEGACY_COLUMN_ORDER, "false") + } + // we have to set the table schema here so that the table schema JSON // string in the table properties still uses the original schema val hiveTable = tableDefinition.copy( @@ -893,12 +898,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val schemaFromTableProps = getSchemaFromTableProperties(table.properties).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) - val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) + val schema = if (isLegacyColumnOrder(table)) { + reorderSchema(schema = schemaFromTableProps, partColumnNames) + } else { + schemaFromTableProps + } table.copy( provider = Some(provider), storage = storageWithoutHiveGeneratedProperties, - schema = reorderedSchema, + schema = schema, partitionColumnNames = partColumnNames, bucketSpec = getBucketSpecFromTableProperties(table), tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG), @@ -1430,6 +1439,8 @@ object HiveExternalCatalog { val EMPTY_DATA_SCHEMA = new StructType() .add("col", "array", nullable = true, comment = "from deserializer") + val LEGACY_COLUMN_ORDER = "legacy_column_order" + private def getColumnNamesByType( props: Map[String, String], colType: String, @@ -1489,4 +1500,14 @@ object HiveExternalCatalog { case st: StringType => st.isUTF8BinaryCollation case _ => true } + + // Whether table was created specifying legacy column order + // lack of the table property means we preserve legacy column order + private def isLegacyColumnOrder(table: CatalogTable): Boolean = { + val legacyColumnOrder = table.properties.get(LEGACY_COLUMN_ORDER) + legacyColumnOrder match { + case Some(l) if l.equalsIgnoreCase("false") => false + case _ => true + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 13e8d3721d81e..1324a9aa2dde3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER +import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} @@ -3432,4 +3433,69 @@ class HiveDDLSuite any[String], any[String], any[StructType]) } } + + test("SPARK-52638: Allow preserving Hive-style column order to be configurable") { + val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog] + Seq(true, false).map { preserveOrder => { + withSQLConf(SQLConf.LEGACY_PRESERVE_HIVE_COLUMN_ORDER.key -> preserveOrder.toString) { + withTable("t1") { + val identifier = Identifier.of(Array("default"), "t1") + val outputSchema = new StructType() + .add("a", IntegerType, true, "comment1") + .add("b", IntegerType, true, "comment2") + .add("c", IntegerType, true, "comment3") + .add("d", IntegerType, true, "comment4") + catalog.createTable( + identifier, + new TableInfo.Builder() + .withProperties(Map.empty.asJava) + .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema)) + .withPartitions(Array(Expressions.identity("a"))) + .build() + ) + val table1 = catalog.loadTable(identifier) + val cols = table1.columns() + + if (preserveOrder) { + assert(cols.length == 4) + assert(cols(0).name() == "a") + assert(cols(1).name() == "b") + assert(cols(2).name() == "c") + assert(cols(3).name() == "d") + assert(table1.properties().get("spark.sql.legacy.preserveHiveColumnOrder") == "false") + } else { + assert(cols.length == 4) + assert(cols(0).name() == "b") + assert(cols(1).name() == "c") + assert(cols(2).name() == "d") + assert(cols(3).name() == "a") + } + + catalog.alterTable( + identifier, + TableChange.addColumn(Array("e"), IntegerType) + ) + + val table2 = catalog.loadTable(identifier) + val cols2 = table2.columns() + if (preserveOrder) { + assert(cols2.length == 5) + assert(cols2(0).name() == "a") + assert(cols2(1).name() == "b") + assert(cols2(2).name() == "c") + assert(cols2(3).name() == "d") + assert(cols2(4).name() == "e") + assert(table2.properties().get("spark.sql.legacy.preserveHiveColumnOrder") == "false") + } else { + assert(cols2(0).name() == "b") + assert(cols2(1).name() == "c") + assert(cols2(2).name() == "d") + assert(cols2(3).name() == "e") + assert(cols2(4).name() == "a") + } + } + } + } + } + } }