Skip to content

RepartitionExec produces no output if the input stream errors #437

@alamb

Description

@alamb

Describe the bug
Given a RepartitionExec where its input stream does not have a record batch for immediate consumption, it will terminate early.

I found this while trying to figure out why this plan was failing in IOx (nothing ever bothered to read from the stream provided to RepartionExec:

ProjectionExec: expr=[town, count]
  RepartitionExec: partitioning=RoundRobinBatch(16)
    IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate

To Reproduce
Set up a plan like this:

┌───────────────────┐        ┌───────────────────────┐
│                   │        │                       │
│    InputStream    │───────▶│   RepartitionStream   │
│                   │        │                       │
└───────────────────┘        └───────────────────────┘

Where the input stream won't produce the record batch immediately. Full reproducer below.

I expect to the repartition stream to produce the same record batch as the input stream (will) provide (or produce a meaningful error). However, I get nothing!

expected:

[
    "+------------------+",
    "| my_awesome_field |",
    "+------------------+",
    "| foo              |",
    "| bar              |",
    "+------------------+",
]
actual:

[
    "++",
    "++",
]

Full Reproducer (run in repartition.rs)
Add any other context about the problem here.

    #[tokio::test]
    async fn repartition_with_delayed_stream() {
        let input = DelayedExec::new();
        let partitioning = input.output_partitioning();
        let expected_batches = vec![input.batch.clone()];
        let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();

        let expected = vec![
            "+------------------+",
            "| my_awesome_field |",
            "+------------------+",
            "| foo              |",
            "| bar              |",
            "+------------------+",
        ];

        assert_batches_eq!(&expected, &expected_batches);


        let output_stream = exec.execute(0).await.unwrap();
        let batches = crate::physical_plan::common::collect(output_stream).await.unwrap();

        assert_batches_eq!(&expected, &batches);

    }

    #[derive(Debug)]
    struct DelayedExec {
        batch: RecordBatch
    }

    impl DelayedExec {
        fn new() -> Self {
            let batch =  RecordBatch::try_from_iter(vec![
                ("my_awesome_field", Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef)
            ]).unwrap();

            Self {
                batch
            }
        }
    }


    #[async_trait]
    impl ExecutionPlan for DelayedExec {
        fn as_any(&self) -> &dyn Any {
            self
        }

        fn schema(&self) -> SchemaRef {
            self.batch.schema()
        }

        fn output_partitioning(&self) -> Partitioning {
            Partitioning::UnknownPartitioning(1)
        }

        fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
            unimplemented!()
        }

        fn with_new_children(
            &self,
            _children: Vec<Arc<dyn ExecutionPlan>>,
        ) -> Result<Arc<dyn ExecutionPlan>> {
            unimplemented!()
        }

        /// Returns a stream which does not have data immediately, but
        /// needs to yield (to allow another task to run) to get its
        /// input.
        async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
            assert_eq!(partition, 0);

            let batch = self.batch.clone();
            let schema = batch.schema();

            let (tx, rx) = tokio::sync::mpsc::channel(2);

            // task simply sends the batch
            tokio::task::spawn(async move {
                println!("Sending batch via delayed stream");
                if let Err(e) = tx.send(Ok(batch.clone())).await {
                    println!("ERROR batch via delayed stream: {}", e);
                }
            });

            // returned stream simply reads off the rx stream
            let stream = ParquetStream {
                schema,
                inner: ReceiverStream::new(rx),
            };
            Ok(Box::pin(stream))
        }
    }


    #[derive(Debug)]
    pub struct ParquetStream {
        schema: SchemaRef,
        inner: ReceiverStream<ArrowResult<RecordBatch>>,
    }

    impl Stream for ParquetStream {
        type Item = ArrowResult<RecordBatch>;

        fn poll_next(
            mut self: std::pin::Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<Self::Item>> {
            println!("ParquetStream::poll_next");
            let res = self.inner.poll_next_unpin(cx);
            println!("ParquetStream::poll_next() done");
            res
        }
    }

    impl RecordBatchStream for ParquetStream {
        fn schema(&self) -> SchemaRef {
            Arc::clone(&self.schema)
        }
    }

    impl Drop for ParquetStream {
        fn drop(&mut self) {
            println!("ParquetStream::drop()");
        }
    }

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions