From 57f59d40121e45cdceaef7d077878b08dfd55825 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 16 Aug 2015 23:30:00 +0800 Subject: [PATCH 1/2] Load JDBC driver in DataFrameReader.jdbc and DataFrameWriter.jdbc --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 6 ++++++ .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 ++ .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 5 +++-- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 4 ++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9ea955b010017..67b1059ab358a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -124,6 +124,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. * + * Note: Options in this reader will be ignored. + * * @since 1.4.0 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { @@ -138,6 +140,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * + * Note: Options in this reader will be ignored. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param columnName the name of a column of integral type that will be used for partitioning. @@ -173,6 +177,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * + * Note: Options in this reader will be ignored. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param predicates Condition in the where clause for each partition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5fa11da4c38cd..7d557d436d700 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -237,6 +237,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * + * Note: Options in this writer will be ignored. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 8eab6a0adccc4..e537d631f4559 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging { */ def resolveTable(url: String, table: String, properties: Properties): StructType = { val dialect = JdbcDialects.get(url) - val conn: Connection = DriverManager.getConnection(url, properties) + val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)() try { val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery() try { @@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging { * getConnector is run on the driver code, while the function it returns * is run on the executor. * - * @param driver - The class name of the JDBC driver for the given url. + * @param driver - The class name of the JDBC driver for the given url, or null if the class name + * is not necessary. * @param url - The JDBC url to connect to. * * @return A function that loads the driver and connects to the url. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 8ee3b8bda8fc7..2d0e736ee4766 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, DriverManager, PreparedStatement} +import java.sql.{Connection, PreparedStatement} import java.util.Properties import scala.util.Try @@ -36,7 +36,7 @@ object JdbcUtils extends Logging { * Establishes a JDBC connection. */ def createConnection(url: String, connectionProperties: Properties): Connection = { - DriverManager.getConnection(url, connectionProperties) + JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)() } /** From adf75dee6d4ed61075755163303a9072802d0bdd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 17 Aug 2015 14:59:18 +0800 Subject: [PATCH 2/2] Add extraOptions to the connection properties --- .../org/apache/spark/sql/DataFrameReader.scala | 14 +++++++------- .../org/apache/spark/sql/DataFrameWriter.scala | 12 ++++++++---- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 67b1059ab358a..6dc7bfe333498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -124,8 +124,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. * - * Note: Options in this reader will be ignored. - * * @since 1.4.0 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { @@ -140,8 +138,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * - * Note: Options in this reader will be ignored. - * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param columnName the name of a column of integral type that will be used for partitioning. @@ -177,8 +173,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * - * Note: Options in this reader will be ignored. - * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param predicates Condition in the where clause for each partition. @@ -203,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { table: String, parts: Array[Partition], connectionProperties: Properties): DataFrame = { - val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext) + val props = new Properties() + extraOptions.foreach { case (key, value) => + props.put(key, value) + } + // connectionProperties should override settings in extraOptions + props.putAll(connectionProperties) + val relation = JDBCRelation(url, table, parts, props)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7d557d436d700..f0bf1be506411 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -237,8 +237,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash * your external database systems. * - * Note: Options in this writer will be ignored. - * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string @@ -246,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) { * should be included. */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { - val conn = JdbcUtils.createConnection(url, connectionProperties) + val props = new Properties() + extraOptions.foreach { case (key, value) => + props.put(key, value) + } + // connectionProperties should override settings in extraOptions + props.putAll(connectionProperties) + val conn = JdbcUtils.createConnection(url, props) try { var tableExists = JdbcUtils.tableExists(conn, table) @@ -274,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { conn.close() } - JdbcUtils.saveTable(df, url, table, connectionProperties) + JdbcUtils.saveTable(df, url, table, props) } /**