Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions datafusion/jit/src/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,13 @@ impl JIT {
builder.seal_block(entry_block);

// Walk the AST and declare all variables.
let variables =
declare_variables(&mut builder, &params, &the_return, &stmts, entry_block);
let variables = declare_variables(
&mut builder,
&params,
the_return.as_ref(),
&stmts,
entry_block,
);

// Now translate the statements of the function body.
let mut trans = FunctionTranslator {
Expand Down Expand Up @@ -652,7 +657,7 @@ fn typed_zero(typ: JITType, builder: &mut FunctionBuilder) -> Value {
fn declare_variables(
builder: &mut FunctionBuilder,
params: &[(String, JITType)],
the_return: &Option<(String, JITType)>,
the_return: Option<&(String, JITType)>,
stmts: &[Stmt],
entry_block: Block,
) -> HashMap<String, Variable> {
Expand Down
10 changes: 6 additions & 4 deletions datafusion/physical-expr/src/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ fn digest_process(
))),
},
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
.digest_scalar(&a.as_ref().map(|s: &String| s.as_bytes()))),
ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => {
Ok(digest_algorithm
.digest_scalar(a.as_ref().map(|s: &String| s.as_bytes())))
}
ScalarValue::Binary(a) | ScalarValue::LargeBinary(a) => Ok(digest_algorithm
.digest_scalar(&a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
.digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
other, digest_algorithm,
Expand Down Expand Up @@ -112,7 +114,7 @@ macro_rules! digest_to_scalar {

impl DigestAlgorithm {
/// digest an optional string to its hash value, null values are returned as is
fn digest_scalar(self, value: &Option<&[u8]>) -> ColumnarValue {
fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
ColumnarValue::Scalar(match self {
Self::Md5 => digest_to_scalar!(Md5, value),
Self::Sha224 => digest_to_scalar!(Sha224, value),
Expand Down
16 changes: 11 additions & 5 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,16 @@ pub fn parse_expr(
.when_then_expr
.iter()
.map(|e| {
let when_expr =
parse_required_expr_inner(&e.when_expr, registry, "when_expr")?;
let then_expr =
parse_required_expr_inner(&e.then_expr, registry, "then_expr")?;
let when_expr = parse_required_expr_inner(
e.when_expr.as_ref(),
registry,
"when_expr",
)?;
let then_expr = parse_required_expr_inner(
e.then_expr.as_ref(),
registry,
"then_expr",
)?;
Ok((Box::new(when_expr), Box::new(then_expr)))
})
.collect::<Result<Vec<(Box<Expr>, Box<Expr>)>, Error>>()?;
Expand Down Expand Up @@ -1352,7 +1358,7 @@ fn parse_required_expr(
}

fn parse_required_expr_inner(
p: &Option<protobuf::LogicalExprNode>,
p: Option<&protobuf::LogicalExprNode>,
registry: &dyn FunctionRegistry,
field: impl Into<String>,
) -> Result<Expr, Error> {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ pub(crate) fn parse_physical_expr(
.map(|e| {
Ok((
parse_required_physical_expr(
&e.when_expr,
e.when_expr.as_ref(),
registry,
"when_expr",
input_schema,
)?,
parse_required_physical_expr(
&e.then_expr,
e.then_expr.as_ref(),
registry,
"then_expr",
input_schema,
Expand Down Expand Up @@ -237,7 +237,7 @@ fn parse_required_physical_box_expr(
}

fn parse_required_physical_expr(
expr: &Option<protobuf::PhysicalExprNode>,
expr: Option<&protobuf::PhysicalExprNode>,
registry: &dyn FunctionRegistry,
field: &str,
input_schema: &Schema,
Expand Down
83 changes: 48 additions & 35 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,43 +912,53 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
let data_type = val.get_datatype();
match val {
scalar::ScalarValue::Boolean(val) => {
create_proto_scalar(val, &data_type, |s| Value::BoolValue(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::BoolValue(*s))
}
scalar::ScalarValue::Float32(val) => {
create_proto_scalar(val, &data_type, |s| Value::Float32Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float32Value(*s))
}
scalar::ScalarValue::Float64(val) => {
create_proto_scalar(val, &data_type, |s| Value::Float64Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Float64Value(*s))
}
scalar::ScalarValue::Int8(val) => {
create_proto_scalar(val, &data_type, |s| Value::Int8Value(*s as i32))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::Int8Value(*s as i32)
})
}
scalar::ScalarValue::Int16(val) => {
create_proto_scalar(val, &data_type, |s| Value::Int16Value(*s as i32))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::Int16Value(*s as i32)
})
}
scalar::ScalarValue::Int32(val) => {
create_proto_scalar(val, &data_type, |s| Value::Int32Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int32Value(*s))
}
scalar::ScalarValue::Int64(val) => {
create_proto_scalar(val, &data_type, |s| Value::Int64Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Int64Value(*s))
}
scalar::ScalarValue::UInt8(val) => {
create_proto_scalar(val, &data_type, |s| Value::Uint8Value(*s as u32))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::Uint8Value(*s as u32)
})
}
scalar::ScalarValue::UInt16(val) => {
create_proto_scalar(val, &data_type, |s| Value::Uint16Value(*s as u32))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::Uint16Value(*s as u32)
})
}
scalar::ScalarValue::UInt32(val) => {
create_proto_scalar(val, &data_type, |s| Value::Uint32Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint32Value(*s))
}
scalar::ScalarValue::UInt64(val) => {
create_proto_scalar(val, &data_type, |s| Value::Uint64Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Uint64Value(*s))
}
scalar::ScalarValue::Utf8(val) => {
create_proto_scalar(val, &data_type, |s| Value::Utf8Value(s.to_owned()))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::Utf8Value(s.to_owned())
})
}
scalar::ScalarValue::LargeUtf8(val) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::LargeUtf8Value(s.to_owned())
})
}
Expand Down Expand Up @@ -977,10 +987,10 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::Date32(val) => {
create_proto_scalar(val, &data_type, |s| Value::Date32Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
}
datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -992,7 +1002,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand Down Expand Up @@ -1022,10 +1032,10 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}),
},
datafusion::scalar::ScalarValue::Date64(val) => {
create_proto_scalar(val, &data_type, |s| Value::Date64Value(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date64Value(*s))
}
datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1035,7 +1045,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1047,27 +1057,31 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::IntervalYearMonth(val) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::IntervalYearmonthValue(*s)
})
}
datafusion::scalar::ScalarValue::IntervalDayTime(val) => {
create_proto_scalar(val, &data_type, |s| Value::IntervalDaytimeValue(*s))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::IntervalDaytimeValue(*s)
})
}
datafusion::scalar::ScalarValue::Null => Ok(protobuf::ScalarValue {
value: Some(Value::NullValue((&data_type).try_into()?)),
}),

scalar::ScalarValue::Binary(val) => {
create_proto_scalar(val, &data_type, |s| Value::BinaryValue(s.to_owned()))
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::BinaryValue(s.to_owned())
})
}
scalar::ScalarValue::LargeBinary(val) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::LargeBinaryValue(s.to_owned())
})
}
scalar::ScalarValue::FixedSizeBinary(length, val) => {
create_proto_scalar(val, &data_type, |s| {
create_proto_scalar(val.as_ref(), &data_type, |s| {
Value::FixedSizeBinaryValue(protobuf::ScalarFixedSizeBinary {
values: s.to_owned(),
length: *length,
Expand All @@ -1076,7 +1090,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}

datafusion::scalar::ScalarValue::Time32Second(v) => {
create_proto_scalar(v, &data_type, |v| {
create_proto_scalar(v.as_ref(), &data_type, |v| {
Value::Time32Value(protobuf::ScalarTime32Value {
value: Some(
protobuf::scalar_time32_value::Value::Time32SecondValue(*v),
Expand All @@ -1086,7 +1100,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}

datafusion::scalar::ScalarValue::Time32Millisecond(v) => {
create_proto_scalar(v, &data_type, |v| {
create_proto_scalar(v.as_ref(), &data_type, |v| {
Value::Time32Value(protobuf::ScalarTime32Value {
value: Some(
protobuf::scalar_time32_value::Value::Time32MillisecondValue(
Expand All @@ -1098,7 +1112,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}

datafusion::scalar::ScalarValue::Time64Microsecond(v) => {
create_proto_scalar(v, &data_type, |v| {
create_proto_scalar(v.as_ref(), &data_type, |v| {
Value::Time64Value(protobuf::ScalarTime64Value {
value: Some(
protobuf::scalar_time64_value::Value::Time64MicrosecondValue(
Expand All @@ -1110,7 +1124,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
}

datafusion::scalar::ScalarValue::Time64Nanosecond(v) => {
create_proto_scalar(v, &data_type, |v| {
create_proto_scalar(v.as_ref(), &data_type, |v| {
Value::Time64Value(protobuf::ScalarTime64Value {
value: Some(
protobuf::scalar_time64_value::Value::Time64NanosecondValue(
Expand Down Expand Up @@ -1286,16 +1300,15 @@ impl From<&IntervalUnit> for protobuf::IntervalUnit {
/// Creates a scalar protobuf value from an optional value (T), and
/// encoding None as the appropriate datatype
fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
v: &Option<I>,
v: Option<&I>,
null_arrow_type: &DataType,
constructor: T,
) -> Result<protobuf::ScalarValue, Error> {
let value =
v.as_ref()
.map(constructor)
.unwrap_or(protobuf::scalar_value::Value::NullValue(
null_arrow_type.try_into()?,
));
let value = v
.map(constructor)
.unwrap_or(protobuf::scalar_value::Value::NullValue(
null_arrow_type.try_into()?,
));

Ok(protobuf::ScalarValue { value: Some(value) })
}