-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-16083: [C++] Implement AsofJoin execution node #13028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename pull request title in the following format? or See also: |
|
|
|
@westonpace Not sure if you are the best person to review this. I think I got the basics working but I felt it's also a bit messy. So I am looking for any feedback that can help improve it. Especially the around 1. ergonomic around managing thread/execution model and 2. reusing existing utils/classes for certain things. The algorithm itself is not complicated IMO so it's more like "how do I do it in the Arrow way". |
|
The original implementation of this was a research proof of concept that I was experimenting with. I'd expect it to be significantly revised for a production version. There are parts of the code and some complexity that aren't necessary for this version, and I'd expect changes (for example: potentially using the task mechanism) to make it consistent with other arrow ExecNodes. |
|
Thank you for the contribution. Yes, there would need to be some significant style changes but it would be a welcome addition. I'll try and give it a more detailed look tomorrow and play around with it. At a glance a few thoughts:
|
|
Thanks @westonpace
I see. I will try to change this.
Hmm..not sure if I follow what u mean... Memo store just stores row/index into the batch (it only keeps the latest row for each batch for each key). Even if there are many keys they might just point to the same batch, so it should be a pretty light weight data structure (a couple of pointers per key). Maybe you are concerning many key values pointing to different batches?
Cool thanks. I was sure if I should reuse
I haven't put too much thought into it and want to leave it out of the scope of this PR. (Seems like adding foundation logic for ordered data could be a separate PR). At the high level, I think upstream provides batch index and downstream node reorder them and split out to desk is a reasonable approach, just not sure if there is other solution that is simpler.
Yes - @rtpsw did some POC on Substrait + Asof Join and managed to get it to work. We did change to index based from name based in the substrait plan IIRC.
Cool thanks. |
|
Ok, managed to look at it a bit more today. Your sidecar processing thread is probably fine for a first approach. Eventually we will probably want to get rid of it with something that looks like: The main advantage of the above approach is just to avoid scheduling conflicts / context switches that we would have by introducing another busy thread. Yes, I misunderstood and my concern about running out of memory was not valid. The I'd have to think a bit more on how Then I guess I could be way off base here too. I'll leave it to you on how you want to proceed next. If you want to do more optimization and benchmarking you can. If you want to get something that works going first then I think we can focus on getting this cleaned up and migrated over to the Arrow style (see https://arrow.apache.org/docs/developers/cpp/development.html#code-style-linting-and-ci ). We will also want some more tests. I think my personal preference would be to get something in that is clean with a lot of tests first. Then get some benchmarks in place. Then we can play around with performance. |
|
Thanks @westonpace I will work on this more Today. Yeah I'd like to get something working and cleaned up first with some baseline performance. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not adding anything new here. Just making an official "review" comment so that Github you can click "re-request reveiw" when you're ready for me to take a look again.
|
@westonpace I am trying to extend the implementation to support multiple keys and key types and wonder if you can give some pointers. Basically I think I would create a "mapper" that maps an input "row" to the "key" and use that as the the hash map key for the given row. This mapper would
I also take a look at aggregation for other options but didn't find anything obvious. Did such a "mapper" class already exist in Arrow compute that I can use for this purpose? |
|
So if you have three key columns then the string will be the bytes for the first column followed by the bytes for the second column followed by the bytes for the third column. This string can indeed be used as the key for a hash map. This approach works ok, but is not the most performant. A newer version is being integrated which uses All of this should not be confused with |
|
Thanks @westonpace that's super helpful. I will take a look at those classes. Sounds like it's probably worthwhile to take a look at |
|
I took a stab at using I will probably wait until that is merged use and the utility functions added by that PR, e.g.
|
|
@westonpace reading back your comments - I wonder if you can explain a bit more "thread-per-core" model here?
The term seems to be used in different context so want to make sure I understand it correctly |
|
Sure. "Thread per core" is probably a bit of a misnomer too, but I haven't found a nicer term yet. The default thread pool size is std::hardware_concurrency which is the maximum number of concurrent threads the hardware supports. So we do not over-allocate threads. When dealing with I/O you normally want to make sure the system is doing useful work while the I/O is happening. One possible solution is the synchronous approach where you create a pool with a lot of threads, more than your CPU can handle. When I/O is encountered you simply block synchronously on the I/O and let the OS schedule a different thread onto the hardware. We don't do that today. Instead we take an asynchronous approach. To implement this we actually have two thread pools. The I/O thread pool is sized based on how many concurrent I/O requests make sense (e.g. not very many for HDD and a lot for S3). It is expected these threads are usually in a waiting state. The second thread pool (the one that, by default, drives the execution engine) is the CPU thread pool. This thread pool (again, by default) has a fixed size based on the processor hardware. It's very important not to block a CPU thread because that usually means you are under utilizing the hardware. |
|
The second thing you will often see mentioned is the "morsel / batch" model. When reading data in you often want to read it in largish blocks of data (counter-intuitively, these large blocks are referred to as "morsels"). This can lead to an execution engine that is roughly: However, since each operator is often going over the same data, and morsels are often bigger than CPU caches, this can be inefficient. Instead the ideal approach is: This is the model we are trying to work towards in the current execution engine (the hash join is pretty close, the projection and filter nodes still have some work to do before they can handle small batches). Also note that only the outermost loop is parallel. The "morsel" (the larger chunk) is the unit of parallelism. For data sources that have very large row groups we actually have another round of slicing but that's slightly off-topic. |
|
Thanks for the explanation! |
|
@github-actions autotune |
|
@westonpace I am sort of stuck on this PR and can use some help.
I am not sure what causes this issue or if this is something I did - any suggestions?
(Didn't run the exact cmake command from the development doc because I it gives me an missing openSSL error when configuring for some subcomponents) but nothing seem to have changed |
|
Thank you for asking. I think I probably should have been clearer. It sounds like you might be doing more than is needed (we should maybe update our developer docs or put some work into getting these pieces working again).
Yes, this particular GTest issue is not related to your change. I think it's being worked on but you don't need to worry about it.
I wouldn't worry about running clang-tidy. As long as you don't get any compiler warnings and pass
I don't know that it works for anyone. I would say that our philosophy is iwyu but we don't have any way of enforcing it. So this is more of a "if you have any doubts use the rules of iwyu to know what to include" but don't stress too much if you might have missed something. If you remove |
|
Thanks @westonpace that's very helpful. I think I know how to proceed now (pending the CI failure issue). |
|
@westonpace I have pushed another revision and addressed most of the comments (and replied the ones that I didn't address or have questions) Please take another look thank you! |
| // Build the result | ||
| assert(sizeof(size_t) >= sizeof(int64_t)); // Make takes signed int64_t for num_rows | ||
|
|
||
| // TODO: check n_rows for cast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would probably just do...
DCHECK_LE(n_rows, std::numeric_limits<int32_t>::max());
...but I wouldn't be surprised if we have a shortcut for that somewhere that I'm just not aware of yet.
| join.inputs.emplace_back(Declaration{ | ||
| "source", SourceNodeOptions{l_batches.schema, l_batches.gen(false, false)}}); | ||
| join.inputs.emplace_back(Declaration{ | ||
| "source", SourceNodeOptions{r0_batches.schema, r0_batches.gen(false, false)}}); | ||
| join.inputs.emplace_back(Declaration{ | ||
| "source", SourceNodeOptions{r1_batches.schema, r1_batches.gen(false, false)}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have at least some testing with slow / parallel inputs. I don't think a slow input is capable of reordering data but it might be so we can hold off if that is the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm..
I tried to change to parallel test by
auto exec_ctx =
arrow::internal::make_unique<ExecContext>(default_memory_pool(), arrow::internal::GetCpuThreadPool());
but then I see something weird happening - the node seems to receive data out of order. (Note for k=0, input data is out of order).
(k=0) time=([
2000,
2000
])
InputFinished find
InputFinished END
(k=1) time=([
1500,
2000,
2500
])
InputReceived END
(k=0) time=([
0,
0,
500,
1000,
1500,
1500
])
My understanding was even with parallel execution, each data source should be single thread, which doesn't seem to be be case. Before I dig too deep, I wonder if my understanding is correct about how arrow::internal::GetCpuThreadPool() works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was to leave the executor as nullptr but change things like l_batches.gen(false, false) to l_batches.gen(false, true). Each input should still be ordered but since there is some delay there might be some variation in arrival order. I think right now you will always get (assuming two batches per input)...
l_batches::0
l_batches::1
l_batches::end
r0_batches::0
r0_batches::1
r0_batches::end
r1_batches::0
r1_batches::1
r1_batches::end
I was hoping it would be possible that adding slow could yield something like...
l_batches::0
r0_batches::0
l_batches::1
r1_batches::0
r0_batches::1
l_batches::end
r0_batches::end
r1_batches::1
r1_batches::end
However, if that is not what you are seeing, then we don't need to dig too deeply and can worry about it more when we add support for parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
| namespace arrow { | ||
| namespace compute { | ||
|
|
||
| BatchesWithSchema GenerateBatchesFromString( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than copy this from hash join node tests can we move this method to src/arrow/compute/exec/test_util? It seems generally useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Moved
| schema({field("time", int64()), field("key", int32()), field("r0_v0", utf8())})); | ||
| } | ||
|
|
||
| } // namespace compute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This testing is probably ok to get started. However, the node has to deal with synchronization and has several corner cases. I think it would be good someday to get a test that generated a couple hundred batches of "structured" random data that we can later verify. I don't think we need to do this right now but it is something to keep in mind.
| auto rb = *batch.ToRecordBatch(input->output_schema()); | ||
|
|
||
| _state.at(k)->push(rb); | ||
| _process.push(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems right. Not sure what I was thinking originally.
Looking at it with fresh eyes I think there is a minor possibility that batches_processed_ could be concurrently updated if a single input had multiple empty batches arriving at the same time but it might be easier to change batches_processed_ to an atomic counter instead of using a mutex.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is getting close. A few more minor changes but nothing significant. The tests are nice and clean but they might be complete enough to flesh out all race conditions and corner cases. That can be something we grow out with the feature set too.
iChauster
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This prints the name of the unsupported type rather than the address.
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com> Co-authored-by: Ivan Chau <ivanmchau@gmail.com>
|
@westonpace Sorry for the delay - I took some time off and now got back to this. I left a question regarding parallel execution test and wonder if you have some thoughts - appreciate your help. |
iChauster
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requires these adjustments to build
No problem. I'm actually in the middle of moving so I have somewhat spotty availability this week (and probably next week) anyways. I've responded to your question. |
|
@westonpace This should be good to go. I addressed all your comments and left follow ups. CI is also green. Let me know you have more comments. Li |
|
@westonpace Gentle ping - Any thing else you would like me to change ? |
|
@icexelloss Apologies, I've spent this week (and most of the last) moving. I'm in my new location now so I should be able to get to this soon (ideally tomorrow). |
|
@westonpace No worries. Appreciate the heads up. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for getting this created. This is a cool new capability and a good starting point. Looking forward to seeing new changes built on top of this.
## Overview This is a work in progress implementation of the AsofJoin node in Arrow C++ compute. The code needs quite a bit of clean up but I have worked on this long enough that I think I benefit from some inputs/comments from Arrow maintainers about the high levels before I potentially spend too much time in the wrong direction. All Credit to @stmrtn (Steven Martin) who is the original author of the code. ## Implementation There are quite a bit of code and here is how it works at the high level: Classes: * `InputState`: A class that handles queuing for input batches and purging unneeded batches. There are one input state per input table. * `MemoStore`: A class that responsible for advancing row index and getting the latest row for each key for each key given a timestamp. (Latest timestamp that is <= the given timestamp) * `CompositeReferenceTable`: A class that is responsible for storing temporary output rows and produces RecordBatches from those rows. Algorithm: * The node takes one left side table and n right side tables, and produces a joined table * It is currently assumed that each input table will call `InputReceived` with time-ordered batches. `InputReceived` will queue the batches inside `InputState` (it doesn't do any work). There is a separate process thread that wakes up when there is new inputs and attempts to produces a output batch. If the current data is not enough to produce the output batch (i.e., we have not received all the potential right side rows that could be a match for the current left batch), it will wait for new inputs. * The process thread works as follows: 1. Advance left row index for the current batch. Then advance right tables to get the latest right row (i.e., latest right row with timestamp <= left row timestamp) 2. Once advances are done, it will continue to check to produce the output row for the current left row 3. Go to 1 until left batches are processed 4. Output batch for the current left batch 5. Purge batches that are no longer needed 6. Wait until enough batches are received to process the next left batch Entry point for the algorithm is `process()` ## TODO - [x] More Tests - [x] Decide if we can replace `CompositeReferenceTable` with sth that already exits (perhaps `RowEncoder`?) - [x] Life cycle management for the process thread (or whether or not we should have it) - [x] Lint & Code Style - [x] Handle null results properly - [x] Handle errors properly (e.g., unsupported types) - [x] Clean up debug statement ## Follow up - Handle more datatypes (both key and value) - Handle multiple keys - Change from column name to column index for time and key column (Substrait integration) - Look into using `table_builder` to reproduce the materialize logic. Authored-by: Li Jin <ice.xelloss@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
Overview
This is a work in progress implementation of the AsofJoin node in Arrow C++ compute. The code needs quite a bit of clean up but I have worked on this long enough that I think I benefit from some inputs/comments from Arrow maintainers about the high levels before I potentially spend too much time in the wrong direction.
All Credit to @stmrtn (Steven Martin) who is the original author of the code.
Implementation
There are quite a bit of code and here is how it works at the high level:
Classes:
InputState: A class that handles queuing for input batches and purging unneeded batches. There are one input state per input table.MemoStore: A class that responsible for advancing row index and getting the latest row for each key for each key given a timestamp. (Latest timestamp that is <= the given timestamp)CompositeReferenceTable: A class that is responsible for storing temporary output rows and produces RecordBatches from those rows.Algorithm:
InputReceivedwith time-ordered batches.InputReceivedwill queue the batches insideInputState(it doesn't do any work). There is a separate process thread that wakes up when there is new inputs and attempts to produces a output batch. If the current data is not enough to produce the output batch (i.e., we have not received all the potential right side rows that could be a match for the current left batch), it will wait for new inputs.Entry point for the algorithm is
process()TODO
CompositeReferenceTablewith sth that already exits (perhapsRowEncoder?)Follow up
table_builderto reproduce the materialize logic.