Skip to content

Conversation

@westonpace
Copy link
Member

This also adds a utility method arrow::compute::DeclarationToReader.

This does not fully satisfy ARROW-17288 as it only adapts the CSV file format. However, this was the most complicated case, and I had to convert some of the common test utilities as well. So I am hopeful the other formats will be more straightforward.

@github-actions
Copy link

Copy link
Member

@rok rok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this Weston!

It's a little bit unclear to me what happens type-wise when csv column names are duplicated and types of duplicated columns don't match. Do we currently raise or promote?

Otherwise I'm not that familiar with the codebase so just some naming comments.

});
}

// TODO: Should use exec context owned by the plan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be addressed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no it does not need to be addressed any longer. The plan now uses the context of the batch converter so there is a single exec context, which was what the todo comment was referring to.

Future<std::shared_ptr<InspectedFragment>> FileFormat::InspectFragment(
const FileSource& source, const FragmentScanOptions* format_options,
compute::ExecContext* exec_context) const {
return Status::NotImplemented("The new scanner is not yet supported on this format");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which scanner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this around to read "scan2 node". Although I hope we don't end up needing a scan2 node and this can just replace the scan node.

Future<std::shared_ptr<FragmentScanner>> FileFormat::BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options, compute::ExecContext* exec_context) const {
return Status::NotImplemented("The new scanner is not yet supported on this format");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, which scanner?

@westonpace
Copy link
Member Author

It's a little bit unclear to me what happens type-wise when csv column names are duplicated and types of duplicated columns don't match. Do we currently raise or promote?

This should be fully configurable by the schema evolution strategy.

  • The user will ask for a column in the dataset schema and we will know the name of that column
  • The CSV reader will supply its list of column names (which will contain the duplicate)
  • The evolution strategy will be given that list (the inspected fragment), the dataset schema, and the requested columns in the dataset schema and must tell us which columns to load from the CSV.

I think the default behavior picks one arbitrarily and tries to load it. If the type for that column in the file doesn't match the type for the requested column in the dataset schema then a runtime error should be generated.

Happy to change the default to simply give an error when the fragment has duplicate column names. At this point in time I don't think we actually infer the types but we could, so that we could support a "find first column with the right name and a compatible type".
In most cases, if the fragments have duplicate column names, any amount of guessing on are part isn't likely to be correct.

Sounds like a good test case to have though to verify whatever it is that we do.

@rok
Copy link
Member

rok commented Nov 22, 2022

I think the default behavior picks one arbitrarily and tries to load it. If the type for that column in the file doesn't match the type for the requested column in the dataset schema then a runtime error should be generated.

That sounds ok correctness wise, but also like it could make execution non-deterministic for preventable reasons?

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me.

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;

/// \brief Get the size (in bytes) of the file or buffer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With compression, is this the compressed or uncompressed size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm...the same question exists for "encoded size?" or "decoded size?". I believe the answer is "uncompressed and decoded". The goal is to limit how much data is loaded after a scanner is paused. I will update this comment.

Copy link
Member Author

@westonpace westonpace Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misunderstood and thought we were talking about a different size. This size could pose a problem for compressed streams. It should ideally be the uncompressed size but I don't think that is easily available.

/// This method adds any fields that are needed by `filter` which are not already
/// included in the list of columns. Any new fields added will be added to the end
/// in no particular order.
static Status AddFieldsNeededForFilter(ScanV2Options* options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just a regular method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, when would you ever not want this? I suppose it's separate because the filter isn't provided upon construction - maybe we instead want a Validate method or something that can update state like this and check any other invariants?

Copy link
Member Author

@westonpace westonpace Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a (potentially rare) possibility that a format could fully satisfy the best effort filter. In that case, if the field is only needed for filtering, we shouldn't load it.

Although, in that case, @jacques-n has convinced me that we might want two filter fields, best_effort_filter which does not have to be satisfied (and thus should always be a selected column) and filter which MUST be satisfied (and the format should error if it cannot), which doesn't have to be in the columns list.

Either way, the main reason I'm not automagically including this is because I am trying to move away from the concept of a scan node automagically adding more columns than the user asked for. I will be doing something similar in ARROW-18386 so that the __filename, etc. fields are only included if asked for.

The goal is that, a scan request with X columns will emit batches with X columns.

Why isn't this just a regular method?

I could be convinced otherwise. In my head all exec node options objects are PODs and I never know whether a POD should have methods or not. I think, however, it would be fine to make an immutable

Result<ScanV2Options> AddFIeldsNeededForFilter() const

Would that be preferred?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry, I missed this. The reasoning makes sense. I suppose if it's meant to be an immutable POD, then the latter method makes more sense to me. But that's kind of a nit.

Comment on lines +186 to +187
/// If the filter references fields that are not included in `columns` this may or may
/// not be an error, depending on the format.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how would this not be an error? (I suppose if the format just ignores the filter?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, formats like CSV and IPC which just ignore the filter will not error. Even using parquet as an example, one could filter on statistics for a column, without ever actually loading the column. That's generally not what is desired, since that filter is best-effort, and the column would then be needed to fulfill the filter in-memory.

for (const auto& field : fields_referenced) {
// Note: this will fail if the field reference is ambiguous or the field doesn't
// exist in the dataset schema
ARROW_ASSIGN_OR_RAISE(auto field_path, field.FindOne(*options->dataset->schema()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC FindOne is linear, so this operation overall is quadratic. Might be worth considering (as a follow up) how this pattern behaves on very wide datasets (1000-10000 columns) since we've gotten reports of issues at that size before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm
Copy link
Member

I also lean towards having an explicit error if the column is ambiguous. Trying to type-match also feels somewhat brittle to me.

@westonpace
Copy link
Member Author

I also lean towards having an explicit error if the column is ambiguous. Trying to type-match also feels somewhat brittle to me.

There was a slightly related discussion here: #13938 (comment) where the following was suggested:

Hmm, it would be nice if this could work even without disabling schema evolution. Perhaps a heuristic is possible?

  • if a field name is unique, do as usual
  • if a field name is non-unique, require that it has the same number of occurrences in both schema, and iterate on those pairs in order

@lidavidm
Copy link
Member

That also seems reasonable. As long as the behavior is well-specified.

@lidavidm
Copy link
Member

It's also something that could be left for a future PR, IMO

@westonpace
Copy link
Member Author

It's also something that could be left for a future PR, IMO

ARROW-18388

@westonpace westonpace force-pushed the feature/ARROW-17288--update-scanners-for-existing-formats branch from 64ec95b to 99ca767 Compare November 27, 2022 08:12
Comment on lines 81 to 82
return reader_->ReadNextAsync().Then(
[](const std::shared_ptr<RecordBatch>& batch) { return batch; });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, isn't the Then here redundant?

Copy link
Member Author

@westonpace westonpace Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments while I skim through this. This isn't a comprehensive review at all.


EXPECT_EQ(without_named_refs, expected);
}
} ExpectRemovesRefsTo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something that makes this function better/more useful than a simple function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I was copying ExpectFoldsTo but I see there are other ExpectsXyz methods in this file which aren't structs and so I changed it to a plain method for simplicity.

/// Tasks will be left in a queue until the next call and no work will be done between
/// calls.
template <typename T>
Iterator<T> IterateSynchronously(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add basic tests for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The underlying IterateGenerator has quite a few tests already so I just tested the basic functionality in this method.

@westonpace westonpace force-pushed the feature/ARROW-17288--update-scanners-for-existing-formats branch from 99ca767 to 49010b0 Compare December 8, 2022 00:24
@westonpace
Copy link
Member Author

I believe I have addressed the review comments and CI is green again. I will probably merge this soon. We can still catch / change things as we address the other formats (and hook in the bindings) too.

@westonpace westonpace requested a review from pitrou December 8, 2022 04:25
@westonpace westonpace merged commit d1a550c into apache:master Dec 8, 2022
kou added a commit to kou/arrow that referenced this pull request Dec 24, 2022
`arrow::dataset::FileFormat`'s constructor API was changed in
ARROW-17288/apacheGH-14663.
kou added a commit that referenced this pull request Dec 25, 2022
`arrow::dataset::FileFormat`'s constructor API was changed in ARROW-17288/GH-14663.
* Closes: #14990

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
EpsilonPrime pushed a commit to EpsilonPrime/arrow that referenced this pull request Jan 5, 2023
…5086)

`arrow::dataset::FileFormat`'s constructor API was changed in ARROW-17288/apacheGH-14663.
* Closes: apache#14990

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
// This should be called in increasing order but let's verify that in case it changes.
// It would be easy enough to handle out of order but no need for that complexity at
// the moment.
DCHECK_EQ(scanned_so_far_++, batch_number);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will ScanNode::ScanBatchTask call ScanBatch unorderly and cause failure?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants