Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{any::Any, sync::Arc};

use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, divide, divide_scalar, multiply, subtract,
add, divide, divide_scalar, modulus, modulus_scalar, multiply, subtract,
};
use arrow::compute::kernels::boolean::{and_kleene, or_kleene};
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
Expand Down Expand Up @@ -341,14 +341,11 @@ fn common_binary_type(
}
// for math expressions, the final value of the coercion is also the return type
// because coercion favours higher information types
Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => {
numerical_coercion(lhs_type, rhs_type)
}
Operator::Modulus => {
return Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
))
}
Operator::Plus
| Operator::Minus
| Operator::Modulus
| Operator::Divide
| Operator::Multiply => numerical_coercion(lhs_type, rhs_type),
};

// re-write the error message of failed coercions to include the operator's information
Expand Down Expand Up @@ -389,12 +386,11 @@ pub fn binary_operator_data_type(
| Operator::GtEq
| Operator::LtEq => Ok(DataType::Boolean),
// math operations return the same value as the common coerced type
Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => {
Ok(common_type)
}
Operator::Modulus => Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
)),
Operator::Plus
| Operator::Minus
| Operator::Divide
| Operator::Multiply
| Operator::Modulus => Ok(common_type),
}
}

Expand Down Expand Up @@ -454,6 +450,9 @@ impl PhysicalExpr for BinaryExpr {
Operator::Divide => {
binary_primitive_array_op_scalar!(array, scalar.clone(), divide)
}
Operator::Modulus => {
binary_primitive_array_op_scalar!(array, scalar.clone(), modulus)
}
// if scalar operation is not supported - fallback to array implementation
_ => None,
}
Expand Down Expand Up @@ -503,6 +502,7 @@ impl PhysicalExpr for BinaryExpr {
Operator::Minus => binary_primitive_array_op!(left, right, subtract),
Operator::Multiply => binary_primitive_array_op!(left, right, multiply),
Operator::Divide => binary_primitive_array_op!(left, right, divide),
Operator::Modulus => binary_primitive_array_op!(left, right, modulus),
Operator::And => {
if left_data_type == DataType::Boolean {
boolean_op!(left, right, and_kleene)
Expand All @@ -525,9 +525,6 @@ impl PhysicalExpr for BinaryExpr {
)));
}
}
Operator::Modulus => Err(DataFusionError::NotImplemented(
"Modulus operator is still not supported".to_string(),
)),
};
result.map(|a| ColumnarValue::Array(a))
}
Expand Down Expand Up @@ -964,6 +961,25 @@ mod tests {
Ok(())
}

#[test]
fn modulus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));

apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Modulus,
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;

Ok(())
}

fn apply_arithmetic<T: ArrowNumericType>(
schema: SchemaRef,
data: Vec<ArrayRef>,
Expand Down