From 50963c37c0e293fddbe5444cc17f52d6e0841e4e Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 15 Jan 2026 13:51:40 -0800 Subject: [PATCH 1/4] fix: empty array offset should be 0 --- python/pyspark/sql/tests/arrow/test_arrow.py | 34 +++++++++++++++ .../sql/execution/arrow/ArrowWriter.scala | 11 +++++ .../execution/arrow/ArrowWriterSuite.scala | 42 +++++++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py b/python/pyspark/sql/tests/arrow/test_arrow.py index a29408980faba..0217c5a450053 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow.py +++ b/python/pyspark/sql/tests/arrow/test_arrow.py @@ -1854,6 +1854,40 @@ def test_toArrow_with_compression_codec_large_dataset(self): self.assertEqual(t.column_names, ["id", "str_col", "mod_col"]) + + def test_toPandas_double_nested_array_empty_outer(self): + schema = StructType([StructField("data", ArrayType(ArrayType(StringType())))]) + df = self.spark.createDataFrame([Row(data=[])], schema=schema) + pdf = df.toPandas() + self.assertEqual(len(pdf), 1) + self.assertEqual(len(pdf["data"][0]), 0) + + def test_toPandas_array_of_map_empty_outer(self): + schema = StructType([StructField("data", ArrayType(MapType(StringType(), StringType())))]) + df = self.spark.createDataFrame([Row(data=[])], schema=schema) + pdf = df.toPandas() + self.assertEqual(len(pdf), 1) + self.assertEqual(len(pdf["data"][0]), 0) + + def test_toPandas_triple_nested_array_empty_outer(self): + # SPARK-55056: This triggers SIGSEGV without the fix. + # When the outer array is empty, the second-level ArrayWriter is never + # invoked, so its count stays 0. Arrow format requires ListArray offset + # buffer to have N+1 entries even when N=0, but getBufferSizeFor(0) + # returns 0 and the buffer is omitted in IPC serialization. + schema = StructType([StructField("data", ArrayType(ArrayType(ArrayType(StringType()))))]) + df = self.spark.createDataFrame([Row(data=[])], schema=schema) + pdf = df.toPandas() + self.assertEqual(len(pdf), 1) + self.assertEqual(len(pdf["data"][0]), 0) + + def test_toPandas_nested_array_with_map_empty_outer(self): + schema = StructType([StructField("data", ArrayType(ArrayType(MapType(StringType(), StringType()))))]) + df = self.spark.createDataFrame([Row(data=[])], schema=schema) + pdf = df.toPandas() + self.assertEqual(len(pdf), 1) + self.assertEqual(len(pdf["data"][0]), 0) + @unittest.skipIf( not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 8d68e74dbf874..3f71edcd48011 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -402,6 +402,17 @@ private[arrow] class ArrayWriter( } override def finish(): Unit = { + // SPARK-55056: Arrow format requires ListArray offset buffer to have N+1 entries. + // Even when N=0, the buffer must contain [0]. When the outer array is empty, + // nested ArrayWriters are never invoked, so their count stays 0. Then + // getBufferSizeFor(0) returns 0, and the offset buffer is omitted in IPC + // serialization — violating Arrow spec. Simulate one empty write to ensure + // the offset buffer is properly initialized. + if (count == 0) { + valueVector.startNewValue(0) + valueVector.endValue(0, 0) + count = 1 + } super.finish() elementWriter.finish() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala index 2c0c0494bbacf..bb8b8bc28e06b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala @@ -875,4 +875,46 @@ class ArrowWriterSuite extends SparkFunSuite { assert(map2.keyArray().array().mkString(",") == Array(1).mkString(",")) assert(stringRepr(map2) == Array("bob", "40").mkString(",")) } + + test("SPARK-55056: triple nested array with empty outer array") { + // Schema: array>> + // This triggers SIGSEGV without the fix. When the outer array is empty, + // the second-level ArrayWriter is never invoked, so its count stays 0. + // Arrow format requires ListArray offset buffer to have N+1 entries even + // when N=0, but getBufferSizeFor(0) returns 0 and the buffer is omitted. + val schema = new StructType() + .add("data", ArrayType(ArrayType(ArrayType(StringType)))) + val writer = ArrowWriter.create(schema, null) + assert(writer.schema === schema) + + // Write a row with an empty outer array + writer.write(InternalRow(ArrayData.toArrayData(Array.empty))) + writer.finish() + + val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0)) + val array0 = reader.getArray(0) + assert(array0.numElements() === 0) + + writer.root.close() + } + + test("SPARK-55056: nested array with map inside empty outer array") { + // Schema: array>> + // Regression test - two-level array with map does not trigger the issue, + // but we keep this test to ensure the fix doesn't break normal cases. + val schema = new StructType() + .add("data", ArrayType(ArrayType(MapType(StringType, StringType)))) + val writer = ArrowWriter.create(schema, null) + assert(writer.schema === schema) + + // Write a row with an empty outer array + writer.write(InternalRow(ArrayData.toArrayData(Array.empty))) + writer.finish() + + val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0)) + val array0 = reader.getArray(0) + assert(array0.numElements() === 0) + + writer.root.close() + } } From b4b7a4b4f356b5b2e22951ef9381d6ab74bc40fe Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 15 Jan 2026 13:52:01 -0800 Subject: [PATCH 2/4] fix: format --- python/pyspark/sql/tests/arrow/test_arrow.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py b/python/pyspark/sql/tests/arrow/test_arrow.py index 0217c5a450053..76a29afd4063e 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow.py +++ b/python/pyspark/sql/tests/arrow/test_arrow.py @@ -1853,8 +1853,6 @@ def test_toArrow_with_compression_codec_large_dataset(self): self.assertEqual(t.num_rows, 10000) self.assertEqual(t.column_names, ["id", "str_col", "mod_col"]) - - def test_toPandas_double_nested_array_empty_outer(self): schema = StructType([StructField("data", ArrayType(ArrayType(StringType())))]) df = self.spark.createDataFrame([Row(data=[])], schema=schema) @@ -1882,12 +1880,15 @@ def test_toPandas_triple_nested_array_empty_outer(self): self.assertEqual(len(pdf["data"][0]), 0) def test_toPandas_nested_array_with_map_empty_outer(self): - schema = StructType([StructField("data", ArrayType(ArrayType(MapType(StringType(), StringType()))))]) + schema = StructType( + [StructField("data", ArrayType(ArrayType(MapType(StringType(), StringType()))))] + ) df = self.spark.createDataFrame([Row(data=[])], schema=schema) pdf = df.toPandas() self.assertEqual(len(pdf), 1) self.assertEqual(len(pdf["data"][0]), 0) + @unittest.skipIf( not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), From 9c926d3ec042a5e9b3e712d42679762a205675ac Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:41:39 -0800 Subject: [PATCH 3/4] fix: change constructor instead --- .../spark/sql/execution/arrow/ArrowWriter.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 3f71edcd48011..e0cf8c8b4d42e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -387,6 +387,11 @@ private[arrow] class ArrayWriter( val valueVector: ListVector, val elementWriter: ArrowFieldWriter) extends ArrowFieldWriter { + // SPARK-55056: Arrow format requires ListArray offset buffer to have N+1 entries. + // Even when N=0, the buffer must contain [0]. Initialize offset buffer at construction + // to ensure it exists even if no elements are written. + valueVector.getOffsetBuffer.setInt(0, 0) + override def setNull(): Unit = { } @@ -402,17 +407,6 @@ private[arrow] class ArrayWriter( } override def finish(): Unit = { - // SPARK-55056: Arrow format requires ListArray offset buffer to have N+1 entries. - // Even when N=0, the buffer must contain [0]. When the outer array is empty, - // nested ArrayWriters are never invoked, so their count stays 0. Then - // getBufferSizeFor(0) returns 0, and the offset buffer is omitted in IPC - // serialization — violating Arrow spec. Simulate one empty write to ensure - // the offset buffer is properly initialized. - if (count == 0) { - valueVector.startNewValue(0) - valueVector.endValue(0, 0) - count = 1 - } super.finish() elementWriter.finish() } From 33ef75212bb2d3d7b903879fca13054fb1f69fec Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:44:27 -0800 Subject: [PATCH 4/4] fix: add the same for reset --- .../org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index e0cf8c8b4d42e..cb7e407cf819d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -413,6 +413,8 @@ private[arrow] class ArrayWriter( override def reset(): Unit = { super.reset() + // Re-initialize offset buffer after reset (see constructor comment) + valueVector.getOffsetBuffer.setInt(0, 0) elementWriter.reset() } }