From e9b4f7015c7c4b8da4e5a4e355428c07b8cb175b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 13:27:21 -0800 Subject: [PATCH 1/6] Failed test. --- .../sql/hive/MetastoreDataSourcesSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0bd82773f3a55..948e6080e09a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -591,4 +591,22 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } } + + test("SPARK-6024 wide schema support") { + val schema = StructType((1 to 1000).map(i => StructField(s"c_${i}", StringType, true))) + assert( + schema.json.size > conf.schemaStringLengthThreshold, + "To correctly test the fix of SPARK-6024, the value of " + + s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}") + // Manually create a metastore data source table. + catalog.createDataSourceTable( + tableName = "wide_schema", + userSpecifiedSchema = Some(schema), + provider = "json", + options = Map("path" -> "just a dummy path"), + isExternal = false) + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + } } From 12bacaeffb28f6766891540673a531da744d9f77 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 13:45:39 -0800 Subject: [PATCH 2/6] If the JSON string of a schema is too large, split it before storing it in metastore. --- .../scala/org/apache/spark/sql/SQLConf.scala | 10 +++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 45 ++++++++++++++++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 2 + 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index a08c0f5ce3ff4..4815620c6fe57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -51,6 +51,11 @@ private[spark] object SQLConf { // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default" + // This is used to control the when we will split a schema's JSON string to multiple pieces + // in order to fit the JSON string in metastore's table property (by default, the value has + // a length restriction of 4000 characters). We will split the JSON string of a schema + // to its length exceeds the threshold. + val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold" // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. @@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable { private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") + // Do not use a value larger than 4000 as the default value of this property. + // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. + private[spark] def schemaStringLengthThreshold: Int = + getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt + private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8af5a4848fd44..8c5184b6d8a9f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -69,13 +69,25 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val schemaString = table.getProperty("spark.sql.sources.schema") - val userSpecifiedSchema = - if (schemaString == null) { - None - } else { - Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + val schemaString = Option(table.getProperty("spark.sql.sources.schema")) + .orElse { + // If spark.sql.sources.schema is not defined, we either splitted the schema to multiple + // parts or the schema was not defined. To determine if the schema was defined, + // we check spark.sql.sources.schema.numOfParts. + Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match { + case Some(numOfParts) => + val parts = (0 until numOfParts.toInt).map { index => + Option(table.getProperty(s"spark.sql.sources.schema.part.${index}")) + .getOrElse("Could not read schema from the metastore because it is corrupted.") + } + // Stick all parts back to a single schema string in the JSON representation. + Some(parts.mkString) + case None => None // The schema was not defined. + } } + + val userSpecifiedSchema = + schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType])) // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap @@ -119,7 +131,26 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tbl.setProperty("spark.sql.sources.provider", provider) if (userSpecifiedSchema.isDefined) { - tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json) + val threshold = hive.conf.schemaStringLengthThreshold + val schemaJsonString = userSpecifiedSchema.get.json + // Check if the size of the JSON string of the schema exceeds the threshold. + if (schemaJsonString.size > threshold) { + // Need to split the string. + val parts = schemaJsonString.grouped(threshold).toSeq + // First, record the total number of parts we have. + tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString) + // Second, write every part to table property. + parts.zipWithIndex.foreach { + case (part, index) => + tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) + } + } else { + // The length is less than the threshold, just put it in the table property. + tbl.setProperty("spark.sql.sources.schema.numOfParts", "1") + // We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0 + // because users may have already created data source tables in metastore. + tbl.setProperty("spark.sql.sources.schema", schemaJsonString) + } } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 948e6080e09a3..03413755d7c2c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -606,6 +606,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { options = Map("path" -> "just a dummy path"), isExternal = false) + invalidateTable("wide_schema") + val actualSchema = table("wide_schema").schema assert(schema === actualSchema) } From cc1d4724e2a8ab28f564265133638265497dd0e3 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 14:03:04 -0800 Subject: [PATCH 3/6] Make the schema wider. --- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 03413755d7c2c..00306f1cd7f86 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -593,7 +593,8 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { } test("SPARK-6024 wide schema support") { - val schema = StructType((1 to 1000).map(i => StructField(s"c_${i}", StringType, true))) + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true))) assert( schema.json.size > conf.schemaStringLengthThreshold, "To correctly test the fix of SPARK-6024, the value of " + From 143927af48fdb67e1cb1fb22cdabc82df403f0db Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 15:33:14 -0800 Subject: [PATCH 4/6] Simplify code. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8c5184b6d8a9f..721341a5e714f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -69,22 +69,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val schemaString = Option(table.getProperty("spark.sql.sources.schema")) - .orElse { - // If spark.sql.sources.schema is not defined, we either splitted the schema to multiple - // parts or the schema was not defined. To determine if the schema was defined, - // we check spark.sql.sources.schema.numOfParts. - Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match { - case Some(numOfParts) => - val parts = (0 until numOfParts.toInt).map { index => - Option(table.getProperty(s"spark.sql.sources.schema.part.${index}")) - .getOrElse("Could not read schema from the metastore because it is corrupted.") - } - // Stick all parts back to a single schema string in the JSON representation. - Some(parts.mkString) - case None => None // The schema was not defined. + val schemaString = Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match { + case Some(numOfParts) => + val parts = (0 until numOfParts.toInt).map { index => + Option(table.getProperty(s"spark.sql.sources.schema.part.${index}")) + .getOrElse("Could not read schema from the metastore because it is corrupted.") } - } + // Stick all parts back to a single schema string in the JSON representation. + Some(parts.mkString) + case None => None // The schema was not defined. + } val userSpecifiedSchema = schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType])) @@ -133,23 +127,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with if (userSpecifiedSchema.isDefined) { val threshold = hive.conf.schemaStringLengthThreshold val schemaJsonString = userSpecifiedSchema.get.json - // Check if the size of the JSON string of the schema exceeds the threshold. - if (schemaJsonString.size > threshold) { - // Need to split the string. - val parts = schemaJsonString.grouped(threshold).toSeq - // First, record the total number of parts we have. - tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString) - // Second, write every part to table property. - parts.zipWithIndex.foreach { - case (part, index) => - tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) - } - } else { - // The length is less than the threshold, just put it in the table property. - tbl.setProperty("spark.sql.sources.schema.numOfParts", "1") - // We use spark.sql.sources.schema instead of using spark.sql.sources.schema.part.0 - // because users may have already created data source tables in metastore. - tbl.setProperty("spark.sql.sources.schema", schemaJsonString) + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString) + parts.zipWithIndex.foreach { + case (part, index) => + tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) } } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } From 73e71b4f12aa35cfd14f7c8e9d3748c516439c48 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 17:55:25 -0800 Subject: [PATCH 5/6] Address comments. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 721341a5e714f..88e0f3aea6da1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -69,19 +69,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val schemaString = Option(table.getProperty("spark.sql.sources.schema.numOfParts")) match { - case Some(numOfParts) => - val parts = (0 until numOfParts.toInt).map { index => - Option(table.getProperty(s"spark.sql.sources.schema.part.${index}")) - .getOrElse("Could not read schema from the metastore because it is corrupted.") + val userSpecifiedSchema = + Option(table.getProperty("spark.sql.sources.schema.numParts")).flatMap { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = table.getProperty(s"spark.sql.sources.schema.part.${index}") + if (part == null) { + throw new AnalysisException( + "Could not read schema from the metastore because it is corrupted.") + } + + part } - // Stick all parts back to a single schema string in the JSON representation. - Some(parts.mkString) - case None => None // The schema was not defined. + // Stick all parts back to a single schema string in the JSON representation + // and convert it back to a StructType. + Some(DataType.fromJson(parts.mkString).asInstanceOf[StructType]) } - val userSpecifiedSchema = - schemaString.flatMap(s => Some(DataType.fromJson(s).asInstanceOf[StructType])) // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap @@ -129,10 +132,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val schemaJsonString = userSpecifiedSchema.get.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq - tbl.setProperty("spark.sql.sources.schema.numOfParts", parts.size.toString) - parts.zipWithIndex.foreach { - case (part, index) => - tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) + tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => + tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) } } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } From 4882e6f70a58120533397460e3023074490827a4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 19:19:57 -0800 Subject: [PATCH 6/6] Address comments. --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 88e0f3aea6da1..d3ad364328265 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -70,20 +70,21 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with client.getTable(in.database, in.name) } val userSpecifiedSchema = - Option(table.getProperty("spark.sql.sources.schema.numParts")).flatMap { numParts => + Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts => val parts = (0 until numParts.toInt).map { index => val part = table.getProperty(s"spark.sql.sources.schema.part.${index}") if (part == null) { throw new AnalysisException( - "Could not read schema from the metastore because it is corrupted.") + s"Could not read schema from the metastore because it is corrupted " + + s"(missing part ${index} of the schema).") } part } // Stick all parts back to a single schema string in the JSON representation // and convert it back to a StructType. - Some(DataType.fromJson(parts.mkString).asInstanceOf[StructType]) - } + DataType.fromJson(parts.mkString).asInstanceOf[StructType] + } // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly...