From cc2b408aca58dfec3241d0c9cc45984ac76a4271 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 19 Sep 2022 15:52:36 -0400 Subject: [PATCH 1/3] Add serialization of `ScalarValue::Struct` --- datafusion/proto/proto/datafusion.proto | 8 +++++++ datafusion/proto/src/from_proto.rs | 20 ++++++++++++++++ datafusion/proto/src/lib.rs | 17 ++++++++++++++ datafusion/proto/src/to_proto.rs | 31 +++++++++++++++++++------ 4 files changed, 69 insertions(+), 7 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 471579c56f4e..861de2965c0b 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -739,6 +739,13 @@ message IntervalMonthDayNanoValue { int64 nanos = 3; } +message StructValue { + // encode null explicitly to distinguish a struct with no fields (is + // that possible?) + bool is_null = 1; + repeated ScalarValue field_values = 2; + repeated Field fields = 3; +} message ScalarValue{ oneof value { @@ -773,6 +780,7 @@ message ScalarValue{ bytes large_binary_value = 29; int64 time64_value = 30; IntervalMonthDayNanoValue interval_month_day_nano = 31; + StructValue struct_value = 32; } } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index b89839f2e5d4..77561f61bf59 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -814,6 +814,26 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some( IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos), )), + Value::StructValue(v) => { + let values = if v.is_null { + None + } else { + Some( + v.field_values + .iter() + .map(|v| v.try_into()) + .collect::, _>>()?, + ) + }; + + let fields = v + .fields + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + + Self::Struct(values, Box::new(fields)) + } }) } } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 4dc496932fe2..0534c86527c0 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -517,6 +517,23 @@ mod roundtrip_tests { ScalarValue::Binary(None), ScalarValue::LargeBinary(Some(b"bar".to_vec())), ScalarValue::LargeBinary(None), + ScalarValue::Struct( + Some(vec![ + ScalarValue::Int32(Some(23)), + ScalarValue::Boolean(Some(false)), + ]), + Box::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Boolean, false), + ]), + ), + ScalarValue::Struct( + None, + Box::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("a", DataType::Boolean, false), + ]), + ), ]; for test_case in should_pass.into_iter() { diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 7c40017ba5c0..78db5d0a0050 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -113,10 +113,6 @@ impl Error { } } - fn invalid_scalar_value(value: &ScalarValue) -> Self { - Self::InvalidScalarValue(value.to_owned()) - } - fn invalid_scalar_type(data_type: &DataType) -> Self { Self::InvalidScalarType(data_type.to_owned()) } @@ -1214,9 +1210,30 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { protobuf::ScalarValue { value: Some(value) } } - datafusion::scalar::ScalarValue::Struct(_, _) => { - // not yet implemented (TODO file ticket) - return Err(Error::invalid_scalar_value(val)); + datafusion::scalar::ScalarValue::Struct(values, fields) => { + let is_null = values.is_none(); + + let field_values = if let Some(values) = values { + values + .iter() + .map(|v| v.try_into()) + .collect::, _>>()? + } else { + vec![] + }; + + let fields = fields + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + + protobuf::ScalarValue { + value: Some(Value::StructValue(protobuf::StructValue { + is_null, + field_values, + fields, + })), + } } datafusion::scalar::ScalarValue::Dictionary(index_type, val) => { From 88370bf7f624ec2c02243699364f63b06ce11e79 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 28 Sep 2022 15:22:25 -0400 Subject: [PATCH 2/3] Remove explicit is_null encoding --- datafusion/proto/proto/datafusion.proto | 6 +++--- datafusion/proto/src/from_proto.rs | 4 +++- datafusion/proto/src/lib.rs | 5 +++++ datafusion/proto/src/to_proto.rs | 7 ++++--- parquet-testing | 2 +- testing | 2 +- 6 files changed, 17 insertions(+), 9 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 861de2965c0b..0a92d0d1b2ea 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -740,9 +740,9 @@ message IntervalMonthDayNanoValue { } message StructValue { - // encode null explicitly to distinguish a struct with no fields (is - // that possible?) - bool is_null = 1; + // Note that a null struct value must have one or more fields, so we + // encode a null StructValue as one witth an empty field_values + // list. repeated ScalarValue field_values = 2; repeated Field fields = 3; } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 77561f61bf59..3eeb30edf649 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -815,7 +815,9 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos), )), Value::StructValue(v) => { - let values = if v.is_null { + // all structs must have at least 1 field, so we treat + // an empty values list as NULL + let values = if v.field_values.is_empty() { None } else { Some( diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index fb2c35088adb..e3b6c848a2b1 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -315,6 +315,11 @@ mod roundtrip_tests { #[test] fn scalar_values_error_serialization() { let should_fail_on_seralize: Vec = vec![ + // Should fail due to empty values + ScalarValue::Struct( + Some(vec![]), + Box::new(vec![Field::new("item", DataType::Int16, true)]), + ), // Should fail due to inconsistent types ScalarValue::new_list( Some(vec![ diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 78db5d0a0050..47b779fffc74 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1211,9 +1211,11 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { } datafusion::scalar::ScalarValue::Struct(values, fields) => { - let is_null = values.is_none(); - + // encode null as empty field values list let field_values = if let Some(values) = values { + if values.is_empty() { + return Err(Error::InvalidScalarValue(val.clone())); + } values .iter() .map(|v| v.try_into()) @@ -1229,7 +1231,6 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { protobuf::ScalarValue { value: Some(Value::StructValue(protobuf::StructValue { - is_null, field_values, fields, })), diff --git a/parquet-testing b/parquet-testing index a11fc8f148f8..ddd898958803 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf +Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de diff --git a/testing b/testing index 5bab2f264a23..a8f7be380531 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 +Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 From eaee348e0858cdbe4a1f7bbd885bbf8e8daaada2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 28 Sep 2022 15:26:01 -0400 Subject: [PATCH 3/3] Restore submodules --- parquet-testing | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-testing b/parquet-testing index ddd898958803..a11fc8f148f8 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit ddd898958803cb89b7156c6350584d1cda0fe8de +Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf diff --git a/testing b/testing index a8f7be380531..5bab2f264a23 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88