Skip to content

Conversation

@westonpace
Copy link
Member

No description provided.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace
Copy link
Member Author

There is an entirely different implementation we could take (pass the schema in an init method before anything runs):

class ARROW_EXPORT SinkNodeConsumer {
 public:
  virtual ~SinkNodeConsumer() = default;
  /// \brief Initialize the consumer with the schema of the batches that will arrive
  virtual Status Init(const std::shared_ptr<Schema>& schema) = 0;
  /// \brief Consume a batch of data
  virtual Status Consume(ExecBatch batch) = 0;
  /// \brief Signal to the consumer that the last batch has been delivered
  ///
  /// The returned future should only finish when all outstanding tasks have completed
  virtual Future<> Finish() = 0;
};

Pros:

  • More obvious that the schema will never change
    Cons:
  • More methods that have to be implemented

Opinions? @lidavidm @vibhatha @paleolimbot

@lidavidm
Copy link
Member

I would probably prefer Init/Consume/Finish, we could default-implement Init if it's a concern

@paleolimbot
Copy link
Member

Taking this with the grain of salt that I first heard of a SinkNodeConsumer a few weeks ago, passing the schema in virtual Status Init(const std::shared_ptr<Schema>& schema) = 0; makes the most sense to me (it lines up with the C Stream interface's approach too...one schema, many batches).

@vibhatha
Copy link
Contributor

This one is cleaner. I also prefer this method for consuming_sink. One question, does this take care of ‘SinkNode’ and ‘OrderBySink’ node too? We may also need to know what’s the schema of the batches we are going to get out of a given sink, am I right? I am merely considering a general case about sink nodes.

@vibhatha
Copy link
Contributor

Another Advantage is we may be solving this issue too. https://issues.apache.org/jira/browse/ARROW-15297

@westonpace @lidavidm what do you think?

@westonpace
Copy link
Member Author

One question, does this take care of ‘SinkNode’ and ‘OrderBySink’ node too?

It doesn't currently fix SinkNode or OrderBySinkNode. We could add an init-style schema callback to the options objects for those types. I'll try it out.

Another Advantage is we may be solving this issue too. https://issues.apache.org/jira/browse/ARROW-15297

Almost, looks like the python test is failing that was failing on that issue too. I think we can move towards WriteNodeOptions taking either an optional schema (which would replace the one retrieved from the node, we could add a check to make sure they are compatible in the init step) or just a dictionary of metadata that we append to the schema in the init step.

@vibhatha
Copy link
Contributor

Looks like, if we can address these issues, this PR can cover these grounds.
@westonpace Thanks a lot for taking your time on this. I am +1 for this modification.
Shall we also add a few test cases to show the functionality?

@westonpace
Copy link
Member Author

westonpace commented Mar 29, 2022

I would probably prefer Init/Consume/Finish, we could default-implement Init if it's a concern

I've updated this PR to use an Init method. I did not default-implement as I'm not sure any meaningful consumer can do much without the schema.

I could add a default implementation that actually stored the schema (in a protected variable) but I kind of prefer pure-virtual interfaces for the public API, though maybe that is just my Java/C# background. Happy to change if anyone has any strong opinion on the matter.

Another Advantage is we may be solving this issue too. https://issues.apache.org/jira/browse/ARROW-15297

This changes the schema to an (optional) custom_metadata which I think means we do indeed solve ARROW-15297.

Shall we also add a few test cases to show the functionality?

@vibhatha

I've added a test to make sure we test the failure path (i.e. Init returns a non-ok status). I'm not sure what other test cases you had in mind.

@@ -304,10 +310,9 @@ class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
public:
TableSinkNodeOptions(std::shared_ptr<Table>* output_table,
std::shared_ptr<Schema> output_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this second argument now right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've removed it.

Copy link
Contributor

@vibhatha vibhatha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me @westonpace. This will help in exposing Substrait plan very neatly. I cannot think of any additional test cases since, this is basically provides a method to extract the schema in the user code.

@westonpace
Copy link
Member Author

It seems an R change managed to sneak in that exposes a little dilemma. The output schema is including the augmented fields, even though the Substrait plan didn't call for them.

I'm still processing how we want to handle this but I'm thinking right now we just make sure not to include augmented fields when running against a Substrait plan.

@paleolimbot
Copy link
Member

I did notice there were unexpected fields (although because there was no schema I had no idea what they were!). At least the ability to turn them off would be nice (or else we're still stuck having to calculate the column names in advance to know what columns not to pass on to the user).

@westonpace westonpace force-pushed the feature/ARROW-16033--pass-schema-to-sink-consumer branch from 5f04ffd to aad8039 Compare April 5, 2022 00:58
@westonpace
Copy link
Member Author

I added an explicit selection step to drop those columns. I think the proper fix will have to be done in a follow-up. I added a comment to the R test to this effect. @paleolimbot mind taking a quick look for sanity's sake? Then I will merge.

aad8039

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much easier than what I was doing before...thank you!

@westonpace westonpace closed this in 45a97e1 Apr 5, 2022
@ursabot
Copy link

ursabot commented Apr 6, 2022

Benchmark runs are scheduled for baseline = 9e08c50 and contender = 45a97e1. 45a97e1 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.59% ⬆️0.0%] test-mac-arm
[Failed ⬇️0.36% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.6% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/451| 45a97e13 ec2-t3-xlarge-us-east-2>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/436| 45a97e13 test-mac-arm>
[Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/437| 45a97e13 ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/446| 45a97e13 ursa-thinkcentre-m75q>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/450| 9e08c503 ec2-t3-xlarge-us-east-2>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/435| 9e08c503 test-mac-arm>
[Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/436| 9e08c503 ursa-i9-9960x>
[Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/445| 9e08c503 ursa-thinkcentre-m75q>
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

jonkeane pushed a commit that referenced this pull request Apr 8, 2022
…ROW_R_WITH_ENGINE

After ARROW-16033 (#12721) we get this compiler warning when compiling with `ARROW_R_WITH_ENGINE`:

```
   compute-exec.cpp:304:17: warning: 'Init' overrides a member function but is not marked 'override' [-Winconsistent-missing-override]
     arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema) {
                   ^
   /Users/deweydunnington/.r-arrow-dev-build/dist/include/arrow/compute/exec/options.h:153:18: note: overridden virtual function is here
     virtual Status Init(const std::shared_ptr<Schema>& schema) = 0;
                    ^
   1 warning generated.
```

This PR just adds the requisite `override`.

Closes #12823 from paleolimbot/r-minor-override

Authored-by: Dewey Dunnington <dewey@fishandwhistle.net>
Signed-off-by: Jonathan Keane <jkeane@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants