diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 2f047965653..19f23e08dd6 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -21,6 +21,7 @@ //! information regarding data-types and memory layouts see //! [here](https://arrow.apache.org/docs/memory_layout.html). +use std::collections::HashMap; use std::fmt; use std::mem::size_of; #[cfg(feature = "simd")] @@ -901,12 +902,18 @@ impl fmt::Display for Field { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Schema { pub(crate) fields: Vec, + /// A map of key-value pairs containing additional meta data. + #[serde(default)] + pub(crate) metadata: HashMap, } impl Schema { /// Creates an empty `Schema` pub fn empty() -> Self { - Self { fields: vec![] } + Self { + fields: vec![], + metadata: HashMap::new(), + } } /// Creates a new `Schema` from a sequence of `Field` values @@ -922,7 +929,30 @@ impl Schema { /// let schema = Schema::new(vec![field_a, field_b]); /// ``` pub fn new(fields: Vec) -> Self { - Self { fields } + Self::new_with_metadata(fields, HashMap::new()) + } + /// Creates a new `Schema` from a sequence of `Field` values + /// and adds additional metadata in form of key value pairs. + /// + /// # Example + /// + /// ``` + /// # extern crate arrow; + /// # use arrow::datatypes::{Field, DataType, Schema}; + /// # use std::collections::HashMap; + /// let field_a = Field::new("a", DataType::Int64, false); + /// let field_b = Field::new("b", DataType::Boolean, false); + /// + /// let mut metadata: HashMap = HashMap::new(); + /// metadata.insert("row_count".to_string(), "100".to_string()); + /// + /// let schema = Schema::new_with_metadata(vec![field_a, field_b], metadata); + /// ``` + pub fn new_with_metadata( + fields: Vec, + metadata: HashMap, + ) -> Self { + Self { fields, metadata } } /// Returns an immutable reference of the vector of `Field` instances @@ -936,6 +966,11 @@ impl Schema { &self.fields[i] } + /// Returns an immutable reference to the Map of custom metadata key-value pairs. + pub fn metadata(&self) -> &HashMap { + &self.metadata + } + /// Look up a column by name and return a immutable reference to the column along with /// it's index pub fn column_with_name(&self, name: &str) -> Option<(usize, &Field)> { @@ -949,6 +984,7 @@ impl Schema { pub fn to_json(&self) -> Value { json!({ "fields": self.fields.iter().map(|field| field.to_json()).collect::>(), + "metadata": serde_json::to_value(&self.metadata).unwrap() }) } @@ -956,21 +992,51 @@ impl Schema { pub fn from(json: &Value) -> Result { match *json { Value::Object(ref schema) => { - if let Some(Value::Array(fields)) = schema.get("fields") { - let fields: Result> = - fields.iter().map(|f| Field::from(f)).collect(); - Ok(Schema::new(fields?)) + let fields = if let Some(Value::Array(fields)) = schema.get("fields") { + fields + .iter() + .map(|f| Field::from(f)) + .collect::>()? } else { return Err(ArrowError::ParseError( "Schema fields should be an array".to_string(), )); - } + }; + + let metadata = if let Some(value) = schema.get("metadata") { + Self::from_metadata(value)? + } else { + HashMap::default() + }; + + Ok(Self { fields, metadata }) } _ => Err(ArrowError::ParseError( "Invalid json value type for schema".to_string(), )), } } + + /// Parse a `metadata` definition from a JSON representation + fn from_metadata(json: &Value) -> Result> { + if let Value::Object(md) = json { + md.iter() + .map(|(k, v)| { + if let Value::String(v) = v { + Ok((k.to_string(), v.to_string())) + } else { + Err(ArrowError::ParseError( + "metadata `value` field must be a string".to_string(), + )) + } + }) + .collect::>() + } else { + Err(ArrowError::ParseError( + "`metadata` field must be an object".to_string(), + )) + } + } } impl fmt::Display for Schema { @@ -1171,49 +1237,59 @@ mod tests { #[test] fn schema_json() { - let schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::Binary, false), - Field::new("c3", DataType::FixedSizeBinary(3), false), - Field::new("c4", DataType::Boolean, false), - Field::new("c5", DataType::Date32(DateUnit::Day), false), - Field::new("c6", DataType::Date64(DateUnit::Millisecond), false), - Field::new("c7", DataType::Time32(TimeUnit::Second), false), - Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false), - Field::new("c9", DataType::Time32(TimeUnit::Microsecond), false), - Field::new("c10", DataType::Time32(TimeUnit::Nanosecond), false), - Field::new("c11", DataType::Time64(TimeUnit::Second), false), - Field::new("c12", DataType::Time64(TimeUnit::Millisecond), false), - Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false), - Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false), - Field::new("c15", DataType::Timestamp(TimeUnit::Second), false), - Field::new("c16", DataType::Timestamp(TimeUnit::Millisecond), false), - Field::new("c17", DataType::Timestamp(TimeUnit::Microsecond), false), - Field::new("c18", DataType::Timestamp(TimeUnit::Nanosecond), false), - Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), - Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), - Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), - Field::new( - "c22", - DataType::FixedSizeList((Box::new(DataType::Boolean), 5)), - false, - ), - Field::new( - "c23", - DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( - vec![], - ))))), - true, - ), - Field::new( - "c24", - DataType::Struct(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::UInt16, false), - ]), - false, - ), - ]); + // Add some custom metadata + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::Binary, false), + Field::new("c3", DataType::FixedSizeBinary(3), false), + Field::new("c4", DataType::Boolean, false), + Field::new("c5", DataType::Date32(DateUnit::Day), false), + Field::new("c6", DataType::Date64(DateUnit::Millisecond), false), + Field::new("c7", DataType::Time32(TimeUnit::Second), false), + Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("c9", DataType::Time32(TimeUnit::Microsecond), false), + Field::new("c10", DataType::Time32(TimeUnit::Nanosecond), false), + Field::new("c11", DataType::Time64(TimeUnit::Second), false), + Field::new("c12", DataType::Time64(TimeUnit::Millisecond), false), + Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("c15", DataType::Timestamp(TimeUnit::Second), false), + Field::new("c16", DataType::Timestamp(TimeUnit::Millisecond), false), + Field::new("c17", DataType::Timestamp(TimeUnit::Microsecond), false), + Field::new("c18", DataType::Timestamp(TimeUnit::Nanosecond), false), + Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), + Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), + Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), + Field::new( + "c22", + DataType::FixedSizeList((Box::new(DataType::Boolean), 5)), + false, + ), + Field::new( + "c23", + DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( + vec![], + ))))), + true, + ), + Field::new( + "c24", + DataType::Struct(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::UInt16, false), + ]), + false, + ), + ], + metadata, + ); let expected = schema.to_json(); let json = r#"{ @@ -1491,7 +1567,10 @@ mod tests { } ] } - ] + ], + "metadata" : { + "Key": "Value" + } }"#; let value: Value = serde_json::from_str(&json).unwrap(); assert_eq!(expected, value); @@ -1501,6 +1580,41 @@ mod tests { let schema2 = Schema::from(&value).unwrap(); assert_eq!(schema, schema2); + + // Check that empty metadata produces empty value in JSON and can be parsed + let json = r#"{ + "fields": [ + { + "name": "c1", + "nullable": false, + "type": { + "name": "utf8" + }, + "children": [] + } + ], + "metadata": {} + }"#; + let value: Value = serde_json::from_str(&json).unwrap(); + let schema = Schema::from(&value).unwrap(); + assert!(schema.metadata.is_empty()); + + // Check that metadata field is not required in the JSON. + let json = r#"{ + "fields": [ + { + "name": "c1", + "nullable": false, + "type": { + "name": "utf8" + }, + "children": [] + } + ] + }"#; + let value: Value = serde_json::from_str(&json).unwrap(); + let schema = Schema::from(&value).unwrap(); + assert!(schema.metadata.is_empty()); } #[test] diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index a928a00f0f8..c8ee0310334 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -23,6 +23,7 @@ use crate::ipc; use flatbuffers::{ FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; +use std::collections::HashMap; /// Serialize a schema in IPC format fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { @@ -45,11 +46,24 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fields.push(field_builder.finish()); } + let mut custom_metadata = vec![]; + for (k, v) in schema.metadata() { + let fb_key_name = fbb.create_string(k.as_str()); + let fb_val_name = fbb.create_string(v.as_str()); + + let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb); + kv_builder.add_key(fb_key_name); + kv_builder.add_value(fb_val_name); + custom_metadata.push(kv_builder.finish()); + } + let fb_field_list = fbb.create_vector(&fields); + let fb_metadata_list = fbb.create_vector(&custom_metadata); let root = { let mut builder = ipc::SchemaBuilder::new(&mut fbb); builder.add_fields(fb_field_list); + builder.add_custom_metadata(fb_metadata_list); builder.finish() }; @@ -78,7 +92,20 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema { let c_field: ipc::Field = c_fields.get(i); fields.push(c_field.into()); } - Schema::new(fields) + + let mut metadata: HashMap = HashMap::default(); + if let Some(md_fields) = fb.custom_metadata() { + let len = md_fields.len(); + for i in 0..len { + let kv = md_fields.get(i); + let k_str = kv.key(); + let v_str = kv.value(); + if k_str.is_some() && v_str.is_some() { + metadata.insert(k_str.unwrap().to_string(), v_str.unwrap().to_string()); + } + } + } + Schema::new_with_metadata(fields, metadata) } /// Get the Arrow data type from the flatbuffer Field table @@ -346,73 +373,84 @@ mod tests { #[test] fn convert_schema_round_trip() { - let schema = Schema::new(vec![ - Field::new("uint8", DataType::UInt8, false), - Field::new("uint16", DataType::UInt16, true), - Field::new("uint32", DataType::UInt32, false), - Field::new("uint64", DataType::UInt64, true), - Field::new("int8", DataType::Int8, true), - Field::new("int16", DataType::Int16, false), - Field::new("int32", DataType::Int32, true), - Field::new("int64", DataType::Int64, false), - Field::new("float16", DataType::Float16, true), - Field::new("float32", DataType::Float32, false), - Field::new("float64", DataType::Float64, true), - Field::new("bool", DataType::Boolean, false), - Field::new("date32", DataType::Date32(DateUnit::Day), false), - Field::new("date64", DataType::Date64(DateUnit::Millisecond), true), - Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), - Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), - Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), - Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), - Field::new("timestamp[s]", DataType::Timestamp(TimeUnit::Second), false), - Field::new( - "timestamp[ms]", - DataType::Timestamp(TimeUnit::Millisecond), - true, - ), - Field::new( - "timestamp[us]", - DataType::Timestamp(TimeUnit::Microsecond), - false, - ), - Field::new( - "timestamp[ns]", - DataType::Timestamp(TimeUnit::Nanosecond), - true, - ), - Field::new("utf8", DataType::Utf8, false), - Field::new("list[u8]", DataType::List(Box::new(DataType::UInt8)), true), - Field::new( - "list[struct]", - DataType::List(Box::new(DataType::Struct(vec![ - Field::new("float32", DataType::UInt8, false), - Field::new("int32", DataType::Int32, true), - Field::new("bool", DataType::Boolean, true), - ]))), - false, - ), - Field::new( - "struct]>]>", - DataType::Struct(vec![ - Field::new("int64", DataType::Int64, true), - Field::new( - "list[struct]>]", - DataType::List(Box::new(DataType::Struct(vec![ - Field::new("date32", DataType::Date32(DateUnit::Day), true), - Field::new( - "list[struct<>]", - DataType::List(Box::new(DataType::Struct(vec![]))), - false, - ), - ]))), - false, - ), - ]), - false, - ), - Field::new("struct<>", DataType::Struct(vec![]), true), - ]); + let md: HashMap = [("Key".to_string(), "value".to_string())] + .iter() + .cloned() + .collect(); + let schema = Schema::new_with_metadata( + vec![ + Field::new("uint8", DataType::UInt8, false), + Field::new("uint16", DataType::UInt16, true), + Field::new("uint32", DataType::UInt32, false), + Field::new("uint64", DataType::UInt64, true), + Field::new("int8", DataType::Int8, true), + Field::new("int16", DataType::Int16, false), + Field::new("int32", DataType::Int32, true), + Field::new("int64", DataType::Int64, false), + Field::new("float16", DataType::Float16, true), + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, true), + Field::new("bool", DataType::Boolean, false), + Field::new("date32", DataType::Date32(DateUnit::Day), false), + Field::new("date64", DataType::Date64(DateUnit::Millisecond), true), + Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), + Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), + Field::new("timestamp[s]", DataType::Timestamp(TimeUnit::Second), false), + Field::new( + "timestamp[ms]", + DataType::Timestamp(TimeUnit::Millisecond), + true, + ), + Field::new( + "timestamp[us]", + DataType::Timestamp(TimeUnit::Microsecond), + false, + ), + Field::new( + "timestamp[ns]", + DataType::Timestamp(TimeUnit::Nanosecond), + true, + ), + Field::new("utf8", DataType::Utf8, false), + Field::new("list[u8]", DataType::List(Box::new(DataType::UInt8)), true), + Field::new( + "list[struct]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new("float32", DataType::UInt8, false), + Field::new("int32", DataType::Int32, true), + Field::new("bool", DataType::Boolean, true), + ]))), + false, + ), + Field::new( + "struct]>]>", + DataType::Struct(vec![ + Field::new("int64", DataType::Int64, true), + Field::new( + "list[struct]>]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new( + "date32", + DataType::Date32(DateUnit::Day), + true, + ), + Field::new( + "list[struct<>]", + DataType::List(Box::new(DataType::Struct(vec![]))), + false, + ), + ]))), + false, + ), + ]), + false, + ), + Field::new("struct<>", DataType::Struct(vec![]), true), + ], + md, + ); let fb = schema_to_fb(&schema);