diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5bb51a92977af..1192856ae7796 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -195,6 +195,8 @@ private[sql] class AvroDeserializer( case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index a43d171fb52d3..5c0d64b4d55eb 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -360,4 +360,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite None, new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema)) } + + test("AvroDeserializer with binary type") { + val jsonFormatSchema = + """ + |{ + | "type": "record", + | "name": "record", + | "fields" : [ + | {"name": "a", "type": "bytes"} + | ] + |} + """.stripMargin + val avroSchema = new Schema.Parser().parse(jsonFormatSchema) + val avroRecord = new GenericData.Record(avroSchema) + val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("a", bb) + + val expected = InternalRow(Array[Byte](97, 48, 53)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + checkDeserialization(avroSchema, avroRecord, Some(expected)) + } }