Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Aug 2, 2022

Primary Goal: Create a scanner that "cancels" properly. In other words, when the scan node is marked finished then all scan-related thread tasks will be finished. This is different than the current model where I/O tasks are allowed to keep parts of the scan alive via captures of shared_ptr state.

Secondary Goal: Remove our dependency on the merged generator and make the scanner more accessible. The merged generator is complicated and does not support cancellation, and it currently only understood by a very small set of people.

Secondary Goal: Add interfaces for schema evolution. This wasn't originally a goal but arose from my attempt to codify and normalize what we are currently doing. These interfaces should eventually allow for things like filling a missing field with a default value or using the parquet column id for field resolution.

Performance isn't a goal for this rework but ideally this should not degrade performance.

@github-actions
Copy link

github-actions bot commented Aug 2, 2022

@github-actions
Copy link

github-actions bot commented Aug 2, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace
Copy link
Member Author

This is still very much a draft. However, I have the basic path working. I will need to migrate over the existing scanner tests and test the various failure paths (as well as stress test for race conditions, there is potential for plenty here).

I'm very curious what people think about the new scan options as well as the new evolution interfaces.

@westonpace westonpace requested a review from lidavidm August 2, 2022 18:25
@westonpace
Copy link
Member Author

CC @save-buffer @marsupialtail who might be interested in this as well as we have spoken on similar topics

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this looks quite reasonable and is much easier to follow than the generator-based approach.

Comment on lines +78 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wouldn't we want to keep this responsibility in a more common place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the scanner can do this "field pruning" just before the batch is sent for evolution. I'm not entirely sure how the scanner would know which fields need to be pruned though. The fragment scanner could maybe have a boolean flag whether it will do the pruning or not do the pruning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be treated as part of the 'schema evolution' functionality, since that's already being used to fill in missing fields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be. I think I'd like to leave it for now and see how it looks when we add CSV support for the new scanner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically to read the footer ahead of time?

So, ParquetFileFragment would attach the actual Parquet footer in the structure it returns, the Arrow fragment would return the schema, the CSV fragment would throw its hands up, etc.?

Copy link
Member Author

@westonpace westonpace Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I'm not a huge fan of this because now fragment scanning is broken into three steps...

FragmentScanner: Inspect fragment (new step)
Scanner: Create evolution
FragmentScanner: Open reader (where we previously determined column names / schema)
Scanner: Start iteration task
FragmentScanner: Scan

I could push the evolution creation into the fragment scanner but that makes a new step that every fragment scanner has to do and so I think that would be worse.

Your point on "actual parquet footer" and "the schema" is also correct. My "prototypical future evolution strategy" that I am trying to make sure we plan for is to resolve field references by parquet column id. So it would work in this fashion:

  • User specifies numeric field references against the dataset schema
  • Evolution strategy has a lookup table from numeric position in dataset schema to column id
  • On inspection, physical column ids are inserted into the schema and passed to the evolution
  • Evolution uses the lookup table combined with the physical column ids to determine the physical columns that are being asked for

So this way the inspect step would need to return something with the column IDs. This could be the parquet footer. However, it also could be the schema extracted from the footer as we have a mechanism for encoding those column IDs into the schema today. This way we could leave the door open to someone adding column IDs to IPC files as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

The Parquet/Arrow readers, IIRC, let you open a file and provide an already-parsed footer to at least skip that step.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the problem with splitting scanning into these steps, FWIW. I think this would also let us down the line support schemes where metadata (footers) are cached in an index separately from the actual data (which would let us skip a synchronous I/O step)

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a good number of FIXMEs scattered about - do you plan to address those here or convert them into jiras?

/// arguments but also receives a pointer to the unmodified Expression as a second
/// argument. If no arguments were modified the unmodified Expression* will be nullptr.
template <typename PreVisit, typename PostVisitCall>
Result<Expression> Modify(Expression expr, const PreVisit& pre,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: did we have to move this out of the internal header?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I didn't then I ended up needing expression_internal.h in partition.cc. I'm not entirely sure why but including expression_internal.h in two different .cc files in the same module caused duplicate symbol errors. Not on Modify (which should be fine because it is templated) but on things like KnownFieldValues and CallNotNull. Adding Modify didn't require any additional includes in expression.h and so it doesn't seem too heavyweight an addition but I could be missing some reason we didn't want it exposed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because those weren't declared static, only inline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if I can get it to work with appropriate addition of static

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't mark the structs as static. I moved Modify to compute/exec/util.h which is also a private header and I renamed it ModifyExpression so it is more clear.

/// decoded into the Arrow format.
virtual int64_t EstimatedDataBytes(int batch_number) = 0;
/// \brief The number of batches in the fragment to scan
virtual int NumBatches() = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would something like CSV implement this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should NumBatches and EstimatedDataBytes be const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would something like CSV implement this?

It's # of batches instead of # of rows so it should just be ceil(file_size_bytes / block_size). That being said, this is a change from the previous fragment implementation which didn't require fragments to know the # of batches ahead of time. It will be a problem for something like the python iterator-based scanner.

It is possible to make this work without knowing the # of batches up front. However, it creates a bit more complexity for two reasons:

  1. It makes it harder to know the batch index (I'm also working on ordered execution) because you can't know the batch index until previous fragments have been fully scanned (where this implementation just requires that previous fragments have been inspected).
  2. It makes it difficult to know when we should stop spawning tasks to read further in the current fragment and start spawning tasks to read the next fragment. If we do not know NumBatches we have to rely on a fragment scanner returning an end-of-fragment future quickly. However, this should also be possible.

My hope is that an iterator-based dataset could easily just be an iterator-based source node and so it's ok for the scan node to have this expectation. If we were adapting flight to the scan node do you know if this is something that could cause a problem? Does a flight stream have a known # of batches before we even start consuming it? Or do we not know the # of batches until we have finished reading it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to know the number of batches, no. And I'd like to avoid reimplementing parallel scans for Flight/ADBC data sources…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, and I guess (fragment_index, batch_index, is_last) pushes too much complexity into the ordered execution bits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess I'm curious how this will work with iterator-based sources in general. Flight/ADBC would give you a set of iterators, where there is no parallelism within an iterator. And if you wanted to support Arrow stream files for some reason, that'd have the same structure.

On the other hand, Flight/ADBC have a fixed number of fragments up front.

Maybe a separate source node for those sources might make sense, with an optional 'merge' step if you want to use ordered execution with them? It would be kind of a shame to force them outside of the Dataset framework entirely, IMO, but having two different interfaces would also be annoying. I'm kind of just typing out things on the fly here…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the worst case would be if the fragments are large and there is no parallelism within an iterator and the user wants ordered execution. This will be a problem regardless of approach. We will need to queue a large number of batches in memory for any kind of resequencing and, as a result, will probably fallback to serial execution.

Hmm, and I guess (fragment_index, batch_index, is_last) pushes too much complexity into the ordered execution bits?

It might not be too bad. I'll take another look.

struct InspectedFragment {
explicit InspectedFragment(std::vector<std::string> column_names)
: column_names(std::move(column_names)) {}
std::vector<std::string> column_names;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why don't we care about the types here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original thinking was that, with CSV, we can't know the types at this point. Plus, for the basic evolution strategy, we don't need to know the types. With that strategy all we need to know is the column names.

However, looking at this with fresh eyes, I think we do know the types with CSV. We don't re-infer with each new fragment and we assert that the schema matches the dataset schema (you cannot do a type-widening evolution with CSV for example).

In that case maybe I can get rid of InspectedFragment entirely.

The other reason for InspectedFragment was for the case where we know more than types+names. For example, we might know the parquet column ids for each column. However, we should be able to encode that information in a schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make sense to keep it around; encoding everything in a schema might be unwieldy. (Though I suppose it buys you serialization for free.)

@westonpace westonpace force-pushed the feature/ARROW-17287--initial-exec-plan-scan-node branch from cc4212b to dc6fbf7 Compare September 27, 2022 23:01
…ified scan node to call downstream pipeline on a new thread task.
@westonpace westonpace merged commit ec579df into apache:master Oct 3, 2022
@ursabot
Copy link

ursabot commented Oct 3, 2022

Benchmark runs are scheduled for baseline = 89c0214 and contender = ec579df. ec579df is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.03% ⬆️0.0%] test-mac-arm
[Failed ⬇️0.27% ⬆️0.27%] ursa-i9-9960x
[Finished ⬇️1.45% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] ec579df6 ec2-t3-xlarge-us-east-2
[Failed] ec579df6 test-mac-arm
[Failed] ec579df6 ursa-i9-9960x
[Finished] ec579df6 ursa-thinkcentre-m75q
[Finished] 89c0214f ec2-t3-xlarge-us-east-2
[Finished] 89c0214f test-mac-arm
[Failed] 89c0214f ursa-i9-9960x
[Finished] 89c0214f ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants