Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cluster_kwargs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ snowflake:
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)


# For tests/workflows/test_imbalanced_join.py
imbalanced_join:
n_workers: 50
worker_memory: "64 GiB"

# Specific tests
test_work_stealing_on_scaling_up:
n_workers: 1
Expand Down
37 changes: 37 additions & 0 deletions tests/workflows/test_imbalanced_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
This data represents a skewed but realistic dataset that dask has been struggling with in the past.
Workflow based on https://github.com/coiled/imbalanced-join/blob/main/test_big_join_synthetic.ipynb
"""
import dask.dataframe as dd
import pytest


@pytest.mark.client("imbalanced_join")
def test_merge(client):
"""Merge large df and small df"""
large_df = dd.read_parquet("s3://test-imbalanced-join/df1/")
small_df = dd.read_parquet("s3://test-imbalanced-join/df2/")
Comment on lines +12 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought (feel free to ignore) but these datasets seem opaque to me. I don't know how imbalanced they are, or really what this means. Probably the answer is to go to the repository and look at notes.

An alternative would be to use dask.datasets.timeseries and take the random float columns and put them through some transformation to control skew / cardinality / etc.. this would put more control of the situation into this benchmark.

Again, just a thought, not a request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would indeed be a nice utility function. Maybe even a core functionality of timeseries?
I like the idea but I suggest to decouple this from this PR

# large dataframe has known divisions, use those
# to ensure the data is partitioned as expected
divisions = list(range(1, 40002, 10))
large_df = large_df.set_index("bucket", drop=False, divisions=divisions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, when using an ordinary groupby instead of the map_partitions, this set_index is not required.


group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"]
res = large_df.merge(
right=small_df, how="inner", on=["key", "key"], suffixes=["_l", "_r"]
)[group_cols + ["value"]]

# group and aggregate, use split_out so that the final data
# chunks don't end up aggregating on a single worker
# TODO: workers are still getting killed, even with split_out
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to get rid of map_partitions in this workflow. Unfortunately, without map_partitions, even with split_out, I still have workers using too much memory and being killed. Somehow, I was able to get the same code to work on a Coiled cluster in a notebook with split_out=4000, but it might have been a lucky accident, because the workflow still errored.

Even when it worked in a notebook, the approach without map_partitions took 13 minutes to run, vs 4 minutes with map_partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_out=4000 kind of makes sense, or rather split_out=df.npartitions makes kind of sense for an aggregation where we do not expect any significant data reduction.

What was the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 4000 is the number of partitions. It was RuntimeError('shuffle_transfer failed during shuffle 3ae09c441cf66f1f37a5178dc6a4e5dc'). Cluster: https://cloud.coiled.io/clusters/230155/information?account=dask-benchmarks&computation=7b39851b-61cd-43e2-ac2d-cda7c01cbed9&sinceMs=1686864101945&untilMs=1686864482058

# (
# res.groupby(group_cols, sort=False)
# .agg({"value": "sum"}, split_out=4000, shuffle=shuffle_method)
# .value.sum()
# .compute()
# )

def aggregate_partition(part):
return part.groupby(group_cols, sort=False).agg({"value": "sum"})
Copy link
Contributor Author

@j-bennet j-bennet Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This groupby is not using shuffle. With shuffle="p2p", I kept getting errors:

RuntimeError('shuffle_barrier failed during shuffle 64ef1a2a338f4af61f6b9cb4ce441355')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this groupby is a pandas groupby. For actual P2P errors I would like us to investigate.

cc @hendrikmakait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. Of course it is. I was getting errors in the commented groupby.


res.map_partitions(aggregate_partition).value.sum().compute()