Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ pub fn flight_data_from_arrow_schema(
impl TryFrom<&FlightData> for Schema {
type Error = ArrowError;
fn try_from(data: &FlightData) -> Result<Self> {
convert::schema_from_bytes(&data.data_header[..]).ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data to Arrow schema".to_string(),
)
convert::schema_from_bytes(&data.data_header[..]).map_err(|err| {
ArrowError::ParseError(format!(
"Unable to convert flight data to Arrow schema: {}",
err
))
})
}
}
Expand All @@ -99,10 +100,11 @@ impl TryFrom<&FlightData> for Schema {
impl TryFrom<&SchemaResult> for Schema {
type Error = ArrowError;
fn try_from(data: &SchemaResult) -> Result<Self> {
convert::schema_from_bytes(&data.schema[..]).ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert schema result to Arrow schema".to_string(),
)
convert::schema_from_bytes(&data.schema[..]).map_err(|err| {
ArrowError::ParseError(format!(
"Unable to convert schema result to Arrow schema: {}",
err
))
})
}
}
Expand All @@ -113,7 +115,18 @@ pub fn flight_data_to_arrow_batch(
schema: SchemaRef,
) -> Option<Result<RecordBatch>> {
// check that the data_header is a record batch message
let message = arrow::ipc::get_root_as_message(&data.data_header[..]);
let res = arrow::ipc::root_as_message(&data.data_header[..]);

// Catch error.
if let Err(err) = res {
return Some(Err(ArrowError::ParseError(format!(
"Unable to get root as message: {:?}",
err
))));
}

let message = res.unwrap();

let dictionaries_by_field = Vec::new();

message
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ regex = "1.3"
lazy_static = "1.4"
packed_simd = { version = "0.3.4", optional = true, package = "packed_simd_2" }
chrono = "0.4"
flatbuffers = "0.6"
flatbuffers = "^0.8"
hex = "0.4"
prettytable-rs = { version = "0.8.0", optional = true }
lexical-core = "^0.7"
Expand Down
58 changes: 27 additions & 31 deletions rust/arrow/regen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,42 @@ pushd $DIR/../../
# As of 2020-12-06, the snapshot flatc version is not changed since "1.12.0",
# so let's build flatc from source.

echo "Build flatc from source ..."

FB_URL="https://github.com/google/flatbuffers"
FB_COMMIT="2046bffa40400904c926c2a5bedab67a8d6b7e08"
FB_COMMIT="05192553f434d10c5f585aeb6a07a55a6ac702a5"
FB_DIR="rust/arrow/.flatbuffers"
FLATC="$FB_DIR/bazel-bin/flatc"

if [ ! -e "$FLATC" ]; then
echo "$FLATC: not found, let's build it ..."

if [ -z $(which bazel) ]; then
echo "bazel is required to build flatc"
exit 1
fi
if [ -z $(which bazel) ]; then
echo "bazel is required to build flatc"
exit 1
fi

echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')"
echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')"

if [ ! -e $FB_DIR ]; then
echo "git clone $FB_URL ..."
git clone -b master --no-tag --depth 1 $FB_URL $FB_DIR
else
echo "git pull $FB_URL ..."
git -C $FB_DIR pull
fi
if [ ! -e $FB_DIR ]; then
echo "git clone $FB_URL ..."
git clone -b master --no-tag --depth 1 $FB_URL $FB_DIR
else
echo "git pull $FB_URL ..."
git -C $FB_DIR pull
fi

echo "hard reset to $FB_COMMIT"
git -C $FB_DIR reset --hard $FB_COMMIT
echo "hard reset to $FB_COMMIT"
git -C $FB_DIR reset --hard $FB_COMMIT

pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd
fi
pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd

# Execute the code generation:
$FLATC --rust -o rust/arrow/src/ipc/gen/ format/*.fbs
$FLATC --filename-suffix "" --rust -o rust/arrow/src/ipc/gen/ format/*.fbs

# Now the files are wrongly named so we have to change that.
popd
pushd $DIR/src/ipc/gen
for f in `ls *_generated.rs`; do
adj_length=$((${#f}-13))
mv $f "${f:0:$adj_length}.rs"
done

PREFIX=$(cat <<'HEREDOC'
// Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -98,7 +92,7 @@ SCHEMA_IMPORT="\nuse crate::ipc::gen::Schema::*;"
SPARSE_TENSOR_IMPORT="\nuse crate::ipc::gen::SparseTensor::*;"
TENSOR_IMPORT="\nuse crate::ipc::gen::Tensor::*;"

# For flatbuffer(1.12.0+), remove: use crate::${name}_generated::\*;
# For flatbuffer(1.12.0+), remove: use crate::${name}::\*;
names=("File" "Message" "Schema" "SparseTensor" "Tensor")

# Remove all generated lines we don't need
Expand Down Expand Up @@ -126,11 +120,13 @@ for f in `ls *.rs`; do
# required by flatc 1.12.0+
sed -i '' "/\#\!\[allow(unused_imports, dead_code)\]/d" $f
for name in ${names[@]}; do
sed -i '' "/use crate::${name}_generated::\*;/d" $f
sed -i '' "/use crate::${name}::\*;/d" $f
sed -i '' "s/use self::flatbuffers::Verifiable;/use flatbuffers::Verifiable;/g" $f
done

# Replace all occurrences of type__ with type_
# Replace all occurrences of "type__" with "type_", "TYPE__" with "TYPE_".
sed -i '' 's/type__/type_/g' $f
sed -i '' 's/TYPE__/TYPE_/g' $f

# Some files need prefixes
if [[ $f == "File.rs" ]]; then
Expand Down
24 changes: 18 additions & 6 deletions rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Utilities for converting between IPC types and native Arrow types

use crate::datatypes::{DataType, DateUnit, Field, IntervalUnit, Schema, TimeUnit};
use crate::error::{ArrowError, Result};
use crate::ipc;

use flatbuffers::{
Expand Down Expand Up @@ -123,9 +124,20 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
}

/// Deserialize an IPC message into a schema
pub fn schema_from_bytes(bytes: &[u8]) -> Option<Schema> {
let ipc = ipc::get_root_as_message(bytes);
ipc.header_as_schema().map(fb_to_schema)
pub fn schema_from_bytes(bytes: &[u8]) -> Result<Schema> {
if let Ok(ipc) = ipc::root_as_message(bytes) {
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
Ok(schema)
} else {
Err(ArrowError::IoError(
"Unable to get head as schema".to_string(),
))
}
} else {
Err(ArrowError::IoError(
"Unable to get root as message".to_string(),
))
}
}

/// Get the Arrow data type from the flatbuffer Field table
Expand Down Expand Up @@ -777,7 +789,7 @@ mod tests {
let fb = schema_to_fb(&schema);

// read back fields
let ipc = ipc::get_root_as_schema(fb.finished_data());
let ipc = ipc::root_as_schema(fb.finished_data()).unwrap();
let schema2 = fb_to_schema(ipc);
assert_eq!(schema, schema2);
}
Expand All @@ -794,7 +806,7 @@ mod tests {
4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
0, 0, 0, 0,
];
let ipc = ipc::get_root_as_message(&bytes[..]);
let ipc = ipc::root_as_message(&bytes[..]).unwrap();
let schema = ipc.header_as_schema().unwrap();

// a message generated from Rust, same as the Python one
Expand All @@ -806,7 +818,7 @@ mod tests {
8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
0, 0,
];
let ipc2 = ipc::get_root_as_message(&bytes[..]);
let ipc2 = ipc::root_as_message(&bytes[..]).unwrap();
let schema2 = ipc.header_as_schema().unwrap();

assert_eq!(schema, schema2);
Expand Down
118 changes: 112 additions & 6 deletions rust/arrow/src/ipc/gen/File.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl std::fmt::Debug for Block {
}
}

impl flatbuffers::SimpleToVerifyInSlice for Block {}
impl flatbuffers::SafeSliceAccess for Block {}
impl<'a> flatbuffers::Follow<'a> for Block {
type Inner = &'a Block;
Expand Down Expand Up @@ -79,6 +80,16 @@ impl<'b> flatbuffers::Push for &'b Block {
}
}

impl<'a> flatbuffers::Verifiable for Block {
#[inline]
fn run_verifier<'o, 'b>(
v: &mut flatbuffers::Verifier<'o, 'b>,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use flatbuffers::Verifiable;
v.in_buffer::<Self>(pos)
}
}
impl Block {
pub fn new(_offset: i64, _metaDataLength: i32, _bodyLength: i64) -> Self {
Block {
Expand Down Expand Up @@ -166,12 +177,12 @@ impl<'a> Footer<'a> {
#[inline]
pub fn schema(&self) -> Option<Schema<'a>> {
self._tab
.get::<flatbuffers::ForwardsUOffset<Schema<'a>>>(Footer::VT_SCHEMA, None)
.get::<flatbuffers::ForwardsUOffset<Schema>>(Footer::VT_SCHEMA, None)
}
#[inline]
pub fn dictionaries(&self) -> Option<&'a [Block]> {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<Block>>>(
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, Block>>>(
Footer::VT_DICTIONARIES,
None,
)
Expand All @@ -180,7 +191,7 @@ impl<'a> Footer<'a> {
#[inline]
pub fn recordBatches(&self) -> Option<&'a [Block]> {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<Block>>>(
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, Block>>>(
Footer::VT_RECORDBATCHES,
None,
)
Expand All @@ -192,11 +203,42 @@ impl<'a> Footer<'a> {
&self,
) -> Option<flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<KeyValue<'a>>>> {
self._tab.get::<flatbuffers::ForwardsUOffset<
flatbuffers::Vector<flatbuffers::ForwardsUOffset<KeyValue<'a>>>,
flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset<KeyValue>>,
>>(Footer::VT_CUSTOM_METADATA, None)
}
}

impl flatbuffers::Verifiable for Footer<'_> {
#[inline]
fn run_verifier<'o, 'b>(
v: &mut flatbuffers::Verifier<'o, 'b>,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<MetadataVersion>(&"version", Self::VT_VERSION, false)?
.visit_field::<flatbuffers::ForwardsUOffset<Schema>>(
&"schema",
Self::VT_SCHEMA,
false,
)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, Block>>>(
&"dictionaries",
Self::VT_DICTIONARIES,
false,
)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, Block>>>(
&"recordBatches",
Self::VT_RECORDBATCHES,
false,
)?
.visit_field::<flatbuffers::ForwardsUOffset<
flatbuffers::Vector<'_, flatbuffers::ForwardsUOffset<KeyValue>>,
>>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)?
.finish();
Ok(())
}
}
pub struct FooterArgs<'a> {
pub version: MetadataVersion,
pub schema: Option<flatbuffers::WIPOffset<Schema<'a>>>,
Expand Down Expand Up @@ -302,15 +344,79 @@ impl std::fmt::Debug for Footer<'_> {
}
}
#[inline]
#[deprecated(since = "1.13", note = "Deprecated in favor of `root_as...` methods.")]
pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> {
flatbuffers::get_root::<Footer<'a>>(buf)
unsafe { flatbuffers::root_unchecked::<Footer<'a>>(buf) }
}

#[inline]
#[deprecated(since = "1.13", note = "Deprecated in favor of `root_as...` methods.")]
pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> {
flatbuffers::get_size_prefixed_root::<Footer<'a>>(buf)
unsafe { flatbuffers::size_prefixed_root_unchecked::<Footer<'a>>(buf) }
}

#[inline]
/// Verifies that a buffer of bytes contains a `Footer`
/// and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_footer_unchecked`.
pub fn root_as_footer(buf: &[u8]) -> Result<Footer, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root::<Footer>(buf)
}
#[inline]
/// Verifies that a buffer of bytes contains a size prefixed
/// `Footer` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `size_prefixed_root_as_footer_unchecked`.
pub fn size_prefixed_root_as_footer(
buf: &[u8],
) -> Result<Footer, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root::<Footer>(buf)
}
#[inline]
/// Verifies, with the given options, that a buffer of bytes
/// contains a `Footer` and returns it.
/// Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_footer_unchecked`.
pub fn root_as_footer_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Footer<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::root_with_opts::<Footer<'b>>(opts, buf)
}
#[inline]
/// Verifies, with the given verifier options, that a buffer of
/// bytes contains a size prefixed `Footer` and returns
/// it. Note that verification is still experimental and may not
/// catch every error, or be maximally performant. For the
/// previous, unchecked, behavior use
/// `root_as_footer_unchecked`.
pub fn size_prefixed_root_as_footer_with_opts<'b, 'o>(
opts: &'o flatbuffers::VerifierOptions,
buf: &'b [u8],
) -> Result<Footer<'b>, flatbuffers::InvalidFlatbuffer> {
flatbuffers::size_prefixed_root_with_opts::<Footer<'b>>(opts, buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a Footer and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid `Footer`.
pub unsafe fn root_as_footer_unchecked(buf: &[u8]) -> Footer {
flatbuffers::root_unchecked::<Footer>(buf)
}
#[inline]
/// Assumes, without verification, that a buffer of bytes contains a size prefixed Footer and returns it.
/// # Safety
/// Callers must trust the given bytes do indeed contain a valid size prefixed `Footer`.
pub unsafe fn size_prefixed_root_as_footer_unchecked(buf: &[u8]) -> Footer {
flatbuffers::size_prefixed_root_unchecked::<Footer>(buf)
}
#[inline]
pub fn finish_footer_buffer<'a, 'b>(
fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>,
Expand Down
Loading