-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
If the output of one of the repartition operator is not completely consumed, the repartition exec may return an error on one of the other streams
So roughly the picture looks like:
┌───────────────┐
│ Consumer │
└───────────────┘
│
┌────────────┴─────────────┐
│ │
▼ ▼
┌────────────────────┐ ┌────────────────────┐
│RepartitionStream 0 │ │RepartitionStream 1 │
└────────────────────┘ └────────────────────┘
│ │
│ ├───────────────┐
│ │ ▼
┌──┤ │ ┌────────────────────┐
│ └──────────────────────────┼───▶│ InputStream B │
│ │ └────────────────────┘
▼ │
┌────────────────────┐ │
│ InputStream A │◀──────────────────┘
└────────────────────┘
If RepartitionStream 0 is dropped prior to both InputStream A and InputStream B completing, the repartition exec may still try to send a batch to RepartitionStream 0, find the channel closed, and report an error which will be seen by RepartitionStream 1
To Reproduce
I am working on a reproducer.
Reproducing this is error is made more challening by the fact that the repartition stream uses unbounded channels so it is very timing dependent
Expected behavior
No errors should be produced
Additional context
We have a test that fails intermittently https://github.com/influxdata/influxdb_iox/issues/1735
Here is the plan (the 'ExecutionPlan(PlaceHolder)' is an extension node that looks like LIMIT -- in that it may decide to stop consuming its input after producing some output.
The plan being run looks like:
ExecutionPlan(PlaceHolder)
ProjectionExec: expr=[borough, city, state]
CoalesceBatchesExec: target_batch_size=500
FilterExec: 1 <= time AND time < 550 AND CAST(state AS Utf8) = NY
RepartitionExec: partitioning=RoundRobinBatch(4)
IOxReadFilterNode: table_name=o2, chunks=1 predicate=Predicate exprs: [TimestampNanosecond(1) LtEq #time, #time Lt TimestampNanosecond(550), #state Eq Utf8("NY")]
While I have been recently messing with RepartitionExec as part of #521 it appears the error behavior predates that change. However, now the error is passed up to the caller