Skip to content

Conversation

@lidavidm
Copy link
Member

@lidavidm lidavidm commented Apr 16, 2021

This isolates the one-shot portion of InMemoryDataset to Scanner, so that it more clearly is used only for writing data from a source that cannot be re-read.

@lidavidm
Copy link
Member Author

CC @westonpace

@github-actions
Copy link

@westonpace
Copy link
Member

It's a pity, if we were not crossing languages I think we could require as input an iterable (something that can be invoked to generate a (potentially one-shot) iterator) instead of an iterator and then wouldn't have to have this distinction. This looks good but do we want to open a second JIRA for the post-12289 follow-up or just wait and do that work in this JIRA later?

@lidavidm
Copy link
Member Author

If the other side is something like a Flight stream, then I think we'd still have the distinction, unfortunately.

I am happy to wait until we have AsyncScanner merged and then I can update this.

@westonpace
Copy link
Member

How would a flight stream even work with the datasets API? What would fragments be? How would it know when the file is ended? I think I need to fit this into my mental model.

@lidavidm
Copy link
Member Author

Sorry, so this was originally added to support writing data from a generator - which could be something like a Flight stream (=record batch reader). But writing data in Datasets consumes a scanner, so you end up having to support one-shot datasets. I agree supporting reading data from Flight is an entirely different manner and would be modeled differently (presumably, as an iterable, as you suggest, corresponding to an RPC with a fixed set of parameters).

@westonpace
Copy link
Member

Ah, that's right. And for the writing from memory case we want to free up the memory after we write it so an iterable would be out of the question.

@westonpace
Copy link
Member

Hmm...maybe streaming isn't the most intuitive name then. Technically all the file-based datasets are "streaming". If a user was copying a dataset for example we would stream the data a few batches at a time. Should we just use SingleShotDataset?

@lidavidm
Copy link
Member Author

Sounds good to me.

@lidavidm
Copy link
Member Author

Rebased to pick up ARROW-12289; now OneShotDataset uses its own Fragment implementation so that ScanBatchesAsync uses a background thread, to avoid blocking in the async scanner.

@lidavidm
Copy link
Member Author

@westonpace do you want to look over the ScanBatchesAsync implementation here?

@lidavidm lidavidm force-pushed the arrow-12231 branch 2 times, most recently from b0a97ec to f1106c2 Compare May 4, 2021 18:04
@bkietz
Copy link
Member

bkietz commented May 4, 2021

To me, this seems less like a subclass of dataset and more like a subclass of Scanner: IMHO it's not intuitive that a dataset would ever be single-shot. Instead, I think it'd make more sense to add Scanner::MakeFromRecordBatchReader or so, and (probably) add single-shot-ness to the contract of Scanner.

@westonpace
Copy link
Member

To me, this seems less like a subclass of dataset and more like a subclass of Scanner: IMHO it's not intuitive that a dataset would ever be single-shot. Instead, I think it'd make more sense to add Scanner::MakeFromRecordBatchReader or so, and (probably) add single-shot-ness to the contract of Scanner.

I'm not sure I agree. I agree with "it's not intuitive that a dataset would ever be single-shot". I don't agree that it makes any more sense for Scanner to be single-shot. I think the core non-intuitive piece is the concept of a "one-shot iterable".

In my mental model:

Dataset -> Iterable<Fragment>
Fragment -> Map<Fragment, Iterable<RecordBatch>>
Scanner -> Map<Dataset, Iterable<RecordBatch>>

So Scanner is just a "map" function which is generally (Python being the exception) reusable.

Perhaps I will revisit my original suggestion of having the input to dataset be an iterable (InMemoryDataset::RecordBatchGenerator is already sort of an "iterable" interface) and the in-memory variants are one-shot iterables. The user facing python API could remain as-is. list of batches, tables, or iterable of batches, tables would be converted into a RecordBatchReader and a one-shot implementation of InMemoryDataset::RecordBatchGenerator would consume the reader and then return an invalid status the next time InMemoryDataset::RecordBatchGenerator::Get is called.

Although that takes us back pretty close to where we started 😬

@lidavidm
Copy link
Member Author

lidavidm commented May 5, 2021

I think you could argue that Fragment is just Iterable<RecordBatch> while Scanner is Iterator<RecordBatch>. While usually Scanner is a rewindable (but not random-access) iterator, it only guarantees ForwardIterator. Furthermore it's pretty simple to implement a one-shot scanner by having it wrap a (non-public-API) one-shot fragment (like the one implemented here).

Or put another way, if we limit the one-shotness to the Scanner, then we can hide the odd nonconforming Dataset/Fragment from the public API.

@westonpace
Copy link
Member

So this would be a scanner that doesn't use fragments or datasets at all? Then would the python API change? Right now they pass batches/tables to the dataset function and then later get a scanner with scan.

So this change would be creating a scanner directly from a table/batches and bypassing the creation of a "dataset" entirely?

I think that makes a lot of sense.

@lidavidm
Copy link
Member Author

lidavidm commented May 5, 2021

Right (though the implementation would just be a SyncScanner wrapping a OneShotFragment).

In fact, Joris already refactored the Python side to have write take a Scanner. So in all cases, a Scanner gets passed to C++ (batches/etc. get turned into an InMemoryDataset and scanned; iterators get turned into a scanner directly). I'll rebase again to make sure everything still fits together.

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

DCHECK_OK(Filter(scan_options_->filter));
}

class ARROW_DS_EXPORT OneShotScanTask : public ScanTask {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exporting these, I think we can keep them in an anonymous namespace

@lidavidm lidavidm changed the title ARROW-12231: [C++][Python][Dataset] Differentiate one-shot datasets ARROW-12231: [C++][Python][Dataset] Isolate one-shot data to scanner May 6, 2021
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I like this latest approach. Right now there is a fallback in AsyncScanner::Finish to always use the sync scanner if creating a scanner from a fragment because I wasn't sure if wanted to keep the API. This uses that API so I created ARROW-12664 which I'll add soon.

@jorisvandenbossche
Copy link
Member

LGTM, the Scanner.from_batches instead of a single-shot dataset seems like a nice solution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants