Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ exclude = ["ballista-cli", "datafusion-cli"]
[profile.release]
codegen-units = 1
lto = true

[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right after apache/arrow-rs#1682

parquet = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" }
2 changes: 1 addition & 1 deletion ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "13" }
arrow = { version = "14.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "8.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }

arrow-flight = { version = "13" }
arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
11 changes: 9 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Client API for sending requests to executors.

use std::collections::HashMap;
use std::sync::Arc;

use std::{
Expand All @@ -31,6 +32,7 @@ use crate::serde::scheduler::Action;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::{
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
Expand Down Expand Up @@ -131,11 +133,16 @@ impl BallistaClient {
struct FlightDataStream {
stream: Streaming<FlightData>,
schema: SchemaRef,
dictionaries_by_id: HashMap<i64, ArrayRef>,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
Self {
stream,
schema,
dictionaries_by_id: HashMap::new(),
}
}
}

Expand All @@ -154,7 +161,7 @@ impl Stream for FlightDataStream {
flight_data_to_arrow_batch(
&flight_data_chunk,
self.schema.clone(),
&[],
&self.dictionaries_by_id,
)
});
Some(converted_chunk)
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
anyhow = "1"
arrow = { version = "13" }
arrow-flight = { version = "13" }
arrow = { version = "14.0.0" }
arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "13" }
arrow = { version = "14.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "8.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "13" }
arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion datafusion-examples/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -62,7 +63,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// all the remaining stream messages should be dictionary and record batches
let mut results = vec![];
let dictionaries_by_field = vec![None; schema.fields().len()];
let dictionaries_by_field = HashMap::new();
while let Some(flight_data) = stream.message().await? {
let record_batch = flight_data_to_arrow_batch(
&flight_data,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "13", features = ["prettyprint"] }
arrow = { version = "14.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.83.0", optional = true }
ordered-float = "3.0"
parquet = { version = "13", features = ["arrow"], optional = true }
parquet = { version = "14.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.17"
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "13", features = ["prettyprint"] }
arrow = { version = "14.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -73,7 +73,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "13", features = ["arrow"] }
parquet = { version = "14.0.0", features = ["arrow"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "13", features = ["prettyprint"] }
arrow = { version = "14.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
10 changes: 5 additions & 5 deletions datafusion/core/src/from_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
//! This file essentially exists to ease the transition onto arrow2

use arrow::array::{
ArrayData, BinaryOffsetSizeTrait, BooleanArray, GenericBinaryArray,
GenericStringArray, PrimitiveArray, StringOffsetSizeTrait,
ArrayData, BooleanArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
PrimitiveArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{ArrowPrimitiveType, DataType};
Expand Down Expand Up @@ -50,7 +50,7 @@ where
/// default implementation for binary array types, adapted from `From<Vec<_>>`
impl<S, I, OffsetSize> FromSlice<S, I> for GenericBinaryArray<OffsetSize>
where
OffsetSize: BinaryOffsetSizeTrait,
OffsetSize: OffsetSizeTrait,
S: AsRef<[I]>,
I: AsRef<[u8]>,
{
Expand All @@ -69,7 +69,7 @@ where
offsets.push(length_so_far);
values.extend_from_slice(s);
}
let array_data = ArrayData::builder(OffsetSize::DATA_TYPE)
let array_data = ArrayData::builder(Self::get_data_type())
.len(slice.len())
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_buffer(Buffer::from_slice_ref(&values));
Expand All @@ -81,7 +81,7 @@ where
/// default implementation for utf8 array types, adapted from `From<Vec<_>>`
impl<S, I, OffsetSize> FromSlice<S, I> for GenericStringArray<OffsetSize>
where
OffsetSize: StringOffsetSizeTrait,
OffsetSize: OffsetSizeTrait,
S: AsRef<[I]>,
I: AsRef<str>,
{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "13", features = ["prettyprint"] }
arrow = { version = "14.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "8.0.0" }
sqlparser = "0.17"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = { version = "13" }
arrow = { version = "14.0.0" }
cranelift = "0.83.0"
cranelift-jit = "0.83.0"
cranelift-module = "0.83.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "13", features = ["prettyprint"] }
arrow = { version = "14.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
16 changes: 8 additions & 8 deletions datafusion/physical-expr/src/aggregate/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use super::hyperloglog::HyperLogLog;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use arrow::array::{
ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray,
PrimitiveArray, StringOffsetSizeTrait,
ArrayRef, BinaryArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
PrimitiveArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type,
Expand Down Expand Up @@ -119,15 +119,15 @@ impl AggregateExpr for ApproxDistinct {
#[derive(Debug)]
struct BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: OffsetSizeTrait,
{
hll: HyperLogLog<Vec<u8>>,
phantom_data: PhantomData<T>,
}

impl<T> BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: OffsetSizeTrait,
{
/// new approx_distinct accumulator
pub fn new() -> Self {
Expand All @@ -141,15 +141,15 @@ where
#[derive(Debug)]
struct StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: OffsetSizeTrait,
{
hll: HyperLogLog<String>,
phantom_data: PhantomData<T>,
}

impl<T> StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: OffsetSizeTrait,
{
/// new approx_distinct accumulator
pub fn new() -> Self {
Expand Down Expand Up @@ -259,7 +259,7 @@ macro_rules! downcast_value {

impl<T> Accumulator for BinaryHLLAccumulator<T>
where
T: BinaryOffsetSizeTrait,
T: OffsetSizeTrait,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &GenericBinaryArray<T> =
Expand All @@ -275,7 +275,7 @@ where

impl<T> Accumulator for StringHLLAccumulator<T>
where
T: StringOffsetSizeTrait,
T: OffsetSizeTrait,
{
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array: &GenericStringArray<T> =
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-expr/src/crypto_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

use arrow::{
array::{
Array, ArrayRef, BinaryArray, GenericStringArray, StringArray,
StringOffsetSizeTrait,
Array, ArrayRef, BinaryArray, GenericStringArray, OffsetSizeTrait, StringArray,
},
datatypes::DataType,
};
Expand Down Expand Up @@ -127,7 +126,7 @@ impl DigestAlgorithm {
/// digest a string array to their hash values
fn digest_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
where
T: StringOffsetSizeTrait,
T: OffsetSizeTrait,
{
let input_value = value
.as_any()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! DateTime expressions

use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
array::{Array, ArrayRef, GenericStringArray, OffsetSizeTrait, PrimitiveArray},
compute::kernels::cast_utils::string_to_timestamp_nanos,
datatypes::{
ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
Expand Down Expand Up @@ -57,7 +57,7 @@ pub(crate) fn unary_string_to_primitive_function<'a, T, O, F>(
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
T: StringOffsetSizeTrait,
T: OffsetSizeTrait,
F: Fn(&'a str) -> Result<O::Native>,
{
if args.len() != 1 {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ where
.collect())
}

fn is_distinct_from_utf8<OffsetSize: StringOffsetSizeTrait>(
fn is_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
left: &GenericStringArray<OffsetSize>,
right: &GenericStringArray<OffsetSize>,
) -> Result<BooleanArray> {
Expand Down Expand Up @@ -1397,7 +1397,7 @@ where
.collect())
}

fn is_not_distinct_from_utf8<OffsetSize: StringOffsetSizeTrait>(
fn is_not_distinct_from_utf8<OffsetSize: OffsetSizeTrait>(
left: &GenericStringArray<OffsetSize>,
right: &GenericStringArray<OffsetSize>,
) -> Result<BooleanArray> {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::array::GenericStringArray;
use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, StringOffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array,
Int64Array, Int8Array, OffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
};
use arrow::datatypes::ArrowPrimitiveType;
Expand Down Expand Up @@ -250,14 +250,14 @@ fn not_in_list_primitive<T: ArrowPrimitiveType>(
}

// whether each value on the left (can be null) is contained in the non-null list
fn in_list_utf8<OffsetSize: StringOffsetSizeTrait>(
fn in_list_utf8<OffsetSize: OffsetSizeTrait>(
array: &GenericStringArray<OffsetSize>,
values: &[&str],
) -> Result<BooleanArray> {
compare_op_scalar!(array, values, |x, v: &[&str]| v.contains(&x))
}

fn not_in_list_utf8<OffsetSize: StringOffsetSizeTrait>(
fn not_in_list_utf8<OffsetSize: OffsetSizeTrait>(
array: &GenericStringArray<OffsetSize>,
values: &[&str],
) -> Result<BooleanArray> {
Expand Down Expand Up @@ -341,7 +341,7 @@ impl InListExpr {

/// Compare for specific utf8 types
#[allow(clippy::unnecessary_wraps)]
fn compare_utf8<T: StringOffsetSizeTrait>(
fn compare_utf8<T: OffsetSizeTrait>(
&self,
array: ArrayRef,
list_values: Vec<ColumnarValue>,
Expand Down
Loading