diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index df25870680..eb2b85908b 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -69,6 +69,11 @@ snowflake: worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) +# For tests/workflows/test_imbalanced_join.py +imbalanced_join: + n_workers: 50 + worker_memory: "64 GiB" + # Specific tests test_work_stealing_on_scaling_up: n_workers: 1 diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py new file mode 100644 index 0000000000..0d6abd0e75 --- /dev/null +++ b/tests/workflows/test_imbalanced_join.py @@ -0,0 +1,37 @@ +""" +This data represents a skewed but realistic dataset that dask has been struggling with in the past. +Workflow based on https://github.com/coiled/imbalanced-join/blob/main/test_big_join_synthetic.ipynb +""" +import dask.dataframe as dd +import pytest + + +@pytest.mark.client("imbalanced_join") +def test_merge(client): + """Merge large df and small df""" + large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") + small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") + # large dataframe has known divisions, use those + # to ensure the data is partitioned as expected + divisions = list(range(1, 40002, 10)) + large_df = large_df.set_index("bucket", drop=False, divisions=divisions) + + group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] + res = large_df.merge( + right=small_df, how="inner", on=["key", "key"], suffixes=["_l", "_r"] + )[group_cols + ["value"]] + + # group and aggregate, use split_out so that the final data + # chunks don't end up aggregating on a single worker + # TODO: workers are still getting killed, even with split_out + # ( + # res.groupby(group_cols, sort=False) + # .agg({"value": "sum"}, split_out=4000, shuffle=shuffle_method) + # .value.sum() + # .compute() + # ) + + def aggregate_partition(part): + return part.groupby(group_cols, sort=False).agg({"value": "sum"}) + + res.map_partitions(aggregate_partition).value.sum().compute()