Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Feb 19, 2022

  • The primary public API change is to change the behavior of ScanBatches. Previously the main thread would block (via MakeGeneratorIterator) while the work was actually done on a CPU thread. This change makes it so that the main thread is used (via IterateGenerator) to do the work instead. No CPU threads will then be required.
  • The above change is nice on its own but another significant benefit is that we can start to simplify the exec plan code. The exec plan was starting to adopt a "two paths" approach where we would sometimes submit tasks to the executor and sometimes just run them inline. With this change we can always submit tasks to the executor and exec nodes don't need to worry about whether or not they are running serially.
  • The ExecContext will now always have an executor. It is no longer valid to pass nullptr. I added a DCHECK to ensure this.
  • The default ExecContext will now use the CPU thread pool. Previously it was using nullptr (even in some cases where the test was marked "parallel"). This affected a number of tests and I had to add some sorting logic as the results are no longer deterministically ordered.
  • There were a number of places in the scanner where we were defaulting to "CPU executor" that needed to change to use a given executor (to make sure the work stayed on the serial executor in the serial path).
  • The aggregate node and the join node already had logic that was roughly "if there is no executor then do not schedule tasks". Eventually we want to change this to always schedule tasks, and this PR should enable that, but to keep changes gradual I left the logic in place but changed it to "if the executor's capacity is 1 then do not schedule tasks". The ExecPlan's use_threads function reflect's this.
  • Some nodes were forwarding the StopProducing signal. While generally harmless this could lead to bugs. The ExecPlan is responsible for calling StopProducing on each node in a certain order. There is no need to forward the signal.
  • There is a little bit of ambiguity in the API now (although it was kind of always there). The ScanOptions is used to create a ScanNode. There is a field ScanOptions::use_threads but it doesn't make sense for a single node to have its own setting so this is ignored in the ScanNode. We still use this field however because we also use a ScanOptions to create an entire ExecPlan in some cases and, in those cases, we do respect the value of this field.
  • The scan node used to have a require_sequenced_output flag. It was not being used and I removed it. The original problem was as follows: If you are sequencing your final plan output, and you are applying backpressure, and you are not sequencing your scan itself, then it is possible to deadlock. For example: if you output batches 2, 3, 4, and 5 and then you hit backpressure, you will never relieve the backpressure because you can never output batch 1. Backpressure has since been removed from general scanning (it's only wired in for dataset writes) and so this feature is untested at the moment. Furthermore, should we add it back in we could solve all of this much easier by simply requiring the backpressure limit to be greater than the readahead limit.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace westonpace force-pushed the experiment/serial-generator branch 2 times, most recently from c4d7f07 to 2978c1b Compare March 4, 2022 22:00
@westonpace westonpace force-pushed the experiment/serial-generator branch from a7af71c to 4fd18dc Compare March 18, 2022 03:28
@westonpace westonpace force-pushed the experiment/serial-generator branch from 4fd18dc to e3f588f Compare March 25, 2022 02:17
lidavidm pushed a commit that referenced this pull request Mar 25, 2022
… item only after all outstanding futures have completed

Unfortunately, this seems to have made the merge generator, which was already quite complicated, even more complicated.  I'd welcome any suggestions for simplification.  In the meantime, even though this one generator is more complicated, I think this allows us to simplify code using async generators considerably.

This is a prerequisite for #12468 because there is no way to keep the serial generator alive after the async generator has been destroyed (we can't use shared_ptr in this case)

Closes #12662 from westonpace/feature/ARROW-15968--only-emit-terminal-items-when-outstanding-tasks-finished

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
@pitrou
Copy link
Member

pitrou commented Aug 9, 2022

@westonpace What's the status on this PR?

@westonpace
Copy link
Member Author

@pitrou Still blocked by a cancelable scanner (which is in the works). I'll close this until ready.

@westonpace westonpace closed this Aug 9, 2022
@rok
Copy link
Member

rok commented Nov 21, 2022

@westonpace is it correct to assume that DeclarationTo* methods will respect ExecContext.use_threads=false once this merges. (It seems they currently don't).

@westonpace
Copy link
Member Author

@westonpace is it correct to assume that DeclarationTo* methods will respect ExecContext.use_threads=false once this merges. (It seems they currently don't).

@rok

I think I'd be more in favor of just ripping out ExecContext.use_threads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants