diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 471579c56f4eb..0a92d0d1b2ea3 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -739,6 +739,13 @@ message IntervalMonthDayNanoValue { int64 nanos = 3; } +message StructValue { + // Note that a null struct value must have one or more fields, so we + // encode a null StructValue as one witth an empty field_values + // list. + repeated ScalarValue field_values = 2; + repeated Field fields = 3; +} message ScalarValue{ oneof value { @@ -773,6 +780,7 @@ message ScalarValue{ bytes large_binary_value = 29; int64 time64_value = 30; IntervalMonthDayNanoValue interval_month_day_nano = 31; + StructValue struct_value = 32; } } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index b89839f2e5d47..3eeb30edf6490 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -814,6 +814,28 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some( IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos), )), + Value::StructValue(v) => { + // all structs must have at least 1 field, so we treat + // an empty values list as NULL + let values = if v.field_values.is_empty() { + None + } else { + Some( + v.field_values + .iter() + .map(|v| v.try_into()) + .collect::, _>>()?, + ) + }; + + let fields = v + .fields + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + + Self::Struct(values, Box::new(fields)) + } }) } } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index feefbe27d4d92..e3b6c848a2b14 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -315,6 +315,11 @@ mod roundtrip_tests { #[test] fn scalar_values_error_serialization() { let should_fail_on_seralize: Vec = vec![ + // Should fail due to empty values + ScalarValue::Struct( + Some(vec![]), + Box::new(vec![Field::new("item", DataType::Int16, true)]), + ), // Should fail due to inconsistent types ScalarValue::new_list( Some(vec![ @@ -514,6 +519,23 @@ mod roundtrip_tests { ScalarValue::Binary(None), ScalarValue::LargeBinary(Some(b"bar".to_vec())), ScalarValue::LargeBinary(None), + ScalarValue::Struct( + Some(vec![ + ScalarValue::Int32(Some(23)), + ScalarValue::Boolean(Some(false)), + ]), + Box::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Boolean, false), + ]), + ), + ScalarValue::Struct( + None, + Box::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("a", DataType::Boolean, false), + ]), + ), ]; for test_case in should_pass.into_iter() { diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 7c40017ba5c03..47b779fffc74d 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -113,10 +113,6 @@ impl Error { } } - fn invalid_scalar_value(value: &ScalarValue) -> Self { - Self::InvalidScalarValue(value.to_owned()) - } - fn invalid_scalar_type(data_type: &DataType) -> Self { Self::InvalidScalarType(data_type.to_owned()) } @@ -1214,9 +1210,31 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { protobuf::ScalarValue { value: Some(value) } } - datafusion::scalar::ScalarValue::Struct(_, _) => { - // not yet implemented (TODO file ticket) - return Err(Error::invalid_scalar_value(val)); + datafusion::scalar::ScalarValue::Struct(values, fields) => { + // encode null as empty field values list + let field_values = if let Some(values) = values { + if values.is_empty() { + return Err(Error::InvalidScalarValue(val.clone())); + } + values + .iter() + .map(|v| v.try_into()) + .collect::, _>>()? + } else { + vec![] + }; + + let fields = fields + .iter() + .map(|f| f.try_into()) + .collect::, _>>()?; + + protobuf::ScalarValue { + value: Some(Value::StructValue(protobuf::StructValue { + field_values, + fields, + })), + } } datafusion::scalar::ScalarValue::Dictionary(index_type, val) => {