diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index 995aa18bc43..c2e01fb6ccc 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -85,10 +85,11 @@ pub fn flight_data_from_arrow_schema( impl TryFrom<&FlightData> for Schema { type Error = ArrowError; fn try_from(data: &FlightData) -> Result { - convert::schema_from_bytes(&data.data_header[..]).ok_or_else(|| { - ArrowError::ParseError( - "Unable to convert flight data to Arrow schema".to_string(), - ) + convert::schema_from_bytes(&data.data_header[..]).map_err(|err| { + ArrowError::ParseError(format!( + "Unable to convert flight data to Arrow schema: {}", + err + )) }) } } @@ -99,10 +100,11 @@ impl TryFrom<&FlightData> for Schema { impl TryFrom<&SchemaResult> for Schema { type Error = ArrowError; fn try_from(data: &SchemaResult) -> Result { - convert::schema_from_bytes(&data.schema[..]).ok_or_else(|| { - ArrowError::ParseError( - "Unable to convert schema result to Arrow schema".to_string(), - ) + convert::schema_from_bytes(&data.schema[..]).map_err(|err| { + ArrowError::ParseError(format!( + "Unable to convert schema result to Arrow schema: {}", + err + )) }) } } @@ -113,7 +115,18 @@ pub fn flight_data_to_arrow_batch( schema: SchemaRef, ) -> Option> { // check that the data_header is a record batch message - let message = arrow::ipc::get_root_as_message(&data.data_header[..]); + let res = arrow::ipc::root_as_message(&data.data_header[..]); + + // Catch error. + if let Err(err) = res { + return Some(Err(ArrowError::ParseError(format!( + "Unable to get root as message: {:?}", + err + )))); + } + + let message = res.unwrap(); + let dictionaries_by_field = Vec::new(); message diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index a29583e5550..7142a0bc862 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -47,7 +47,7 @@ regex = "1.3" lazy_static = "1.4" packed_simd = { version = "0.3.4", optional = true, package = "packed_simd_2" } chrono = "0.4" -flatbuffers = "0.6" +flatbuffers = "^0.8" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } lexical-core = "^0.7" diff --git a/rust/arrow/regen.sh b/rust/arrow/regen.sh index 14596c6a9a0..4bc35a4852f 100755 --- a/rust/arrow/regen.sh +++ b/rust/arrow/regen.sh @@ -24,48 +24,42 @@ pushd $DIR/../../ # As of 2020-12-06, the snapshot flatc version is not changed since "1.12.0", # so let's build flatc from source. +echo "Build flatc from source ..." + FB_URL="https://github.com/google/flatbuffers" -FB_COMMIT="2046bffa40400904c926c2a5bedab67a8d6b7e08" +FB_COMMIT="05192553f434d10c5f585aeb6a07a55a6ac702a5" FB_DIR="rust/arrow/.flatbuffers" FLATC="$FB_DIR/bazel-bin/flatc" -if [ ! -e "$FLATC" ]; then - echo "$FLATC: not found, let's build it ..." - - if [ -z $(which bazel) ]; then - echo "bazel is required to build flatc" - exit 1 - fi +if [ -z $(which bazel) ]; then + echo "bazel is required to build flatc" + exit 1 +fi - echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')" +echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')" - if [ ! -e $FB_DIR ]; then - echo "git clone $FB_URL ..." - git clone -b master --no-tag --depth 1 $FB_URL $FB_DIR - else - echo "git pull $FB_URL ..." - git -C $FB_DIR pull - fi +if [ ! -e $FB_DIR ]; then + echo "git clone $FB_URL ..." + git clone -b master --no-tag --depth 1 $FB_URL $FB_DIR +else + echo "git pull $FB_URL ..." + git -C $FB_DIR pull +fi - echo "hard reset to $FB_COMMIT" - git -C $FB_DIR reset --hard $FB_COMMIT +echo "hard reset to $FB_COMMIT" +git -C $FB_DIR reset --hard $FB_COMMIT - pushd $FB_DIR - echo "run: bazel build :flatc ..." - bazel build :flatc - popd -fi +pushd $FB_DIR +echo "run: bazel build :flatc ..." +bazel build :flatc +popd # Execute the code generation: -$FLATC --rust -o rust/arrow/src/ipc/gen/ format/*.fbs +$FLATC --filename-suffix "" --rust -o rust/arrow/src/ipc/gen/ format/*.fbs # Now the files are wrongly named so we have to change that. popd pushd $DIR/src/ipc/gen -for f in `ls *_generated.rs`; do - adj_length=$((${#f}-13)) - mv $f "${f:0:$adj_length}.rs" -done PREFIX=$(cat <<'HEREDOC' // Licensed to the Apache Software Foundation (ASF) under one @@ -98,7 +92,7 @@ SCHEMA_IMPORT="\nuse crate::ipc::gen::Schema::*;" SPARSE_TENSOR_IMPORT="\nuse crate::ipc::gen::SparseTensor::*;" TENSOR_IMPORT="\nuse crate::ipc::gen::Tensor::*;" -# For flatbuffer(1.12.0+), remove: use crate::${name}_generated::\*; +# For flatbuffer(1.12.0+), remove: use crate::${name}::\*; names=("File" "Message" "Schema" "SparseTensor" "Tensor") # Remove all generated lines we don't need @@ -126,11 +120,13 @@ for f in `ls *.rs`; do # required by flatc 1.12.0+ sed -i '' "/\#\!\[allow(unused_imports, dead_code)\]/d" $f for name in ${names[@]}; do - sed -i '' "/use crate::${name}_generated::\*;/d" $f + sed -i '' "/use crate::${name}::\*;/d" $f + sed -i '' "s/use self::flatbuffers::Verifiable;/use flatbuffers::Verifiable;/g" $f done - # Replace all occurrences of type__ with type_ + # Replace all occurrences of "type__" with "type_", "TYPE__" with "TYPE_". sed -i '' 's/type__/type_/g' $f + sed -i '' 's/TYPE__/TYPE_/g' $f # Some files need prefixes if [[ $f == "File.rs" ]]; then diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index b1368fc2fa6..f003d6ebb79 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -18,6 +18,7 @@ //! Utilities for converting between IPC types and native Arrow types use crate::datatypes::{DataType, DateUnit, Field, IntervalUnit, Schema, TimeUnit}; +use crate::error::{ArrowError, Result}; use crate::ipc; use flatbuffers::{ @@ -123,9 +124,20 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema { } /// Deserialize an IPC message into a schema -pub fn schema_from_bytes(bytes: &[u8]) -> Option { - let ipc = ipc::get_root_as_message(bytes); - ipc.header_as_schema().map(fb_to_schema) +pub fn schema_from_bytes(bytes: &[u8]) -> Result { + if let Ok(ipc) = ipc::root_as_message(bytes) { + if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) { + Ok(schema) + } else { + Err(ArrowError::IoError( + "Unable to get head as schema".to_string(), + )) + } + } else { + Err(ArrowError::IoError( + "Unable to get root as message".to_string(), + )) + } } /// Get the Arrow data type from the flatbuffer Field table @@ -777,7 +789,7 @@ mod tests { let fb = schema_to_fb(&schema); // read back fields - let ipc = ipc::get_root_as_schema(fb.finished_data()); + let ipc = ipc::root_as_schema(fb.finished_data()).unwrap(); let schema2 = fb_to_schema(ipc); assert_eq!(schema, schema2); } @@ -794,7 +806,7 @@ mod tests { 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, 0, 0, 0, 0, ]; - let ipc = ipc::get_root_as_message(&bytes[..]); + let ipc = ipc::root_as_message(&bytes[..]).unwrap(); let schema = ipc.header_as_schema().unwrap(); // a message generated from Rust, same as the Python one @@ -806,7 +818,7 @@ mod tests { 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, ]; - let ipc2 = ipc::get_root_as_message(&bytes[..]); + let ipc2 = ipc::root_as_message(&bytes[..]).unwrap(); let schema2 = ipc.header_as_schema().unwrap(); assert_eq!(schema, schema2); diff --git a/rust/arrow/src/ipc/gen/File.rs b/rust/arrow/src/ipc/gen/File.rs index af798ceae86..3f8e2787bdd 100644 --- a/rust/arrow/src/ipc/gen/File.rs +++ b/rust/arrow/src/ipc/gen/File.rs @@ -42,6 +42,7 @@ impl std::fmt::Debug for Block { } } +impl flatbuffers::SimpleToVerifyInSlice for Block {} impl flatbuffers::SafeSliceAccess for Block {} impl<'a> flatbuffers::Follow<'a> for Block { type Inner = &'a Block; @@ -79,6 +80,16 @@ impl<'b> flatbuffers::Push for &'b Block { } } +impl<'a> flatbuffers::Verifiable for Block { + #[inline] + fn run_verifier<'o, 'b>( + v: &mut flatbuffers::Verifier<'o, 'b>, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.in_buffer::(pos) + } +} impl Block { pub fn new(_offset: i64, _metaDataLength: i32, _bodyLength: i64) -> Self { Block { @@ -166,12 +177,12 @@ impl<'a> Footer<'a> { #[inline] pub fn schema(&self) -> Option> { self._tab - .get::>>(Footer::VT_SCHEMA, None) + .get::>(Footer::VT_SCHEMA, None) } #[inline] pub fn dictionaries(&self) -> Option<&'a [Block]> { self._tab - .get::>>( + .get::>>( Footer::VT_DICTIONARIES, None, ) @@ -180,7 +191,7 @@ impl<'a> Footer<'a> { #[inline] pub fn recordBatches(&self) -> Option<&'a [Block]> { self._tab - .get::>>( + .get::>>( Footer::VT_RECORDBATCHES, None, ) @@ -192,11 +203,42 @@ impl<'a> Footer<'a> { &self, ) -> Option>>> { self._tab.get::>>, + flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>, >>(Footer::VT_CUSTOM_METADATA, None) } } +impl flatbuffers::Verifiable for Footer<'_> { + #[inline] + fn run_verifier<'o, 'b>( + v: &mut flatbuffers::Verifier<'o, 'b>, + pos: usize, + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::(&"version", Self::VT_VERSION, false)? + .visit_field::>( + &"schema", + Self::VT_SCHEMA, + false, + )? + .visit_field::>>( + &"dictionaries", + Self::VT_DICTIONARIES, + false, + )? + .visit_field::>>( + &"recordBatches", + Self::VT_RECORDBATCHES, + false, + )? + .visit_field::>, + >>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)? + .finish(); + Ok(()) + } +} pub struct FooterArgs<'a> { pub version: MetadataVersion, pub schema: Option>>, @@ -302,15 +344,79 @@ impl std::fmt::Debug for Footer<'_> { } } #[inline] +#[deprecated(since = "1.13", note = "Deprecated in favor of `root_as...` methods.")] pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - flatbuffers::get_root::>(buf) + unsafe { flatbuffers::root_unchecked::>(buf) } } #[inline] +#[deprecated(since = "1.13", note = "Deprecated in favor of `root_as...` methods.")] pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - flatbuffers::get_size_prefixed_root::>(buf) + unsafe { flatbuffers::size_prefixed_root_unchecked::>(buf) } } +#[inline] +/// Verifies that a buffer of bytes contains a `Footer` +/// and returns it. +/// Note that verification is still experimental and may not +/// catch every error, or be maximally performant. For the +/// previous, unchecked, behavior use +/// `root_as_footer_unchecked`. +pub fn root_as_footer(buf: &[u8]) -> Result { + flatbuffers::root::