diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 4b82c34644f90..b40c8788320a5 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -25,7 +25,6 @@ use std::{ }; use crate::error::{ballista_error, BallistaError, Result}; -use crate::memory_stream::MemoryStream; use crate::serde::protobuf::{self}; use crate::serde::scheduler::{ Action, ExecutePartition, ExecutePartitionResult, PartitionId, PartitionStats, diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index 4d401eca03ffe..ea3381deb0885 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::{any::Any, pin::Pin}; use crate::client::BallistaClient; -use crate::memory_stream::MemoryStream; use crate::serde::scheduler::{PartitionLocation, PartitionStats}; use crate::utils::WrappedStream; diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 0962615d96f7a..6a6bc3e92aaa2 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -28,7 +28,6 @@ use std::time::Instant; use std::{any::Any, pin::Pin}; use crate::error::BallistaError; -use crate::memory_stream::MemoryStream; use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; @@ -47,6 +46,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::common::IPCWriter; use datafusion::physical_plan::hash_utils::create_hashes; +use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::metrics::{ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index 6de8dbab0a119..e14c1ebf0e656 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::{any::Any, pin::Pin}; -use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionLocation; use async_trait::async_trait; diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs index 4e51067ec9767..bc7be4f88651f 100644 --- a/ballista/rust/core/src/lib.rs +++ b/ballista/rust/core/src/lib.rs @@ -27,7 +27,6 @@ pub mod client; pub mod config; pub mod error; pub mod execution_plans; -pub mod memory_stream; pub mod utils; #[macro_use] diff --git a/ballista/rust/core/src/memory_stream.rs b/ballista/rust/core/src/memory_stream.rs deleted file mode 100644 index 0c0ba4b4a88bc..0000000000000 --- a/ballista/rust/core/src/memory_stream.rs +++ /dev/null @@ -1,92 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This is copied from DataFusion because it is declared as `pub(crate)`. See -//! https://issues.apache.org/jira/browse/ARROW-11276. - -use std::task::{Context, Poll}; - -use datafusion::arrow::{datatypes::SchemaRef, error::Result, record_batch::RecordBatch}; -use datafusion::physical_plan::RecordBatchStream; -use futures::Stream; - -/// Iterator over batches - -pub struct MemoryStream { - /// Vector of record batches - data: Vec, - /// Schema representing the data - schema: SchemaRef, - /// Optional projection for which columns to load - projection: Option>, - /// Index into the data - index: usize, -} - -impl MemoryStream { - /// Create an iterator for a vector of record batches - - pub fn try_new( - data: Vec, - schema: SchemaRef, - projection: Option>, - ) -> Result { - Ok(Self { - data, - schema, - projection, - index: 0, - }) - } -} - -impl Stream for MemoryStream { - type Item = Result; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(if self.index < self.data.len() { - self.index += 1; - - let batch = &self.data[self.index - 1]; - - // apply projection - let next_batch = match &self.projection { - Some(projection) => batch.project(projection)?, - None => batch.clone(), - }; - - Some(Ok(next_batch)) - } else { - None - }) - } - - fn size_hint(&self) -> (usize, Option) { - (self.data.len(), Some(self.data.len())) - } -} - -impl RecordBatchStream for MemoryStream { - /// Get the schema - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index e2084610ceeec..efe175f4b6b8d 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -26,7 +26,6 @@ use crate::error::{BallistaError, Result}; use crate::execution_plans::{ DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec, }; -use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; use crate::config::BallistaConfig; diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs index ebc7c011970b3..0c92627ba98cb 100644 --- a/datafusion/src/execution/mod.rs +++ b/datafusion/src/execution/mod.rs @@ -20,6 +20,6 @@ pub mod context; pub mod dataframe_impl; pub(crate) mod disk_manager; -pub(crate) mod memory_manager; +pub mod memory_manager; pub mod options; pub mod runtime_env; diff --git a/datafusion/src/physical_plan/memory.rs b/datafusion/src/physical_plan/memory.rs index 8e32b097630f1..8e5f37953d754 100644 --- a/datafusion/src/physical_plan/memory.rs +++ b/datafusion/src/physical_plan/memory.rs @@ -147,7 +147,7 @@ impl MemoryExec { } /// Iterator over batches -pub(crate) struct MemoryStream { +pub struct MemoryStream { /// Vector of record batches data: Vec, /// Schema representing the data