From 080ef69602bd2318152ea34fc602bc50cbc02bd8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 11 Jun 2018 17:18:15 -0700 Subject: [PATCH 1/8] [SPARK-24423] Add a new option for JDBC sources --- .../datasources/jdbc/JDBCOptions.scala | 49 +++++++++- .../execution/datasources/jdbc/JDBCRDD.scala | 4 +- .../datasources/jdbc/JDBCRelation.scala | 4 +- .../jdbc/JdbcRelationProvider.scala | 9 +- .../datasources/jdbc/JdbcUtils.scala | 10 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 96 ++++++++++++++++++- .../spark/sql/jdbc/JDBCWriteSuite.scala | 14 ++- 7 files changed, 165 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index a73a97c06fe5a..c4094db537b11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -65,13 +65,38 @@ class JDBCOptions( // Required parameters // ------------------------------------------------------------ require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.") + // a JDBC URL val url = parameters(JDBC_URL) - // name of table - val table = parameters(JDBC_TABLE_NAME) + val tableName = parameters.get(JDBC_TABLE_NAME) + val query = parameters.get(JDBC_QUERY_STRING) + // Following two conditions make sure that : + // 1. One of the option (dbtable or query) must be specified. + // 2. Both of them can not be specified at the same time as they are conflicting in nature. + require( + tableName.isDefined || query.isDefined, + s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + + require( + !(tableName.isDefined && query.isDefined), + s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + ) + + // table name or a table expression. + val tableExpression = tableName.map(_.trim).getOrElse { + // We have ensured in the code above that either dbtable or query is specified. + query.get match { + case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" + case subq => subq + } + } + + require(tableExpression.nonEmpty, + s"One of the option `$JDBC_TABLE_NAME` or `$JDBC_QUERY_STRING` should not be empty string." + ) + - // ------------------------------------------------------------ // Optional parameters // ------------------------------------------------------------ val driverClass = { @@ -109,6 +134,20 @@ class JDBCOptions( s"When reading JDBC data sources, users need to specify all or none for the following " + s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + s"and '$JDBC_NUM_PARTITIONS'") + + require(!(query.isDefined && partitionColumn.isDefined), + s""" + |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together. + |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify + |the partition columns using the supplied subquery alias to resolve any ambiguity. + |Example : + |spark.read.format("jdbc") + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "subq.c1" + | .load() + """.stripMargin + ) + val fetchSize = { val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt require(size >= 0, @@ -150,6 +189,7 @@ class JDBCOptions( } object JDBCOptions { + private val curId = new java.util.concurrent.atomic.AtomicLong(0L) private val jdbcOptionNames = collection.mutable.Set[String]() private def newOption(name: String): String = { @@ -159,6 +199,7 @@ object JDBCOptions { val JDBC_URL = newOption("url") val JDBC_TABLE_NAME = newOption("dbtable") + val JDBC_QUERY_STRING = newOption("query") val JDBC_DRIVER_CLASS = newOption("driver") val JDBC_PARTITION_COLUMN = newOption("partitionColumn") val JDBC_LOWER_BOUND = newOption("lowerBound") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 0bab3689e5d0e..7609a48231ea0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -51,7 +51,7 @@ object JDBCRDD extends Logging { */ def resolveTable(options: JDBCOptions): StructType = { val url = options.url - val table = options.table + val table = options.tableExpression val dialect = JdbcDialects.get(url) val conn: Connection = JdbcUtils.createConnectionFactory(options)() try { @@ -296,7 +296,7 @@ private[jdbc] class JDBCRDD( val myWhereClause = getWhereClause(part) - val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause" + val sqlText = s"SELECT $columnList FROM ${options.tableExpression} $myWhereClause" stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setFetchSize(options.fetchSize) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index b84543ccd7869..ab49ecebbd7ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -189,12 +189,12 @@ private[sql] case class JDBCRelation( override def insert(data: DataFrame, overwrite: Boolean): Unit = { data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) - .jdbc(jdbcOptions.url, jdbcOptions.table, jdbcOptions.asProperties) + .jdbc(jdbcOptions.url, jdbcOptions.tableExpression, jdbcOptions.asProperties) } override def toString: String = { val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else "" // credentials should not be included in the plan output, table information is sufficient. - s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo + s"JDBCRelation(${jdbcOptions.tableExpression})" + partitioningInfo } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 2b488bb7121dc..38a4829cf68de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -60,6 +60,10 @@ class JdbcRelationProvider extends CreatableRelationProvider parameters: Map[String, String], df: DataFrame): BaseRelation = { val options = new JDBCOptions(parameters) + require( + options.tableName.isDefined, + s"Option '${JDBCOptions.JDBC_TABLE_NAME}' is required. " + + s"Option '${JDBCOptions.JDBC_QUERY_STRING}' is not applicable while writing.") val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis val conn = JdbcUtils.createConnectionFactory(options)() @@ -75,7 +79,7 @@ class JdbcRelationProvider extends CreatableRelationProvider saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, options.table, options) + dropTable(conn, options.tableExpression, options) createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } @@ -86,7 +90,8 @@ class JdbcRelationProvider extends CreatableRelationProvider case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.") + s"Table or view '${options.tableExpression}' already exists. " + + s"SaveMode: ErrorIfExists.") case SaveMode.Ignore => // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 433443007cfd8..66edbf00373ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -74,7 +74,7 @@ object JdbcUtils extends Logging { // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.tableExpression)) try { statement.setQueryTimeout(options.queryTimeout) statement.executeQuery() @@ -105,7 +105,7 @@ object JdbcUtils extends Logging { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeUpdate(dialect.getTruncateQuery(options.table)) + statement.executeUpdate(dialect.getTruncateQuery(options.tableExpression)) } finally { statement.close() } @@ -255,7 +255,7 @@ object JdbcUtils extends Logging { val dialect = JdbcDialects.get(options.url) try { - val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table)) + val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableExpression)) try { statement.setQueryTimeout(options.queryTimeout) Some(getSchema(statement.executeQuery(), dialect)) @@ -811,7 +811,7 @@ object JdbcUtils extends Logging { isCaseSensitive: Boolean, options: JDBCOptions): Unit = { val url = options.url - val table = options.table + val table = options.tableExpression val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -841,7 +841,7 @@ object JdbcUtils extends Logging { options: JDBCOptions): Unit = { val strSchema = schemaString( df, options.url, options.createTableColumnTypes) - val table = options.table + val table = options.tableExpression val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 6ea61f02a8206..5af6244f629a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.DataSourceScanExec @@ -33,13 +33,13 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils} import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class JDBCSuite extends SparkFunSuite +class JDBCSuite extends QueryTest with BeforeAndAfter with PrivateMethodTester with SharedSQLContext { import testImplicits._ @@ -1099,7 +1099,7 @@ class JDBCSuite extends SparkFunSuite test("SPARK-19318: Connection properties keys should be case-sensitive.") { def testJdbcOptions(options: JDBCOptions): Unit = { // Spark JDBC data source options are case-insensitive - assert(options.table == "t1") + assert(options.tableExpression == "t1") // When we convert it to properties, it should be case-sensitive. assert(options.asProperties.size == 3) assert(options.asProperties.get("customkey") == null) @@ -1255,4 +1255,92 @@ class JDBCSuite extends SparkFunSuite testIncorrectJdbcPartitionColumn(testH2Dialect.quoteIdentifier("ThEiD")) } } + + test("query JDBC option - negative tests") { + val query = "SELECT * FROM test.people WHERE theid = 1" + // load path + val e1 = intercept[RuntimeException] { + val df = spark.read.format("jdbc") + .option("Url", urlWithUserAndPass) + .option("query", query) + .option("dbtable", "test.people") + .load() + }.getMessage + assert(e1.contains("Both 'dbtable' and 'query' can not be specified.")) + + // jdbc api path + val properties = new Properties() + properties.setProperty(JDBCOptions.JDBC_QUERY_STRING, query) + val e2 = intercept[RuntimeException] { + spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect() + }.getMessage + assert(e2.contains("Both 'dbtable' and 'query' can not be specified.")) + + val e3 = intercept[RuntimeException] { + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE', + | user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + }.getMessage + assert(e3.contains("Both 'dbtable' and 'query' can not be specified.")) + + val e4 = intercept[RuntimeException] { + val df = spark.read.format("jdbc") + .option("Url", urlWithUserAndPass) + .option("query", "") + .load() + }.getMessage + assert(e4.contains("One of the option `dbtable` or `query` should not be empty string.")) + + // Option query and partitioncolumn are not allowed together. + val expectedErrorMsg = + s""" + |Options 'query' and 'partitionColumn' can not be specified together. + |Please define the query using `dbtable` option instead and make sure to qualify + |the partition columns using the supplied subquery alias to resolve any ambiguity. + |Example : + |spark.read.format("jdbc") + | .option("dbtable", "(select c1, c2 from t1) as subq") + | .option("partitionColumn", "subq.c1" + | .load() + """.stripMargin + val e5 = intercept[RuntimeException] { + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass', + | partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3') + """.stripMargin.replaceAll("\n", " ")) + }.getMessage + assert(e5.contains(expectedErrorMsg)) + } + + test("query JDBC option") { + val query = "SELECT name, theid FROM test.people WHERE theid = 1" + // query option to pass on the query string. + val df = spark.read.format("jdbc") + .option("Url", urlWithUserAndPass) + .option("query", query) + .load() + checkAnswer( + df, + Row("fred", 1) :: Nil) + + // query option in the create table path. + sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW queryOption + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + + checkAnswer( + sql("select name, theid from queryOption"), + Row("fred", 1) :: Nil) + + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 1c2c92d1f0737..b751ec2de4825 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -293,13 +293,23 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { test("save errors if dbtable is not specified") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val e = intercept[RuntimeException] { + val e1 = intercept[RuntimeException] { + df.write.format("jdbc") + .option("url", url1) + .options(properties.asScala) + .save() + }.getMessage + assert(e1.contains("Option 'dbtable' or 'query' is required")) + + val e2 = intercept[RuntimeException] { df.write.format("jdbc") .option("url", url1) .options(properties.asScala) + .option("query", "select * from TEST.SAVETEST") .save() }.getMessage - assert(e.contains("Option 'dbtable' is required")) + val msg = "Option 'dbtable' is required. Option 'query' is not applicable while writing." + assert(e2.contains(msg)) } test("save errors if wrong user/password combination") { From 3375b949be23ca3d3bc85b08ccf23b8cad9f32f5 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 19 Jun 2018 22:35:43 -0700 Subject: [PATCH 2/8] Add doc --- docs/sql-programming-guide.md | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 196b814420be1..05ad7df709553 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1302,9 +1302,18 @@ the following case-insensitive options: dbtable - The JDBC table that should be read. Note that anything that is valid in a FROM clause of - a SQL query can be used. For example, instead of a full table you could also use a - subquery in parentheses. + The JDBC table that should be read from or written into. Note that when using it in the read + path anything that is valid in a FROM clause of a SQL query can be used. + For example, instead of a full table you could also use a subquery in parentheses. + + + + query + + A query that will be used to read data into Spark. The specified query will be parenthesized and used + as a subquery in the FROM clause. Spark will also assign a alias to the subquery clause. + As an example, spark will issue a query of the following form to the datasource.
+ SELECT <columns> FROM (<user_specified_query>) spark_generated_alias From 037f46fcffe77ba9a32273478fb5c402a0da3ed7 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 20 Jun 2018 16:47:52 -0700 Subject: [PATCH 3/8] code review --- docs/sql-programming-guide.md | 8 +++++--- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 10 +++++----- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 4 ++-- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 4 ++-- .../datasources/jdbc/JdbcRelationProvider.scala | 4 ++-- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 +++++----- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 ++-- 7 files changed, 23 insertions(+), 21 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 05ad7df709553..c3db12d36648b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1304,7 +1304,8 @@ the following case-insensitive options: The JDBC table that should be read from or written into. Note that when using it in the read path anything that is valid in a FROM clause of a SQL query can be used. - For example, instead of a full table you could also use a subquery in parentheses. + For example, instead of a full table you could also use a subquery in parentheses. Its not + allowed to specify `dbtable` and `query` options at the same time. @@ -1312,8 +1313,9 @@ the following case-insensitive options: A query that will be used to read data into Spark. The specified query will be parenthesized and used as a subquery in the FROM clause. Spark will also assign a alias to the subquery clause. - As an example, spark will issue a query of the following form to the datasource.
- SELECT <columns> FROM (<user_specified_query>) spark_generated_alias + As an example, spark will issue a query of the following form to the datasource.

+ SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

+ Its not allowed to specify `dbtable` and `query` options at the same time. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index c4094db537b11..3aae7a0f497dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -84,16 +84,16 @@ class JDBCOptions( ) // table name or a table expression. - val tableExpression = tableName.map(_.trim).getOrElse { + val tableOrQuery = tableName.map(_.trim).getOrElse { // We have ensured in the code above that either dbtable or query is specified. query.get match { - case subq if subq.nonEmpty => s"(${subq}) spark_gen_${curId.getAndIncrement()}" - case subq => subq + case subQuery if subQuery.nonEmpty => s"(${subQuery}) spark_gen_${curId.getAndIncrement()}" + case subQuery => subQuery } } - require(tableExpression.nonEmpty, - s"One of the option `$JDBC_TABLE_NAME` or `$JDBC_QUERY_STRING` should not be empty string." + require(tableOrQuery.nonEmpty, + s"Empty string is not allowed in either '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' options" ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 7609a48231ea0..1b3b17c75e756 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -51,7 +51,7 @@ object JDBCRDD extends Logging { */ def resolveTable(options: JDBCOptions): StructType = { val url = options.url - val table = options.tableExpression + val table = options.tableOrQuery val dialect = JdbcDialects.get(url) val conn: Connection = JdbcUtils.createConnectionFactory(options)() try { @@ -296,7 +296,7 @@ private[jdbc] class JDBCRDD( val myWhereClause = getWhereClause(part) - val sqlText = s"SELECT $columnList FROM ${options.tableExpression} $myWhereClause" + val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setFetchSize(options.fetchSize) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index ab49ecebbd7ed..97e2d255cb7be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -189,12 +189,12 @@ private[sql] case class JDBCRelation( override def insert(data: DataFrame, overwrite: Boolean): Unit = { data.write .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) - .jdbc(jdbcOptions.url, jdbcOptions.tableExpression, jdbcOptions.asProperties) + .jdbc(jdbcOptions.url, jdbcOptions.tableOrQuery, jdbcOptions.asProperties) } override def toString: String = { val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else "" // credentials should not be included in the plan output, table information is sufficient. - s"JDBCRelation(${jdbcOptions.tableExpression})" + partitioningInfo + s"JDBCRelation(${jdbcOptions.tableOrQuery})" + partitioningInfo } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 38a4829cf68de..2cf52078e1cf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -79,7 +79,7 @@ class JdbcRelationProvider extends CreatableRelationProvider saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, options.tableExpression, options) + dropTable(conn, options.tableOrQuery, options) createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } @@ -90,7 +90,7 @@ class JdbcRelationProvider extends CreatableRelationProvider case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '${options.tableExpression}' already exists. " + + s"Table or view '${options.tableOrQuery}' already exists. " + s"SaveMode: ErrorIfExists.") case SaveMode.Ignore => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 66edbf00373ff..f8cffb7037919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -74,7 +74,7 @@ object JdbcUtils extends Logging { // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.tableExpression)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.tableOrQuery)) try { statement.setQueryTimeout(options.queryTimeout) statement.executeQuery() @@ -105,7 +105,7 @@ object JdbcUtils extends Logging { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeUpdate(dialect.getTruncateQuery(options.tableExpression)) + statement.executeUpdate(dialect.getTruncateQuery(options.tableOrQuery)) } finally { statement.close() } @@ -255,7 +255,7 @@ object JdbcUtils extends Logging { val dialect = JdbcDialects.get(options.url) try { - val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableExpression)) + val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableOrQuery)) try { statement.setQueryTimeout(options.queryTimeout) Some(getSchema(statement.executeQuery(), dialect)) @@ -811,7 +811,7 @@ object JdbcUtils extends Logging { isCaseSensitive: Boolean, options: JDBCOptions): Unit = { val url = options.url - val table = options.tableExpression + val table = options.tableOrQuery val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -841,7 +841,7 @@ object JdbcUtils extends Logging { options: JDBCOptions): Unit = { val strSchema = schemaString( df, options.url, options.createTableColumnTypes) - val table = options.tableExpression + val table = options.tableOrQuery val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5af6244f629a7..b233a66752e6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1099,7 +1099,7 @@ class JDBCSuite extends QueryTest test("SPARK-19318: Connection properties keys should be case-sensitive.") { def testJdbcOptions(options: JDBCOptions): Unit = { // Spark JDBC data source options are case-insensitive - assert(options.tableExpression == "t1") + assert(options.tableOrQuery == "t1") // When we convert it to properties, it should be case-sensitive. assert(options.asProperties.size == 3) assert(options.asProperties.get("customkey") == null) @@ -1293,7 +1293,7 @@ class JDBCSuite extends QueryTest .option("query", "") .load() }.getMessage - assert(e4.contains("One of the option `dbtable` or `query` should not be empty string.")) + assert(e4.contains("Empty string is not allowed in either 'dbtable' or 'query' options")) // Option query and partitioncolumn are not allowed together. val expectedErrorMsg = From cb447f0f3b3ef9a05cbba8a3551538432d398326 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 25 Jun 2018 00:24:26 -0700 Subject: [PATCH 4/8] code review - sean --- docs/sql-programming-guide.md | 21 ++++++++-- .../datasources/jdbc/JDBCOptions.scala | 39 +++++++++++++------ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++-- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index c3db12d36648b..7c4ef41cc8907 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1304,7 +1304,7 @@ the following case-insensitive options: The JDBC table that should be read from or written into. Note that when using it in the read path anything that is valid in a FROM clause of a SQL query can be used. - For example, instead of a full table you could also use a subquery in parentheses. Its not + For example, instead of a full table you could also use a subquery in parentheses. It is not allowed to specify `dbtable` and `query` options at the same time. @@ -1312,10 +1312,23 @@ the following case-insensitive options: query A query that will be used to read data into Spark. The specified query will be parenthesized and used - as a subquery in the FROM clause. Spark will also assign a alias to the subquery clause. - As an example, spark will issue a query of the following form to the datasource.

+ as a subquery in the FROM clause. Spark will also assign an alias to the subquery clause. + As an example, spark will issue a query of the following form to the JDBC Source.

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

- Its not allowed to specify `dbtable` and `query` options at the same time. + Below are couple of restrictions while using this option.
+
    +
  1. It is not allowed to specify `dbtable` and `query` options at the same time.
  2. +
  3. It is not allowed to spcify `query` and `partitionColumn` options at the same time. When specifying + `partitionColumn` option is required, the subquery can be specified using `dbtable` option instead and + partition columns can be qualified using the subquery alias provided as part of `dbtable`.
    + Example:
    + + spark.read.format("jdbc")
    +    .option("dbtable", "(select c1, c2 from t1) as subq")
    +    .option("partitionColumn", "subq.c1"
    +    .load() +
  4. +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 3aae7a0f497dc..42ab759aab168 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -65,14 +65,14 @@ class JDBCOptions( // Required parameters // ------------------------------------------------------------ require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") - // a JDBC URL val url = parameters(JDBC_URL) val tableName = parameters.get(JDBC_TABLE_NAME) val query = parameters.get(JDBC_QUERY_STRING) - // Following two conditions make sure that : + // Following code checks to make sure that : // 1. One of the option (dbtable or query) must be specified. // 2. Both of them can not be specified at the same time as they are conflicting in nature. + /* require( tableName.isDefined || query.isDefined, s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." @@ -80,21 +80,38 @@ class JDBCOptions( require( !(tableName.isDefined && query.isDefined), - s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified." + s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." ) - - // table name or a table expression. - val tableOrQuery = tableName.map(_.trim).getOrElse { - // We have ensured in the code above that either dbtable or query is specified. - query.get match { - case subQuery if subQuery.nonEmpty => s"(${subQuery}) spark_gen_${curId.getAndIncrement()}" - case subQuery => subQuery - } + */ + // table name or a table subquery. + val tableOrQuery = (tableName, query) match { + case (Some(name), Some(subquery)) => + throw new IllegalArgumentException( + s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." + ) + case (None, None) => + throw new IllegalArgumentException( + s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + ) + case (Some(name), None) => + if (name.isEmpty) { + throw new IllegalArgumentException(s"Option '${JDBC_TABLE_NAME}' can not be empty.") + } else { + name.trim + } + case (None, Some(subquery)) => + if (subquery.isEmpty) { + throw new IllegalArgumentException(s"Option `${JDBC_QUERY_STRING}` can not be empty.") + } else { + s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}" + } } + /* require(tableOrQuery.nonEmpty, s"Empty string is not allowed in either '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' options" ) + */ // Optional parameters diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b233a66752e6b..c4915158845e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1266,7 +1266,7 @@ class JDBCSuite extends QueryTest .option("dbtable", "test.people") .load() }.getMessage - assert(e1.contains("Both 'dbtable' and 'query' can not be specified.")) + assert(e1.contains("Both 'dbtable' and 'query' can not be specified at the same time.")) // jdbc api path val properties = new Properties() @@ -1274,7 +1274,7 @@ class JDBCSuite extends QueryTest val e2 = intercept[RuntimeException] { spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect() }.getMessage - assert(e2.contains("Both 'dbtable' and 'query' can not be specified.")) + assert(e2.contains("Both 'dbtable' and 'query' can not be specified at the same time.")) val e3 = intercept[RuntimeException] { sql( @@ -1285,7 +1285,7 @@ class JDBCSuite extends QueryTest | user 'testUser', password 'testPass') """.stripMargin.replaceAll("\n", " ")) }.getMessage - assert(e3.contains("Both 'dbtable' and 'query' can not be specified.")) + assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time.")) val e4 = intercept[RuntimeException] { val df = spark.read.format("jdbc") @@ -1293,7 +1293,7 @@ class JDBCSuite extends QueryTest .option("query", "") .load() }.getMessage - assert(e4.contains("Empty string is not allowed in either 'dbtable' or 'query' options")) + assert(e4.contains("Option `query` can not be empty.")) // Option query and partitioncolumn are not allowed together. val expectedErrorMsg = From c3feadac3edd94c9557bd0f3a940f0eef61274f1 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 25 Jun 2018 15:46:44 -0700 Subject: [PATCH 5/8] Code review --- .../datasources/jdbc/JDBCOptions.scala | 51 +++++++++---------- .../jdbc/JdbcRelationProvider.scala | 10 ++-- .../datasources/jdbc/JdbcUtils.scala | 16 +++--- 3 files changed, 36 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 42ab759aab168..292ef148fa115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType * Options for the JDBC data source. */ class JDBCOptions( - @transient private val parameters: CaseInsensitiveMap[String]) + @transient val parameters: CaseInsensitiveMap[String]) extends Serializable { import JDBCOptions._ @@ -67,24 +67,8 @@ class JDBCOptions( require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.") // a JDBC URL val url = parameters(JDBC_URL) - val tableName = parameters.get(JDBC_TABLE_NAME) - val query = parameters.get(JDBC_QUERY_STRING) - // Following code checks to make sure that : - // 1. One of the option (dbtable or query) must be specified. - // 2. Both of them can not be specified at the same time as they are conflicting in nature. - /* - require( - tableName.isDefined || query.isDefined, - s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." - ) - - require( - !(tableName.isDefined && query.isDefined), - s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." - ) - */ // table name or a table subquery. - val tableOrQuery = (tableName, query) match { + val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match { case (Some(name), Some(subquery)) => throw new IllegalArgumentException( s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time." @@ -107,13 +91,6 @@ class JDBCOptions( } } - /* - require(tableOrQuery.nonEmpty, - s"Empty string is not allowed in either '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' options" - ) - */ - - // Optional parameters // ------------------------------------------------------------ val driverClass = { @@ -152,7 +129,7 @@ class JDBCOptions( s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " + s"and '$JDBC_NUM_PARTITIONS'") - require(!(query.isDefined && partitionColumn.isDefined), + require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined), s""" |Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together. |Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify @@ -232,3 +209,25 @@ object JDBCOptions { val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") } + +class JdbcOptionsInWrite( + @transient override val parameters: CaseInsensitiveMap[String]) + extends JDBCOptions(parameters) { + + import JDBCOptions._ + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + def this(url: String, table: String, parameters: Map[String, String]) = { + this(CaseInsensitiveMap(parameters ++ Map( + JDBCOptions.JDBC_URL -> url, + JDBCOptions.JDBC_TABLE_NAME -> table))) + } + + require( + parameters.get(JDBC_TABLE_NAME).isDefined, + s"Option '${JDBCOptions.JDBC_TABLE_NAME}' is required. " + + s"Option '${JDBCOptions.JDBC_QUERY_STRING}' is not applicable while writing.") + + val destinationTable = parameters(JDBC_TABLE_NAME) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 2cf52078e1cf3..0207ddd36844d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -59,11 +59,7 @@ class JdbcRelationProvider extends CreatableRelationProvider mode: SaveMode, parameters: Map[String, String], df: DataFrame): BaseRelation = { - val options = new JDBCOptions(parameters) - require( - options.tableName.isDefined, - s"Option '${JDBCOptions.JDBC_TABLE_NAME}' is required. " + - s"Option '${JDBCOptions.JDBC_QUERY_STRING}' is not applicable while writing.") + val options = new JdbcOptionsInWrite(parameters) val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis val conn = JdbcUtils.createConnectionFactory(options)() @@ -79,7 +75,7 @@ class JdbcRelationProvider extends CreatableRelationProvider saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, options.tableOrQuery, options) + dropTable(conn, options.destinationTable, options) createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } @@ -90,7 +86,7 @@ class JdbcRelationProvider extends CreatableRelationProvider case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '${options.tableOrQuery}' already exists. " + + s"Table or view '${options.destinationTable}' already exists. " + s"SaveMode: ErrorIfExists.") case SaveMode.Ignore => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index f8cffb7037919..982b06c40aa51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -67,14 +67,14 @@ object JdbcUtils extends Logging { /** * Returns true if the table already exists in the JDBC database. */ - def tableExists(conn: Connection, options: JDBCOptions): Boolean = { + def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = { val dialect = JdbcDialects.get(options.url) // Somewhat hacky, but there isn't a good way to identify whether a table exists for all // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.tableOrQuery)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.destinationTable)) try { statement.setQueryTimeout(options.queryTimeout) statement.executeQuery() @@ -100,12 +100,12 @@ object JdbcUtils extends Logging { /** * Truncates a table from the JDBC database without side effects. */ - def truncateTable(conn: Connection, options: JDBCOptions): Unit = { + def truncateTable(conn: Connection, options: JdbcOptionsInWrite): Unit = { val dialect = JdbcDialects.get(options.url) val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeUpdate(dialect.getTruncateQuery(options.tableOrQuery)) + statement.executeUpdate(dialect.getTruncateQuery(options.destinationTable)) } finally { statement.close() } @@ -809,9 +809,9 @@ object JdbcUtils extends Logging { df: DataFrame, tableSchema: Option[StructType], isCaseSensitive: Boolean, - options: JDBCOptions): Unit = { + options: JdbcOptionsInWrite): Unit = { val url = options.url - val table = options.tableOrQuery + val table = options.destinationTable val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -838,10 +838,10 @@ object JdbcUtils extends Logging { def createTable( conn: Connection, df: DataFrame, - options: JDBCOptions): Unit = { + options: JdbcOptionsInWrite): Unit = { val strSchema = schemaString( df, options.url, options.createTableColumnTypes) - val table = options.tableOrQuery + val table = options.destinationTable val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be From 765de0bbdc179b17fe79c8f14df5e1f4c740a861 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 25 Jun 2018 15:55:04 -0700 Subject: [PATCH 6/8] conflict --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index c4915158845e2..0389273d6cdfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils} import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ From 57f9e3fa6609271a4710858da64af444c56cff08 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 25 Jun 2018 23:38:21 -0700 Subject: [PATCH 7/8] Code review --- .../datasources/jdbc/JDBCOptions.scala | 53 ++++++++++--------- .../jdbc/JdbcRelationProvider.scala | 4 +- .../datasources/jdbc/JdbcUtils.scala | 8 +-- 3 files changed, 33 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 292ef148fa115..b52cf1f87e16e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -75,22 +75,23 @@ class JDBCOptions( ) case (None, None) => throw new IllegalArgumentException( - s"Option '$JDBC_TABLE_NAME' or '${JDBC_QUERY_STRING}' is required." + s"Option '$JDBC_TABLE_NAME' or '$JDBC_QUERY_STRING' is required." ) case (Some(name), None) => if (name.isEmpty) { - throw new IllegalArgumentException(s"Option '${JDBC_TABLE_NAME}' can not be empty.") + throw new IllegalArgumentException(s"Option '$JDBC_TABLE_NAME' can not be empty.") } else { name.trim } case (None, Some(subquery)) => if (subquery.isEmpty) { - throw new IllegalArgumentException(s"Option `${JDBC_QUERY_STRING}` can not be empty.") + throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.") } else { s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}" } } + // ------------------------------------------------------------ // Optional parameters // ------------------------------------------------------------ val driverClass = { @@ -182,6 +183,28 @@ class JDBCOptions( val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT) } +class JdbcOptionsInWrite( + @transient override val parameters: CaseInsensitiveMap[String]) + extends JDBCOptions(parameters) { + + import JDBCOptions._ + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + def this(url: String, table: String, parameters: Map[String, String]) = { + this(CaseInsensitiveMap(parameters ++ Map( + JDBCOptions.JDBC_URL -> url, + JDBCOptions.JDBC_TABLE_NAME -> table))) + } + + require( + parameters.get(JDBC_TABLE_NAME).isDefined, + s"Option '$JDBC_TABLE_NAME' is required. " + + s"Option '$JDBC_QUERY_STRING' is not applicable while writing.") + + val table = parameters(JDBC_TABLE_NAME) +} + object JDBCOptions { private val curId = new java.util.concurrent.atomic.AtomicLong(0L) private val jdbcOptionNames = collection.mutable.Set[String]() @@ -208,26 +231,4 @@ object JDBCOptions { val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") -} - -class JdbcOptionsInWrite( - @transient override val parameters: CaseInsensitiveMap[String]) - extends JDBCOptions(parameters) { - - import JDBCOptions._ - - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) - - def this(url: String, table: String, parameters: Map[String, String]) = { - this(CaseInsensitiveMap(parameters ++ Map( - JDBCOptions.JDBC_URL -> url, - JDBCOptions.JDBC_TABLE_NAME -> table))) - } - - require( - parameters.get(JDBC_TABLE_NAME).isDefined, - s"Option '${JDBCOptions.JDBC_TABLE_NAME}' is required. " + - s"Option '${JDBCOptions.JDBC_QUERY_STRING}' is not applicable while writing.") - - val destinationTable = parameters(JDBC_TABLE_NAME) -} +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 0207ddd36844d..782d626c1573c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -75,7 +75,7 @@ class JdbcRelationProvider extends CreatableRelationProvider saveTable(df, tableSchema, isCaseSensitive, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it - dropTable(conn, options.destinationTable, options) + dropTable(conn, options.table, options) createTable(conn, df, options) saveTable(df, Some(df.schema), isCaseSensitive, options) } @@ -86,7 +86,7 @@ class JdbcRelationProvider extends CreatableRelationProvider case SaveMode.ErrorIfExists => throw new AnalysisException( - s"Table or view '${options.destinationTable}' already exists. " + + s"Table or view '${options.table}' already exists. " + s"SaveMode: ErrorIfExists.") case SaveMode.Ignore => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 982b06c40aa51..b81737eda475b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -74,7 +74,7 @@ object JdbcUtils extends Logging { // SQL database systems using JDBC meta data calls, considering "table" could also include // the database name. Query used to find table exists can be overridden by the dialects. Try { - val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.destinationTable)) + val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table)) try { statement.setQueryTimeout(options.queryTimeout) statement.executeQuery() @@ -105,7 +105,7 @@ object JdbcUtils extends Logging { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeUpdate(dialect.getTruncateQuery(options.destinationTable)) + statement.executeUpdate(dialect.getTruncateQuery(options.table)) } finally { statement.close() } @@ -811,7 +811,7 @@ object JdbcUtils extends Logging { isCaseSensitive: Boolean, options: JdbcOptionsInWrite): Unit = { val url = options.url - val table = options.destinationTable + val table = options.table val dialect = JdbcDialects.get(url) val rddSchema = df.schema val getConnection: () => Connection = createConnectionFactory(options) @@ -841,7 +841,7 @@ object JdbcUtils extends Logging { options: JdbcOptionsInWrite): Unit = { val strSchema = schemaString( df, options.url, options.createTableColumnTypes) - val table = options.destinationTable + val table = options.table val createTableOptions = options.createTableOptions // Create the table if the table does not exist. // To allow certain options to append when create a new table, which can be From c083e13289b302efbbe6bc6c915a3b47268a63fc Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 25 Jun 2018 23:40:38 -0700 Subject: [PATCH 8/8] style --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index b52cf1f87e16e..eea966d30948b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -231,4 +231,4 @@ object JDBCOptions { val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement") -} \ No newline at end of file +}