Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions rust/arrow/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ impl From<ArrayDataRef> for BinaryArray {

impl<'a> From<Vec<&'a str>> for BinaryArray {
fn from(v: Vec<&'a str>) -> Self {
let mut offsets = vec![];
let mut values = vec![];
let mut offsets = Vec::with_capacity(v.len() + 1);
let mut values = Vec::new();
let mut length_so_far = 0;
offsets.push(length_so_far);
for s in &v {
Expand All @@ -627,6 +627,26 @@ impl<'a> From<Vec<&'a str>> for BinaryArray {
}
}

impl<'a> From<Vec<&[u8]>> for BinaryArray {
fn from(v: Vec<&[u8]>) -> Self {
let mut offsets = Vec::with_capacity(v.len() + 1);
let mut values = Vec::new();
let mut length_so_far = 0;
offsets.push(length_so_far);
for s in &v {
length_so_far += s.len() as i32;
offsets.push(length_so_far as i32);
values.extend_from_slice(s);
}
let array_data = ArrayData::builder(DataType::Utf8)
.len(v.len())
.add_buffer(Buffer::from(offsets.to_byte_slice()))
.add_buffer(Buffer::from(&values[..]))
.build();
BinaryArray::from(array_data)
}
}

/// Creates a `BinaryArray` from `List<u8>` array
impl From<ListArray> for BinaryArray {
fn from(v: ListArray) -> Self {
Expand Down Expand Up @@ -1155,6 +1175,36 @@ mod tests {
}
}

#[test]
fn test_binary_array_from_u8_slice() {
let values: Vec<&[u8]> = vec![
&[b'h', b'e', b'l', b'l', b'o'],
&[],
&[b'p', b'a', b'r', b'q', b'u', b'e', b't'],
];

// Array data: ["hello", "", "parquet"]
let binary_array = BinaryArray::from(values);

assert_eq!(3, binary_array.len());
assert_eq!(0, binary_array.null_count());
assert_eq!([b'h', b'e', b'l', b'l', b'o'], binary_array.value(0));
assert_eq!("hello", binary_array.get_string(0));
assert_eq!([] as [u8; 0], binary_array.value(1));
assert_eq!("", binary_array.get_string(1));
assert_eq!(
[b'p', b'a', b'r', b'q', b'u', b'e', b't'],
binary_array.value(2)
);
assert_eq!("parquet", binary_array.get_string(2));
assert_eq!(5, binary_array.value_offset(2));
assert_eq!(7, binary_array.value_length(2));
for i in 0..3 {
assert!(binary_array.is_valid(i));
assert!(!binary_array.is_null(i));
}
}

#[test]
#[should_panic(
expected = "BinaryArray can only be created from List<u8> arrays, mismatched \
Expand Down
181 changes: 179 additions & 2 deletions rust/arrow/src/compute/array_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

//! Defines primitive computations on arrays, e.g. addition, equality, boolean logic.

use std::cmp;
use std::ops::Add;
use std::sync::Arc;

use crate::array::{Array, BooleanArray, PrimitiveArray};
use crate::datatypes::ArrowNumericType;
use crate::array::{
Array, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, PrimitiveArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
use crate::datatypes::{ArrowNumericType, DataType};
use crate::error::{ArrowError, Result};

/// Returns the minimum value in the array, according to the natural order.
Expand Down Expand Up @@ -204,6 +210,101 @@ where
Ok(b.finish())
}

macro_rules! filter_array {
($array:expr, $filter:expr, $array_type:ident) => {{
let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
let mut builder = $array_type::builder(b.len());
for i in 0..b.len() {
if $filter.value(i) {
if b.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(b.value(i))?;
}
}
}
Ok(Arc::new(builder.finish()))
}};
}

pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
match array.data_type() {
DataType::UInt8 => filter_array!(array, filter, UInt8Array),
DataType::UInt16 => filter_array!(array, filter, UInt16Array),
DataType::UInt32 => filter_array!(array, filter, UInt32Array),
DataType::UInt64 => filter_array!(array, filter, UInt64Array),
DataType::Int8 => filter_array!(array, filter, Int8Array),
DataType::Int16 => filter_array!(array, filter, Int16Array),
DataType::Int32 => filter_array!(array, filter, Int32Array),
DataType::Int64 => filter_array!(array, filter, Int64Array),
DataType::Float32 => filter_array!(array, filter, Float32Array),
DataType::Float64 => filter_array!(array, filter, Float64Array),
DataType::Boolean => filter_array!(array, filter, BooleanArray),
DataType::Utf8 => {
let b = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let mut values: Vec<&[u8]> = Vec::with_capacity(b.len());
for i in 0..b.len() {
if filter.value(i) {
values.push(b.value(i));
}
}
Ok(Arc::new(BinaryArray::from(values)))
}
other => Err(ArrowError::ComputeError(format!(
"filter not supported for {:?}",
other
))),
}
}

macro_rules! limit_array {
($array:expr, $num_elements:expr, $array_type:ident) => {{
let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
let mut builder = $array_type::builder($num_elements);
for i in 0..$num_elements {
if b.is_null(i) {
builder.append_null()?;
} else {
builder.append_value(b.value(i))?;
}
}
Ok(Arc::new(builder.finish()))
}};
}

/// Returns the array, taking only the number of elements specified
///
/// Returns the whole array if the number of elements specified is larger than the length of the array
pub fn limit(array: &Array, num_elements: usize) -> Result<ArrayRef> {
let num_elements_safe: usize = cmp::min(array.len(), num_elements);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last nit, we could return the array as ArrayRef immediately if the limit >= len. I sold have thought of it earlier, my apologies.

I'm happy with everything else

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can help me here: how can I wrap the reference to array in an Arc<Array> / ArrayRef? Since the reference to array can be freed at any time it would leave the Arc invalid, so I have to specify the lifetime somehow. Can I do that with Arc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're right, I missed that part. We can improve limit when we have zero-copy array slicing 👍🏾


match array.data_type() {
DataType::UInt8 => limit_array!(array, num_elements_safe, UInt8Array),
DataType::UInt16 => limit_array!(array, num_elements_safe, UInt16Array),
DataType::UInt32 => limit_array!(array, num_elements_safe, UInt32Array),
DataType::UInt64 => limit_array!(array, num_elements_safe, UInt64Array),
DataType::Int8 => limit_array!(array, num_elements_safe, Int8Array),
DataType::Int16 => limit_array!(array, num_elements_safe, Int16Array),
DataType::Int32 => limit_array!(array, num_elements_safe, Int32Array),
DataType::Int64 => limit_array!(array, num_elements_safe, Int64Array),
DataType::Float32 => limit_array!(array, num_elements_safe, Float32Array),
DataType::Float64 => limit_array!(array, num_elements_safe, Float64Array),
DataType::Boolean => limit_array!(array, num_elements_safe, BooleanArray),
DataType::Utf8 => {
let b = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let mut values: Vec<&[u8]> = Vec::with_capacity(num_elements_safe);
for i in 0..num_elements_safe {
values.push(b.value(i));
}
Ok(Arc::new(BinaryArray::from(values)))
}
other => Err(ArrowError::ComputeError(format!(
"limit not supported for {:?}",
other
))),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -358,4 +459,80 @@ mod tests {
assert_eq!(5, min(&a).unwrap());
assert_eq!(9, max(&a).unwrap());
}

#[test]
fn test_filter_array() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = BooleanArray::from(vec![true, false, false, true, false]);
let c = filter(&a, &b).unwrap();
let d = c.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(2, d.len());
assert_eq!(5, d.value(0));
assert_eq!(8, d.value(1));
}

#[test]
fn test_filter_binary_array() {
let a = BinaryArray::from(vec!["hello", " ", "world", "!"]);
let b = BooleanArray::from(vec![true, false, true, false]);
let c = filter(&a, &b).unwrap();
let d = c.as_ref().as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(2, d.len());
assert_eq!("hello", d.get_string(0));
assert_eq!("world", d.get_string(1));
}

#[test]
fn test_filter_array_with_null() {
let a = Int32Array::from(vec![Some(5), None]);
let b = BooleanArray::from(vec![false, true]);
let c = filter(&a, &b).unwrap();
let d = c.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(1, d.len());
assert_eq!(true, d.is_null(0));
}

#[test]
fn test_limit_array() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = limit(&a, 3).unwrap();
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(3, c.len());
assert_eq!(5, c.value(0));
assert_eq!(6, c.value(1));
assert_eq!(7, c.value(2));
}

#[test]
fn test_limit_binary_array() {
let a = BinaryArray::from(vec!["hello", " ", "world", "!"]);
let b = limit(&a, 2).unwrap();
let c = b.as_ref().as_any().downcast_ref::<BinaryArray>().unwrap();
assert_eq!(2, c.len());
assert_eq!("hello", c.get_string(0));
assert_eq!(" ", c.get_string(1));
}

#[test]
fn test_limit_array_with_null() {
let a = Int32Array::from(vec![None, Some(5)]);
let b = limit(&a, 1).unwrap();
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(1, c.len());
assert_eq!(true, c.is_null(0));
}

#[test]
fn test_limit_array_with_limit_too_large() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = limit(&a, 6).unwrap();
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();

assert_eq!(5, c.len());
assert_eq!(a.value(0), c.value(0));
assert_eq!(a.value(1), c.value(1));
assert_eq!(a.value(2), c.value(2));
assert_eq!(a.value(3), c.value(3));
assert_eq!(a.value(4), c.value(4));
}
}
Loading