From bd917cebdc023658e6ec9d4f98b88ea96c7c17ce Mon Sep 17 00:00:00 2001 From: "wangzixuan.wzxuan" Date: Fri, 24 Jun 2022 13:07:31 +0800 Subject: [PATCH 1/3] [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer --- .../spark/sql/avro/AvroDeserializer.scala | 2 ++ .../AvroCatalystDataConversionSuite.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5bb51a92977af..1192856ae7796 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -195,6 +195,8 @@ private[sql] class AvroDeserializer( case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index a43d171fb52d3..1ceeccd0ae253 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import java.nio.ByteBuffer + class AvroCatalystDataConversionSuite extends SparkFunSuite with SharedSparkSession with ExpressionEvalHelper { @@ -360,4 +362,27 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite None, new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } + + test("AvroDeserializer with binary type") { + val jsonFormatSchema = + """ + |{ "type": "record", + | "name": "record", + | "fields" : [{ + | "name": "a", + | "type": { + | "type": "bytes" + | } + | }] + |} + """.stripMargin + val avroSchema = new Schema.Parser().parse(jsonFormatSchema) + val avroRecord = new GenericData.Record(avroSchema) + val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("a", bb) + + val expected = InternalRow(Array[Byte](97, 48, 53)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + } } From 1cb156cb665665ec694f5222e6120079ca1686ea Mon Sep 17 00:00:00 2001 From: "wangzixuan.wzxuan" Date: Sat, 25 Jun 2022 00:29:56 +0800 Subject: [PATCH 2/3] remove unnecessary nested type --- .../sql/avro/AvroCatalystDataConversionSuite.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 1ceeccd0ae253..c4a38728a57e2 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -366,14 +366,12 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite test("AvroDeserializer with binary type") { val jsonFormatSchema = """ - |{ "type": "record", + |{ + | "type": "record", | "name": "record", - | "fields" : [{ - | "name": "a", - | "type": { - | "type": "bytes" - | } - | }] + | "fields" : [ + | {"name": "a", "type": "bytes"} + | ] |} """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) From 83094a82e36fa767b47d05c621d3d47c228a0c8a Mon Sep 17 00:00:00 2001 From: "wangzixuan.wzxuan" Date: Sat, 25 Jun 2022 13:31:01 +0800 Subject: [PATCH 3/3] fix code style --- .../spark/sql/avro/AvroCatalystDataConversionSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index c4a38728a57e2..5c0d64b4d55eb 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -36,8 +36,6 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import java.nio.ByteBuffer - class AvroCatalystDataConversionSuite extends SparkFunSuite with SharedSparkSession with ExpressionEvalHelper { @@ -376,7 +374,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) val avroRecord = new GenericData.Record(avroSchema) - val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53)) + val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53)) avroRecord.put("a", bb) val expected = InternalRow(Array[Byte](97, 48, 53))