From 419c618ca7c2466614dfac408b3b872937f9a017 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 2 Oct 2016 00:03:23 -0700 Subject: [PATCH 1/5] [SPARK-17112][SQL] "select null" via JDBC triggers IllegalArgumentException in Thriftserver --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e555ebd623f72..d03a2f91ba635 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -61,7 +61,8 @@ private[hive] class SparkExecuteStatementOperation( } else { logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => - new FieldSchema(attr.name, attr.dataType.catalogString, "") + val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString + new FieldSchema(attr.name, attrTypeString, "") } new TableSchema(schema.asJava) } From 5df2e06d913bb7e1ce6e39fbf261716c6dc2519e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 2 Oct 2016 23:06:26 -0700 Subject: [PATCH 2/5] Add testcase. --- .../SparkExecuteStatementOperation.scala | 28 ++++++---- .../SparkExecuteStatementOperationSuite.scala | 55 +++++++++++++++++++ 2 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index d03a2f91ba635..9a2a0ee5d3ff5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -55,18 +55,7 @@ private[hive] class SparkExecuteStatementOperation( private var dataTypes: Array[DataType] = _ private var statementId: String = _ - private lazy val resultSchema: TableSchema = { - if (result == null || result.queryExecution.analyzed.output.size == 0) { - new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) - } else { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - val schema = result.queryExecution.analyzed.output.map { attr => - val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString - new FieldSchema(attr.name, attrTypeString, "") - } - new TableSchema(schema.asJava) - } - } + private lazy val resultSchema: TableSchema = SparkExecuteStatementOperation.getTableSchema(result) def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. @@ -283,3 +272,18 @@ private[hive] class SparkExecuteStatementOperation( } } } + +object SparkExecuteStatementOperation extends Logging { + def getTableSchema(result: DataFrame): TableSchema = { + if (result == null || result.queryExecution.analyzed.output.isEmpty) { + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) + } else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + val schema = result.queryExecution.analyzed.output.map { attr => + val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString + new FieldSchema(attr.name, attrTypeString, "") + } + new TableSchema(schema.asJava) + } + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala new file mode 100644 index 0000000000000..84f5699555620 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +class SparkExecuteStatementOperationSuite + extends SparkFunSuite with BeforeAndAfterAll with Logging { + + private var spark: SparkSession = _ + + override def beforeAll(): Unit = { + super.beforeAll() + spark = SparkSession + .builder + .master("local[1]") + .appName("SparkExecuteStatementOperationSuite") + .getOrCreate() + } + + override def afterAll(): Unit = { + try { + spark.stop() + } finally { + super.afterAll() + } + } + + test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { + val df = spark.sql("select null, if(true,null,null)") + val columns = SparkExecuteStatementOperation.getTableSchema(df).getColumnDescriptors() + assert(columns.size() == 2) + assert(columns.get(0).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) + assert(columns.get(1).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) + } +} From b3deec26fdcbf37e66d8f563dfe309afd42dbad4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Oct 2016 00:09:35 -0700 Subject: [PATCH 3/5] Simplify testsuite. --- .../SparkExecuteStatementOperation.scala | 25 ++++++++------ .../SparkExecuteStatementOperationSuite.scala | 34 ++++--------------- 2 files changed, 20 insertions(+), 39 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 9a2a0ee5d3ff5..e963d7cf26190 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -34,6 +34,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command.SetCommand import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.SQLConf @@ -55,7 +56,13 @@ private[hive] class SparkExecuteStatementOperation( private var dataTypes: Array[DataType] = _ private var statementId: String = _ - private lazy val resultSchema: TableSchema = SparkExecuteStatementOperation.getTableSchema(result) + private lazy val resultSchema: TableSchema = { + if (result == null || result.queryExecution.analyzed.output.isEmpty) { + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) + } else { + SparkExecuteStatementOperation.getTableSchema(result.queryExecution.analyzed.output) + } + } def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. @@ -274,16 +281,12 @@ private[hive] class SparkExecuteStatementOperation( } object SparkExecuteStatementOperation extends Logging { - def getTableSchema(result: DataFrame): TableSchema = { - if (result == null || result.queryExecution.analyzed.output.isEmpty) { - new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) - } else { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - val schema = result.queryExecution.analyzed.output.map { attr => - val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString - new FieldSchema(attr.name, attrTypeString, "") - } - new TableSchema(schema.asJava) + def getTableSchema(output: Seq[Attribute]): TableSchema = { + logInfo(s"Result Schema: ${output}") + val schema = output.map { attr => + val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString + new FieldSchema(attr.name, attrTypeString, "") } + new TableSchema(schema.asJava) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 84f5699555620..2aa20d7d7fc09 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -17,37 +17,15 @@ package org.apache.spark.sql.hive.thriftserver -import org.scalatest.BeforeAndAfterAll - import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession - -class SparkExecuteStatementOperationSuite - extends SparkFunSuite with BeforeAndAfterAll with Logging { - - private var spark: SparkSession = _ - - override def beforeAll(): Unit = { - super.beforeAll() - spark = SparkSession - .builder - .master("local[1]") - .appName("SparkExecuteStatementOperationSuite") - .getOrCreate() - } - - override def afterAll(): Unit = { - try { - spark.stop() - } finally { - super.afterAll() - } - } +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.NullType +class SparkExecuteStatementOperationSuite extends SparkFunSuite { test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { - val df = spark.sql("select null, if(true,null,null)") - val columns = SparkExecuteStatementOperation.getTableSchema(df).getColumnDescriptors() + val c1 = AttributeReference("NULL", NullType, nullable = true)() + val c2 = AttributeReference("(IF(true, NULL, NULL))", NullType, nullable = true)() + val columns = SparkExecuteStatementOperation.getTableSchema(Seq(c1, c2)).getColumnDescriptors() assert(columns.size() == 2) assert(columns.get(0).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) assert(columns.get(1).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) From 32b2bb239ed99a2ef35d2a68118479778169f29c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Oct 2016 00:50:39 -0700 Subject: [PATCH 4/5] Address comments. --- .../SparkExecuteStatementOperation.scala | 15 +++++++-------- .../SparkExecuteStatementOperationSuite.scala | 10 +++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e963d7cf26190..aab7d55066c2d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -34,7 +34,6 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command.SetCommand import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.SQLConf @@ -60,7 +59,8 @@ private[hive] class SparkExecuteStatementOperation( if (result == null || result.queryExecution.analyzed.output.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { - SparkExecuteStatementOperation.getTableSchema(result.queryExecution.analyzed.output) + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + SparkExecuteStatementOperation.getTableSchema(result.queryExecution.analyzed.schema) } } @@ -280,12 +280,11 @@ private[hive] class SparkExecuteStatementOperation( } } -object SparkExecuteStatementOperation extends Logging { - def getTableSchema(output: Seq[Attribute]): TableSchema = { - logInfo(s"Result Schema: ${output}") - val schema = output.map { attr => - val attrTypeString = if (attr.dataType == NullType) "void" else attr.dataType.catalogString - new FieldSchema(attr.name, attrTypeString, "") +object SparkExecuteStatementOperation { + def getTableSchema(structType: StructType): TableSchema = { + val schema = structType.map { field => + val attrTypeString = if (field.dataType == NullType) "void" else field.dataType.catalogString + new FieldSchema(field.name, attrTypeString, "") } new TableSchema(schema.asJava) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 2aa20d7d7fc09..32ded0d254ef8 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -18,14 +18,14 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.types.NullType +import org.apache.spark.sql.types.{NullType, StructField, StructType} class SparkExecuteStatementOperationSuite extends SparkFunSuite { test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { - val c1 = AttributeReference("NULL", NullType, nullable = true)() - val c2 = AttributeReference("(IF(true, NULL, NULL))", NullType, nullable = true)() - val columns = SparkExecuteStatementOperation.getTableSchema(Seq(c1, c2)).getColumnDescriptors() + val field1 = StructField("NULL", NullType) + val field2 = StructField("(IF(true, NULL, NULL))", NullType) + val tableSchema = StructType(Seq(field1, field2)) + val columns = SparkExecuteStatementOperation.getTableSchema(tableSchema).getColumnDescriptors() assert(columns.size() == 2) assert(columns.get(0).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) assert(columns.get(1).getType() == org.apache.hive.service.cli.Type.NULL_TYPE) From 424a601dcf734e93c40ca83529af3e5565d32e7f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Oct 2016 11:11:50 -0700 Subject: [PATCH 5/5] Use `result.schema`. --- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index aab7d55066c2d..aeabd6a15881d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -56,11 +56,11 @@ private[hive] class SparkExecuteStatementOperation( private var statementId: String = _ private lazy val resultSchema: TableSchema = { - if (result == null || result.queryExecution.analyzed.output.isEmpty) { + if (result == null || result.schema.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - SparkExecuteStatementOperation.getTableSchema(result.queryExecution.analyzed.schema) + logInfo(s"Result Schema: ${result.schema}") + SparkExecuteStatementOperation.getTableSchema(result.schema) } }