-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-15779: [Python] Create python bindings for Substrait consumer #12672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
|
Hmm, did you mean to delete the |
|
@lidavidm no I wrongly checked in files of testing submodule. I just wanted to remove that. |
59d2e12 to
4f97960
Compare
| return Status::OK(); | ||
| } | ||
|
|
||
| Future<> SubstraitSinkConsumer::Finish() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there already a sink that outputs to a reader? Why do we need a custom implementation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure I know that there is a sink that output's a std::shared_ptr<arrow::Table>. Could you please point me to this implementation, I might have missed this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is that you are thinking of SinkNode which is very similar to this class.
Right now the Substrait consumer always uses a ConsumingSinkNode and thus it needs a "consumer factory".
Another potential implementation would be for the Substrait to take in a "sink node factory" instead (or we could have both implementations). That might be more flexible in the long term. In that case we could reuse SinkNode here.
So we have SinkNode which is a "node that shoves batches into a push generator" and we have SubstraitSinkConsumer which is a "consumer that shoves batches into a push generator".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to support both as well as a consumer is much easier to implement than a node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, that last comment was about potential modifications to the Substrait consumer (e.g. we might want the Substrait consumer to support both a consumer factory API and a sink node factory API)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace good point. In this PR, should we continue with the ConsumingSinkapproach and later on in another PR think about supporting a factory approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's keep this PR focused on the ConsumingSink approach. We can worry about other changes later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create a JIRA for that one or should it be an open discussion before it becomes a ticket? Interested in that piece :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a ticket. It can be our place for open discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace created a ticket here: https://issues.apache.org/jira/browse/ARROW-16036 for. open discussion.
Would like to work on this one. I think the usability piece of this PR can be further improved with this integration.
| ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader( | ||
| substrait_json, in_schema)); | ||
| ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get())); | ||
| EXPECT_GT(table->num_rows(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we know the number of expected rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this means we should know the answer to the query accurately. It depends on the data file, right? So assuming the file static we can set a limit. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative is we can read the file directly using Parquet API and check the values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably just assume the # of rows is static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you plan to address this or not? It's not a big deal either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am. Will update it today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @lidavidm I updated the test case and also added note in case of a modification to the test file could cause a test failure. I think being exact is better than checking a non-zero value and thank you for pointing this out in the first place. Plus in Python we verify this to the letter by using the parquet reader. Using parquet reader here seemed like an overkill. So one way or the other, the objective is covered :)
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, I've added some thoughts on the C++ implementation.
python/pyarrow/_engine.pyx
Outdated
|
|
||
| Paramters | ||
| --------- | ||
| plan : bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(The docstring still doesn't describe this parameter fully.)
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting close. I think the overall approach looks good now. Just a few cleanups:
Also, is this comment resolved? https://github.com/apache/arrow/pull/12672/files#r836413387
| std::string& substrait_json); | ||
|
|
||
| /// \brief Retrieve a RecordBatchReader from a Substrait plan in Buffer. | ||
| ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'd prefer something like ExecuteJsonPlan and ExecuteSerializedPlan over overloads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And again, I think any use of Buffer here in a parameter, method name, or docstring is confusing at best, we should clarify what exactly it's meant to be
| // Path is supposed to start with "/..." | ||
| file_path = "file://" + file_path; | ||
| #endif | ||
| std::cout << "File Path : >>>>" << file_path << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this print?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes of course, still testing the windows CI breaks. Will remove it after the fix.
| ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader( | ||
| substrait_json, in_schema)); | ||
| ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get())); | ||
| EXPECT_GT(table->num_rows(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping?
|
All tests are failing, seems like it needs to be rebased against Weston's recent refactoring? |
Thank you. Let me do that. |
|
cc @lidavidm @westonpace following this refactor should be changed to |
|
There's still test failures. Can we just do the 'dumb' thing that seems to work? arrow/cpp/src/parquet/test_util.cc Lines 42 to 72 in dc97883
Or else looks like we have to fix |
|
Also @vibhatha, it is possible to set up MinGW under a Windows VM so that you can debug more quickly…unless you only have an ARM machine? |
|
@lidavidm Yes I only have an Arm machine. |
|
Benchmark runs are scheduled for baseline = cc2265a and contender = c544a8b. c544a8b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
The PR includes the initial integration of Substrait to Python
enginemodule (experimental) : comments and suggestions are much appreciated to improve this component