Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,13 @@ message IntervalMonthDayNanoValue {
int64 nanos = 3;
}

message StructValue {
// Note that a null struct value must have one or more fields, so we
// encode a null StructValue as one witth an empty field_values
// list.
repeated ScalarValue field_values = 2;
repeated Field fields = 3;
}

message ScalarValue{
oneof value {
Expand Down Expand Up @@ -773,6 +780,7 @@ message ScalarValue{
bytes large_binary_value = 29;
int64 time64_value = 30;
IntervalMonthDayNanoValue interval_month_day_nano = 31;
StructValue struct_value = 32;
}
}

Expand Down
22 changes: 22 additions & 0 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,28 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
)),
Value::StructValue(v) => {
// all structs must have at least 1 field, so we treat
// an empty values list as NULL
let values = if v.field_values.is_empty() {
None
} else {
Some(
v.field_values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<ScalarValue>, _>>()?,
)
};

let fields = v
.fields
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<Field>, _>>()?;

Self::Struct(values, Box::new(fields))
}
})
}
}
Expand Down
22 changes: 22 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ mod roundtrip_tests {
#[test]
fn scalar_values_error_serialization() {
let should_fail_on_seralize: Vec<ScalarValue> = vec![
// Should fail due to empty values
ScalarValue::Struct(
Some(vec![]),
Box::new(vec![Field::new("item", DataType::Int16, true)]),
),
// Should fail due to inconsistent types
ScalarValue::new_list(
Some(vec![
Expand Down Expand Up @@ -514,6 +519,23 @@ mod roundtrip_tests {
ScalarValue::Binary(None),
ScalarValue::LargeBinary(Some(b"bar".to_vec())),
ScalarValue::LargeBinary(None),
ScalarValue::Struct(
Some(vec![
ScalarValue::Int32(Some(23)),
ScalarValue::Boolean(Some(false)),
]),
Box::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Boolean, false),
]),
),
ScalarValue::Struct(
None,
Box::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("a", DataType::Boolean, false),
]),
),
];

for test_case in should_pass.into_iter() {
Expand Down
32 changes: 25 additions & 7 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ impl Error {
}
}

fn invalid_scalar_value(value: &ScalarValue) -> Self {
Self::InvalidScalarValue(value.to_owned())
}

fn invalid_scalar_type(data_type: &DataType) -> Self {
Self::InvalidScalarType(data_type.to_owned())
}
Expand Down Expand Up @@ -1214,9 +1210,31 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
protobuf::ScalarValue { value: Some(value) }
}

datafusion::scalar::ScalarValue::Struct(_, _) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
datafusion::scalar::ScalarValue::Struct(values, fields) => {
// encode null as empty field values list
let field_values = if let Some(values) = values {
if values.is_empty() {
return Err(Error::InvalidScalarValue(val.clone()));
}
values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<protobuf::ScalarValue>, _>>()?
} else {
vec![]
};

let fields = fields
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<protobuf::Field>, _>>()?;

protobuf::ScalarValue {
value: Some(Value::StructValue(protobuf::StructValue {
field_values,
fields,
})),
}
}

datafusion::scalar::ScalarValue::Dictionary(index_type, val) => {
Expand Down