-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-8039: [Python] Use dataset API in existing parquet readers and tests #6303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-8039: [Python] Use dataset API in existing parquet readers and tests #6303
Conversation
2c2cd83 to
4f04f69
Compare
|
Thanks for opening a pull request! Could you open an issue for this pull request on JIRA? Then could you also rename pull request title in the following format? See also: |
4f04f69 to
bdb2948
Compare
bdb2948 to
22967e0
Compare
22967e0 to
3f7add8
Compare
0bea92d to
0f16f99
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, few typos/questions
Personally, I'd prefer to flip the condition from use_dataset=False to use_legacy_dataset=True
python/pyarrow/tests/test_parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could be fixed just for ParquetDatasetV2 by uniqueing the column names, is that unfavorable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that should be easy to do.
But, I would prefer that we decide on this for the new Datasets API what we want (deduplicating the passed columns, or returning duplicated columns), and follow that here. Otherwise it creates an inconsistency between this and the pyarrow.dataset API. So therefore I left it as a TODO for now (with the TODO being the need to bring this up / take a decision).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will not deduplicate the column names in C++ @fsaintjacques
Fields within a schema may have duplicated field names so it seems unlikely that we'll move amenities like deduplication to the lowest level.
python/pyarrow/tests/test_parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this critical? This would be fairly easy to recover with an option like discover_dictionaries in partition schema discovery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as above: I would prefer that we first decice what we want long term, rather than exactly mimicking the old API (certainly given it is opt-in for now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the variable dictionary support we have now, returning dictionary encoded fields shouldn't (in principle?) be too hard and also should be faster than duplicating a string many times. In the worst case, a record batch would have a field having all 0 indices and a dictionary with a single value.
e6a0c37 to
ba899eb
Compare
I like that suggestion |
55edc7b to
9d6fc4a
Compare
fa16aa2 to
410798e
Compare
|
Working on reviewing this, will get you feedback as soon as I can |
python/pyarrow/tests/test_parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to discuss how to expose metadata in pyarrow.dataset. IMO this should be a property of ParquetFileFragment and not of a Dataset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to discuss how to expose metadata in pyarrow.dataset. IMO this should be a property of ParquetFileFragment and not of a Dataset
Yes, indeed, this could be something specific on the ParquetFileFragment, although a ParquetDataset can still have a "metadata" that maps to all metadata of all files/row groups, if available (eg if the dataset was created from a _metadata file with all this information).
And with the future work on storing statistics on fragments, we might also get a parquet-independent access to part of the metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_metadata serves as a consolidated mapping from paths or row groups to metadata, so I'd say that even in that case the metadata derived from it is a per-Fragment property.
wesm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good progress, thank you all for working on this. I'm not sure what to do longer term about this test suite, which was a bit of a rat's nest already
Can we collect somewhere a list of what features of the old ParquetDataset are not supported, and then we can decide what must be implemented or what will be left on the chopping block?
python/pyarrow/parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having all the arguments and their defaults duplicated here and then the additional checking in ParquetDatasetV2 is a bit unsatisfying. Is there a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The __new__ was needed (as far as I remember and understand) to get pickling to work for the old ParquetDataset
python/pyarrow/parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to try to extract this and the same section from the legacy reader (can do so later)
python/pyarrow/parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is discovery / metadata analysis multithreaded by default?
python/pyarrow/tests/test_parquet.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the variable dictionary support we have now, returning dictionary encoded fields shouldn't (in principle?) be too hard and also should be faster than duplicating a string many times. In the worst case, a record batch would have a field having all 0 indices and a dictionary with a single value.
|
Thanks for the feedback! Some non-inline responses for easier visibility:
Splitting it in multiple files might also help (eg pure parquet tests (ParquetFile, metadata, statistics) vs parquet dataset tests). And at some point if we can remove the old dataset code, that will also help of course ;)
Yes, will do.
Right now, this is not multithreaded (also not optional). There is a JIRA for this: https://issues.apache.org/jira/browse/ARROW-8137
In the datasets API, there is a The main problem here is that in the new datasets API, no deterministic row order (order of the record batches) is guaranteed when using multi-threading. So for testing equality of tables, we turned multithreading off for now (see eg https://issues.apache.org/jira/browse/ARROW-7719, #6319 for a PR that fixed failing tests due to this in test_dataset.py). Personally, I would prefer to see deterministic results in the Datasets API as well (IMO this is what users will expect, although this can also be handled to some extent by the library that uses this code (e.g. dask)), but there was some disagreement about this. I don't think this discussion was captured in a JIRA (except the one about failing tests).
It indeed should be possible (@bkietz fixed a bug preventing this), so you can already manually specify the partitioning schema with dictionary types, and that works now. But we could also have an option to automatically do this? |
1e736e7 to
eae54f3
Compare
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
974a975 to
a5dfddc
Compare
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
6938ef0 to
9650f65
Compare
|
All green here. |
python/pyarrow/parquet.py
Outdated
| This function also supports passing in as List[Tuple]. These predicates | ||
| are evaluated as a conjunction. To express OR in predicates, one must | ||
| use the (preferred) List[List[Tuple]] notation. | ||
| implements partition-level (hive) filtering, i.e., to prevent the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the hive mention a bit confusing here. Could you add clearer note that depending on use_legacy_dataset what can the user filter on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| "Dataset API".format(keyword)) | ||
|
|
||
| # map old filesystems to new one | ||
| # TODO(dataset) deal with other file systems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be hard to map HadoopFileSystem and S3FSWrapper as well?
Either way please file a jira ticket for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question might also be if we actually want to add such mapping (for LocalFileSystem I mainly did it to get the tests passing right now).
As in the end, we also want that people switch to the new filesystems, so maybe we should rather give that message (certainly if they do the use_legacy_dataset=False opt-in, they can also use the new filesystem objects?)
kszucs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor comments, nicely done @jorisvandenbossche!
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few suggestions which might improve clarity
| # map old filesystems to new one | ||
| # TODO(dataset) deal with other file systems | ||
| if isinstance(filesystem, LocalFileSystem): | ||
| filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a follow up ticket, maybe memory_map should default to None so that we can reconcile it with an instance of pa.fs.LocalFileSystem
| filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map) | |
| filesystem = pyarrow.fs.LocalFileSystem(use_mmap=memory_map) | |
| elif isinstance(filesystem, pyarrow.fs.LocalFileSystem): | |
| assert memory_map is None |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe memory_map should default to None so that we can reconcile it with an instance of pa.fs.LocalFileSystem
Yeah, if we keep it as an option to the filesystem, that sounds as a good idea. But there also has been some discussion recently about not tying it to the filesytem.
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
Co-Authored-By: Benjamin Kietzman <bengilgit@gmail.com>
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @jorisvandenbossche!
merging
|
I finally listed the open TODO items from the discussions in this PR / the skipped tests, and opened JIRAs where this was not yet the case:
|
This is testing to optionally use the dataset API in the pyarrow parquet reader implementation (
read_tableandParquetDataset().read()).Currently, it is enabled by passing
use_legacy_dataset=False(mechanism to opt in to be discussed), which allows to run our existing parquet tests with this (the approach I now took is to parametrize the existing tests for use_legacy_dataset True/False).This allows users to do:
or
and with the idea that at some point, the default for
use_legacy_datasetwould switch fromTruetoFalse.Long term, I think we certainly want to keep
pq.read_table(and I think we will also be able to support most of its keywords).The future for
pq.ParquetDatasetis less clear (it has a lot of API that is tied to the python implementation, eg the ParquetDatasetPiece, PartitionSet, ParquetPartitions, .. classes). We probably want to move people towards a "new" ParquetDataset that is more consistent with the new general datasets API. Therefore, right now theParquetDataset(use_legacy_dataset=False)does not yet try to provide all those features, but just theread()method. We can later see which extra features we add and how advanced users of ParquetDataset can move to the new API.