-
-
Notifications
You must be signed in to change notification settings - Fork 747
Description
I'm running the following experiment with partition frequencies of both '7d' and '30d'
from dask.distributed import Client, wait, progress
c = Client('localhost:8786')
c
import dask.dataframe as dd
df = dd.demo.make_timeseries('2000', '2001',
{'x': float, 'y': float, 'id': int},
freq='1s', partition_freq='7d', seed=2)
df = df.persist()
wait(df)
start = time()
df2 = df.set_index('id').persist()
wait(df2)
end = time()
print(end - start) # trying to optimize thisProfiles
Analysis
Bandwidths in the small case are poor. They're close to 15MB/s per chunk. There is a lot of overlapping though. We tend to get somewhere around 50MB/s and 70MB/s. (This is for the whole machine). In the larger case they're better, around 120MB/s on average peaking up to 250MB/s. This is all on localhost though, so we should expect better.
The time to instantiate a new connection can takes a surprising amount of time (100ms or so) with wide spread, which might contribute a bit to the poor bandwidths. This should be at or below a millisecond.
My first guess on all of this is that our network is less responsive due to GIL-holding computations in Pandas (cc @jreback) but this is just a guess.
Some things we can do
- Reduce serialization cost of pandas dataframes (see Pandas serialization #931 and https://issues.apache.org/jira/browse/ARROW-376) (cc @wesm)
- Pool inter-worker connections (will try this next)
- Investigate gil-free pandas operations (these computations get poor speedups in a thread pool, so this is presumably still a live issue (cc @jreback))
- Byte-copy-free tornado IOStreams? (cc @pitrou)
The odds of any individual issue here resolving the problem is low so individually these are all probably pretty low priority.