From 12b115b2151728ad0372c01766b576048dc42b4f Mon Sep 17 00:00:00 2001 From: Thomas D'Silva Date: Tue, 1 Jan 2019 14:11:14 +0800 Subject: [PATCH 1/2] [SPARK-26499][SQL] JdbcUtils.makeGetter does not handle ByteType MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …Type ## What changes were proposed in this pull request? Modifed JdbcUtils.makeGetter to handle ByteType. ## How was this patch tested? Added a new test to JDBCSuite that maps ```TINYINT``` to ```ByteType```. Closes #23400 from twdsilva/tiny_int_support. Authored-by: Thomas D'Silva Signed-off-by: Hyukjin Kwon --- .../datasources/jdbc/JdbcUtils.scala | 4 +++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index edea549748b47..922bef284c98e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -438,6 +438,10 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setShort(pos, rs.getShort(pos + 1)) + case ByteType => + (rs: ResultSet, row: InternalRow, pos: Int) => + row.update(pos, rs.getByte(pos + 1)) + case StringType => (rs: ResultSet, row: InternalRow, pos: Int) => // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 94333e8716897..66e347dfe9412 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -57,6 +57,20 @@ class JDBCSuite extends QueryTest Some(StringType) } + val testH2DialectTinyInt = new JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = { + sqlType match { + case java.sql.Types.TINYINT => Some(ByteType) + case _ => None + } + } + } + before { Utils.classForName("org.h2.Driver") // Extra properties that will be specified for our database. We need these to test @@ -685,6 +699,17 @@ class JDBCSuite extends QueryTest JdbcDialects.unregisterDialect(testH2Dialect) } + test("Map TINYINT to ByteType via JdbcDialects") { + JdbcDialects.registerDialect(testH2DialectTinyInt) + val df = spark.read.jdbc(urlWithUserAndPass, "test.inttypes", new Properties()) + val rows = df.collect() + assert(rows.length === 2) + assert(rows(0).get(2).isInstanceOf[Byte]) + assert(rows(0).getByte(2) === 3) + assert(rows(1).isNullAt(2)) + JdbcDialects.unregisterDialect(testH2DialectTinyInt) + } + test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) From f7ba5f8c071afe901865e55336cee6054401280a Mon Sep 17 00:00:00 2001 From: shivsood Date: Thu, 14 Nov 2019 15:47:03 -0800 Subject: [PATCH 2/2] fixed setUpdate to setByte and added PR prefix to testcase --- .../apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 2 +- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 922bef284c98e..c0f628ff04108 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -440,7 +440,7 @@ object JdbcUtils extends Logging { case ByteType => (rs: ResultSet, row: InternalRow, pos: Int) => - row.update(pos, rs.getByte(pos + 1)) + row.setByte(pos, rs.getByte(pos + 1)) case StringType => (rs: ResultSet, row: InternalRow, pos: Int) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 66e347dfe9412..2dcedc3fc1cc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -699,7 +699,7 @@ class JDBCSuite extends QueryTest JdbcDialects.unregisterDialect(testH2Dialect) } - test("Map TINYINT to ByteType via JdbcDialects") { + test("SPARK-26499: Map TINYINT to ByteType via JdbcDialects") { JdbcDialects.registerDialect(testH2DialectTinyInt) val df = spark.read.jdbc(urlWithUserAndPass, "test.inttypes", new Properties()) val rows = df.collect()