diff --git a/.github/labeler.yml b/.github/labeler.yml new file mode 100644 index 00000000000..621d0fde833 --- /dev/null +++ b/.github/labeler.yml @@ -0,0 +1,30 @@ +# https://github.com/actions/labeler#common-examples +# Adapted from https://github.com/rapidsai/cugraph/blob/main/.github/CODEOWNERS +# Labels culled from https://github.com/rapidsai/cugraph/labels + +python: + - 'python/**' + - 'notebooks/**' + - 'benchmarks/**' + +doc: + - 'docs/**' + - '**/*.md' + +datasets: + - 'datasets/**' + +cuGraph: + - 'cpp/**' + +CMake: + - '**/CMakeLists.txt' + - '**/cmake/**' + +Ops: + - '.github/**' + - 'ci/**' + - 'conda/**' + - '**/Dockerfile' + - '**/.dockerignore' + - 'docker/**' diff --git a/CHANGELOG.md b/CHANGELOG.md index 42286c54df4..b22d92c902f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ## Improvements ## Bug Fixes +- PR #1321 Fix benchmark script trap setup to come after the PATH variable update # cuGraph 0.17.0 (10 Dec 2020) ## New Features diff --git a/ci/benchmark/build.sh b/ci/benchmark/build.sh index 5f74dca4044..921e96dbbb9 100644 --- a/ci/benchmark/build.sh +++ b/ci/benchmark/build.sh @@ -20,18 +20,18 @@ function cleanup { rm -f testoutput.txt } -# Set cleanup trap for Jenkins -if [ ! -z "$JENKINS_HOME" ] ; then - gpuci_logger "Jenkins environment detected, setting cleanup trap" - trap cleanup EXIT -fi - # Set path, build parallel level, and CUDA version cd $WORKSPACE export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} export CUDA_REL=${CUDA_VERSION%.*} +# Set cleanup trap for Jenkins +if [ ! -z "$JENKINS_HOME" ] ; then + gpuci_logger "Jenkins environment detected, setting cleanup trap" + trap cleanup EXIT +fi + # Set home export HOME=$WORKSPACE diff --git a/ci/checks/style.sh b/ci/checks/style.sh index 978ac03d85b..e590e4aafa7 100755 --- a/ci/checks/style.sh +++ b/ci/checks/style.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2018-2020, NVIDIA CORPORATION. +# Copyright (c) 2018-2021, NVIDIA CORPORATION. ######################## # cuGraph Style Tester # ######################## @@ -52,13 +52,14 @@ COPYRIGHT=`env PYTHONPATH=ci/utils python ci/checks/copyright.py --git-modified- CR_RETVAL=$? ERRORCODE=$((ERRORCODE | ${CR_RETVAL})) -# Output results if failure otherwise show pass if [ "$CR_RETVAL" != "0" ]; then echo -e "\n\n>>>> FAILED: copyright check; begin output\n\n" echo -e "$COPYRIGHT" echo -e "\n\n>>>> FAILED: copyright check; end output\n\n" else - echo -e "\n\n>>>> PASSED: copyright check\n\n" + echo -e "\n\n>>>> PASSED: copyright check; begin debug output\n\n" + echo -e "$COPYRIGHT" + echo -e "\n\n>>>> PASSED: copyright check; end debug output\n\n" fi exit ${ERRORCODE} diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 9dd6e14181e..019d03e21da 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -91,7 +91,7 @@ conda list --show-channel-urls if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then gpuci_logger "Build from source" - $WORKSPACE/build.sh -v clean libcugraph cugraph --allgpuarch + $WORKSPACE/build.sh -v clean libcugraph cugraph fi ################################################################################ diff --git a/ci/utils/git_helpers.py b/ci/utils/git_helpers.py index 83ad73fe283..a0c413b75f4 100644 --- a/ci/utils/git_helpers.py +++ b/ci/utils/git_helpers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# Copyright (c) 2019-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -59,14 +59,24 @@ def uncommittedFiles(): return ret -def changedFilesBetween(b1, b2): - """Returns a list of files changed between branches b1 and b2""" +def changedFilesBetween(baseName, branchName, commitHash): + """ + Returns a list of files changed between branches baseName and latest commit + of branchName. + """ current = branch() - __git("checkout", "--quiet", b1) - __git("checkout", "--quiet", b2) - files = __gitdiff("--name-only", "--ignore-submodules", "%s...%s" % - (b1, b2)) - __git("checkout", "--quiet", current) + # checkout "base" branch + __git("checkout", "--force", baseName) + # checkout branch for comparing + __git("checkout", "--force", branchName) + # checkout latest commit from branch + __git("checkout", "-fq", commitHash) + + files = __gitdiff("--name-only", "--ignore-submodules", + f"{baseName}..{branchName}") + + # restore the original branch + __git("checkout", "--force", current) return files.splitlines() @@ -87,10 +97,10 @@ def changesInFileBetween(file, b1, b2, pathFilter=None): def modifiedFiles(pathFilter=None): """ - If inside a CI-env (ie. currentBranch=current-pr-branch and the env-var - PR_TARGET_BRANCH is defined), then lists out all files modified between - these 2 branches. Else, lists out all the uncommitted files in the current - branch. + If inside a CI-env (ie. TARGET_BRANCH and COMMIT_HASH are defined, and + current branch is "current-pr-branch"), then lists out all files modified + between these 2 branches. Else, lists out all the uncommitted files in the + current branch. Such utility function is helpful while putting checker scripts as part of cmake, as well as CI process. This way, during development, only the files @@ -98,15 +108,26 @@ def modifiedFiles(pathFilter=None): process ALL files modified by the dev, as submiited in the PR, will be checked. This happens, all the while using the same script. """ - if "PR_TARGET_BRANCH" in os.environ and branch() == "current-pr-branch": - allFiles = changedFilesBetween(os.environ["PR_TARGET_BRANCH"], - branch()) + targetBranch = os.environ.get("TARGET_BRANCH") + commitHash = os.environ.get("COMMIT_HASH") + currentBranch = branch() + print(f" [DEBUG] TARGET_BRANCH={targetBranch}, COMMIT_HASH={commitHash}, " + f"currentBranch={currentBranch}") + + if targetBranch and commitHash and (currentBranch == "current-pr-branch"): + print(" [DEBUG] Assuming a CI environment.") + allFiles = changedFilesBetween(targetBranch, currentBranch, commitHash) else: + print(" [DEBUG] Did not detect CI environment.") allFiles = uncommittedFiles() + files = [] for f in allFiles: if pathFilter is None or pathFilter(f): files.append(f) + + filesToCheckString = "\n\t".join(files) if files else "" + print(f" [DEBUG] Found files to check:\n\t{filesToCheckString}\n") return files diff --git a/cpp/src/community/louvain.cu b/cpp/src/community/louvain.cu index 1044211a0ce..81a68a31663 100644 --- a/cpp/src/community/louvain.cu +++ b/cpp/src/community/louvain.cu @@ -15,19 +15,8 @@ */ #include - -// "FIXME": remove the guards after support for Pascal will be dropped; -// -// Disable louvain(experimenta::graph_view_t,...) -// versions for GPU architectures < 700 -//(this is because cuco/static_map.cuh would not -// compile on those) -// -#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 #include -#else #include -#endif namespace cugraph { @@ -58,13 +47,22 @@ std::pair louvain( { CUGRAPH_EXPECTS(clustering != nullptr, "Invalid input argument: clustering is null"); -#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 - CUGRAPH_FAIL("Louvain not supported on Pascal and older architectures"); -#else - experimental::Louvain> - runner(handle, graph_view); - return runner(clustering, max_level, resolution); -#endif + // "FIXME": remove this check and the guards below + // + // Disable louvain(experimental::graph_view_t,...) + // versions for GPU architectures < 700 + // (cuco/static_map.cuh depends on features not supported on or before Pascal) + // + cudaDeviceProp device_prop; + CUDA_CHECK(cudaGetDeviceProperties(&device_prop, 0)); + + if (device_prop.major < 7) { + CUGRAPH_FAIL("Louvain not supported on Pascal and older architectures"); + } else { + experimental::Louvain> + runner(handle, graph_view); + return runner(clustering, max_level, resolution); + } } } // namespace detail @@ -78,11 +76,7 @@ std::pair louvain(raft::handle_t const &h { CUGRAPH_EXPECTS(clustering != nullptr, "Invalid input argument: clustering is null"); -#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 - CUGRAPH_FAIL("Louvain not supported on Pascal and older architectures"); -#else return detail::louvain(handle, graph, clustering, max_level, resolution); -#endif } // Explicit template instantations diff --git a/cpp/src/experimental/louvain.cuh b/cpp/src/experimental/louvain.cuh index 1f6f8633bcd..08e52092362 100644 --- a/cpp/src/experimental/louvain.cuh +++ b/cpp/src/experimental/louvain.cuh @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -33,6 +32,23 @@ #include #include +// "FIXME": remove the guards below and references to CUCO_STATIC_MAP_DEFINED +// +// cuco/static_map.cuh depends on features not supported on or before Pascal. +// +// If we build for sm_60 or before, the inclusion of cuco/static_map.cuh wil +// result in compilation errors. +// +// If we're Pascal or before we do nothing here and will suppress including +// some code below. If we are later than Pascal we define CUCO_STATIC_MAP_DEFINED +// which will result in the full implementation being pulled in. +// +#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 +#else +#define CUCO_STATIC_MAP_DEFINED +#include +#endif + //#define TIMING #ifdef TIMING @@ -44,6 +60,7 @@ namespace experimental { namespace detail { +#ifdef CUCO_STATIC_MAP_DEFINED template struct create_cuco_pair_t { cuco::pair_type __device__ operator()(data_t data) @@ -54,6 +71,7 @@ struct create_cuco_pair_t { return tmp; } }; +#endif // // These classes should allow cuco::static_map to generate hash tables of @@ -443,7 +461,9 @@ class Louvain { weight_t resolution) { size_t num_level{0}; + weight_t best_modularity = weight_t{-1}; +#ifdef CUCO_STATIC_MAP_DEFINED weight_t total_edge_weight; total_edge_weight = experimental::transform_reduce_e( handle_, @@ -453,8 +473,6 @@ class Louvain { [] __device__(auto, auto, weight_t wt, auto, auto) { return wt; }, weight_t{0}); - weight_t best_modularity = weight_t{-1}; - // // Initialize every cluster to reference each vertex to itself // @@ -480,6 +498,7 @@ class Louvain { } timer_display(std::cout); +#endif return std::make_pair(num_level, best_modularity); } @@ -593,6 +612,7 @@ class Louvain { } } +#ifdef CUCO_STATIC_MAP_DEFINED virtual weight_t update_clustering(weight_t total_edge_weight, weight_t resolution) { timer_start("update_clustering"); @@ -1616,6 +1636,7 @@ class Louvain { cugraph::detail::offsets_to_indices( current_graph_view_.offsets(), local_num_rows_, src_indices_v_.data().get()); } +#endif std:: tuple, rmm::device_vector, rmm::device_vector> diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 593c36359e2..9b57ad4557c 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -302,12 +302,11 @@ ConfigureTest(EXPERIMENTAL_PAGERANK_TEST "${EXPERIMENTAL_PAGERANK_TEST_SRCS}" "" ################################################################################################### # - Experimental LOUVAIN tests ------------------------------------------------------------------- -# FIXME: Re-enable once failures are fixed -#set(EXPERIMENTAL_LOUVAIN_TEST_SRCS -# "${CMAKE_SOURCE_DIR}/../thirdparty/mmio/mmio.c" -# "${CMAKE_CURRENT_SOURCE_DIR}/experimental/louvain_test.cu") -# -#ConfigureTest(EXPERIMENTAL_LOUVAIN_TEST "${EXPERIMENTAL_LOUVAIN_TEST_SRCS}" "") +set(EXPERIMENTAL_LOUVAIN_TEST_SRCS + "${CMAKE_SOURCE_DIR}/../thirdparty/mmio/mmio.c" + "${CMAKE_CURRENT_SOURCE_DIR}/experimental/louvain_test.cu") + +ConfigureTest(EXPERIMENTAL_LOUVAIN_TEST "${EXPERIMENTAL_LOUVAIN_TEST_SRCS}" "") ################################################################################################### # - Experimental KATZ_CENTRALITY tests ------------------------------------------------------------ diff --git a/cpp/tests/experimental/bfs_test.cpp b/cpp/tests/experimental/bfs_test.cpp index 2498ca4f3f5..82286b1e2fa 100644 --- a/cpp/tests/experimental/bfs_test.cpp +++ b/cpp/tests/experimental/bfs_test.cpp @@ -33,8 +33,8 @@ #include template -void bfs_reference(edge_t* offsets, - vertex_t* indices, +void bfs_reference(edge_t const* offsets, + vertex_t const* indices, vertex_t* distances, vertex_t* predecessors, vertex_t num_vertices, diff --git a/cpp/tests/experimental/katz_centrality_test.cpp b/cpp/tests/experimental/katz_centrality_test.cpp index c2ac4340319..cdbe3688248 100644 --- a/cpp/tests/experimental/katz_centrality_test.cpp +++ b/cpp/tests/experimental/katz_centrality_test.cpp @@ -35,10 +35,10 @@ #include template -void katz_centrality_reference(edge_t* offsets, - vertex_t* indices, - weight_t* weights, - result_t* betas, +void katz_centrality_reference(edge_t const* offsets, + vertex_t const* indices, + weight_t const* weights, + result_t const* betas, result_t* katz_centralities, vertex_t num_vertices, result_t alpha, @@ -195,7 +195,8 @@ class Tests_KatzCentrality : public ::testing::TestWithParam(graph_view.get_number_of_vertices())) * threshold_ratio; + (1.0 / static_cast(graph_view.get_number_of_vertices())) * + threshold_ratio; // skip comparison for low Katz Centrality verties (lowly ranked vertices) auto nearly_equal = [threshold_ratio, threshold_magnitude](auto lhs, auto rhs) { auto diff = std::abs(lhs - rhs); return (diff < std::max(lhs, rhs) * threshold_ratio) || (diff < threshold_magnitude); diff --git a/cpp/tests/experimental/louvain_test.cu b/cpp/tests/experimental/louvain_test.cu index ce8fb55b1d8..4a47b1a1aca 100644 --- a/cpp/tests/experimental/louvain_test.cu +++ b/cpp/tests/experimental/louvain_test.cu @@ -17,11 +17,12 @@ #include #include -#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 +#include +#include +#include + #include -#else #include -#endif #include @@ -64,9 +65,6 @@ class Tests_Louvain : public ::testing::TestWithParam { template void run_current_test(Louvain_Usecase const& configuration) { -#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ < 700 - CUGRAPH_FAIL("Louvain not supported on Pascal and older architectures"); -#else raft::handle_t handle{}; std::cout << "read graph file: " << configuration.graph_file_full_path << std::endl; @@ -77,8 +75,19 @@ class Tests_Louvain : public ::testing::TestWithParam { auto graph_view = graph.view(); - louvain(graph_view); -#endif + // "FIXME": remove this check once we drop support for Pascal + // + // Calling louvain on Pascal will throw an exception, we'll check that + // this is the behavior while we still support Pascal (device_prop.major < 7) + // + cudaDeviceProp device_prop; + CUDA_CHECK(cudaGetDeviceProperties(&device_prop, 0)); + + if (device_prop.major < 7) { + EXPECT_THROW(louvain(graph_view), cugraph::logic_error); + } else { + louvain(graph_view); + } } template diff --git a/cpp/tests/experimental/pagerank_test.cpp b/cpp/tests/experimental/pagerank_test.cpp index 4763249aa9e..70c83ef8192 100644 --- a/cpp/tests/experimental/pagerank_test.cpp +++ b/cpp/tests/experimental/pagerank_test.cpp @@ -36,11 +36,11 @@ #include template -void pagerank_reference(edge_t* offsets, - vertex_t* indices, - weight_t* weights, - vertex_t* personalization_vertices, - result_t* personalization_values, +void pagerank_reference(edge_t const* offsets, + vertex_t const* indices, + weight_t const* weights, + vertex_t const* personalization_vertices, + result_t const* personalization_values, result_t* pageranks, vertex_t num_vertices, vertex_t personalization_vector_size, @@ -52,7 +52,11 @@ void pagerank_reference(edge_t* offsets, if (num_vertices == 0) { return; } if (has_initial_guess) { - auto sum = std::accumulate(pageranks, pageranks + num_vertices, result_t{0.0}); + // use a double type counter (instead of result_t) to accumulate as std::accumulate is + // inaccurate in adding a large number of comparably sized numbers. In C++17 or later, + // std::reduce may be a better option. + auto sum = + static_cast(std::accumulate(pageranks, pageranks + num_vertices, double{0.0})); ASSERT_TRUE(sum > 0.0); std::for_each(pageranks, pageranks + num_vertices, [sum](auto& val) { val /= sum; }); } else { @@ -61,13 +65,14 @@ void pagerank_reference(edge_t* offsets, }); } + result_t personalization_sum{0.0}; if (personalization_vertices != nullptr) { - auto sum = std::accumulate( - personalization_values, personalization_values + personalization_vector_size, result_t{0.0}); - ASSERT_TRUE(sum > 0.0); - std::for_each(personalization_values, - personalization_values + personalization_vector_size, - [sum](auto& val) { val /= sum; }); + // use a double type counter (instead of result_t) to accumulate as std::accumulate is + // inaccurate in adding a large number of comparably sized numbers. In C++17 or later, + // std::reduce may be a better option. + personalization_sum = static_cast(std::accumulate( + personalization_values, personalization_values + personalization_vector_size, double{0.0})); + ASSERT_TRUE(personalization_sum > 0.0); } std::vector out_weight_sums(num_vertices, result_t{0.0}); @@ -102,7 +107,8 @@ void pagerank_reference(edge_t* offsets, if (personalization_vertices != nullptr) { for (vertex_t i = 0; i < personalization_vector_size; ++i) { auto v = personalization_vertices[i]; - pageranks[v] += (dangling_sum * alpha + (1.0 - alpha)) * personalization_values[i]; + pageranks[v] += (dangling_sum * alpha + (1.0 - alpha)) * + (personalization_values[i] / personalization_sum); } } result_t diff_sum{0.0}; @@ -177,8 +183,7 @@ class Tests_PageRank : public ::testing::TestWithParam { std::vector h_personalization_vertices{}; std::vector h_personalization_values{}; if (configuration.personalization_ratio > 0.0) { - std::random_device r{}; - std::default_random_engine generator{r()}; + std::default_random_engine generator{}; std::uniform_real_distribution distribution{0.0, 1.0}; h_personalization_vertices.resize(graph_view.get_number_of_local_vertices()); std::iota(h_personalization_vertices.begin(), @@ -195,8 +200,11 @@ class Tests_PageRank : public ::testing::TestWithParam { std::for_each(h_personalization_values.begin(), h_personalization_values.end(), [&distribution, &generator](auto& val) { val = distribution(generator); }); - auto sum = std::accumulate( - h_personalization_values.begin(), h_personalization_values.end(), result_t{0.0}); + // use a double type counter (instead of result_t) to accumulate as std::accumulate is + // inaccurate in adding a large number of comparably sized numbers. In C++17 or later, + // std::reduce may be a better option. + auto sum = static_cast(std::accumulate( + h_personalization_values.begin(), h_personalization_values.end(), double{0.0})); std::for_each(h_personalization_values.begin(), h_personalization_values.end(), [sum](auto& val) { val /= sum; }); @@ -263,7 +271,8 @@ class Tests_PageRank : public ::testing::TestWithParam { auto threshold_ratio = 1e-3; auto threshold_magnitude = - (epsilon / static_cast(graph_view.get_number_of_vertices())) * threshold_ratio; + (1.0 / static_cast(graph_view.get_number_of_vertices())) * + threshold_ratio; // skip comparison for low PageRank verties (lowly ranked vertices) auto nearly_equal = [threshold_ratio, threshold_magnitude](auto lhs, auto rhs) { auto diff = std::abs(lhs - rhs); return (diff < std::max(lhs, rhs) * threshold_ratio) || (diff < threshold_magnitude); @@ -299,8 +308,7 @@ INSTANTIATE_TEST_CASE_P( PageRank_Usecase("test/datasets/ljournal-2008.mtx", 0.0, true), PageRank_Usecase("test/datasets/ljournal-2008.mtx", 0.5, true), PageRank_Usecase("test/datasets/webbase-1M.mtx", 0.0, false), - // FIXME: Re-enable test after failures are addressed - // PageRank_Usecase("test/datasets/webbase-1M.mtx", 0.5, false), + PageRank_Usecase("test/datasets/webbase-1M.mtx", 0.5, false), PageRank_Usecase("test/datasets/webbase-1M.mtx", 0.0, true), PageRank_Usecase("test/datasets/webbase-1M.mtx", 0.5, true))); diff --git a/cpp/tests/experimental/sssp_test.cpp b/cpp/tests/experimental/sssp_test.cpp index 49eaca56f56..2f7cc499d35 100644 --- a/cpp/tests/experimental/sssp_test.cpp +++ b/cpp/tests/experimental/sssp_test.cpp @@ -36,9 +36,9 @@ // Dijkstra's algorithm template -void sssp_reference(edge_t* offsets, - vertex_t* indices, - weight_t* weights, +void sssp_reference(edge_t const* offsets, + vertex_t const* indices, + weight_t const* weights, weight_t* distances, vertex_t* predecessors, vertex_t num_vertices, diff --git a/github/workflows/labeler.yml b/github/workflows/labeler.yml new file mode 100644 index 00000000000..23956a02fbd --- /dev/null +++ b/github/workflows/labeler.yml @@ -0,0 +1,11 @@ +name: "Pull Request Labeler" +on: +- pull_request_target + +jobs: + triage: + runs-on: ubuntu-latest + steps: + - uses: actions/labeler@main + with: + repo-token: "${{ secrets.GITHUB_TOKEN }}" diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index 0140c9f06f9..bbc914da502 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -21,8 +21,7 @@ import cugraph.comms.comms as Comms from cugraph.raft.dask.common.utils import get_client -from cugraph.dask.common.part_utils import (_extract_partitions, - load_balance_func) +from cugraph.dask.common.part_utils import _extract_partitions from dask.distributed import default_client from toolz import first from functools import reduce @@ -174,6 +173,14 @@ def calculate_local_data(self, comms, by): self.max_vertex_id = max_vid +def _get_local_data(df, by): + df = df[0] + num_local_edges = len(df) + local_by_max = df[by].iloc[-1] + local_max = df[['src', 'dst']].max().max() + return num_local_edges, local_by_max, local_max + + """ Internal methods, API subject to change """ @@ -198,28 +205,6 @@ def get_obj(x): return x[0] if multiple else x return total, reduce(lambda a, b: a + b, total) -def _get_local_data(df, by): - df = df[0] - num_local_edges = len(df) - local_by_max = df[by].iloc[-1] - local_max = df[['src', 'dst']].max().max() - return num_local_edges, local_by_max, local_max - - -def get_local_data(input_graph, by, load_balance=True): - input_graph.compute_renumber_edge_list(transposed=(by == 'dst')) - _ddf = input_graph.edgelist.edgelist_df - ddf = _ddf.sort_values(by=by, ignore_index=True) - - if load_balance: - ddf = load_balance_func(ddf, by=by) - - comms = Comms.get_comms() - data = DistributedDataHandler.create(data=ddf) - data.calculate_local_data(comms, by) - return data - - def get_mg_batch_data(dask_cudf_data): data = DistributedDataHandler.create(data=dask_cudf_data) return data diff --git a/python/cugraph/dask/common/mg_utils.py b/python/cugraph/dask/common/mg_utils.py index 7556afb122a..1651a9e800c 100644 --- a/python/cugraph/dask/common/mg_utils.py +++ b/python/cugraph/dask/common/mg_utils.py @@ -10,9 +10,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from cugraph.raft.dask.common.utils import default_client + +import os + import numba.cuda +from dask_cuda import LocalCUDACluster +from dask.distributed import Client + +from cugraph.raft.dask.common.utils import default_client +# FIXME: cugraph/__init__.py also imports the comms module, but +# depending on the import environment, cugraph/comms/__init__.py +# may be imported instead. The following imports the comms.py +# module directly +from cugraph.comms import comms as Comms + # FIXME: We currently look for the default client from dask, as such is the # if there is a dask client running without any GPU we will still try @@ -41,3 +53,36 @@ def is_single_gpu(): return False else: return True + + +def get_visible_devices(): + _visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES") + if _visible_devices is None: + # FIXME: We assume that if the variable is unset there is only one GPU + visible_devices = ["0"] + else: + visible_devices = _visible_devices.strip().split(",") + return visible_devices + + +def setup_local_dask_cluster(p2p=True): + """ + Performs steps to setup a Dask cluster using LocalCUDACluster and returns + the LocalCUDACluster and corresponding client instance. + """ + cluster = LocalCUDACluster() + client = Client(cluster) + client.wait_for_workers(len(get_visible_devices())) + Comms.initialize(p2p=p2p) + + return (cluster, client) + + +def teardown_local_dask_cluster(cluster, client): + """ + Performs steps to destroy a Dask cluster and a corresponding client + instance. + """ + Comms.destroy() + client.close() + cluster.close() diff --git a/python/cugraph/structure/graph.py b/python/cugraph/structure/graph.py index 53c3a4e656c..acc0ad8f066 100644 --- a/python/cugraph/structure/graph.py +++ b/python/cugraph/structure/graph.py @@ -14,7 +14,6 @@ from cugraph.structure import graph_primtypes_wrapper from cugraph.structure.symmetrize import symmetrize from cugraph.structure.number_map import NumberMap -from cugraph.dask.common.input_utils import get_local_data import cugraph.dask.common.mg_utils as mg_utils import cudf import dask_cudf @@ -653,32 +652,6 @@ def from_dask_cudf_edgelist( self.destination_columns = destination self.store_tranposed = None - def compute_local_data(self, by, load_balance=True): - """ - Compute the local edges, vertices and offsets for a distributed - graph stored as a dask-cudf dataframe and initialize the - communicator. Performs global sorting and load_balancing. - - Parameters - ---------- - by : str - by argument is the column by which we want to sort and - partition. It should be the source column name for generating - CSR format and destination column name for generating CSC - format. - load_balance : bool - Set as True to perform load_balancing after global sorting of - dask-cudf DataFrame. This ensures that the data is uniformly - distributed among multiple GPUs to avoid over-loading. - """ - if self.distributed: - data = get_local_data(self, by, load_balance) - self.local_data = {} - self.local_data["data"] = data - self.local_data["by"] = by - else: - raise Exception("Graph should be a distributed graph") - def view_edge_list(self): """ Display the edge list. Compute it if needed. @@ -1373,6 +1346,8 @@ def nodes(self): return self.renumber_map.implementation.df["0"] else: return cudf.concat([df["src"], df["dst"]]).unique() + if self.adjlist is not None: + return cudf.Series(np.arange(0, self.number_of_nodes())) if "all_nodes" in self._nodes.keys(): return self._nodes["all_nodes"] else: diff --git a/python/cugraph/structure/number_map.py b/python/cugraph/structure/number_map.py index b9ed8eb2e58..f1b8949eb5d 100644 --- a/python/cugraph/structure/number_map.py +++ b/python/cugraph/structure/number_map.py @@ -88,7 +88,7 @@ def __init__(self, df, src_col_names, dst_col_names, id_type, self.df[newname] = tmp[newname].append(tmp_dst[oldname]) self.df['count'] = tmp['count'].append(tmp_dst['count']) else: - for newname, oldname in zip(self.col_names, dst_col_names): + for newname in self.col_names: self.df[newname] = tmp[newname] self.df['count'] = tmp['count'] @@ -340,7 +340,7 @@ def compute(self): numbering_map, cudf.Series(base_addresses), val_types ) - self.ddf = numbering_map.persist() + self.ddf = numbering_map self.numbered = True def to_internal_vertex_id(self, ddf, col_names): diff --git a/python/cugraph/tests/dask/mg_context.py b/python/cugraph/tests/dask/mg_context.py index a72cf1c4b04..45dc75767fa 100644 --- a/python/cugraph/tests/dask/mg_context.py +++ b/python/cugraph/tests/dask/mg_context.py @@ -12,12 +12,15 @@ # limitations under the License. import time -import os + +import pytest from dask.distributed import Client + +from cugraph.dask.common.mg_utils import get_visible_devices from dask_cuda import LocalCUDACluster as CUDACluster import cugraph.comms as Comms -import pytest + # Maximal number of verifications of the number of workers DEFAULT_MAX_ATTEMPT = 100 @@ -26,22 +29,13 @@ DEFAULT_WAIT_TIME = 0.5 -def get_visible_devices(): - _visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES") - if _visible_devices is None: - # FIXME: We assume that if the variable is unset there is only one GPU - visible_devices = ["0"] - else: - visible_devices = _visible_devices.strip().split(",") - return visible_devices - - def skip_if_not_enough_devices(required_devices): - visible_devices = get_visible_devices() - number_of_visible_devices = len(visible_devices) - if required_devices > number_of_visible_devices: - pytest.skip("Not enough devices available to " - "test MG({})".format(required_devices)) + if required_devices is not None: + visible_devices = get_visible_devices() + number_of_visible_devices = len(visible_devices) + if required_devices > number_of_visible_devices: + pytest.skip("Not enough devices available to " + "test MG({})".format(required_devices)) class MGContext: diff --git a/python/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py b/python/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py index 4d04bf6df85..4b0f6629bc3 100644 --- a/python/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py +++ b/python/cugraph/tests/dask/test_mg_batch_betweenness_centrality.py @@ -37,7 +37,11 @@ # Parameters # ============================================================================= DATASETS = ["../datasets/karate.csv"] -MG_DEVICE_COUNT_OPTIONS = [1, 2, 3, 4] +MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count), + pytest.param(2, marks=pytest.mark.preset_gpu_count), + pytest.param(3, marks=pytest.mark.preset_gpu_count), + pytest.param(4, marks=pytest.mark.preset_gpu_count), + None] RESULT_DTYPE_OPTIONS = [np.float64] diff --git a/python/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py b/python/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py index 1e4a1950c53..54b58c340aa 100644 --- a/python/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py +++ b/python/cugraph/tests/dask/test_mg_batch_edge_betweenness_centrality.py @@ -37,7 +37,11 @@ # Parameters # ============================================================================= DATASETS = ["../datasets/karate.csv"] -MG_DEVICE_COUNT_OPTIONS = [1, 2, 4] +MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count), + pytest.param(2, marks=pytest.mark.preset_gpu_count), + pytest.param(3, marks=pytest.mark.preset_gpu_count), + pytest.param(4, marks=pytest.mark.preset_gpu_count), + None] RESULT_DTYPE_OPTIONS = [np.float64] diff --git a/python/cugraph/tests/dask/test_mg_bfs.py b/python/cugraph/tests/dask/test_mg_bfs.py index 553bbc698ff..63580461b17 100644 --- a/python/cugraph/tests/dask/test_mg_bfs.py +++ b/python/cugraph/tests/dask/test_mg_bfs.py @@ -12,28 +12,21 @@ # limitations under the License. import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import gc import pytest import cugraph import dask_cudf import cudf -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_comms.py b/python/cugraph/tests/dask/test_mg_comms.py index 29789461018..61a4944b5f1 100644 --- a/python/cugraph/tests/dask/test_mg_comms.py +++ b/python/cugraph/tests/dask/test_mg_comms.py @@ -12,28 +12,21 @@ # limitations under the License. import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import gc import pytest import cugraph import dask_cudf import cudf -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_degree.py b/python/cugraph/tests/dask/test_mg_degree.py index a6600104bc8..9f4c0d94319 100644 --- a/python/cugraph/tests/dask/test_mg_degree.py +++ b/python/cugraph/tests/dask/test_mg_degree.py @@ -11,30 +11,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dask.distributed import Client import gc import pytest import cudf -import cugraph.comms as Comms import cugraph import dask_cudf -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -# Move to conftest -from dask_cuda import LocalCUDACluster - -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_katz_centrality.py b/python/cugraph/tests/dask/test_mg_katz_centrality.py index 43d63f2fd5d..631457f7558 100644 --- a/python/cugraph/tests/dask/test_mg_katz_centrality.py +++ b/python/cugraph/tests/dask/test_mg_katz_centrality.py @@ -14,30 +14,20 @@ # import numpy as np import pytest import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import gc import cugraph import dask_cudf import cudf -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -# The function selects personalization_perc% of accessible vertices in graph M -# and randomly assigns them personalization values - -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_louvain.py b/python/cugraph/tests/dask/test_mg_louvain.py index 56401e338a4..a07eede8cb9 100644 --- a/python/cugraph/tests/dask/test_mg_louvain.py +++ b/python/cugraph/tests/dask/test_mg_louvain.py @@ -14,13 +14,13 @@ import pytest import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import cugraph import dask_cudf -from dask_cuda import LocalCUDACluster from cugraph.tests import utils -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.utilities.utils import is_device_version_less_than +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) try: from rapids_pytest_benchmark import setFixtureParamNames @@ -44,17 +44,9 @@ def setFixtureParamNames(*args, **kwargs): # Fixtures @pytest.fixture(scope="module") def client_connection(): - # setup - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - # teardown - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( @@ -93,11 +85,15 @@ def test_mg_louvain_with_edgevals(daskGraphFromDataset): # FIXME: daskGraphFromDataset returns a DiGraph, which Louvain is currently # accepting. In the future, an MNMG symmeterize will need to be called to # create a Graph for Louvain. - parts, mod = dcg.louvain(daskGraphFromDataset) - - # FIXME: either call Nx with the same dataset and compare results, or - # hadcode golden results to compare to. - print() - print(parts.compute()) - print(mod) - print() + if is_device_version_less_than((7, 0)): + with pytest.raises(RuntimeError): + parts, mod = dcg.louvain(daskGraphFromDataset) + else: + parts, mod = dcg.louvain(daskGraphFromDataset) + + # FIXME: either call Nx with the same dataset and compare results, or + # hardcode golden results to compare to. + print() + print(parts.compute()) + print(mod) + print() diff --git a/python/cugraph/tests/dask/test_mg_pagerank.py b/python/cugraph/tests/dask/test_mg_pagerank.py index f6416903b89..4f0b45242dd 100644 --- a/python/cugraph/tests/dask/test_mg_pagerank.py +++ b/python/cugraph/tests/dask/test_mg_pagerank.py @@ -13,19 +13,18 @@ import numpy as np import pytest import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import gc import cugraph import dask_cudf import cudf -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) + # The function selects personalization_perc% of accessible vertices in graph M # and randomly assigns them personalization values - def personalize(vertices, personalization_perc): personalization = None if personalization_perc != 0: @@ -52,17 +51,11 @@ def personalize(vertices, personalization_perc): PERSONALIZATION_PERC = [0, 10, 50] -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_renumber.py b/python/cugraph/tests/dask/test_mg_renumber.py index 8456241ff26..7f5cf6f08bc 100644 --- a/python/cugraph/tests/dask/test_mg_renumber.py +++ b/python/cugraph/tests/dask/test_mg_renumber.py @@ -20,29 +20,22 @@ import numpy as np import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import cugraph import dask_cudf import dask import cudf -from dask_cuda import LocalCUDACluster from cugraph.tests import utils from cugraph.structure.number_map import NumberMap -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) # Test all combinations of default/managed and pooled/non-pooled allocation @@ -195,9 +188,6 @@ def test_dask_pagerank(client_connection): dg = cugraph.DiGraph() dg.from_dask_cudf_edgelist(ddf, "src", "dst") - # Pre compute local data - # dg.compute_local_data(by='dst') - expected_pr = cugraph.pagerank(g) result_pr = dcg.pagerank(dg).compute() diff --git a/python/cugraph/tests/dask/test_mg_replication.py b/python/cugraph/tests/dask/test_mg_replication.py index 2b8510cd9ff..bb43d6c0f7a 100644 --- a/python/cugraph/tests/dask/test_mg_replication.py +++ b/python/cugraph/tests/dask/test_mg_replication.py @@ -24,8 +24,11 @@ DATASETS_OPTIONS = utils.DATASETS_SMALL DIRECTED_GRAPH_OPTIONS = [False, True] -# MG_DEVICE_COUNT_OPTIONS = [1, 2, 3, 4] -MG_DEVICE_COUNT_OPTIONS = [1] +MG_DEVICE_COUNT_OPTIONS = [pytest.param(1, marks=pytest.mark.preset_gpu_count), + pytest.param(2, marks=pytest.mark.preset_gpu_count), + pytest.param(3, marks=pytest.mark.preset_gpu_count), + pytest.param(4, marks=pytest.mark.preset_gpu_count), + None] @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_sssp.py b/python/cugraph/tests/dask/test_mg_sssp.py index ac4a60f1bdc..d75d76d7fd4 100644 --- a/python/cugraph/tests/dask/test_mg_sssp.py +++ b/python/cugraph/tests/dask/test_mg_sssp.py @@ -12,28 +12,21 @@ # limitations under the License. import cugraph.dask as dcg -import cugraph.comms as Comms -from dask.distributed import Client import gc import pytest import cugraph import dask_cudf import cudf -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/tests/dask/test_mg_utility.py b/python/cugraph/tests/dask/test_mg_utility.py index 808f1bcfa70..3217c1bef1a 100644 --- a/python/cugraph/tests/dask/test_mg_utility.py +++ b/python/cugraph/tests/dask/test_mg_utility.py @@ -12,16 +12,16 @@ # limitations under the License. import cugraph.dask as dcg -from dask.distributed import Client, default_client, futures_of, wait +from dask.distributed import default_client, futures_of, wait import gc import cugraph import dask_cudf -import cugraph.comms as Comms -from dask_cuda import LocalCUDACluster import pytest from cugraph.dask.common.part_utils import concat_within_workers from cugraph.dask.common.read_utils import get_n_workers -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) import os import time import numpy as np @@ -35,17 +35,11 @@ def setup_function(): gc.collect() -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( @@ -74,40 +68,6 @@ def test_from_edgelist(client_connection): assert dg1.EdgeList == dg2.EdgeList -@pytest.mark.skipif( - is_single_gpu(), reason="skipping MG testing on Single GPU system" -) -def test_compute_local_data(client_connection): - - input_data_path = r"../datasets/karate.csv" - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value" - ) - - # Compute_local_data - dg.compute_local_data(by="dst") - data = dg.local_data["data"] - by = dg.local_data["by"] - - assert by == "dst" - assert Comms.is_initialized() - - global_num_edges = data.local_data["edges"].sum() - assert global_num_edges == dg.number_of_edges() - global_num_verts = data.local_data["verts"].sum() - assert global_num_verts == dg.number_of_nodes() - - @pytest.mark.skipif( is_single_gpu(), reason="skipping MG testing on Single GPU system" ) diff --git a/python/cugraph/tests/test_renumber.py b/python/cugraph/tests/test_renumber.py index 91416942429..6f88d5f85c4 100644 --- a/python/cugraph/tests/test_renumber.py +++ b/python/cugraph/tests/test_renumber.py @@ -163,6 +163,40 @@ def test_renumber_negative_col(): # Test all combinations of default/managed and pooled/non-pooled allocation +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_renumber_series(graph_file): + gc.collect() + + M = utils.read_csv_for_nx(graph_file) + sources = cudf.Series(M["0"]) + destinations = cudf.Series(M["1"]) + + translate = 1000 + + df = cudf.DataFrame() + df["src"] = cudf.Series([x + translate for x in sources. + values_host]) + df["dst"] = cudf.Series([x + translate for x in destinations. + values_host]) + + numbering_series_1 = NumberMap() + numbering_series_1.from_series(df["src"]) + + numbering_series_2 = NumberMap() + numbering_series_2.from_series(df["dst"]) + + renumbered_src = numbering_series_1.add_internal_vertex_id( + df["src"], "src_id") + renumbered_dst = numbering_series_2.add_internal_vertex_id( + df["dst"], "dst_id") + + check_src = numbering_series_1.from_internal_vertex_id(renumbered_src, + "src_id") + check_dst = numbering_series_2.from_internal_vertex_id(renumbered_dst, + "dst_id") + + assert check_src["0_y"].equals(check_src["0_x"]) + assert check_dst["0_y"].equals(check_dst["0_x"]) @pytest.mark.parametrize("graph_file", utils.DATASETS) diff --git a/python/cugraph/tests/test_symmetrize.py b/python/cugraph/tests/test_symmetrize.py index 7ef8b33e97f..4080362ddfa 100644 --- a/python/cugraph/tests/test_symmetrize.py +++ b/python/cugraph/tests/test_symmetrize.py @@ -19,10 +19,9 @@ import cudf import cugraph from cugraph.tests import utils -import cugraph.comms as Comms -from dask.distributed import Client -from dask_cuda import LocalCUDACluster -from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.dask.common.mg_utils import (is_single_gpu, + setup_local_dask_cluster, + teardown_local_dask_cluster) def test_version(): @@ -188,17 +187,11 @@ def test_symmetrize_weighted(graph_file): compare(cu_M["0"], cu_M["1"], cu_M["2"], sym_src, sym_dst, sym_w) -@pytest.fixture +@pytest.fixture(scope="module") def client_connection(): - cluster = LocalCUDACluster() - client = Client(cluster) - Comms.initialize(p2p=True) - + (cluster, client) = setup_local_dask_cluster(p2p=True) yield client - - Comms.destroy() - client.close() - cluster.close() + teardown_local_dask_cluster(cluster, client) @pytest.mark.skipif( diff --git a/python/cugraph/utilities/utils.py b/python/cugraph/utilities/utils.py index f1a320cd1ef..b77f6789abe 100644 --- a/python/cugraph/utilities/utils.py +++ b/python/cugraph/utilities/utils.py @@ -14,6 +14,11 @@ from numba import cuda import cudf +from rmm._cuda.gpu import ( + getDeviceAttribute, + cudaDeviceAttr, +) + # optional dependencies try: @@ -182,6 +187,25 @@ def is_cuda_version_less_than(min_version=(10, 2)): return False +def is_device_version_less_than(min_version=(7, 0)): + """ + Returns True if the version of CUDA being used is less than min_version + """ + major_version = getDeviceAttribute( + cudaDeviceAttr.cudaDevAttrComputeCapabilityMajor, 0 + ) + minor_version = getDeviceAttribute( + cudaDeviceAttr.cudaDevAttrComputeCapabilityMinor, 0 + ) + if major_version > min_version[0]: + return False + if major_version < min_version[0]: + return True + if minor_version < min_version[1]: + return True + return False + + # FIXME: if G is a Nx type, the weight attribute is assumed to be "weight", if # set. An additional optional parameter for the weight attr name when accepting # Nx graphs may be needed. From the Nx docs: diff --git a/python/pytest.ini b/python/pytest.ini index 33c82fe48f7..fb8c6ea0948 100644 --- a/python/pytest.ini +++ b/python/pytest.ini @@ -11,6 +11,7 @@ markers = managedmem_off: RMM managed memory disabled poolallocator_on: RMM pool allocator enabled poolallocator_off: RMM pool allocator disabled + preset_gpu_count: Use a hard-coded number of GPUs for specific MG tests ETL: benchmarks for ETL steps small: small datasets tiny: tiny datasets diff --git a/python/setup.py b/python/setup.py index d99ff12cfa1..59292f32032 100644 --- a/python/setup.py +++ b/python/setup.py @@ -105,11 +105,11 @@ def run(self): "../thirdparty/cub", raft_include_dir, os.path.join( - conda_include_dir, "libcudf", "libcudacxx"), + conda_include_dir, "libcudacxx"), cuda_include_dir], library_dirs=[get_python_lib()], runtime_library_dirs=[conda_lib_dir], - libraries=['cugraph', 'cudf', 'nccl'], + libraries=['cugraph', 'nccl'], language='c++', extra_compile_args=['-std=c++14']) ]