Skip to content

Conversation

@jorgecarleitao
Copy link
Member

Currently, mergeExec uses tokio::spawn to parallelize the work, by calling tokio::spawn once per logical thread. However, tokio::spawn returns a task / future, which tokio runtime will then schedule on its thread pool.

Therefore, there is no need to limit the number of tasks to the number of logical threads, as tokio's runtime itself is responsible for that work. In particular, since we are using rt-threaded, tokio already declares a thread pool from the number of logical threads available.

This PR removes the coupling, in mergeExec, between the number of logical threads (max_concurrency) and the number of created tasks. I observe no change in performance:

Benchmark results
Switched to branch 'simplify_merge'
Your branch is up to date with 'origin/simplify_merge'.
   Compiling datafusion v2.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/datafusion)
    Finished bench [optimized] target(s) in 38.02s
     Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/aggregate_query_sql-5241a705a1ff29ae
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12                                                                            
                        time:   [715.17 us 722.60 us 730.19 us]
                        change: [-8.3167% -5.2253% -2.2675%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by 15 12                                                                            
                        time:   [5.6538 ms 5.6695 ms 5.6892 ms]
                        change: [+0.1012% +0.5308% +0.9913%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  4 (4.00%) high mild
  6 (6.00%) high severe

aggregate_query_group_by_with_filter 15 12                                                                             
                        time:   [2.6598 ms 2.6665 ms 2.6751 ms]
                        change: [-0.5532% -0.1446% +0.2679%] (p = 0.51 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe

@jorgecarleitao
Copy link
Member Author

Note that there is an important note in Tokio's documentation:

If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as rayon. You can use an oneshot channel to send the result back to Tokio when the rayon task finishes.

I.e. Tokio's spawn is suitable for non-blocking, non-CPU-bound tasks. This may explain why we do not see a significant performance increase when using tokio::spawn: most of our tasks are CPU-bounded.

@github-actions
Copy link

@andygrove
Copy link
Member

andygrove commented Oct 13, 2020

This looks great, but how do we control the concurrency now? The concurrency attribute in ExecutionConfig is now effectively unused. I think we should either create the tokio thread pool explicitly using the ExecutionConfig.concurrency value or remove this config entirely since it is no longer used.

Without being able to control the concurrency level it will be difficult (but not impossible) to run scalability benchmarks to see how we scale with an increasing concurrency level.

@jorgecarleitao
Copy link
Member Author

Ah, I see the use-case now.

Ok, I think that tokio's recommendation is:

Instances of Runtime can be created using new or Builder. However, most users will use the #[tokio::main] annotation on their entry point instead.

Thus, IMO we should remove the #[tokio:main] and initialize the runtime using its associated builder, to configure the size of the thread pool, whenever we want to expose the concurrency configuration.

Note that in the benchmarks, we can even specify different concurrency parameter depending on the benchmark (in case we want to perform some scaling).

@andygrove
Copy link
Member

I wonder if the scope of this is too much. I really like this change and would like to see if merged. Perhaps we just remove the concurrency config and let users of DataFusion create their own tokio runtimes?

@andygrove
Copy link
Member

It would make sense to create the tokio runtime specifically in the benchmark crate and in the CLI but for other cases, we are being called as a library and should assume that the user has created the tokio runtime, I think.

@jorgecarleitao
Copy link
Member Author

@andygrove , just to understand, we should avoid merging to master until the release, right? (I have this feeling, but I am not super sure xD)

@andygrove
Copy link
Member

@jorgecarleitao I see no reason to wait before merging this (unless you want to remove the concurrency attribute from ExecutionConfig as part of this PR since it no longer has any effect).

@jorgecarleitao
Copy link
Member Author

Ah, ok, I mistakenly understood that we were in a "feature freeze" until the release (which is why I have not been merging stuff these days).

@andygrove
Copy link
Member

andygrove commented Oct 13, 2020 via email

@alamb
Copy link
Contributor

alamb commented Oct 13, 2020

Note that in the benchmarks, we can even specify different concurrency parameter depending on the benchmark (in case we want to perform some scaling).

For what it is worth, one thing I plan to do as part of my internal project, is to limit "concurrent CPU bound tasks" by using a tokio::sync::mpsc::Channel as a way to limit the maximum number of concurrent tasks.

kszucs pushed a commit that referenced this pull request Oct 19, 2020
Currently, `mergeExec` uses `tokio::spawn` to parallelize the work, by calling `tokio::spawn` once per logical thread. However, `tokio::spawn` returns a task / future, which `tokio` runtime will then schedule on its thread pool.

Therefore, there is no need to limit the number of tasks to the number of logical threads, as tokio's runtime itself is responsible for that work. In particular, since we are using [`rt-threaded`](https://docs.rs/tokio/0.2.22/tokio/runtime/index.html#threaded-scheduler), tokio already declares a thread pool from the number of logical threads available.

This PR removes the coupling, in `mergeExec`, between the number of logical threads (`max_concurrency`) and the number of created tasks. I observe no change in performance:

<details>
 <summary>Benchmark results</summary>

```
Switched to branch 'simplify_merge'
Your branch is up to date with 'origin/simplify_merge'.
   Compiling datafusion v2.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/datafusion)
    Finished bench [optimized] target(s) in 38.02s
     Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/aggregate_query_sql-5241a705a1ff29ae
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12
                        time:   [715.17 us 722.60 us 730.19 us]
                        change: [-8.3167% -5.2253% -2.2675%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by 15 12
                        time:   [5.6538 ms 5.6695 ms 5.6892 ms]
                        change: [+0.1012% +0.5308% +0.9913%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  4 (4.00%) high mild
  6 (6.00%) high severe

aggregate_query_group_by_with_filter 15 12
                        time:   [2.6598 ms 2.6665 ms 2.6751 ms]
                        change: [-0.5532% -0.1446% +0.2679%] (p = 0.51 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe
```

</details>

Closes #8453 from jorgecarleitao/simplify_merge

Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
@jorgecarleitao jorgecarleitao deleted the simplify_merge branch October 28, 2020 04:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants