Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val relation = JDBCRelation(url, table, parts, props)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,13 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* should be included.
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val conn = JdbcUtils.createConnection(url, connectionProperties)
val props = new Properties()
extraOptions.foreach { case (key, value) =>
props.put(key, value)
}
// connectionProperties should override settings in extraOptions
props.putAll(connectionProperties)
val conn = JdbcUtils.createConnection(url, props)

try {
var tableExists = JdbcUtils.tableExists(conn, table)
Expand Down Expand Up @@ -272,7 +278,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close()
}

JdbcUtils.saveTable(df, url, table, connectionProperties)
JdbcUtils.saveTable(df, url, table, props)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] object JDBCRDD extends Logging {
*/
def resolveTable(url: String, table: String, properties: Properties): StructType = {
val dialect = JdbcDialects.get(url)
val conn: Connection = DriverManager.getConnection(url, properties)
val conn: Connection = getConnector(properties.getProperty("driver"), url, properties)()
try {
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
try {
Expand Down Expand Up @@ -171,7 +171,8 @@ private[sql] object JDBCRDD extends Logging {
* getConnector is run on the driver code, while the function it returns
* is run on the executor.
*
* @param driver - The class name of the JDBC driver for the given url.
* @param driver - The class name of the JDBC driver for the given url, or null if the class name
* is not necessary.
* @param url - The JDBC url to connect to.
*
* @return A function that loads the driver and connects to the url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.sql.{Connection, PreparedStatement}
import java.util.Properties

import scala.util.Try
Expand All @@ -36,7 +36,7 @@ object JdbcUtils extends Logging {
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties)
JDBCRDD.getConnector(connectionProperties.getProperty("driver"), url, connectionProperties)()
}

/**
Expand Down