Skip to content

Conversation

@2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Jul 24, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Now the NLJ execution logic is

  1. Buffer all left input
  2. Read one right batch at a time, and do a Cartesian product (all-left-batches x right-batch) to fetch the index combination
  3. Incrementally do the join on that pre-computed indices

This approach includes one extra step for a very large Cartesian product calculation, it can be both memory-consuming and in-efficient. A better approach can be directly perform the join on a small intermediate.

What changes are included in this PR?

Summary

This PR introduces a somewhat unusual change: it rewrites the Nested Loop Join operator from scratch.

Motivation

The original implementation performs a Cartesian product of (all-left-batches x right-batch), materializes that intermediate result for predicate evaluation, and then materializes the (potentially very large) final result all at once. This design is inherently inefficient, and although many patches have attempted to alleviate the problem, the fundamental issue remains.

A key challenge is that the original design and the ideal design (i.e., one that produces small intermediates during execution) are fundamentally different. As a result, it's practically impossible to make small incremental changes that fully address the inefficiency. These patches may also increase code complexity, making long-term maintenance more difficult.

Example of Prior Work

Here's a recent example of a small patch intended to improve the situation:
#16443
Even with careful engineering, I still feel the entropy in the code increases.

Why a Rewrite?

Since NLJ is a relatively straightforward operator, a full rewrite seemed worthwhile. This allows for a clean, simplified design focused on current goals—performance and memory efficiency—without being constrained by the legacy implementation.

Current Status and Outlook

The rewrite is smoother than I expected: The current minimal implementation, which reuses existing utilities (despite some inefficiencies), is already up to 2× faster and is likely significantly more memory-efficient (although memory usage has not yet been measured)

I also noticed there are several more optimizations opportunities to make it even faster. Now the implementation got regular tests passed, and there are several join_fuzz tests failing, after fixing them we can iterate on more optimizations.

Benchmark Result

The NLJ micro-bench: #16819

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃   pr-16819 ┃ nlj-rewrite-tmp ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  219.41 ms │        88.63 ms │ +2.48x faster │
│ QQuery 2     │  265.81 ms │       116.56 ms │ +2.28x faster │
│ QQuery 3     │  363.44 ms │       206.17 ms │ +1.76x faster │
│ QQuery 4     │  995.84 ms │       401.15 ms │ +2.48x faster │
│ QQuery 5     │  640.90 ms │       267.34 ms │ +2.40x faster │
│ QQuery 6     │ 5677.61 ms │      1766.28 ms │ +3.21x faster │
│ QQuery 7     │  666.71 ms │       415.68 ms │ +1.60x faster │
│ QQuery 8     │ 5632.40 ms │      1755.71 ms │ +3.21x faster │
│ QQuery 9     │  673.28 ms │       452.82 ms │ +1.49x faster │
│ QQuery 10    │ 2185.64 ms │      1247.23 ms │ +1.75x faster │
└──────────────┴────────────┴─────────────────┴───────────────┘

Implementation

The design/implementation doc can be found in the source.

Next Steps

Given there are still many opportunities to make it even faster, I plan to include several major optimizations into the current PR to save some review bandwidth (It's always a contiguous low hundreds LoC for the core logic, even if we include more optimizations into it)

  • TODO: doc about the major potential optimizations
  • Fix bug that caused join_fuzz tests to fail
  • Memory-limited case (maybe this can be split to a separate PR 🤔 , but I want to first do some POC to ensure it won't cause big structural change)
  • Update order properties with tests

Potential Optimizations

  • Eliminate redundant indices <--> batch conversions for the buffered right batch

Update: the above change improved from 2X --> 3X!
For the following ideas, I've checked through the profiling flamegraph. They're not on the critical path.

- [ ] Don't concat input batches into a single one in JoinLeftData
- [ ] Batch bitmap ops ~~
~~ Now we maintain a concurrent bitmap for matched rows in the left buffer, once a while each partition will grab the lock, set matched bits one by one, then release the lock.

A more efficient way is: each partition first set a local bitmap (like a chunk of 512B), and apply all for every lock acquire. (if the match is rare maybe we can skip this step)
- [ ] Arrow: In BatchCoalescer support directly append joined result into the output buffer, without extra copies.
Potential new interface: push_joined_batch(left_batch, right_batch, left_indices, right_indices)

Would love to hear your thoughts on this @UBarney @jonathanc-n @ding-young , and looking forward to collaborating to make NLJ as fast as possible.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jul 24, 2025
@2010YOUY01 2010YOUY01 marked this pull request as draft July 24, 2025 12:41
@ding-young
Copy link
Contributor

ding-young commented Jul 25, 2025

I'll need some time to read through the code, but reduced memory usage is also impressive 👍

  • v1 (old)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            657.40    1112.0 MB      10.6 GB            0
2            833.66    1691.1 MB      10.6 GB            0
3           1688.13       3.4 GB      10.8 GB            0
4           4176.38      16.2 GB      20.1 GB            0
5           1629.22      23.2 GB      57.2 GB            0
6          17687.00      18.9 GB      26.1 GB            0
7           1245.64      23.3 GB      39.2 GB            0
8          19420.73      20.1 GB      31.3 GB            0
9           3467.28      10.4 GB      19.9 GB            0
10          5414.65      44.7 GB      46.0 GB            0
  • v2 (nlj-rewrite-tmp)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            383.61     104.4 MB       5.0 GB            0
2            645.60     471.2 MB       8.0 GB            0
3            763.10    1738.5 MB       6.0 GB            0
4           1973.21       5.9 GB       9.6 GB            0
5            644.86     145.1 MB       4.0 GB            0
6           6043.58     149.3 MB       2.0 GB            0
7            543.70     154.5 MB       7.0 GB            0
8           6516.91     177.4 MB       4.0 GB            0
9           1571.97     140.4 MB       6.0 GB            0
10          3098.54      22.8 GB      26.9 GB            0

I ran cargo run --profile release-nonlto --bin mem_profile nlj in https://github.com/ding-young/datafusion/tree/nlj-rewrite-tmp-review and adjusted v1/v2 with static USE_NLJ_V2: bool = false;

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Jul 26, 2025

I'll need some time to read through the code, but reduced memory usage is also impressive 👍

  • v1 (old)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            657.40    1112.0 MB      10.6 GB            0
2            833.66    1691.1 MB      10.6 GB            0
3           1688.13       3.4 GB      10.8 GB            0
4           4176.38      16.2 GB      20.1 GB            0
5           1629.22      23.2 GB      57.2 GB            0
6          17687.00      18.9 GB      26.1 GB            0
7           1245.64      23.3 GB      39.2 GB            0
8          19420.73      20.1 GB      31.3 GB            0
9           3467.28      10.4 GB      19.9 GB            0
10          5414.65      44.7 GB      46.0 GB            0
  • v2 (nlj-rewrite-tmp)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            383.61     104.4 MB       5.0 GB            0
2            645.60     471.2 MB       8.0 GB            0
3            763.10    1738.5 MB       6.0 GB            0
4           1973.21       5.9 GB       9.6 GB            0
5            644.86     145.1 MB       4.0 GB            0
6           6043.58     149.3 MB       2.0 GB            0
7            543.70     154.5 MB       7.0 GB            0
8           6516.91     177.4 MB       4.0 GB            0
9           1571.97     140.4 MB       6.0 GB            0
10          3098.54      22.8 GB      26.9 GB            0

I ran cargo run --profile release-nonlto --bin mem_profile nlj in https://github.com/ding-young/datafusion/tree/nlj-rewrite-tmp-review and adjusted v1/v2 with static USE_NLJ_V2: bool = false;

Thanks for the profiler 👍🏼 I'll review that soon.

I think Q4 and Q10 should not consume that much memory 😅

-- q10:
SELECT *
        FROM range(30000) AS t1
        FULL JOIN range(30000) AS t2
        ON (t1.value + t2.value) % 10 <> 0;

It is supposed to only buffer (30k x 8Bytes), 20G RSS is not expected.
let me double check the implementation...

UPDATE: maybe it's just because the nlj benchmark is buffering all the result (since we're only measuring the memory usage for NLJ operator, the downstream operator is okay to throw away results immediately) I'll fix and confirm it.

@2010YOUY01 2010YOUY01 changed the title WIP: Limit intermediate data size for NestedLoopJoin (up to 2X faster) WIP: Rewrite NestedLoopJoin to limit intermediate size (up to 2X faster) Jul 26, 2025
@UBarney
Copy link
Contributor

UBarney commented Jul 26, 2025

I'll need some time to read through the code, but reduced memory usage is also impressive 👍

  • v1 (old)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            657.40    1112.0 MB      10.6 GB            0
2            833.66    1691.1 MB      10.6 GB            0
3           1688.13       3.4 GB      10.8 GB            0
4           4176.38      16.2 GB      20.1 GB            0
5           1629.22      23.2 GB      57.2 GB            0
6          17687.00      18.9 GB      26.1 GB            0
7           1245.64      23.3 GB      39.2 GB            0
8          19420.73      20.1 GB      31.3 GB            0
9           3467.28      10.4 GB      19.9 GB            0
10          5414.65      44.7 GB      46.0 GB            0
  • v2 (nlj-rewrite-tmp)
Query     Time (ms)     Peak RSS  Peak Commit  Page Faults
--------------------------------------------------------------
1            383.61     104.4 MB       5.0 GB            0
2            645.60     471.2 MB       8.0 GB            0
3            763.10    1738.5 MB       6.0 GB            0
4           1973.21       5.9 GB       9.6 GB            0
5            644.86     145.1 MB       4.0 GB            0
6           6043.58     149.3 MB       2.0 GB            0
7            543.70     154.5 MB       7.0 GB            0
8           6516.91     177.4 MB       4.0 GB            0
9           1571.97     140.4 MB       6.0 GB            0
10          3098.54      22.8 GB      26.9 GB            0

I ran cargo run --profile release-nonlto --bin mem_profile nlj in https://github.com/ding-young/datafusion/tree/nlj-rewrite-tmp-review and adjusted v1/v2 with static USE_NLJ_V2: bool = false;

Thanks for the profiler 👍🏼 I'll review that soon.

I think Q4 and Q10 should not consume that much memory 😅

-- q10:
SELECT *
        FROM range(30000) AS t1
        FULL JOIN range(30000) AS t2
        ON (t1.value + t2.value) % 10 <> 0;

It is supposed to only buffer (30k x 8Bytes), 20G RSS is not expected. let me double check the implementation...

@ding-young For q10, the maxRSS I got using /usr/bin/time -v is lower than yours (16712272kb vs 44.7 GB).

The code for ./target/release/join_limit_join_batch_size should be identical to the "v1 (old)" code.

√ devhomeinsp  ~/c/datafusion > /usr/bin/time -v  ./target/release/join_limit_join_batch_size --maxrows 1 -c '        SELECT *
        FROM range(30000) AS t1
        FULL JOIN range(30000) AS t2
        ON (t1.value + t2.value) % 10 <> 0;'
DataFusion CLI v48.0.0
+-------+-------+
| value | value |
+-------+-------+
| 0     | 24576 |
| .             |
| .             |
| .             |
+-------+-------+
810000000 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 4.202 seconds.

        Command being timed: "./target/release/join_limit_join_batch_size --maxrows 1 -c         SELECT *
        FROM range(30000) AS t1
        FULL JOIN range(30000) AS t2
        ON (t1.value + t2.value) % 10 <> 0;"
        User time (seconds): 9.26
        System time (seconds): 4.74
        Percent of CPU this job got: 322%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:04.34
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 16712272
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 3028619
        Voluntary context switches: 72488
        Involuntary context switches: 28
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

@2010YOUY01
Copy link
Contributor Author

Run extended tests

@2010YOUY01
Copy link
Contributor Author

Re: #16889 (comment)

I think the reason is: datafusion-cli won't buffer the final output, and now the NLJ bench will buffer all output (I'll fix that)

Copy link
Contributor

@UBarney UBarney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't expect this PR to have such a big performance improvement. Like #16443, I still don't understand why there is a performance improvement.

/// include decoding Parquet files), so it tries to buffer more left
/// batches at once to minimize the scan passes.
/// 2. Read right side batch one at a time. For each iteration, it only
/// evaluates the join filter on (1-left-row x right-batch), and puts the
Copy link
Contributor

@UBarney UBarney Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this join method, it seems we can't preserve the order of the right table.

#16364 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to relax that property. The new implementation can also preserve left order in some cases.

Copy link
Contributor

@UBarney UBarney Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new implementation can also preserve left order in some cases

Only when right_table.num_rows() < batch_size ?

For join types like inner join, we are no longer preserving the order of the right table. I'm not sure if this is a breaking change. It's possible that some users rely on this behavior, especially those who create an ExecutionPlan directly, bypassing the DataFusion optimizer. (Are there any users who do this?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should mark it as a breaking change and add to the upgrade guide.
If anyone is relying heavily on this ordering property for optimization, we can keep the old impl (maybe rename to RightOrderPreservingNLJ) and use a configuration to control it.

@2010YOUY01
Copy link
Contributor Author

I didn't expect this PR to have such a big performance improvement. Like #16443, I still don't understand why there is a performance improvement.

I’m aware of two key differences:

  • Fewer redundant steps for indices <--> batches
  • Always keeping the right batch in cache (the original implementation performs a left-chunk × right-row iteration)

However I was just hoping to cleanup the codebase a bit, I also didn’t expect it to be an easy 2X.

@UBarney
Copy link
Contributor

UBarney commented Jul 27, 2025

I didn't expect this PR to have such a big performance improvement. Like #16443, I still don't understand why there is a performance improvement.

I’m aware of two key differences:

  • Fewer redundant steps for indices <--> batches
  • Always keeping the right batch in cache (the original implementation performs a left-chunk × right-row iteration)

However I was just hoping to cleanup the codebase a bit, I also didn’t expect it to be an easy 2X.

I can't use perf to analyze cache misses in my Hyper-V VM.

Details
sudo perf stat -e cycles,instructions,cache-references,cache-misses ./target/release/1_left_row_join_right_batch -c '        SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;'erf list cache -M

 Performance counter stats for './target/release/1_left_row_join_right_batch -c         SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;':

   <not supported>      cycles
   <not supported>      instructions
   <not supported>      cache-references
   <not supported>      cache-misses

       0.662652693 seconds time elapsed

      11.216489000 seconds user
       0.082434000 seconds sys

However, using the time command, I discovered that the previous version had a significantly higher number of Minor (reclaiming a frame) page faults (3,207,160 vs 5,133) and much greater system time (19.36s vs 0.06s).

Details
 /usr/bin/time -v ./target/release/join_limit_join_batch_size -c '        SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;'

        Command being timed: "./target/release/join_limit_join_batch_size -c         SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;"
        User time (seconds): 27.68
        System time (seconds): 19.36
        Percent of CPU this job got: 2058%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:02.28
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 23530920
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 3207160
        Voluntary context switches: 629
        Involuntary context switches: 1771
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

 /usr/bin/time -v ./target/release/1_left_row_join_right_batch -c '        SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;'

        Command being timed: "./target/release/1_left_row_join_right_batch -c         SELECT *                            FROM range(10000) AS t1
        JOIN range(200000) AS t2
        ON (t1.value + t2.value) % 1000 = 0;"
        User time (seconds): 11.50
        System time (seconds): 0.06
        Percent of CPU this job got: 1896%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 0:00.61
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 135744
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 5133
        Voluntary context switches: 461
        Involuntary context switches: 574
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

My speculation is that the previous version suffered from memory management overload due to need alloc large memory, as perf also indicated that the single kernel function clear_page_erms( a kernel function that efficiently zeroes out a page of memory using a fast CPU instruction.) was the top CPU consumer.

sudo perf report --no-children

Samples: 231K of event 'cpu-clock:ppp', Event count (approx.): 57968750000
  Overhead  Command          Shared Object                       Symbol
+   29.95%  tokio-runtime-w  [kernel.kallsyms]                   [k] clear_page_erms
+    9.11%  swapper          [kernel.kallsyms]                   [k] pv_native_safe_halt
+    6.03%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000000df5b44
+    4.14%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000002bd63d5
+    3.90%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000002bd63e5
+    2.96%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000002bd61e7
+    2.94%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000002bd61eb
+    2.65%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000000e088a4
+    2.64%  tokio-runtime-w  join_limit_join_batch_size          [.] 0x0000000002bd61bc
image

@2010YOUY01 2010YOUY01 changed the title WIP: Rewrite NestedLoopJoin to limit intermediate size (up to 2X faster) WIP: Rewrite NestedLoopJoin to limit intermediate size (up to 3.2X faster) Jul 28, 2025
@2010YOUY01
Copy link
Contributor Author

I tried to eliminate redundant batch transformations, the speedup goes from 2X -> 3X!

Through the flamegraph, I can see that over 50% of the time is now spent evaluating the join filter (e.g. (c1 + c2) % 10 = 1). It seems there may not be any more low-hanging fruits for further speed improvements.

After a few more chores, this should be ready for review:

  • figure out the output ordering properties and update test
  • Metrics accounting
  • Polish(more comments and clean-up)

@2010YOUY01
Copy link
Contributor Author

Move to the final PR #16996, so closing this one.

@2010YOUY01 2010YOUY01 closed this Jul 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants