From 8dc4e9b595d4c6b77b1cf6125629e6e557a0e2bc Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Wed, 4 Jun 2025 23:42:20 +0800 Subject: [PATCH 1/2] Fix shuffle writing rows containing null struct fields --- native/core/src/execution/shuffle/row.rs | 66 ++++++++++++++----- .../exec/CometColumnarShuffleSuite.scala | 30 +++++++++ 2 files changed, 80 insertions(+), 16 deletions(-) diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index bb1401e263..e19136b37e 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -444,25 +444,18 @@ pub(crate) fn append_field( // Appending value into struct field builder of Arrow struct builder. let field_builder = struct_builder.field_builder::(idx).unwrap(); - if row.is_null_row() { - // The row is null. + let nested_row = if row.is_null_row() || row.is_null_at(idx) { + // The row is null, or the field in the row is null, i.e., a null nested row. + // Append a null value to the row builder. field_builder.append_null(); + SparkUnsafeRow::default() } else { - let is_null = row.is_null_at(idx); + field_builder.append(true); + row.get_struct(idx, fields.len()) + }; - let nested_row = if is_null { - // The field in the row is null, i.e., a null nested row. - // Append a null value to the row builder. - field_builder.append_null(); - SparkUnsafeRow::default() - } else { - field_builder.append(true); - row.get_struct(idx, fields.len()) - }; - - for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), field_builder, &nested_row, field_idx)?; - } + for (field_idx, field) in fields.into_iter().enumerate() { + append_field(field.data_type(), field_builder, &nested_row, field_idx)?; } } DataType::Map(field, _) => { @@ -3302,3 +3295,44 @@ fn make_batch(arrays: Vec, row_count: usize) -> Result + val testData = "{}\n" + val path = Paths.get(dir.toString, "test.json") + Files.write(path, testData.getBytes) + + // Define the nested struct schema + val readSchema = StructType( + Array( + StructField( + "metaData", + StructType( + Array(StructField( + "format", + StructType(Array(StructField("provider", StringType, nullable = true))), + nullable = true))), + nullable = true))) + + // Read JSON with custom schema and repartition, this will repartition rows that contain + // null struct fields. + val df = spark.read.format("json").schema(readSchema).load(path.toString).repartition(2) + assert(df.count() == 1) + val row = df.collect()(0) + assert(row.getAs[org.apache.spark.sql.Row]("metaData") == null) + } + } + /** * Checks that `df` produces the same answer as Spark does, and has the `expectedNum` Comet * exchange operators. From 0fcd7ebcccbd7c2cd8b4a8d98ac21a8e8914deef Mon Sep 17 00:00:00 2001 From: Kristin Cowalcijk Date: Thu, 5 Jun 2025 00:15:25 +0800 Subject: [PATCH 2/2] Ignore miri for test_append_null_struct_field_to_struct_builder --- native/core/src/execution/shuffle/row.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index e19136b37e..c98cc54387 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3319,6 +3319,7 @@ mod test { } #[test] + #[cfg_attr(miri, ignore)] // Unaligned memory access in SparkUnsafeRow fn test_append_null_struct_field_to_struct_builder() { let data_type = DataType::Struct(Fields::from(vec![ Field::new("a", DataType::Boolean, true),