diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 1b97259e555..35ab289cc33 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1568,8 +1568,7 @@ def _temp_path(): generate_custom_metadata_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), # TODO(ARROW-10259) + .skip_category('JS'), generate_duplicate_fieldnames_case() .skip_category('Go') diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index ac5410c2583..30cce75d00c 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -2974,7 +2974,7 @@ mod tests { #[test] #[should_panic( - expected = "Data type List(Field { name: \"item\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false }) is not currently supported" + expected = "Data type List(Field { name: \"item\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }) is not currently supported" )] fn test_struct_array_builder_from_schema_unsupported_type() { let mut fields = Vec::new(); diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index b613a4d167a..7b16d95a868 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -22,6 +22,7 @@ //! * [`Field`](crate::datatypes::Field) to describe one field within a schema. //! * [`DataType`](crate::datatypes::DataType) to describe the type of a field. +use std::collections::BTreeMap; use std::collections::HashMap; use std::default::Default; use std::fmt; @@ -193,6 +194,9 @@ pub struct Field { nullable: bool, dict_id: i64, dict_is_ordered: bool, + /// A map of key-value pairs containing additional custom meta data. + #[serde(skip_serializing_if = "Option::is_none")] + metadata: Option>, } pub trait ArrowNativeType: @@ -1279,6 +1283,7 @@ impl Field { nullable, dict_id: 0, dict_is_ordered: false, + metadata: None, } } @@ -1296,9 +1301,29 @@ impl Field { nullable, dict_id, dict_is_ordered, + metadata: None, } } + /// Sets the `Field`'s optional custom metadata. + /// The metadata is set as `None` for empty map. + #[inline] + pub fn set_metadata(&mut self, metadata: Option>) { + // To make serde happy, convert Some(empty_map) to None. + self.metadata = None; + if let Some(v) = metadata { + if !v.is_empty() { + self.metadata = Some(v); + } + } + } + + /// Returns the immutable reference to the `Field`'s optional custom metadata. + #[inline] + pub const fn metadata(&self) -> &Option> { + &self.metadata + } + /// Returns an immutable reference to the `Field`'s name #[inline] pub const fn name(&self) -> &String { @@ -1363,6 +1388,68 @@ impl Field { )); } }; + + // Referenced example file: testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_custom_metadata.json.gz + let metadata = match map.get("metadata") { + Some(&Value::Array(ref values)) => { + let mut res: BTreeMap = BTreeMap::new(); + for value in values { + match value.as_object() { + Some(map) => { + if map.len() != 2 { + return Err(ArrowError::ParseError( + "Field 'metadata' must have exact two entries for each key-value map".to_string(), + )); + } + if let (Some(k), Some(v)) = + (map.get("key"), map.get("value")) + { + if let (Some(k_str), Some(v_str)) = + (k.as_str(), v.as_str()) + { + res.insert( + k_str.to_string().clone(), + v_str.to_string().clone(), + ); + } else { + return Err(ArrowError::ParseError("Field 'metadata' must have map value of string type".to_string())); + } + } else { + return Err(ArrowError::ParseError("Field 'metadata' lacks map keys named \"key\" or \"value\"".to_string())); + } + } + _ => { + return Err(ArrowError::ParseError( + "Field 'metadata' contains non-object key-value pair".to_string(), + )); + } + } + } + Some(res) + } + // We also support map format, because Schema's metadata supports this. + // See https://github.com/apache/arrow/pull/5907 + Some(&Value::Object(ref values)) => { + let mut res: BTreeMap = BTreeMap::new(); + for (k, v) in values { + if let Some(str_value) = v.as_str() { + res.insert(k.clone(), str_value.to_string().clone()); + } else { + return Err(ArrowError::ParseError( + format!("Field 'metadata' contains non-string value for key {}", k), + )); + } + } + Some(res) + } + Some(_) => { + return Err(ArrowError::ParseError( + "Field `metadata` is not json array".to_string(), + )); + } + _ => None, + }; + // if data_type is a struct or list, get its children let data_type = match data_type { DataType::List(_) @@ -1461,6 +1548,7 @@ impl Field { data_type, dict_id, dict_is_ordered, + metadata, }) } _ => Err(ArrowError::ParseError( @@ -1500,6 +1588,7 @@ impl Field { } /// Merge field into self if it is compatible. Struct will be merged recursively. + /// NOTE: `self` may be updated to unexpected state in case of merge failure. /// /// Example: /// @@ -1511,6 +1600,28 @@ impl Field { /// assert!(field.is_nullable()); /// ``` pub fn try_merge(&mut self, from: &Field) -> Result<()> { + // merge metadata + match (self.metadata(), from.metadata()) { + (Some(self_metadata), Some(from_metadata)) => { + let mut merged = self_metadata.clone(); + for (key, from_value) in from_metadata { + if let Some(self_value) = self_metadata.get(key) { + if self_value != from_value { + return Err(ArrowError::SchemaError(format!( + "Fail to merge field due to conflicting metadata data value for key {}", key), + )); + } + } else { + merged.insert(key.clone(), from_value.clone()); + } + } + self.set_metadata(Some(merged)); + } + (None, Some(from_metadata)) => { + self.set_metadata(Some(from_metadata.clone())); + } + _ => {} + } if from.dict_id != self.dict_id { return Err(ArrowError::SchemaError( "Fail to merge schema Field due to conflicting dict_id".to_string(), @@ -1614,9 +1725,10 @@ impl Field { } } +// TODO: improve display with crate https://crates.io/crates/derive_more ? impl fmt::Display for Field { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}: {:?}", self.name, self.data_type) + write!(f, "{:?}", self) } } @@ -1973,9 +2085,20 @@ mod tests { #[test] fn serde_struct_type() { + let kv_array = [("k".to_string(), "v".to_string())]; + let field_metadata: BTreeMap = kv_array.iter().cloned().collect(); + + // Non-empty map: should be converted as JSON obj { ... } + let mut first_name = Field::new("first_name", DataType::Utf8, false); + first_name.set_metadata(Some(field_metadata)); + + // Empty map: should be omitted. + let mut last_name = Field::new("last_name", DataType::Utf8, false); + last_name.set_metadata(Some(BTreeMap::default())); + let person = DataType::Struct(vec![ - Field::new("first_name", DataType::Utf8, false), - Field::new("last_name", DataType::Utf8, false), + first_name, + last_name, Field::new( "address", DataType::Struct(vec![ @@ -1993,7 +2116,7 @@ mod tests { assert_eq!( "{\"Struct\":[\ - {\"name\":\"first_name\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\ + {\"name\":\"first_name\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{\"k\":\"v\"}},\ {\"name\":\"last_name\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\ {\"name\":\"address\",\"data_type\":{\"Struct\":\ [{\"name\":\"street\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false},\ @@ -2687,12 +2810,14 @@ mod tests { #[test] fn create_schema_string() { let schema = person_schema(); - assert_eq!(schema.to_string(), "first_name: Utf8, \ - last_name: Utf8, \ - address: Struct([\ - Field { name: \"street\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false }, \ - Field { name: \"zip\", data_type: UInt16, nullable: false, dict_id: 0, dict_is_ordered: false }]), \ - interests: Dictionary(Int32, Utf8)") + assert_eq!(schema.to_string(), + "Field { name: \"first_name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: Some({\"k\": \"v\"}) }, \ + Field { name: \"last_name\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \ + Field { name: \"address\", data_type: Struct([\ + Field { name: \"street\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \ + Field { name: \"zip\", data_type: UInt16, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }\ + ]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \ + Field { name: \"interests\", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 123, dict_is_ordered: true, metadata: None }") } #[test] @@ -2710,6 +2835,14 @@ mod tests { assert_eq!(first_name.dict_id(), None); assert_eq!(first_name.dict_is_ordered(), None); + let metadata = first_name.metadata(); + assert!(metadata.is_some()); + let md = metadata.as_ref().unwrap(); + assert_eq!(md.len(), 1); + let key = md.get("k"); + assert!(key.is_some()); + assert_eq!(key.unwrap(), "v"); + let interests = &schema.fields()[3]; assert_eq!(interests.name(), "interests"); assert_eq!( @@ -2791,6 +2924,20 @@ mod tests { assert!(schema2 != schema3); assert!(schema2 != schema4); assert!(schema3 != schema4); + + let mut f = Field::new("c1", DataType::Utf8, false); + f.set_metadata(Some( + [("foo".to_string(), "bar".to_string())] + .iter() + .cloned() + .collect(), + )); + let schema5 = Schema::new(vec![ + f, + Field::new("c2", DataType::Float64, true), + Field::new("c3", DataType::LargeBinary, true), + ]); + assert!(schema1 != schema5); } #[test] @@ -2816,8 +2963,13 @@ mod tests { } fn person_schema() -> Schema { + let kv_array = [("k".to_string(), "v".to_string())]; + let field_metadata: BTreeMap = kv_array.iter().cloned().collect(); + let mut first_name = Field::new("first_name", DataType::Utf8, false); + first_name.set_metadata(Some(field_metadata)); + Schema::new(vec![ - Field::new("first_name", DataType::Utf8, false), + first_name, Field::new("last_name", DataType::Utf8, false), Field::new( "address", @@ -2837,6 +2989,98 @@ mod tests { ]) } + #[test] + fn test_try_merge_field_with_metadata() { + // 1. Different values for the same key should cause error. + let metadata1: BTreeMap = + [("foo".to_string(), "bar".to_string())] + .iter() + .cloned() + .collect(); + let mut f1 = Field::new("first_name", DataType::Utf8, false); + f1.set_metadata(Some(metadata1)); + + let metadata2: BTreeMap = + [("foo".to_string(), "baz".to_string())] + .iter() + .cloned() + .collect(); + let mut f2 = Field::new("first_name", DataType::Utf8, false); + f2.set_metadata(Some(metadata2)); + + assert!( + Schema::try_merge(&[Schema::new(vec![f1]), Schema::new(vec![f2])]).is_err() + ); + + // 2. None + Some + let mut f1 = Field::new("first_name", DataType::Utf8, false); + let metadata2: BTreeMap = + [("missing".to_string(), "value".to_string())] + .iter() + .cloned() + .collect(); + let mut f2 = Field::new("first_name", DataType::Utf8, false); + f2.set_metadata(Some(metadata2)); + + assert!(f1.try_merge(&f2).is_ok()); + assert!(f1.metadata.is_some()); + assert_eq!(f1.metadata.unwrap(), f2.metadata.unwrap()); + + // 3. Some + Some + let mut f1 = Field::new("first_name", DataType::Utf8, false); + f1.set_metadata(Some( + [("foo".to_string(), "bar".to_string())] + .iter() + .cloned() + .collect(), + )); + let mut f2 = Field::new("first_name", DataType::Utf8, false); + f2.set_metadata(Some( + [("foo2".to_string(), "bar2".to_string())] + .iter() + .cloned() + .collect(), + )); + + assert!(f1.try_merge(&f2).is_ok()); + assert!(f1.metadata.is_some()); + assert_eq!( + f1.metadata.unwrap(), + [ + ("foo".to_string(), "bar".to_string()), + ("foo2".to_string(), "bar2".to_string()) + ] + .iter() + .cloned() + .collect() + ); + + // 4. Some + None. + let mut f1 = Field::new("first_name", DataType::Utf8, false); + f1.set_metadata(Some( + [("foo".to_string(), "bar".to_string())] + .iter() + .cloned() + .collect(), + )); + let f2 = Field::new("first_name", DataType::Utf8, false); + assert!(f1.try_merge(&f2).is_ok()); + assert!(f1.metadata.is_some()); + assert_eq!( + f1.metadata.unwrap(), + [("foo".to_string(), "bar".to_string())] + .iter() + .cloned() + .collect() + ); + + // 5. None + None. + let mut f1 = Field::new("first_name", DataType::Utf8, false); + let f2 = Field::new("first_name", DataType::Utf8, false); + assert!(f1.try_merge(&f2).is_ok()); + assert!(f1.metadata.is_none()); + } + #[test] fn test_schema_merge() -> Result<()> { let merged = Schema::try_merge(&[ diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index f003d6ebb79..8733b0ee35c 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -24,7 +24,7 @@ use crate::ipc; use flatbuffers::{ FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use DataType::*; @@ -72,7 +72,7 @@ pub fn schema_to_fb_offset<'a>( /// Convert an IPC Field to Arrow Field impl<'a> From> for Field { fn from(field: ipc::Field) -> Field { - if let Some(dictionary) = field.dictionary() { + let mut arrow_field = if let Some(dictionary) = field.dictionary() { Field::new_dict( field.name().unwrap(), get_data_type(field, true), @@ -86,7 +86,21 @@ impl<'a> From> for Field { get_data_type(field, true), field.nullable(), ) + }; + + let mut metadata = None; + if let Some(list) = field.custom_metadata() { + let mut metadata_map = BTreeMap::default(); + for kv in list { + if let (Some(k), Some(v)) = (kv.key(), kv.value()) { + metadata_map.insert(k.to_string(), v.to_string()); + } + } + metadata = Some(metadata_map); } + + arrow_field.set_metadata(metadata); + arrow_field } } @@ -313,6 +327,23 @@ pub(crate) fn build_field<'a>( fbb: &mut FlatBufferBuilder<'a>, field: &Field, ) -> WIPOffset> { + // Optional custom metadata. + let mut fb_metadata = None; + if let Some(metadata) = field.metadata() { + if !metadata.is_empty() { + let mut kv_vec = vec![]; + for (k, v) in metadata { + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string(k.as_str())), + value: Some(fbb.create_string(v.as_str())), + }; + let kv_offset = ipc::KeyValue::create(fbb, &kv_args); + kv_vec.push(kv_offset); + } + fb_metadata = Some(fbb.create_vector(&kv_vec)); + } + }; + let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); @@ -343,6 +374,11 @@ pub(crate) fn build_field<'a>( Some(children) => field_builder.add_children(children), }; field_builder.add_type_(field_type.type_); + + if let Some(fb_metadata) = fb_metadata { + field_builder.add_custom_metadata(fb_metadata); + } + field_builder.finish() } @@ -655,9 +691,17 @@ mod tests { .iter() .cloned() .collect(); + let field_md: BTreeMap = [("k".to_string(), "v".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new("uint8", DataType::UInt8, false), + { + let mut f = Field::new("uint8", DataType::UInt8, false); + f.set_metadata(Some(field_md)); + f + }, Field::new("uint16", DataType::UInt16, true), Field::new("uint32", DataType::UInt32, false), Field::new("uint64", DataType::UInt64, true), diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 6a6e7ea77de..4a4b865e8f4 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -20,7 +20,7 @@ //! These utilities define structs that read the integration JSON format for integration testing purposes. use serde_derive::{Deserialize, Serialize}; -use serde_json::{Number as VNumber, Value}; +use serde_json::{Map as SJMap, Number as VNumber, Value}; use crate::array::*; use crate::datatypes::*; @@ -60,13 +60,30 @@ pub struct ArrowJsonField { impl From<&Field> for ArrowJsonField { fn from(field: &Field) -> Self { + let metadata_value = match field.metadata() { + Some(kv_list) => { + let mut array = Vec::new(); + for (k, v) in kv_list { + let mut kv_map = SJMap::new(); + kv_map.insert(k.clone(), Value::String(v.clone())); + array.push(Value::Object(kv_map)); + } + if !array.is_empty() { + Some(Value::Array(array)) + } else { + None + } + } + _ => None, + }; + Self { name: field.name().to_string(), field_type: field.data_type().to_json(), nullable: field.is_nullable(), children: vec![], dictionary: None, // TODO: not enough info - metadata: None, // TODO(ARROW-10259) + metadata: metadata_value, } } } @@ -709,7 +726,30 @@ mod tests { let millis_tz = Some("America/New_York".to_string()); let micros_tz = Some("UTC".to_string()); let nanos_tz = Some("Africa/Johannesburg".to_string()); + let schema = Schema::new(vec![ + { + let mut f = + Field::new("bools-with-metadata-map", DataType::Boolean, true); + f.set_metadata(Some( + [("k".to_string(), "v".to_string())] + .iter() + .cloned() + .collect(), + )); + f + }, + { + let mut f = + Field::new("bools-with-metadata-vec", DataType::Boolean, true); + f.set_metadata(Some( + [("k2".to_string(), "v2".to_string())] + .iter() + .cloned() + .collect(), + )); + f + }, Field::new("bools", DataType::Boolean, true), Field::new("int8s", DataType::Int8, true), Field::new("int16s", DataType::Int16, true), @@ -779,6 +819,10 @@ mod tests { ), ]); + let bools_with_metadata_map = + BooleanArray::from(vec![Some(true), None, Some(false)]); + let bools_with_metadata_vec = + BooleanArray::from(vec![Some(true), None, Some(false)]); let bools = BooleanArray::from(vec![Some(true), None, Some(false)]); let int8s = Int8Array::from(vec![Some(1), None, Some(3)]); let int16s = Int16Array::from(vec![Some(1), None, Some(3)]); @@ -867,6 +911,8 @@ mod tests { let record_batch = RecordBatch::try_new( Arc::new(schema.clone()), vec![ + Arc::new(bools_with_metadata_map), + Arc::new(bools_with_metadata_vec), Arc::new(bools), Arc::new(int8s), Arc::new(int16s), diff --git a/rust/arrow/test/data/integration.json b/rust/arrow/test/data/integration.json index 193636ff136..7e4a22cddba 100644 --- a/rust/arrow/test/data/integration.json +++ b/rust/arrow/test/data/integration.json @@ -1,6 +1,31 @@ { "schema": { "fields": [ + { + "name": "bools-with-metadata-map", + "type": { + "name": "bool" + }, + "nullable": true, + "metadata": { + "k": "v" + }, + "children": [] + }, + { + "name": "bools-with-metadata-vec", + "type": { + "name": "bool" + }, + "nullable": true, + "metadata": [ + { + "key": "k2", + "value": "v2" + } + ], + "children": [] + }, { "name": "bools", "type": { @@ -301,6 +326,34 @@ { "count": 3, "columns": [ + { + "name": "bools-with-metadata-map", + "count": 3, + "VALIDITY": [ + 1, + 0, + 1 + ], + "DATA": [ + true, + true, + false + ] + }, + { + "name": "bools-with-metadata-vec", + "count": 3, + "VALIDITY": [ + 1, + 0, + 1 + ], + "DATA": [ + true, + true, + false + ] + }, { "name": "bools", "count": 3, diff --git a/rust/datafusion/src/logical_plan/dfschema.rs b/rust/datafusion/src/logical_plan/dfschema.rs index b6c1d21752d..0305f07acdc 100644 --- a/rust/datafusion/src/logical_plan/dfschema.rs +++ b/rust/datafusion/src/logical_plan/dfschema.rs @@ -390,7 +390,9 @@ mod tests { fn from_qualified_schema_into_arrow_schema() -> Result<()> { let schema = DFSchema::try_from_qualified("t1", &test_schema_1())?; let arrow_schema: Schema = schema.into(); - assert_eq!("t1.c0: Boolean, t1.c1: Boolean", arrow_schema.to_string()); + let expected = "Field { name: \"t1.c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \ + Field { name: \"t1.c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }"; + assert_eq!(expected, arrow_schema.to_string()); Ok(()) } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index fd414890fb6..ad866809b1b 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -860,7 +860,8 @@ mod tests { data_type: Int32, \ nullable: false, \ dict_id: 0, \ - dict_is_ordered: false } }\ + dict_is_ordered: false, \ + metadata: None } }\ ] }, \ ExecutionPlan schema: Schema { fields: [\ Field { \ @@ -868,7 +869,8 @@ mod tests { data_type: Int32, \ nullable: false, \ dict_id: 0, \ - dict_is_ordered: false }\ + dict_is_ordered: false, \ + metadata: None }\ ], metadata: {} }"; match plan { Ok(_) => panic!("Expected planning failure"),