Skip to content

Conversation

@lidavidm
Copy link
Member

This binds InMemoryDataset to Python, allowing us to create and write back out datasets from iterables of record batches and various other objects.

@github-actions
Copy link

@lidavidm
Copy link
Member Author

CC @jorisvandenbossche perhaps?

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool, thanks!

Added few minor inline comments.

I assume this is existing behaviour of the C++ InMemoryDataset, but trying it out now because you added bindings for it, and I am noticing some surprising behaviour (or at least different behaviour compared to other datasets) regarding multiple scans.

Listing fragments only works a single time:

In [26]: table = pa.table({"a": range(10), 'b': np.random.randn(10)})

In [27]: dataset = ds.dataset(table)

In [28]: list(dataset.get_fragments())
Out[28]: [<pyarrow._dataset.Fragment at 0x7fe157128d70>]

In [29]: list(dataset.get_fragments())
Out[29]: []

Scanning a fragment results in an empty table? (I had expected that it would work at least the first time)

In [30]: dataset = ds.dataset(table)

In [31]: fragment = list(dataset.get_fragments())[0]

In [32]: fragment.to_table().to_pandas()
Out[32]: 
Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Scanning the dataset twice gives an empty table the second time:

In [33]: dataset = ds.dataset(table)

In [34]: dataset.to_table().to_pandas()
Out[34]: 
   a         b
0  0  0.114496
1  1 -0.402515
2  2 -0.581416
3  3 -0.919706
4  4 -1.335101
5  5  0.743333
6  6 -0.156003
7  7 -0.273285
8  8 -0.901662
9  9 -1.873260

In [35]: dataset.to_table().to_pandas()
Out[35]: 
Empty DataFrame
Columns: [a, b]
Index: []

I suppose some of this is the expected behaviour (eg since the iterator is only consumed a single time), but it might then be worth to more explicitly document this. As with FileSystemDatasets, you can happily do multiple scans (eg with different filters).
But an error instead of empty results could also be more user friendly IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f'Item has schema {item.schema} which which does not '
f'Item has schema {item.schema} which does not '

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I would maybe add some \n around the inserted schema to improve readability of the message

Comment on lines 735 to 738
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also pass this to an InMemoryDataset? Then _filesystemdataset_write could potentially be simplified to not have to deal with a list of batches/tables

@lidavidm
Copy link
Member Author

Hmm, I think the behavior is specifically because we're passing a reader, which of course is single-shot. In some cases we can't avoid this (e.g. if you pass in a Flight reader or an iterator). But InMemoryDataset allows passing a record batch iterator factory which we should take advantage of as much as possible, so at least in the case where we get a list of batches, that should be re-readable.

@lidavidm
Copy link
Member Author

Aha, the reason why scanning a fragment is empty is because it gets constructed with an empty schema due to a spot of undefined behavior.

InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
                                   Expression partition_expression)
    : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(),
                       std::move(record_batches), std::move(partition_expression)) {}

This might move record_batches before we evaluate empty(), resulting in an empty schema getting passed.

@lidavidm
Copy link
Member Author

This should be good (minus the known flaky integration test). As suggested I've changed _filesystemdataset_write such that it only needs to handle datasets now.

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates look good!

@lidavidm
Copy link
Member Author

lidavidm commented Apr 5, 2021

@westonpace would it be easier if we held this until ARROW-7001 is through? If we think ARROW-7001 won't make it we can let this through then, or else rework it after ARROW-7001 lands.

@westonpace
Copy link
Member

Is the concern just that we will need to rebase this into ARROW-7001? It doesn't look like it should be too challenging to rebase so I'd say go for it. Or is there some other concern I'm missing?

@lidavidm
Copy link
Member Author

lidavidm commented Apr 5, 2021

Ah yeah, I just wanted to make sure it wasn't creating a lot of additional work for you in that PR.

@westonpace
Copy link
Member

Don't worry about it. Go for it.

@lidavidm
Copy link
Member Author

lidavidm commented Apr 6, 2021

@jorisvandenbossche any other comments?

@jorisvandenbossche
Copy link
Member

Nope, I approved above, so go ahead and merge!

@lidavidm lidavidm closed this in 3274d08 Apr 6, 2021
@westonpace
Copy link
Member

Postmortem comments now that I'm reviewing this in more detail to merge 😃

  • If you accept a record batch reader then "in-memory" could be misleading. There is nothing preventing you from passing an IPC reader of any kind. In the future we might want to rename this to something like ExternalDataset or PipedDataset or StreamingDataset.

  • Until we interface with Python async there is no way to really scan this asynchronously. I can either scan it on a background thread or use the CPU thread and simply pray that the reader doesn't block. For now I'll do the latter but going forwards maybe we should split this into two different dataset classes? An InMemoryDataset which wraps a list of batches (or table[s]) and a piped dataset which wraps a reader/iterator? The latter would be consumed by an I/O thread while the former would just get consumed on the CPU thread.

@lidavidm
Copy link
Member Author

lidavidm commented Apr 6, 2021

Both points sound good to me. I filed ARROW-12231 to keep track.

pachadotdev pushed a commit to pachadotdev/arrow that referenced this pull request Apr 6, 2021
This binds InMemoryDataset to Python, allowing us to create and write back out datasets from iterables of record batches and various other objects.

Closes apache#9802 from lidavidm/arrow-10882

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants