From e74504b70a57abfee58281d0aee1cd2f7d9f6c07 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 11:12:17 -0700 Subject: [PATCH 01/16] Add imbalanced join. --- cluster_kwargs.yaml | 6 ++++ tests/workflows/test_imbalanced_join.py | 39 +++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 tests/workflows/test_imbalanced_join.py diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index df25870680..627458af97 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -69,6 +69,12 @@ snowflake: worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) +# For tests/workflows/test_imbalanced_join.py +imbalanced_join: + n_workers: 50 + worker_memory: "64 GiB" + scheduler_memory: "32 GiB" + # Specific tests test_work_stealing_on_scaling_up: n_workers: 1 diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py new file mode 100644 index 0000000000..0e93b4d43d --- /dev/null +++ b/tests/workflows/test_imbalanced_join.py @@ -0,0 +1,39 @@ +""" +This data represents a skewed but realistic dataset that dask has been struggling with in the past. +Workflow based on https://github.com/coiled/imbalanced-join/blob/main/test_big_join_synthetic.ipynb +""" +import dask.dataframe as dd +import pytest +from dask.distributed import wait + +LARGE_DF = "s3://test-imbalanced-join/df1/" +SMALL_DF = "s3://test-imbalanced-join/df2/" + + +@pytest.fixture(scope="module") +@pytest.mark.client("imbalanced_join") +def large_df(client): + """Set index on the large dataframe""" + df = dd.read_parquet(LARGE_DF) + divisions = list(range(1, 40002, 10)) + df = df.set_index("bucket", drop=False, divisions=divisions) + df = df.persist() + wait(df) + yield df + + +@pytest.mark.client("imbalanced_join") +def test_merge(client, large_df): + """Merge large df and small df""" + group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] + small_df = dd.read_parquet(SMALL_DF) + + output = large_df.merge( + right=small_df, how="inner", on=["key", "key"], suffixes=["_l", "_r"] + )[group_cols + ["value"]] + + def aggregate_partition(part): + return part.groupby(group_cols, sort=False).agg({"value": "sum"}) + + output = output.map_partitions(aggregate_partition) + output["value"].sum().compute() From a776239f412d5038ad0aa3b9b32c52c6e14499a7 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 11:16:01 -0700 Subject: [PATCH 02/16] Temporarily comment other tests. --- .github/workflows/tests.yml | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9b76cdbff5..d917711b64 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,7 +33,7 @@ jobs: os: [ubuntu-latest] python-version: ["3.9"] pytest_args: [tests] - include: +# include: # Run stability tests on the lowest and highest versions of Python only # These are temporarily redundant with the current global python-version # - pytest_args: tests/stability @@ -42,19 +42,19 @@ jobs: # - pytest_args: tests/stability # python-version: "3.9" # os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - os: macos-latest +# - pytest_args: tests/stability +# python-version: "3.9" +# os: windows-latest +# - pytest_args: tests/stability +# python-version: "3.9" +# os: macos-latest steps: - name: Checkout @@ -108,7 +108,8 @@ jobs: DB_NAME: ${{ matrix.os }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} + # run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} + run: python -m pytest --run-workflows tests/workflows/test_imbalanced_join.py - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml From 7c7f9da260dc2f0e591e49ce29099b1e2fb5c1d1 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 12:19:06 -0700 Subject: [PATCH 03/16] Remove persist and map_partitions. --- tests/workflows/test_imbalanced_join.py | 29 +++++++------------------ 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 0e93b4d43d..0a3e929e05 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -4,36 +4,23 @@ """ import dask.dataframe as dd import pytest -from dask.distributed import wait LARGE_DF = "s3://test-imbalanced-join/df1/" SMALL_DF = "s3://test-imbalanced-join/df2/" -@pytest.fixture(scope="module") @pytest.mark.client("imbalanced_join") -def large_df(client): - """Set index on the large dataframe""" - df = dd.read_parquet(LARGE_DF) - divisions = list(range(1, 40002, 10)) - df = df.set_index("bucket", drop=False, divisions=divisions) - df = df.persist() - wait(df) - yield df - - -@pytest.mark.client("imbalanced_join") -def test_merge(client, large_df): +def test_merge(client): """Merge large df and small df""" - group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] + large_df = dd.read_parquet(LARGE_DF) small_df = dd.read_parquet(SMALL_DF) - output = large_df.merge( + group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] + + res = large_df.merge( right=small_df, how="inner", on=["key", "key"], suffixes=["_l", "_r"] )[group_cols + ["value"]] - def aggregate_partition(part): - return part.groupby(group_cols, sort=False).agg({"value": "sum"}) - - output = output.map_partitions(aggregate_partition) - output["value"].sum().compute() + res = res.groupby(group_cols, sort=False).agg({"value": "sum"}) + # evaluate the whole merged dataframe + res["value"].sum().compute() From 4e8b15b04a8b98723fcb754fa76b4594ccb2947a Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 12:35:30 -0700 Subject: [PATCH 04/16] scheduler_vm_types conflicts with scheduler_memory. --- cluster_kwargs.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 627458af97..a23dff655b 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -73,6 +73,7 @@ snowflake: imbalanced_join: n_workers: 50 worker_memory: "64 GiB" + scheduler_vm_types: null scheduler_memory: "32 GiB" # Specific tests From b4efbbc3e40981f943f2b24eaf048b52f1e81a28 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 13:57:18 -0700 Subject: [PATCH 05/16] Even simpler. --- tests/workflows/test_imbalanced_join.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 0a3e929e05..b3c7c22d53 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -5,15 +5,12 @@ import dask.dataframe as dd import pytest -LARGE_DF = "s3://test-imbalanced-join/df1/" -SMALL_DF = "s3://test-imbalanced-join/df2/" - @pytest.mark.client("imbalanced_join") def test_merge(client): """Merge large df and small df""" - large_df = dd.read_parquet(LARGE_DF) - small_df = dd.read_parquet(SMALL_DF) + large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") + small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] From 60f79748d5042818ac0005406a639fa592d1e723 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 14:13:44 -0700 Subject: [PATCH 06/16] Use a small subset of data. --- tests/workflows/test_imbalanced_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index b3c7c22d53..02bd11d83d 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -9,7 +9,7 @@ @pytest.mark.client("imbalanced_join") def test_merge(client): """Merge large df and small df""" - large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") + large_df = dd.read_parquet("s3://test-imbalanced-join/df1/key_000*.parquet") small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] From 8221054b37c674df445512036193bfed4f3df849 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 14:56:47 -0700 Subject: [PATCH 07/16] Uncomment tests. --- .github/workflows/tests.yml | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d917711b64..9b76cdbff5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,7 +33,7 @@ jobs: os: [ubuntu-latest] python-version: ["3.9"] pytest_args: [tests] -# include: + include: # Run stability tests on the lowest and highest versions of Python only # These are temporarily redundant with the current global python-version # - pytest_args: tests/stability @@ -42,19 +42,19 @@ jobs: # - pytest_args: tests/stability # python-version: "3.9" # os: ubuntu-latest -# - pytest_args: tests/stability -# python-version: "3.11" -# os: ubuntu-latest -# - pytest_args: tests/stability -# python-version: "3.11" -# os: ubuntu-latest + - pytest_args: tests/stability + python-version: "3.11" + os: ubuntu-latest + - pytest_args: tests/stability + python-version: "3.11" + os: ubuntu-latest # Run stability tests on Python Windows and MacOS (latest py39 only) -# - pytest_args: tests/stability -# python-version: "3.9" -# os: windows-latest -# - pytest_args: tests/stability -# python-version: "3.9" -# os: macos-latest + - pytest_args: tests/stability + python-version: "3.9" + os: windows-latest + - pytest_args: tests/stability + python-version: "3.9" + os: macos-latest steps: - name: Checkout @@ -108,8 +108,7 @@ jobs: DB_NAME: ${{ matrix.os }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - # run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - run: python -m pytest --run-workflows tests/workflows/test_imbalanced_join.py + run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml From 53625bbefd6c08662a37bf11f4501a37123fec1c Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Wed, 14 Jun 2023 17:50:40 -0700 Subject: [PATCH 08/16] With the full dataset. --- tests/workflows/test_imbalanced_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 02bd11d83d..b3c7c22d53 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -9,7 +9,7 @@ @pytest.mark.client("imbalanced_join") def test_merge(client): """Merge large df and small df""" - large_df = dd.read_parquet("s3://test-imbalanced-join/df1/key_000*.parquet") + large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] From 3f9bfb1615bfd23b306a982574baf58e5422f6ee Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 14:00:31 -0700 Subject: [PATCH 09/16] Add split_out. --- tests/workflows/test_imbalanced_join.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index b3c7c22d53..ba0277f09e 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -7,17 +7,27 @@ @pytest.mark.client("imbalanced_join") -def test_merge(client): +def test_merge(client, shuffle_method): """Merge large df and small df""" large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") - group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] + # this dataframe has known divisions, use those + # to ensure the data is partitioned as expected + divisions = list(range(1, 40002, 10)) + large_df = large_df.set_index("bucket", drop=False, divisions=divisions) + split_out = 4000 # same as number of partitions + group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] res = large_df.merge( right=small_df, how="inner", on=["key", "key"], suffixes=["_l", "_r"] )[group_cols + ["value"]] - res = res.groupby(group_cols, sort=False).agg({"value": "sum"}) - # evaluate the whole merged dataframe - res["value"].sum().compute() + # group and aggregate, use split_out so that the final data + # chunks don't end up aggregating on a single worker + ( + res.groupby(group_cols, sort=False) + .agg({"value": "sum"}, split_out=split_out, shuffle=shuffle_method) + .value.sum() + .compute() + ) From 356c66b77981bcbad628fae2eed21b8d3b619ae0 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 14:00:56 -0700 Subject: [PATCH 10/16] Comment other tests. --- .github/workflows/tests.yml | 47 +++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9b76cdbff5..c05857a8e0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,28 +33,28 @@ jobs: os: [ubuntu-latest] python-version: ["3.9"] pytest_args: [tests] - include: - # Run stability tests on the lowest and highest versions of Python only - # These are temporarily redundant with the current global python-version - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - # - pytest_args: tests/stability - # python-version: "3.9" - # os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.11" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - os: macos-latest +# include: +# # Run stability tests on the lowest and highest versions of Python only +# # These are temporarily redundant with the current global python-version +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# # - pytest_args: tests/stability +# # python-version: "3.9" +# # os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# - pytest_args: tests/stability +# python-version: "3.11" +# os: ubuntu-latest +# # Run stability tests on Python Windows and MacOS (latest py39 only) +# - pytest_args: tests/stability +# python-version: "3.9" +# os: windows-latest +# - pytest_args: tests/stability +# python-version: "3.9" +# os: macos-latest steps: - name: Checkout @@ -108,7 +108,8 @@ jobs: DB_NAME: ${{ matrix.os }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} + # run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} + run: python -m pytest --run-workflows tests/workflows/test_imbalanced_join.py - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml From 1f134a50b2f1cdd60a876f713c1d6bd242cf2ce8 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 15:55:44 -0700 Subject: [PATCH 11/16] Move dataframes into fixture. --- tests/workflows/test_imbalanced_join.py | 45 ++++++++++++++++++------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index ba0277f09e..2730965522 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -2,21 +2,31 @@ This data represents a skewed but realistic dataset that dask has been struggling with in the past. Workflow based on https://github.com/coiled/imbalanced-join/blob/main/test_big_join_synthetic.ipynb """ +from functools import partial + import dask.dataframe as dd import pytest +from distributed import wait @pytest.mark.client("imbalanced_join") -def test_merge(client, shuffle_method): - """Merge large df and small df""" +@pytest.fixture +def dataframes(client): large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") - small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") - - # this dataframe has known divisions, use those + small_df = dd.read_parquet("s3://test-imbalanced-join/df2/").persist() + # large dataframe has known divisions, use those # to ensure the data is partitioned as expected divisions = list(range(1, 40002, 10)) - large_df = large_df.set_index("bucket", drop=False, divisions=divisions) - split_out = 4000 # same as number of partitions + large_df = large_df.set_index("bucket", drop=False, divisions=divisions).persist() + wait(large_df, client, 10 * 60) + wait(small_df, client, 10 * 60) + yield large_df, small_df + + +@pytest.mark.client("imbalanced_join") +def test_merge(dataframes, shuffle_method): + """Merge large df and small df""" + large_df, small_df = dataframes group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] res = large_df.merge( @@ -25,9 +35,18 @@ def test_merge(client, shuffle_method): # group and aggregate, use split_out so that the final data # chunks don't end up aggregating on a single worker - ( - res.groupby(group_cols, sort=False) - .agg({"value": "sum"}, split_out=split_out, shuffle=shuffle_method) - .value.sum() - .compute() - ) + # TODO: workers are still getting killed, even with split_out + # ( + # res.groupby(group_cols, sort=False) + # .agg({"value": "sum"}, split_out=4000, shuffle=shuffle_method) + # .value.sum() + # .compute() + # ) + + def aggregate_partition(part, *, shuffle=None): + return part.groupby(group_cols, sort=False).agg( + {"value": "sum"}, shuffle=shuffle, split_out=100 + ) + + shuffle_aggregate = partial(aggregate_partition, shuffle=shuffle_method) + res.map_partitions(shuffle_aggregate).value.sum().compute() From 76e24b7d4df0d687c82cd18768d6fff517cbb3bc Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 16:19:21 -0700 Subject: [PATCH 12/16] Wrong wait. --- tests/workflows/test_imbalanced_join.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 2730965522..0098a921e5 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -6,18 +6,19 @@ import dask.dataframe as dd import pytest -from distributed import wait + +from ..utils_test import wait @pytest.mark.client("imbalanced_join") @pytest.fixture def dataframes(client): large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") - small_df = dd.read_parquet("s3://test-imbalanced-join/df2/").persist() + small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") # large dataframe has known divisions, use those # to ensure the data is partitioned as expected divisions = list(range(1, 40002, 10)) - large_df = large_df.set_index("bucket", drop=False, divisions=divisions).persist() + large_df = large_df.set_index("bucket", drop=False, divisions=divisions) wait(large_df, client, 10 * 60) wait(small_df, client, 10 * 60) yield large_df, small_df From 5769487d95b803baf7fbd682ed4cc11cba6063da Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 16:46:35 -0700 Subject: [PATCH 13/16] Let's make it simple. --- tests/workflows/test_imbalanced_join.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 0098a921e5..0ee261077c 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -7,27 +7,16 @@ import dask.dataframe as dd import pytest -from ..utils_test import wait - @pytest.mark.client("imbalanced_join") -@pytest.fixture -def dataframes(client): +def test_merge(client, shuffle_method): + """Merge large df and small df""" large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") # large dataframe has known divisions, use those # to ensure the data is partitioned as expected divisions = list(range(1, 40002, 10)) large_df = large_df.set_index("bucket", drop=False, divisions=divisions) - wait(large_df, client, 10 * 60) - wait(small_df, client, 10 * 60) - yield large_df, small_df - - -@pytest.mark.client("imbalanced_join") -def test_merge(dataframes, shuffle_method): - """Merge large df and small df""" - large_df, small_df = dataframes group_cols = ["df2_group", "bucket", "group1", "group2", "group3", "group4"] res = large_df.merge( From 31fdb5f5025f676a3870f8dde078c91c5c9d5edb Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 17:06:51 -0700 Subject: [PATCH 14/16] Remove shuffle. --- tests/workflows/test_imbalanced_join.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/workflows/test_imbalanced_join.py b/tests/workflows/test_imbalanced_join.py index 0ee261077c..0d6abd0e75 100644 --- a/tests/workflows/test_imbalanced_join.py +++ b/tests/workflows/test_imbalanced_join.py @@ -2,14 +2,12 @@ This data represents a skewed but realistic dataset that dask has been struggling with in the past. Workflow based on https://github.com/coiled/imbalanced-join/blob/main/test_big_join_synthetic.ipynb """ -from functools import partial - import dask.dataframe as dd import pytest @pytest.mark.client("imbalanced_join") -def test_merge(client, shuffle_method): +def test_merge(client): """Merge large df and small df""" large_df = dd.read_parquet("s3://test-imbalanced-join/df1/") small_df = dd.read_parquet("s3://test-imbalanced-join/df2/") @@ -33,10 +31,7 @@ def test_merge(client, shuffle_method): # .compute() # ) - def aggregate_partition(part, *, shuffle=None): - return part.groupby(group_cols, sort=False).agg( - {"value": "sum"}, shuffle=shuffle, split_out=100 - ) + def aggregate_partition(part): + return part.groupby(group_cols, sort=False).agg({"value": "sum"}) - shuffle_aggregate = partial(aggregate_partition, shuffle=shuffle_method) - res.map_partitions(shuffle_aggregate).value.sum().compute() + res.map_partitions(aggregate_partition).value.sum().compute() From 310059286770b81a96eacd882ae2649c89293a65 Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Thu, 15 Jun 2023 17:20:13 -0700 Subject: [PATCH 15/16] Uncomment tests. --- .github/workflows/tests.yml | 47 ++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c05857a8e0..9b76cdbff5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,28 +33,28 @@ jobs: os: [ubuntu-latest] python-version: ["3.9"] pytest_args: [tests] -# include: -# # Run stability tests on the lowest and highest versions of Python only -# # These are temporarily redundant with the current global python-version -# # - pytest_args: tests/stability -# # python-version: "3.9" -# # os: ubuntu-latest -# # - pytest_args: tests/stability -# # python-version: "3.9" -# # os: ubuntu-latest -# - pytest_args: tests/stability -# python-version: "3.11" -# os: ubuntu-latest -# - pytest_args: tests/stability -# python-version: "3.11" -# os: ubuntu-latest -# # Run stability tests on Python Windows and MacOS (latest py39 only) -# - pytest_args: tests/stability -# python-version: "3.9" -# os: windows-latest -# - pytest_args: tests/stability -# python-version: "3.9" -# os: macos-latest + include: + # Run stability tests on the lowest and highest versions of Python only + # These are temporarily redundant with the current global python-version + # - pytest_args: tests/stability + # python-version: "3.9" + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.9" + # os: ubuntu-latest + - pytest_args: tests/stability + python-version: "3.11" + os: ubuntu-latest + - pytest_args: tests/stability + python-version: "3.11" + os: ubuntu-latest + # Run stability tests on Python Windows and MacOS (latest py39 only) + - pytest_args: tests/stability + python-version: "3.9" + os: windows-latest + - pytest_args: tests/stability + python-version: "3.9" + os: macos-latest steps: - name: Checkout @@ -108,8 +108,7 @@ jobs: DB_NAME: ${{ matrix.os }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - # run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - run: python -m pytest --run-workflows tests/workflows/test_imbalanced_join.py + run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml From b933bfe6370abe37093c5efc0fbf150933ccdb7b Mon Sep 17 00:00:00 2001 From: Irina Truong Date: Fri, 16 Jun 2023 17:58:07 +0200 Subject: [PATCH 16/16] Scheduler memory is unnecessary Co-authored-by: Florian Jetter --- cluster_kwargs.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index a23dff655b..eb2b85908b 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -73,8 +73,6 @@ snowflake: imbalanced_join: n_workers: 50 worker_memory: "64 GiB" - scheduler_vm_types: null - scheduler_memory: "32 GiB" # Specific tests test_work_stealing_on_scaling_up: