Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Feb 16, 2024

Closes #856

Primarily what isn't implemented is any sort of statistics collection, compaction or row group splitting.

The approach I'm taking here is pretty straight forward. I'm taking the pyarrow filesystem and am implementing the bare essentials to instantiate a parquet dataset and get the minimal metadata from the bucket and files (e.g. all paths, the schema). The expression then effectively tracks the Fragment objects that are being created by pyarrow. The Fragments expose all the API we truly care about
They can...

  • load parquet metadata (not implemented yet)
  • read the parquet file as pandas
  • accept low level configuration options
  • split themselves by row group (not implemented yet)

I think the most notable change besides relying directly and explicitly on the pyarrow filesystem (which is backed by C++ threads, i.e. no GIL/event loop blockage) is that I am configuring the pre_buffer cache options explicitly (this may actually also be possible with the existing implementation but I'm a little lost about how kwargs are passed through).

This is not really a cache but rathe a control for how many concurrent IO requests we are posting and how large/small those are allowed to become. The config options are briefly documented here https://github.com/apache/arrow/blob/a61f4af724cd06c3a9b4abd20491345997e532c0/python/pyarrow/io.pxi#L2139

The TLDR is primarily that the default hole_size_limit is a couple of orders of magnitude too small such that we get incredibly fragmented read requests.
Arrow actually ships with a utility that allows one to calculate appropriate config options easily CacheOptions.from_network_metrics

Docs are in the C++ code here https://github.com/apache/arrow/blob/a61f4af724cd06c3a9b4abd20491345997e532c0/cpp/src/arrow/io/caching.cc#L49C28-L103 which also describes the math they are using and I think that makes sense. There is more documentation here about the lazy and prefetch_limit options.

I started measuring S3 latencies for our benchmarking bucket (coiled-runtime-ci)

image

Depending on how hot/cold the bucket is, the FirstByteLatency fluctuates between 15ms and 60ms wheres total request latency is somewhere between 30ms and 200ms. Plugging this stuff into the equation and assuming 50MiB/s per connection, we'll get

import pyarrow as pa
copts = pa.CacheOptions.from_network_metrics(
    100, # I think we should take TotalRequestLatency as a guiding point
    50,  # 50MiB/s per connection
)
print(f"hole_size_limit={format_bytes(copts.hole_size_limit)}")
print(f"range_size_limit={format_bytes(copts.range_size_limit)}")
hole_size_limit=5.00 MiB
range_size_limit=45.00 MiB

So, while the range limit is fine you'll notice that the hole size is MiB whereas the default is kiB. Effectively, we're crippling ourselves with column projections because we're introducing more latency to the requests than we're saving on data transfer.


How does this translate to benchmarks?

Well, I haven't run many yet and am about to setup a full tpch run. So far I looked at two queries

Q1

On the pyarrow generated dataset, this change isn't doing that much. The files are reasonably large and the fragmentation doesn't hurt us that badly. Also, with current main only about 40-50% of this query is actual IO. With this branch I can reduce IO to 30% which effectively shaves off about 10% total runtime

On the DuckDB dataset which contains many small rowgroups (i.e. chances of producing holes are very large and we'll end up with many fragmented reads) this change without further tuning boosts us by 25% easily

Q14

This one was pointed out to be to suffer from P2P and parquet cross interference. The query is about 40% faster on this branch using the arrow filesystem. I suspect this is fsspec vs arrow more than the caching but I haven't investigated.

I'll post full results once available.

@fjetter
Copy link
Member Author

fjetter commented Feb 16, 2024

The hole size of ~5MiB also matches with the default block size we chose in plateau (4MiB there). The buffer there works a little differently but the block size there correlates pretty well with the hole size in this implementation. The buffer there worked great for me in the past.

@mrocklin
Copy link
Member

Note that there may be some value to concurrent reads to S3, even with appropriately sized reads. Anecdotally I've found value in 3x number of connections per core/task.

This is demonstrated in this notebook in the section "Test general bandwidth onto these machines". There I just read the whole files and see what bandwidth is like and find that multiple threads are valuable even there.

In my hacky version I did this by using a Python concurrent.futures threadpool around a few partitions at once (usually when we were grouping things together due to only downloading a few columns).

@fjetter
Copy link
Member Author

fjetter commented Feb 19, 2024

Note that there may be some value to concurrent reads to S3, even with appropriately sized reads. Anecdotally I've found value in 3x number of connections per core/task.

Of course. The range_size_limit parameter is there to ensure ranges are not fused too aggressively. I think there is still some experimentation necessary but these parameters are making a big difference already.

@fjetter
Copy link
Member Author

fjetter commented Feb 19, 2024

Preliminary results (I got hard failures for Q18 and Q20, therefore no data). This is A/B main w/ fsspec and this branch

image

@mrocklin
Copy link
Member

Nice.

col, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError("Value of 'in' filter must be a list, set or tuple.")
from pyarrow import fs as pa_fs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we could maybe even turn this on if various uncommon settings aren't set? I'm not in a huge rush here, but just pointing out that the rollout process could be iterative.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's my intention. I don't really care about feature completeness here but I still want to run a test on how the fragments metadata collection thing works. If no red flags pop up there we can move forward (ETA ~ tomorrow)

return default_types_mapper


class ReadParquetPyarrowFS(PartitionsFiltered, BlockwiseIO):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this is not inheriting from ReadParquet. I intended this to inherit since there are many places in the code path where we're checking for ReadParquet. So.... this didn't use any of the optimizations defined in ReadParquet.simplify_up. I guess those are not very relevant for TPCH? 🤔

@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2024

I added filter pushdown to this based on #886 with a couple of fixes as discussed with @phofl . Looks like some tests broke so I may remove this again to merge this PR more quickly.

I have good and bad news

First the good news: Pushing filters down appears to be universally good even though not groundbreaking (in contrast, see earlier results here #305)
(this is this branch with filters pushdown compared against no filter pushdown, i.e. 717ad0b vs 227557d

This makes sense since the filter pushdown can prune entire files. This is happening for Q1 for example. I will investigate in a follow up whether it is worthwhile pushing filters down to IO level. I assume it is on the DuckDB dataset with the granular rowgroups but not on the pyarrow dataset. TBD

image

Bad news: Rebasing seems to have broken something and I caught a regression. I'll need to investigate this (left 227557d w/out filters; right 717ad0b with filters)

image

@fjetter
Copy link
Member Author

fjetter commented Feb 22, 2024

Oh well, it turns out that some of the performance kick came from dropped data :( dask/distributed#8520 I could track this down for query_2 and I suspect this is also the case for the other regressions we're seeing above. Now that this is settled, I can dig into why they are slower. That shouldn't be the case either way.

@fjetter fjetter changed the title Parquet Rewrite POC Parquet reader using Pyarrow FileSystem Feb 26, 2024
@fjetter fjetter marked this pull request as ready for review February 26, 2024 15:19
@fjetter
Copy link
Member Author

fjetter commented Feb 26, 2024

I'd like to move forward with merging this. I have a couple more things to try and modify but would like to enter an ordinary review mode asap.

current benchmarks against main (with latest pandas) looks like

image

The regressions we're seeing for query2, for example, are P2P stragglers. Looking at the bar chars w/ error bars this is a little more obvious

image

image

Particularly with #856 the pyarrow filesystem is horribly broken anyhow so this PR is a very, very clear improvement.

@fjetter
Copy link
Member Author

fjetter commented Feb 26, 2024

In case this is not abundantly obvious, this isn't feature complete. Particularly the parquet metadata scraping and all the goodies that come from it are not implemented yet. Unfortunately, the pyarrow Fragments loose their metadata upon serialization so we'll have to implement a similar collection layer as in dask/dask (if we choose to scrape all metadata). This is something I'd like to enter a proper review mode for.

There are two other possibly major performance changes that I would also like to separate

  • Use pyarrow dtype backend such that the pyarrow tables are only wrapped and actually never converted to pandas. This gives us in my testing another solid 10% performance kick on top of the above (sometimes more). However, this will likely cause fallout in tests which is why a separate PR seems sensible
  • Implement another concurrency layer in FusedIO. This is something I would love to not have to do, particularly since pyarrow appears to implement something similar (that's the fragment readahead foo) but I haven't found out how to tap into that, yet.

@fjetter fjetter requested a review from phofl February 26, 2024 15:32
Comment on lines +759 to +768
batch_size=10_000_000,
# batch_readahead=16,
# fragment_readahead=4,
fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(
pre_buffer=True,
cache_options=pa.CacheOptions(
hole_size_limit=parse_bytes("4 MiB"),
range_size_limit=parse_bytes("32.00 MiB"),
),
),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to exposing all of this via kwargs or options but I don't believe there are (m)any users that have the necessary know how and determination to fine tune this. I could be wrong, of course. I would prefer adding this once there is need for it.

Comment on lines +439 to +441
def _determine_type_mapper(
*, user_types_mapper=None, dtype_backend=None, convert_string=True
):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in a follow up PR. This is mostly copied and the defaults are such that dask/dask CI works. As already indicated, there is a sizable performance kick if we switch to pyarrow by default.

# fragment_readahead=4,
fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(
pre_buffer=True,
cache_options=pa.CacheOptions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like CacheOptions requires pyarrow>=15, so dask-expr would need to pin pyarrow pretty aggressively to use this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, indeed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I cannot build an environment with cudf and pyarrow>14 right now. I'm sure there are plenty of other packages that would like to use dask/dask-expr with a pyarrow version that is more than 4 weeks old.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this is already behind a big if check, right? Maybe we can add the PyArrow version to that condition so that this only gets used if Arrow is pretty new.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I should clarify that I don't think it's a bad idea to "use" it (setting appropriate data sieving properties can be huge).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either lock out usage of pyarrow FS for versions smaller than this or I can write compat code to make it work for everything. I'm inclined to go with Matt's suggestion and make pyarrow>=15 a requirement for this path and force users with a smaller version to use the fsspec path.

Is that fine for everyone? This way we wouldn't have a lot of compat code in the actual implementation and we can increase the actual minimal version a little later when support is better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine with me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable to me.

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments

from pyarrow import fs as pa_fs

if (
isinstance(filesystem, pa_fs.FileSystem)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to make those configs raise or should we rather fall back to the old implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old implementation is currently pretty broken so I don't want to fall back, see #856

jorisvandenbossche added a commit to apache/arrow that referenced this pull request Feb 27, 2024
…nit (#40143)

### Rationale for this change

Closes #40142

I'm developing a new dask integration with pyarrow parquet reader (see dask/dask-expr#882) and want to rely on the pyarrow Filesystem more.

Right now, we are performing a list operation ourselves to get all touched files and I would like to pass the retrieved `FileInfo` objects directly to the dataset constructor. This API is already exposed in C++ and this PR is adding the necessary python bindings.

The benefit of this is that there is API is that it cuts the need to perform additional HEAD requests to a remote storage.

This came up in #38389 (comment) and there's been related work already with #37857

### What changes are included in this PR?

Python bindings for the `DatasetFactory` constructor that accepts a list/vector of `FileInfo` objects.

### Are these changes tested?

~I slightly modified the minio test setup such that the prometheus endpoint is exposed. This can be used to assert that there hasn't been any HEAD requests.~ I ended up removing this again since parsing the response is a bit brittle.

### Are there any user-facing changes?

* Closes: #40142

Lead-authored-by: fjetter <fjetter@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
@fjetter
Copy link
Member Author

fjetter commented Feb 27, 2024

Given the lack of any objections and the fact that this is a dedicated code path that doesn't touch the fsspec filesystem code and the pyarrow FS code is currently broken anyhow (#856) I'll move forward with merging

@fjetter fjetter merged commit ed5d051 into dask:main Feb 27, 2024
@fjetter fjetter deleted the pyarrow_filesystem branch February 29, 2024 10:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parquet checksum calculation horribly slow with arrow FileSystem wrapper

4 participants