Skip to content
Closed
40 changes: 15 additions & 25 deletions rust/arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::From;
use std::convert::{From, TryInto};
use std::fmt;
use std::mem;
use std::{any::Any, iter::FromIterator};
Expand Down Expand Up @@ -462,22 +462,11 @@ impl DecimalArray {
(self.value_offset_at(offset + 1) - pos) as usize,
)
};
Self::from_bytes_to_i128(raw_val)
}

fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "DecimalArray supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
let as_array = raw_val.try_into();
match as_array {
Ok(v) if raw_val.len() == 16 => i128::from_le_bytes(v),
_ => panic!("DecimalArray elements are not 128bit integers."),
}
i128::from_be_bytes(result)
}

/// Returns the byte size per value for Decimal arrays with a given precision
pub fn calc_fixed_byte_size(precision: usize) -> i32 {
(10.0_f64.powi(precision as i32).log2() / 8.0).ceil() as i32
}

/// Returns the offset for the element at index `i`.
Expand Down Expand Up @@ -549,7 +538,7 @@ impl From<ArrayDataRef> for DecimalArray {
DataType::Decimal(precision, scale) => (*precision, *scale),
_ => panic!("Expected data type to be Decimal"),
};
let length = Self::calc_fixed_byte_size(precision);
let length = 16;
Self {
data,
value_data: RawPtrBox::new(value_data),
Expand Down Expand Up @@ -950,26 +939,27 @@ mod tests {

#[test]
fn test_decimal_array() {
let values: [u8; 20] = [
0, 0, 0, 0, 0, 2, 17, 180, 219, 192, 255, 255, 255, 255, 255, 253, 238, 75,
36, 64,
// let val_8887: [u8; 16] = [192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
// let val_neg_8887: [u8; 16] = [64, 36, 75, 238, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255];
let values: [u8; 32] = [
192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 238, 253,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
];

let array_data = ArrayData::builder(DataType::Decimal(23, 6))
.len(2)
.add_buffer(Buffer::from(&values[..]))
.build();
let decimal_array = DecimalArray::from(array_data);
assert_eq!(8_887_000_000, decimal_array.value(0));
assert_eq!(-8_887_000_000, decimal_array.value(1));
assert_eq!(10, decimal_array.value_length());
assert_eq!(16, decimal_array.value_length());
}

#[test]
fn test_decimal_array_fmt_debug() {
let values: [u8; 20] = [
0, 0, 0, 0, 0, 2, 17, 180, 219, 192, 255, 255, 255, 255, 255, 253, 238, 75,
36, 64,
let values: [u8; 32] = [
192, 219, 180, 17, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 36, 75, 238, 253,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
];
let array_data = ArrayData::builder(DataType::Decimal(23, 6))
.len(2)
Expand Down
8 changes: 4 additions & 4 deletions rust/arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,7 @@ impl DecimalBuilder {
/// array
pub fn new(capacity: usize, precision: usize, scale: usize) -> Self {
let values_builder = UInt8Builder::new(capacity);
let byte_width = DecimalArray::calc_fixed_byte_size(precision);
let byte_width = 16;
Self {
builder: FixedSizeListBuilder::new(values_builder, byte_width),
precision,
Expand Down Expand Up @@ -2005,7 +2005,7 @@ impl DecimalBuilder {
"DecimalBuilder only supports values up to 16 bytes.".to_string(),
));
}
let res = v.to_be_bytes();
let res = v.to_le_bytes();
let start_byte = 16 - size;
Ok(res[start_byte..16].to_vec())
}
Expand Down Expand Up @@ -3612,8 +3612,8 @@ mod tests {
assert_eq!(&DataType::Decimal(23, 6), decimal_array.data_type());
assert_eq!(3, decimal_array.len());
assert_eq!(1, decimal_array.null_count());
assert_eq!(20, decimal_array.value_offset(2));
assert_eq!(10, decimal_array.value_length());
assert_eq!(32, decimal_array.value_offset(2));
assert_eq!(16, decimal_array.value_length());
}

#[test]
Expand Down
6 changes: 2 additions & 4 deletions rust/arrow/src/array/equal/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{array::ArrayData, array::DecimalArray, datatypes::DataType};
use crate::{array::ArrayData, datatypes::DataType};

use super::utils::equal_len;

Expand All @@ -27,9 +27,7 @@ pub(super) fn decimal_equal(
len: usize,
) -> bool {
let size = match lhs.data_type() {
DataType::Decimal(precision, _) => {
DecimalArray::calc_fixed_byte_size(*precision) as usize
}
DataType::Decimal(_, _) => 16,
_ => unreachable!(),
};

Expand Down
1 change: 0 additions & 1 deletion rust/arrow/src/array/equal_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,6 @@ mod tests {
"#,
)
.unwrap();
println!("{:?}", arrow_array);
assert!(arrow_array.eq(&json_array));
assert!(json_array.eq(&arrow_array));

Expand Down
17 changes: 17 additions & 0 deletions rust/arrow/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,23 @@ impl DataType {
))
}
}
Some(s) if s == "decimal" => {
// return a list with any type as its child isn't defined in the map
let precision = match map.get("precision") {
Some(p) => Ok(p.as_u64().unwrap() as usize),
None => Err(ArrowError::ParseError(
"Expecting a precision for decimal".to_string(),
)),
};
let scale = match map.get("scale") {
Some(s) => Ok(s.as_u64().unwrap() as usize),
_ => Err(ArrowError::ParseError(
"Expecting a scale for decimal".to_string(),
)),
};

Ok(DataType::Decimal(precision?, scale?))
}
Some(s) if s == "floatingpoint" => match map.get("precision") {
Some(p) if p == "HALF" => Ok(DataType::Float16),
Some(p) if p == "SINGLE" => Ok(DataType::Float32),
Expand Down
22 changes: 22 additions & 0 deletions rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
let len = c_fields.len();
for i in 0..len {
let c_field: ipc::Field = c_fields.get(i);
match c_field.type_type() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb could you take another look at my attempt to add the unimplemented path for big endian?

I am not happy with placing the check in fb_to_schema and would have preferred to put it in get_data_type but I found no way to pass on the endianness from the schema.

Copy link
Contributor

@alamb alamb Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the problem -- yes, there since the endianness is on the shema object, not the field, since the field is all that is passed around there is no way to know what the details of the schema are.

I personally think this code is fine, if a bit un-indeal and could be cleaned up in the future. My only worry is that it would get lost / broken during such cleanup

What would you think about adding a test that triggers the error? Then we could be sure that any future cleanups will not break the check?

Thanks again @sweb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thank you for being so nice about it - I was just too lazy to add a test and should receive full scrutiny ;)

This is partly due to the fact that I am not very familiar with flatbuffers and still do not fully understand how to create the appropriate flatbuffer to test this. As a temporary solution, I have added two tests to ipc::reader that uses the BigEndian files in arrow-ipc-stream/integration/1.0.0-bigendian. The one for decimal fails, the others work. I hope this is okay for now, until I am able to construct the correct schema message to test this directly in ipc::convert.

While adding the big endian test for the other types I noticed that the contents are not equal to the json content. That is why the test does not contain an equality check. Thus, there may be problems with Big Endian for other types as well.

ipc::Type::Decimal if fb.endianness() == ipc::Endianness::Big => {
unimplemented!("Big Endian is not supported for Decimal!")
}
_ => (),
};
fields.push(c_field.into());
}

Expand Down Expand Up @@ -270,6 +276,10 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT

DataType::Struct(fields)
}
ipc::Type::Decimal => {
let fsb = field.type_as_decimal().unwrap();
DataType::Decimal(fsb.precision() as usize, fsb.scale() as usize)
}
t => unimplemented!("Type {:?} not supported", t),
}
}
Expand Down Expand Up @@ -562,6 +572,17 @@ pub(crate) fn get_fb_field_type<'a>(
// type in the DictionaryEncoding metadata in the parent field
get_fb_field_type(value_type, is_nullable, fbb)
}
Decimal(precision, scale) => {
let mut builder = ipc::DecimalBuilder::new(fbb);
builder.add_precision(*precision as i32);
builder.add_scale(*scale as i32);
builder.add_bitWidth(128);
FBFieldType {
type_type: ipc::Type::Decimal,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
t => unimplemented!("Type {:?} not supported", t),
}
}
Expand Down Expand Up @@ -738,6 +759,7 @@ mod tests {
123,
true,
),
Field::new("decimal<usize, usize>", DataType::Decimal(10, 6), false),
],
md,
);
Expand Down
51 changes: 51 additions & 0 deletions rust/arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,19 @@ fn create_primitive_array(
}
builder.build()
}
Decimal(_, _) => {
// read 3 buffers
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
t => panic!("Data type {:?} either unsupported or not primitive", t),
};

Expand Down Expand Up @@ -978,6 +991,7 @@ mod tests {
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
Expand All @@ -994,6 +1008,42 @@ mod tests {
});
}

#[test]
#[should_panic(expected = "Big Endian is not supported for Decimal!")]
fn read_decimal_be_file_should_panic() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
testdata
))
.unwrap();
FileReader::try_new(file).unwrap();
}

#[test]
fn read_generated_be_files_should_work() {
// complementary to the previous test
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file",
testdata, path
))
.unwrap();

FileReader::try_new(file).unwrap();
});
}

#[test]
fn read_generated_streams() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
Expand All @@ -1006,6 +1056,7 @@ mod tests {
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
Expand Down
2 changes: 2 additions & 0 deletions rust/arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ mod tests {
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
Expand Down Expand Up @@ -865,6 +866,7 @@ mod tests {
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
Expand Down
4 changes: 4 additions & 0 deletions rust/arrow/src/util/integration_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ impl ArrowJsonBatch {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
}
DataType::Decimal(_, _) => {
let arr = arr.as_any().downcast_ref::<DecimalArray>().unwrap();
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
}
DataType::Dictionary(ref key_type, _) => match key_type.as_ref() {
DataType::Int8 => {
let arr = arr
Expand Down
13 changes: 7 additions & 6 deletions rust/parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,13 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_length(*length)
.build()
}
DataType::Decimal(_, _) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(10)
.build()
}
DataType::Decimal(precision, _) => Type::primitive_type_builder(
name,
PhysicalType::FIXED_LEN_BYTE_ARRAY,
)
.with_repetition(repetition)
.with_length((10.0_f64.powi(*precision as i32).log2() / 8.0).ceil() as i32)
.build(),
DataType::Utf8 | DataType::LargeUtf8 => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::UTF8)
Expand Down