Skip to content

Conversation

@j-bennet
Copy link
Contributor

@j-bennet j-bennet commented Jun 14, 2023

Closes #882.

Benchmark for merging two dataframes, one of which is a lot bigger than the other.

Based on https://github.com/coiled/imbalanced-join/.

To clarify "imbalanced":

  • "left" dataframe has a lot more records
  • some partitions are huge and some are tiny
  • dataframes are joined by key column. It's a many-to-many join, with a lot more records on the left side
  • data is further grouped by group_cols, and then aggregated. The size of groups is very uneven, with some groups containing hundreds of records, and some containing millions.

@j-bennet j-bennet force-pushed the j-bennet/882-imbalanced-join-workflow branch 2 times, most recently from 13eec01 to 7ab74f8 Compare June 14, 2023 18:24
@j-bennet j-bennet force-pushed the j-bennet/882-imbalanced-join-workflow branch from 7ab74f8 to a776239 Compare June 14, 2023 18:44
@j-bennet j-bennet added the workflows Related to representative Dask user workflows label Jun 14, 2023
@j-bennet j-bennet closed this Jun 14, 2023
@j-bennet j-bennet reopened this Jun 14, 2023
@j-bennet j-bennet closed this Jun 14, 2023
@j-bennet j-bennet reopened this Jun 14, 2023
Comment on lines +12 to +13
large_df = dd.read_parquet("s3://test-imbalanced-join/df1/")
small_df = dd.read_parquet("s3://test-imbalanced-join/df2/")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought (feel free to ignore) but these datasets seem opaque to me. I don't know how imbalanced they are, or really what this means. Probably the answer is to go to the repository and look at notes.

An alternative would be to use dask.datasets.timeseries and take the random float columns and put them through some transformation to control skew / cardinality / etc.. this would put more control of the situation into this benchmark.

Again, just a thought, not a request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would indeed be a nice utility function. Maybe even a core functionality of timeseries?
I like the idea but I suggest to decouple this from this PR

@hendrikmakait
Copy link
Contributor

Benchmark for merging two dataframes, one of which is a lot bigger than the other.

When I think of an imbalanced join, it is a join where the individual result partitions are imbalanced (e.g., very few customers account for most of the orders in an online shop). Is this dataset imbalanced because one side is larger than the other or is there actual imbalance on the partitions/join predicates?

@ncclementi
Copy link
Contributor

For those curious about the dataset here is the repo and the notebook on what this data has: https://github.com/coiled/imbalanced-join/blob/main/simulate_imbalanced_data.ipynb

@j-bennet j-bennet marked this pull request as draft June 15, 2023 21:02
@j-bennet j-bennet force-pushed the j-bennet/882-imbalanced-join-workflow branch from e8bee56 to 1f134a5 Compare June 15, 2023 23:04

# group and aggregate, use split_out so that the final data
# chunks don't end up aggregating on a single worker
# TODO: workers are still getting killed, even with split_out
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to get rid of map_partitions in this workflow. Unfortunately, without map_partitions, even with split_out, I still have workers using too much memory and being killed. Somehow, I was able to get the same code to work on a Coiled cluster in a notebook with split_out=4000, but it might have been a lucky accident, because the workflow still errored.

Even when it worked in a notebook, the approach without map_partitions took 13 minutes to run, vs 4 minutes with map_partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_out=4000 kind of makes sense, or rather split_out=df.npartitions makes kind of sense for an aggregation where we do not expect any significant data reduction.

What was the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 4000 is the number of partitions. It was RuntimeError('shuffle_transfer failed during shuffle 3ae09c441cf66f1f37a5178dc6a4e5dc'). Cluster: https://cloud.coiled.io/clusters/230155/information?account=dask-benchmarks&computation=7b39851b-61cd-43e2-ac2d-cda7c01cbed9&sinceMs=1686864101945&untilMs=1686864482058

# )

def aggregate_partition(part):
return part.groupby(group_cols, sort=False).agg({"value": "sum"})
Copy link
Contributor Author

@j-bennet j-bennet Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This groupby is not using shuffle. With shuffle="p2p", I kept getting errors:

RuntimeError('shuffle_barrier failed during shuffle 64ef1a2a338f4af61f6b9cb4ce441355')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this groupby is a pandas groupby. For actual P2P errors I would like us to investigate.

cc @hendrikmakait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. Of course it is. I was getting errors in the commented groupby.

@j-bennet j-bennet marked this pull request as ready for review June 16, 2023 02:46
@hendrikmakait
Copy link
Contributor

Thanks for the clarification on the imbalance, @j-bennet!

Copy link
Contributor

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we merge this I would like to see

  • this running with p2p
  • this running an ordinary groupby / w/out the set_index
  • Investigate the P2P failure

In the end I would also like to see a comparison between

tasks + map_partitions vs P2P + groupby

since this is basically "what dask did last year" vs "what dask would do right now"


# group and aggregate, use split_out so that the final data
# chunks don't end up aggregating on a single worker
# TODO: workers are still getting killed, even with split_out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_out=4000 kind of makes sense, or rather split_out=df.npartitions makes kind of sense for an aggregation where we do not expect any significant data reduction.

What was the error?

Comment on lines +12 to +13
large_df = dd.read_parquet("s3://test-imbalanced-join/df1/")
small_df = dd.read_parquet("s3://test-imbalanced-join/df2/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would indeed be a nice utility function. Maybe even a core functionality of timeseries?
I like the idea but I suggest to decouple this from this PR

# large dataframe has known divisions, use those
# to ensure the data is partitioned as expected
divisions = list(range(1, 40002, 10))
large_df = large_df.set_index("bucket", drop=False, divisions=divisions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, when using an ordinary groupby instead of the map_partitions, this set_index is not required.

# )

def aggregate_partition(part):
return part.groupby(group_cols, sort=False).agg({"value": "sum"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this groupby is a pandas groupby. For actual P2P errors I would like us to investigate.

cc @hendrikmakait

@fjetter
Copy link
Contributor

fjetter commented Jun 16, 2023

data is further grouped by group_cols, and then aggregated. The size of groups is very uneven, with some groups containing hundreds of records, and some containing millions.

I actually poked around here for a while and this is not the entire truth.

What is imbalanced here is the initial partition size. When we calculate the distribution of partition sizes we can see a very strong imbalance with partitions with partition sizes ranging from a few hundred KiB up to ~3GiB (as measured by dask.sizeof)

image

However, once the groupby is done we get an extremely homogeneous partition size of about ~17MM rows each. I'm currently running another job to see how and if the group sizes actually vary.

What I can confirm is that P2P doesn't seem to be able to cope with this. I do see OOM failures because some buffers are overflowing. This is something we should look into

Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
@j-bennet
Copy link
Contributor Author

@fjetter

What is imbalanced here is the initial partition size. When we calculate the distribution of partition sizes we can see a very strong imbalance with partitions with partition sizes ranging from a few hundred KiB up to ~3GiB (as measured by dask.sizeof)

I think that's why we need that set_index. This data was simulated with a similar number of records in each bucket, so using those buckets as divisions ensures pretty even partitions.

@fjetter
Copy link
Contributor

fjetter commented Jun 20, 2023

A couple of things are wrong here

  • The input dataset df1 is imbalanced as described above such that some partitions are super large while others are tiny. There is also an ordering to this, namely the first ~6.5k partitions are tiny while the last couple hundred are large. Therefore, if we work through the dataset in priority order, we will process all large partitions together. Particularly on the default machines you are using here, with 16 cores, this means that we'll be processing 16 large partitions at the same time.
  • The way this join is constructed is that this effectively creates a worst case situation. The dataset is constructed in a way that significantly expands the size of the joined partitions. The intermediate output for the joined dataset has partitions that range from 1MM up to a couple hundred million rows which corresponds to ~50MB up to 15GB
  • This heavy imbalance is avoided by the set_index call because it does not only redistribute the data (which appears to be spread rather homogeneously along the axis of bucket) but also slices the partitions to a default size of about ~128MB which effectively negates the severe imbalance of the dataset
  • I tried reproducing the same or similar rebalancing by just calling repartition(partition_size=...) which helped somewhat but ultimately failed with memory issues because the repartition-split tasks where not computed fast enough (this is basically "root task overproduction" where the splits are not processed fast enough s.t. the very large to-be-split frames stay in memory. This effect does not happen with set_index since we have a P2P shuffle in there which does not exhibit this behavior (it's not that simple, see also RFC Set priorities for p2p shuffle tasks dask/distributed#7926)

Apart from "things are not working / scheduled the way they are supposed to" there is also the question why set_index + groupby is not successful.

  • The merge operation after set_index is simply a broadcast join (there are two broadcast joins implemented (?). This chooses the route of single_partition_join) but no indexing information is preserved, i.e. the groupby afterwards needs to perform a shuffle operation as well even though the data is already properly sorted by bucket. This is very closely related to Track whether DataFrame partitions are sorted, even if divisions are unknown? dask/dask#9425
  • This knowledge is then (ab)used to make the map_partitions work. Ideally, we'd be able to detect this ourselves

TLDR

Side note: Many / most of the problems I am discussing above can be played with on much smaller data. I ended up reducing the cluster to five workers while also reducing the data sizes by a factor of ten, e.g.

# this effectively reduces the size to 10% while keeping most of the imbalance of the dataset still intact
large_df = dd.read_parquet("s3://test-imbalanced-join/df1/").partitions[::10]  

from coiled import Cluster
from distributed import Client

cluster_kwargs = {
    "n_workers": 5,
    # Memory optimized VMs are a bit better suited for this workload. We are bottlenecked by network, not CPU load
    "worker_vm_types": "r6i.2xlarge",
    "compute_purchase_option": "spot_with_fallback",
}
cluster =  Cluster(
    **cluster_kwargs,
)
client = Client(cluster)

I suggest to not merge this PR since I do not consider having the workaround with map_partitions running regularly to be valuable. Once either P2P is more robust (cc @hendrikmakait ) OR dask/dask#9425 is closed we should revisit this because dask should then be able to just run a groupby.

@j-bennet j-bennet closed this Jun 20, 2023
@j-bennet j-bennet deleted the j-bennet/882-imbalanced-join-workflow branch June 20, 2023 15:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

workflows Related to representative Dask user workflows

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Imbalance join workflow

6 participants