diff --git a/CHANGELOG.md b/CHANGELOG.md index bd5b313e550..4170e9c4bc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,70 @@ Please see https://github.com/rapidsai/cugraph/releases/tag/v0.20.0a for the latest changes to this development branch. -# cuGraph 0.19.0 (Date TBD) - -Please see https://github.com/rapidsai/cugraph/releases/tag/v0.19.0a for the latest changes to this development branch. +# cuGraph 0.19.0 (21 Apr 2021) + +## 🐛 Bug Fixes + +- Fixed copyright date and format ([#1526](https://github.com//rapidsai/cugraph/pull/1526)) [@rlratzel](https://github.com/rlratzel) +- fix mg_renumber non-deterministic errors ([#1523](https://github.com//rapidsai/cugraph/pull/1523)) [@Iroy30](https://github.com/Iroy30) +- Updated NetworkX version to 2.5.1 ([#1510](https://github.com//rapidsai/cugraph/pull/1510)) [@rlratzel](https://github.com/rlratzel) +- pascal renumbering fix ([#1505](https://github.com//rapidsai/cugraph/pull/1505)) [@Iroy30](https://github.com/Iroy30) +- Fix MNMG test failures and skip tests that are not supported on Pascal ([#1498](https://github.com//rapidsai/cugraph/pull/1498)) [@jnke2016](https://github.com/jnke2016) +- Revert "Update conda recipes pinning of repo dependencies" ([#1493](https://github.com//rapidsai/cugraph/pull/1493)) [@raydouglass](https://github.com/raydouglass) +- Update conda recipes pinning of repo dependencies ([#1485](https://github.com//rapidsai/cugraph/pull/1485)) [@mike-wendt](https://github.com/mike-wendt) +- Update to make notebook_list.py compatible with numba 0.53 ([#1455](https://github.com//rapidsai/cugraph/pull/1455)) [@rlratzel](https://github.com/rlratzel) +- Fix bugs in copy_v_transform_reduce_key_aggregated_out_nbr & groupby_gpuid_and_shuffle ([#1434](https://github.com//rapidsai/cugraph/pull/1434)) [@seunghwak](https://github.com/seunghwak) +- update default path of setup to use the new directory paths in build … ([#1425](https://github.com//rapidsai/cugraph/pull/1425)) [@ChuckHastings](https://github.com/ChuckHastings) + +## 📖 Documentation + +- Create C++ documentation ([#1489](https://github.com//rapidsai/cugraph/pull/1489)) [@ChuckHastings](https://github.com/ChuckHastings) +- Create cuGraph developers guide ([#1431](https://github.com//rapidsai/cugraph/pull/1431)) [@ChuckHastings](https://github.com/ChuckHastings) +- Add boost 1.0 license file. ([#1401](https://github.com//rapidsai/cugraph/pull/1401)) [@seunghwak](https://github.com/seunghwak) + +## 🚀 New Features + +- Implement C/CUDA RandomWalks functionality ([#1439](https://github.com//rapidsai/cugraph/pull/1439)) [@aschaffer](https://github.com/aschaffer) +- Add R-mat generator ([#1411](https://github.com//rapidsai/cugraph/pull/1411)) [@seunghwak](https://github.com/seunghwak) + +## 🛠️ Improvements + +- Random Walks - Python Bindings ([#1516](https://github.com//rapidsai/cugraph/pull/1516)) [@jnke2016](https://github.com/jnke2016) +- Updating RAFT tag ([#1509](https://github.com//rapidsai/cugraph/pull/1509)) [@afender](https://github.com/afender) +- Clean up nullptr cuda_stream_view arguments ([#1504](https://github.com//rapidsai/cugraph/pull/1504)) [@hlinsen](https://github.com/hlinsen) +- Reduce the size of the cugraph libraries ([#1503](https://github.com//rapidsai/cugraph/pull/1503)) [@robertmaynard](https://github.com/robertmaynard) +- Add indirection and replace algorithms with new renumbering ([#1484](https://github.com//rapidsai/cugraph/pull/1484)) [@Iroy30](https://github.com/Iroy30) +- Multiple graph generator with power law distribution on sizes ([#1483](https://github.com//rapidsai/cugraph/pull/1483)) [@afender](https://github.com/afender) +- TSP solver bug fix ([#1480](https://github.com//rapidsai/cugraph/pull/1480)) [@hlinsen](https://github.com/hlinsen) +- Added cmake function and .hpp template for generating version_config.hpp file. ([#1476](https://github.com//rapidsai/cugraph/pull/1476)) [@rlratzel](https://github.com/rlratzel) +- Fix for bug in SCC on self-loops ([#1475](https://github.com//rapidsai/cugraph/pull/1475)) [@aschaffer](https://github.com/aschaffer) +- MS BFS python APIs + EgoNet updates ([#1469](https://github.com//rapidsai/cugraph/pull/1469)) [@afender](https://github.com/afender) +- Removed unused dependencies from libcugraph recipe, moved non-test script code from test script to gpu build script ([#1468](https://github.com//rapidsai/cugraph/pull/1468)) [@rlratzel](https://github.com/rlratzel) +- Remove literals passed to `device_uvector::set_element_async` ([#1453](https://github.com//rapidsai/cugraph/pull/1453)) [@harrism](https://github.com/harrism) +- ENH Change conda build directories to work with ccache ([#1452](https://github.com//rapidsai/cugraph/pull/1452)) [@dillon-cullinan](https://github.com/dillon-cullinan) +- Updating docs ([#1448](https://github.com//rapidsai/cugraph/pull/1448)) [@BradReesWork](https://github.com/BradReesWork) +- Improve graph primitives performance on graphs with widely varying vertex degrees ([#1447](https://github.com//rapidsai/cugraph/pull/1447)) [@seunghwak](https://github.com/seunghwak) +- Update Changelog Link ([#1446](https://github.com//rapidsai/cugraph/pull/1446)) [@ajschmidt8](https://github.com/ajschmidt8) +- Updated NCCL to version 2.8.4 ([#1445](https://github.com//rapidsai/cugraph/pull/1445)) [@BradReesWork](https://github.com/BradReesWork) +- Update FAISS to 1.7.0 ([#1444](https://github.com//rapidsai/cugraph/pull/1444)) [@BradReesWork](https://github.com/BradReesWork) +- Update graph partitioning scheme ([#1443](https://github.com//rapidsai/cugraph/pull/1443)) [@seunghwak](https://github.com/seunghwak) +- Add additional datasets to improve coverage ([#1441](https://github.com//rapidsai/cugraph/pull/1441)) [@jnke2016](https://github.com/jnke2016) +- Update C++ MG PageRank and SG PageRank, Katz Centrality, BFS, and SSSP to use the new R-mat graph generator ([#1438](https://github.com//rapidsai/cugraph/pull/1438)) [@seunghwak](https://github.com/seunghwak) +- Remove raft handle duplication ([#1436](https://github.com//rapidsai/cugraph/pull/1436)) [@Iroy30](https://github.com/Iroy30) +- Streams infra + support in egonet ([#1435](https://github.com//rapidsai/cugraph/pull/1435)) [@afender](https://github.com/afender) +- Prepare Changelog for Automation ([#1433](https://github.com//rapidsai/cugraph/pull/1433)) [@ajschmidt8](https://github.com/ajschmidt8) +- Update 0.18 changelog entry ([#1429](https://github.com//rapidsai/cugraph/pull/1429)) [@ajschmidt8](https://github.com/ajschmidt8) +- Update and Test Renumber bindings ([#1427](https://github.com//rapidsai/cugraph/pull/1427)) [@Iroy30](https://github.com/Iroy30) +- Update Louvain to use new graph primitives and pattern accelerators ([#1423](https://github.com//rapidsai/cugraph/pull/1423)) [@ChuckHastings](https://github.com/ChuckHastings) +- Replace rmm::device_vector & thrust::host_vector with rmm::device_uvector & std::vector, respectively. ([#1421](https://github.com//rapidsai/cugraph/pull/1421)) [@seunghwak](https://github.com/seunghwak) +- Update C++ MG PageRank test ([#1419](https://github.com//rapidsai/cugraph/pull/1419)) [@seunghwak](https://github.com/seunghwak) +- ENH Build with `cmake --build` & Pass ccache variables to conda recipe & use Ninja in CI ([#1415](https://github.com//rapidsai/cugraph/pull/1415)) [@Ethyling](https://github.com/Ethyling) +- Adding new primitives: copy_v_transform_reduce_key_aggregated_out_nbr & transform_reduce_by_adj_matrix_row|col_key_e bug fixes ([#1399](https://github.com//rapidsai/cugraph/pull/1399)) [@seunghwak](https://github.com/seunghwak) +- Add new primitives: compute_in|out_degrees, compute_in|out_weight_sums to graph_view_t ([#1394](https://github.com//rapidsai/cugraph/pull/1394)) [@seunghwak](https://github.com/seunghwak) +- Rename sort_and_shuffle to groupby_gpuid_and_shuffle ([#1392](https://github.com//rapidsai/cugraph/pull/1392)) [@seunghwak](https://github.com/seunghwak) +- Matching updates for RAFT comms updates (device_sendrecv, device_multicast_sendrecv, gather, gatherv) ([#1391](https://github.com//rapidsai/cugraph/pull/1391)) [@seunghwak](https://github.com/seunghwak) +- Fix forward-merge conflicts for #1370 ([#1377](https://github.com//rapidsai/cugraph/pull/1377)) [@ajschmidt8](https://github.com/ajschmidt8) +- Add utility function for computing a secondary cost for BFS and SSSP output ([#1376](https://github.com//rapidsai/cugraph/pull/1376)) [@hlinsen](https://github.com/hlinsen) # cuGraph 0.18.0 (24 Feb 2021) diff --git a/README.md b/README.md index c5785e6cb08..2e94bd87d34 100644 --- a/README.md +++ b/README.md @@ -152,13 +152,13 @@ Install and update cuGraph using the conda command: ```bash # CUDA 11.0 -conda install -c nvidia -c rapidsai -c numba -c conda-forge -c defaults cugraph cudatoolkit=11.0 +conda install -c nvidia -c rapidsai -c numba -c conda-forge cugraph cudatoolkit=11.0 # CUDA 11.1 -conda install -c nvidia -c rapidsai -c numba -c conda-forge -c defaults cugraph cudatoolkit=11.1 +conda install -c nvidia -c rapidsai -c numba -c conda-forge cugraph cudatoolkit=11.1 # CUDA 11.2 -conda install -c nvidia -c rapidsai -c numba -c conda-forge -c defaults cugraph cudatoolkit=11.2 +conda install -c nvidia -c rapidsai -c numba -c conda-forge cugraph cudatoolkit=11.2 ``` Note: This conda installation only applies to Linux and Python versions 3.7/3.8. diff --git a/ci/benchmark/build.sh b/ci/benchmark/build.sh index 921e96dbbb9..d48f475f2eb 100644 --- a/ci/benchmark/build.sh +++ b/ci/benchmark/build.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Copyright (c) 2018-2020, NVIDIA CORPORATION. +# Copyright (c) 2018-2021, NVIDIA CORPORATION. ########################################## # cuGraph Benchmark test script for CI # ########################################## @@ -68,7 +68,7 @@ CUGRAPH_DEPS=(cudf rmm) LIBCUGRAPH_DEPS=(cudf rmm) gpuci_logger "Install required packages" -gpuci_conda_retry install -c nvidia -c rapidsai -c rapidsai-nightly -c conda-forge -c defaults \ +gpuci_conda_retry install -c nvidia -c rapidsai -c rapidsai-nightly -c conda-forge \ "cudf=${MINOR_VERSION}" \ "rmm=${MINOR_VERSION}" \ "cudatoolkit=$CUDA_REL" \ diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh index 8d12b10a640..4f46938ee49 100755 --- a/ci/cpu/build.sh +++ b/ci/cpu/build.sh @@ -39,6 +39,11 @@ gpuci_logger "Activate conda env" . /opt/conda/etc/profile.d/conda.sh conda activate rapids +# Remove rapidsai-nightly channel if we are building main branch +if [ "$SOURCE_BRANCH" = "main" ]; then + conda config --system --remove channels rapidsai-nightly +fi + gpuci_logger "Check versions" python --version $CC --version diff --git a/ci/cpu/upload.sh b/ci/cpu/upload.sh index 50e4c25b90b..a333d8828d8 100644 --- a/ci/cpu/upload.sh +++ b/ci/cpu/upload.sh @@ -43,13 +43,13 @@ if [[ "$BUILD_LIBCUGRAPH" == "1" && "$UPLOAD_LIBCUGRAPH" == "1" ]]; then test -e ${LIBCUGRAPH_FILE} echo "Upload libcugraph" echo ${LIBCUGRAPH_FILE} - gpuci_retry anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --skip-existing ${LIBCUGRAPH_FILE} + gpuci_retry anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --skip-existing ${LIBCUGRAPH_FILE} --no-progress fi if [[ "$BUILD_CUGRAPH" == "1" && "$UPLOAD_CUGRAPH" == "1" ]]; then test -e ${CUGRAPH_FILE} echo "Upload cugraph" echo ${CUGRAPH_FILE} - gpuci_retry anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --skip-existing ${CUGRAPH_FILE} + gpuci_retry anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --skip-existing ${CUGRAPH_FILE} --no-progress fi diff --git a/conda/recipes/cugraph/meta.yaml b/conda/recipes/cugraph/meta.yaml index 1ef64ddbe72..c687e57b74f 100644 --- a/conda/recipes/cugraph/meta.yaml +++ b/conda/recipes/cugraph/meta.yaml @@ -1,7 +1,7 @@ # Copyright (c) 2018-2021, NVIDIA CORPORATION. # Usage: -# conda build -c nvidia -c rapidsai -c conda-forge -c defaults . +# conda build -c nvidia -c rapidsai -c conda-forge . {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} {% set py_version=environ.get('CONDA_PY', 36) %} diff --git a/conda/recipes/libcugraph/meta.yaml b/conda/recipes/libcugraph/meta.yaml index 2602b2d8608..71b22c8cf1b 100644 --- a/conda/recipes/libcugraph/meta.yaml +++ b/conda/recipes/libcugraph/meta.yaml @@ -1,7 +1,7 @@ # Copyright (c) 2018-2021, NVIDIA CORPORATION. # Usage: -# conda build -c nvidia -c rapidsai -c conda-forge -c defaults . +# conda build -c nvidia -c rapidsai -c conda-forge . {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} {% set cuda_version='.'.join(environ.get('CUDA', '9.2').split('.')[:2]) %} diff --git a/conda_build.sh b/conda_build.sh index 4643e302f5c..1254b7d8d5a 100755 --- a/conda_build.sh +++ b/conda_build.sh @@ -1,13 +1,15 @@ #!/usr/bin/env bash +# Copyright (c) 2021, NVIDIA CORPORATION + set -xe CUDA_REL=${CUDA_VERSION%.*} conda install conda-build anaconda-client conda-verify -y -conda build -c nvidia -c rapidsai -c rapidsai-nightly/label/cuda${CUDA_REL} -c conda-forge -c defaults --python=${PYTHON} conda/recipes/cugraph +conda build -c nvidia -c rapidsai -c rapidsai-nightly/label/cuda${CUDA_REL} -c conda-forge --python=${PYTHON} conda/recipes/cugraph if [ "$UPLOAD_PACKAGE" == '1' ]; then - export UPLOADFILE=`conda build -c nvidia -c rapidsai -c conda-forge -c defaults --python=${PYTHON} conda/recipes/cugraph --output` + export UPLOADFILE=`conda build -c nvidia -c rapidsai -c conda-forge --python=${PYTHON} conda/recipes/cugraph --output` SOURCE_BRANCH=main test -e ${UPLOADFILE} @@ -26,7 +28,7 @@ if [ "$UPLOAD_PACKAGE" == '1' ]; then echo "Upload" echo ${UPLOADFILE} - anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --force ${UPLOADFILE} + anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME:-rapidsai} ${LABEL_OPTION} --force ${UPLOADFILE} --no-progress else echo "Skipping upload" fi diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3f421da5e19..6b638441a5b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -276,7 +276,6 @@ FetchContent_Declare( cuhornet GIT_REPOSITORY https://github.com/rapidsai/cuhornet.git GIT_TAG 6d2fc894cc56dd2ca8fc9d1523a18a6ec444b663 - GIT_SHALLOW true SOURCE_SUBDIR hornet ) @@ -439,6 +438,7 @@ add_library(cugraph SHARED src/experimental/pagerank.cu src/experimental/katz_centrality.cu src/tree/mst.cu + src/utilities/host_barrier.cpp ) target_link_directories(cugraph diff --git a/cpp/include/algorithms.hpp b/cpp/include/algorithms.hpp index 7a7a0219d74..9f1cb02df0c 100644 --- a/cpp/include/algorithms.hpp +++ b/cpp/include/algorithms.hpp @@ -1265,11 +1265,19 @@ extract_ego(raft::handle_t const &handle, * @param ptr_d_start Device pointer to set of starting vertex indices for the RW. * @param num_paths = number(paths). * @param max_depth maximum length of RWs. - * @return std::tuple, device_vec_t, - * device_vec_t> Triplet of coalesced RW paths, with corresponding edge weights for - * each, and corresponding path sizes. This is meant to minimize the number of DF's to be passed to - * the Python layer. The meaning of "coalesced" here is that a 2D array of paths of different sizes - * is represented as a 1D array. + * @param use_padding (optional) specifies if return uses padded format (true), or coalesced + * (compressed) format; when padding is used the output is a matrix of vertex paths and a matrix of + * edges paths (weights); in this case the matrices are stored in row major order; the vertex path + * matrix is padded with `num_vertices` values and the weight matrix is padded with `0` values; + * @return std::tuple, rmm::device_uvector, + * rmm::device_uvector> Triplet of either padded or coalesced RW paths; in the coalesced + * case (default), the return consists of corresponding vertex and edge weights for each, and + * corresponding path sizes. This is meant to minimize the number of DF's to be passed to the Python + * layer. The meaning of "coalesced" here is that a 2D array of paths of different sizes is + * represented as a 1D contiguous array. In the padded case the return is a matrix of num_paths x + * max_depth vertex paths; and num_paths x (max_depth-1) edge (weight) paths, with an empty array of + * sizes. Note: if the graph is un-weighted the edge (weight) paths consists of `weight_t{1}` + * entries; */ template std::tuple, @@ -1279,7 +1287,8 @@ random_walks(raft::handle_t const &handle, graph_t const &graph, typename graph_t::vertex_type const *ptr_d_start, index_t num_paths, - index_t max_depth); + index_t max_depth, + bool use_padding = false); } // namespace experimental } // namespace cugraph diff --git a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh index ca20b9a1285..26a4eed4213 100644 --- a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh +++ b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -60,6 +61,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + std::vector rx_counts(col_comm_size, size_t{0}); std::vector displacements(col_comm_size, size_t{0}); for (int i = 0; i < col_comm_size; ++i) { @@ -72,6 +84,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, rx_counts, displacements, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols() @@ -106,6 +129,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto rx_counts = host_scalar_allgather(col_comm, static_cast(thrust::distance(vertex_first, vertex_last)), @@ -171,6 +205,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, matrix_major_value_output_first + matrix_partition.get_major_value_start_offset()); } } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols() @@ -202,6 +247,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + std::vector rx_counts(row_comm_size, size_t{0}); std::vector displacements(row_comm_size, size_t{0}); for (int i = 0; i < row_comm_size; ++i) { @@ -214,6 +270,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, rx_counts, displacements, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_rows() @@ -248,6 +315,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto rx_counts = host_scalar_allgather(row_comm, static_cast(thrust::distance(vertex_first, vertex_last)), @@ -310,6 +388,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, matrix_minor_value_output_first); } } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == graph_view.get_number_of_local_adj_matrix_partition_rows()); diff --git a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh index 6d828dab513..6aded0eccf0 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -496,6 +497,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, } if (GraphViewType::is_multi_gpu && update_major) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); @@ -503,6 +505,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + device_reduce(col_comm, major_buffer_first, vertex_value_output_first, @@ -510,6 +523,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, raft::comms::op_t::SUM, i, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } } @@ -523,6 +547,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + for (int i = 0; i < row_comm_size; ++i) { auto offset = (graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size + i) - graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size)); @@ -535,6 +570,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, i, handle.get_stream()); } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } } diff --git a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index f6eac67e4e7..9a1d9fea24c 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -211,10 +212,22 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( auto kv_map_ptr = std::make_unique>( size_t{0}, invalid_vertex_id::value, invalid_vertex_id::value); if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto map_counts = host_scalar_allgather(row_comm, static_cast(thrust::distance(map_key_first, map_key_last)), @@ -292,6 +305,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( // 2. aggregate each vertex out-going edges based on keys and transform-reduce. + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector major_vertices(0, handle.get_stream()); auto e_op_result_buffer = allocate_dataframe_buffer(0, handle.get_stream()); for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { @@ -436,6 +464,9 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( // FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op // can be mapped to ncclRedOp_t). + // FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to + // remote memory allocation), this barrier is unnecessary otherwise. + col_comm.barrier(); auto rx_sizes = host_scalar_gather(col_comm, tmp_major_vertices.size(), i, handle.get_stream()); std::vector rx_displs{}; @@ -475,6 +506,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( } } + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), vertex_value_output_first, vertex_value_output_first + graph_view.get_number_of_local_vertices(), diff --git a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh index 3d87f19969e..4f3925f7d4c 100644 --- a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -403,6 +404,21 @@ void update_frontier_v_push_if_out_nbr( // 1. fill the buffer + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector keys(size_t{0}, handle.get_stream()); auto payload_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); rmm::device_scalar buffer_idx(size_t{0}, handle.get_stream()); @@ -585,6 +601,21 @@ void update_frontier_v_push_if_out_nbr( } } + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + // 2. reduce the buffer auto num_buffer_elements = @@ -596,13 +627,21 @@ void update_frontier_v_push_if_out_nbr( if (GraphViewType::is_multi_gpu) { // FIXME: this step is unnecessary if row_comm_size== 1 auto& comm = handle.get_comms(); - auto const comm_rank = comm.get_rank(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - auto const col_comm_size = col_comm.get_size(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif std::vector h_vertex_lasts(row_comm_size); for (size_t i = 0; i < h_vertex_lasts.size(); ++i) { @@ -649,6 +688,17 @@ void update_frontier_v_push_if_out_nbr( get_dataframe_buffer_begin(payload_buffer), keys.size(), reduce_op); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } // 3. update vertex properties @@ -753,7 +803,7 @@ void update_frontier_v_push_if_out_nbr( } } } -} +} // namespace experimental } // namespace experimental } // namespace cugraph diff --git a/cpp/include/utilities/host_barrier.hpp b/cpp/include/utilities/host_barrier.hpp new file mode 100644 index 00000000000..11803a7bde4 --- /dev/null +++ b/cpp/include/utilities/host_barrier.hpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace cugraph { +namespace experimental { + +// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and +// MPI barrier for MPI) +void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view); + +} // namespace experimental +} // namespace cugraph diff --git a/cpp/include/utilities/path_retrieval.hpp b/cpp/include/utilities/path_retrieval.hpp index fd0d36b67d6..4d1b6a1b4d2 100644 --- a/cpp/include/utilities/path_retrieval.hpp +++ b/cpp/include/utilities/path_retrieval.hpp @@ -67,5 +67,21 @@ std:: index_t num_paths, rmm::device_buffer &&d_coalesced_v, rmm::device_buffer &&d_sizes); + +/** + * @brief returns additional RW information on vertex paths offsets and weight path sizes and + * offsets, for the coalesced case (the padded case does not need or provide this information) + * + * @tparam index_t Type used to store indexing and sizes. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param num_paths number of paths. + * @param ptr_d_sizes sizes of vertex paths. + * @return tuple of (vertex_path_offsets, weight_path_sizes, weight_path_offsets), where offsets are + * exclusive scan of corresponding sizes. + */ +template +std::tuple, rmm::device_uvector, rmm::device_uvector> +query_rw_sizes_offsets(raft::handle_t const &handle, index_t num_paths, index_t const *ptr_d_sizes); } // namespace experimental } // namespace cugraph diff --git a/cpp/include/utilities/shuffle_comm.cuh b/cpp/include/utilities/shuffle_comm.cuh index b318009d9bf..b42b9ad06bb 100644 --- a/cpp/include/utilities/shuffle_comm.cuh +++ b/cpp/include/utilities/shuffle_comm.cuh @@ -73,6 +73,10 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const &comm, rx_offsets, rx_src_ranks, stream); + // FIXME: temporary unverified work-around for a NCCL (2.9.6) bug that causes a hang on DGX1 (due + // to remote memory allocation), this synchronization is unnecessary otherwise but seems like + // suppress the hange issue. Need to be revisited once NCCL 2.10 is released. + CUDA_TRY(cudaDeviceSynchronize()); raft::update_host(tx_counts.data(), d_tx_value_counts.data(), comm_size, stream); raft::update_host(rx_counts.data(), d_rx_value_counts.data(), comm_size, stream); @@ -201,8 +205,6 @@ auto shuffle_values(raft::comms::comms_t const &comm, rmm::device_uvector d_tx_value_counts(comm_size, stream); raft::update_device(d_tx_value_counts.data(), tx_value_counts.data(), comm_size, stream); - CUDA_TRY(cudaStreamSynchronize(stream)); // tx_value_counts should be up-to-date - std::vector tx_counts{}; std::vector tx_offsets{}; std::vector tx_dst_ranks{}; diff --git a/cpp/src/experimental/coarsen_graph.cu b/cpp/src/experimental/coarsen_graph.cu index 1eccbd23584..6397f92e336 100644 --- a/cpp/src/experimental/coarsen_graph.cu +++ b/cpp/src/experimental/coarsen_graph.cu @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -269,6 +270,16 @@ coarsen_graph( for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { // 1-1. locally construct coarsened edge list + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif rmm::device_uvector major_labels( store_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols(i) : graph_view.get_number_of_local_adj_matrix_partition_rows(i), @@ -285,6 +296,16 @@ coarsen_graph( major_labels.size(), static_cast(i), handle.get_stream()); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif rmm::device_uvector edgelist_major_vertices(0, handle.get_stream()); rmm::device_uvector edgelist_minor_vertices(0, handle.get_stream()); diff --git a/cpp/src/experimental/generate_rmat_edgelist.cu b/cpp/src/experimental/generate_rmat_edgelist.cu index d75a4654a15..f00443a0596 100644 --- a/cpp/src/experimental/generate_rmat_edgelist.cu +++ b/cpp/src/experimental/generate_rmat_edgelist.cu @@ -137,8 +137,9 @@ generate_rmat_edgelists(raft::handle_t const& handle, bool scramble_vertex_ids) { CUGRAPH_EXPECTS(min_scale > 0, "minimum graph scale is 1."); - CUGRAPH_EXPECTS(size_t{1} << max_scale <= std::numeric_limits::max(), - "Invalid input argument: scale too large for vertex_t."); + CUGRAPH_EXPECTS( + size_t{1} << max_scale <= static_cast(std::numeric_limits::max()), + "Invalid input argument: scale too large for vertex_t."); std::vector, rmm::device_uvector>> output{}; output.reserve(n_edgelists); diff --git a/cpp/src/experimental/renumber_edgelist.cu b/cpp/src/experimental/renumber_edgelist.cu index dbf0250b88a..01022e8fa6d 100644 --- a/cpp/src/experimental/renumber_edgelist.cu +++ b/cpp/src/experimental/renumber_edgelist.cu @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,22 @@ rmm::device_uvector compute_renumber_map( // 1. acquire (unique major label, count) pairs + if (multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + ; + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector major_labels(0, handle.get_stream()); rmm::device_uvector major_counts(0, handle.get_stream()); for (size_t i = 0; i < edgelist_major_vertices.size(); ++i) { @@ -71,6 +88,7 @@ rmm::device_uvector compute_renumber_map( edgelist_major_vertices[i], edgelist_major_vertices[i] + edgelist_edge_counts[i], sorted_major_labels.begin()); + // FIXME: better refactor this sort-count_if-reduce_by_key routine for reuse thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), sorted_major_labels.begin(), sorted_major_labels.end()); @@ -98,6 +116,9 @@ rmm::device_uvector compute_renumber_map( rmm::device_uvector rx_major_labels(0, handle.get_stream()); rmm::device_uvector rx_major_counts(0, handle.get_stream()); + // FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to + // remote memory allocation), this barrier is unnecessary otherwise. + col_comm.barrier(); auto rx_sizes = host_scalar_gather( col_comm, tmp_major_labels.size(), static_cast(i), handle.get_stream()); std::vector rx_displs{}; @@ -118,32 +139,39 @@ rmm::device_uvector compute_renumber_map( static_cast(i), handle.get_stream()); if (static_cast(i) == col_comm_rank) { - thrust::sort_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_major_labels.begin(), - rx_major_labels.end(), - rx_major_counts.begin()); - major_labels.resize(rx_major_labels.size(), handle.get_stream()); - major_counts.resize(major_labels.size(), handle.get_stream()); - auto pair_it = - thrust::reduce_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_major_labels.begin(), - rx_major_labels.end(), - rx_major_counts.begin(), - major_labels.begin(), - major_counts.begin()); - major_labels.resize(thrust::distance(major_labels.begin(), thrust::get<0>(pair_it)), - handle.get_stream()); - major_counts.resize(major_labels.size(), handle.get_stream()); - major_labels.shrink_to_fit(handle.get_stream()); - major_counts.shrink_to_fit(handle.get_stream()); + major_labels = std::move(rx_major_labels); + major_counts = std::move(rx_major_counts); } } else { - tmp_major_labels.shrink_to_fit(handle.get_stream()); - tmp_major_counts.shrink_to_fit(handle.get_stream()); + assert(i == 0); major_labels = std::move(tmp_major_labels); major_counts = std::move(tmp_major_counts); } } + if (multi_gpu) { + // FIXME: better refactor this sort-count_if-reduce_by_key routine for reuse + thrust::sort_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + major_labels.begin(), + major_labels.end(), + major_counts.begin()); + auto num_unique_labels = + thrust::count_if(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(major_labels.size()), + [labels = major_labels.data()] __device__(auto i) { + return (i == 0) || (labels[i - 1] != labels[i]); + }); + rmm::device_uvector tmp_major_labels(num_unique_labels, handle.get_stream()); + rmm::device_uvector tmp_major_counts(tmp_major_labels.size(), handle.get_stream()); + thrust::reduce_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + major_labels.begin(), + major_labels.end(), + major_counts.begin(), + tmp_major_labels.begin(), + tmp_major_counts.begin()); + major_labels = std::move(tmp_major_labels); + major_counts = std::move(tmp_major_counts); + } // 2. acquire unique minor labels @@ -168,28 +196,54 @@ rmm::device_uvector compute_renumber_map( minor_labels.end())), handle.get_stream()); if (multi_gpu) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_size = row_comm.get_size(); - rmm::device_uvector rx_minor_labels(0, handle.get_stream()); - std::tie(rx_minor_labels, std::ignore) = groupby_gpuid_and_shuffle_values( - row_comm, - minor_labels.begin(), - minor_labels.end(), - [key_func = detail::compute_gpu_id_from_vertex_t{row_comm_size}] __device__( - auto val) { return key_func(val); }, - handle.get_stream()); - thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_minor_labels.begin(), - rx_minor_labels.end()); - rx_minor_labels.resize( - thrust::distance( - rx_minor_labels.begin(), - thrust::unique(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_minor_labels.begin(), - rx_minor_labels.end())), - handle.get_stream()); - minor_labels = std::move(rx_minor_labels); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + + if (row_comm_size > 1) { + rmm::device_uvector rx_minor_labels(0, handle.get_stream()); + std::tie(rx_minor_labels, std::ignore) = groupby_gpuid_and_shuffle_values( + row_comm, + minor_labels.begin(), + minor_labels.end(), + [key_func = detail::compute_gpu_id_from_vertex_t{row_comm_size}] __device__( + auto val) { return key_func(val); }, + handle.get_stream()); + thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + rx_minor_labels.begin(), + rx_minor_labels.end()); + rx_minor_labels.resize( + thrust::distance( + rx_minor_labels.begin(), + thrust::unique(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + rx_minor_labels.begin(), + rx_minor_labels.end())), + handle.get_stream()); + minor_labels = std::move(rx_minor_labels); + } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + // + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } minor_labels.shrink_to_fit(handle.get_stream()); @@ -366,6 +420,19 @@ void expensive_check_edgelist( auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + // FIXME: this barrier is unnecessary if the above host_scalar_allreduce is a true host + // operation (as it serves as a barrier) barrier is necessary here to avoid potential + // overlap (which can leads to deadlock) between two different communicators (beginning of + // col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + rmm::device_uvector sorted_major_vertices(0, handle.get_stream()); { auto recvcounts = @@ -385,6 +452,17 @@ void expensive_check_edgelist( sorted_major_vertices.end()); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) + // between two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + rmm::device_uvector sorted_minor_vertices(0, handle.get_stream()); { auto recvcounts = @@ -404,6 +482,17 @@ void expensive_check_edgelist( sorted_minor_vertices.end()); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) + // between two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto edge_first = thrust::make_zip_iterator( thrust::make_tuple(edgelist_major_vertices[i], edgelist_minor_vertices[i])); CUGRAPH_EXPECTS( @@ -509,7 +598,6 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_const_major_vertices, edgelist_const_minor_vertices, edgelist_edge_counts); - // 2. initialize partition_t object, number_of_vertices, and number_of_edges for the coarsened // graph @@ -535,6 +623,18 @@ renumber_edgelist(raft::handle_t const& handle, // FIXME: compare this hash based approach with a binary search based approach in both memory // footprint and execution time + // FIXME: this barrier is unnecessary if the above host_scalar_allgather is a true host operation + // (as it serves as a barrier) barrier is necessary here to avoid potential overlap (which can + // leads to deadlock) between two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + for (size_t i = 0; i < edgelist_major_vertices.size(); ++i) { rmm::device_uvector renumber_map_major_labels( col_comm_rank == static_cast(i) ? vertex_t{0} @@ -571,6 +671,16 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_major_vertices[i]); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between two + // different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif { rmm::device_uvector renumber_map_minor_labels( partition.get_matrix_partition_minor_size(), handle.get_stream()); @@ -611,6 +721,16 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_minor_vertices[i]); } } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between two + // different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif return std::make_tuple( std::move(renumber_map_labels), partition, number_of_vertices, number_of_edges); diff --git a/cpp/src/sampling/random_walks.cu b/cpp/src/sampling/random_walks.cu index d1d0382d46f..a5410d0e65e 100644 --- a/cpp/src/sampling/random_walks.cu +++ b/cpp/src/sampling/random_walks.cu @@ -30,7 +30,8 @@ template std:: graph_view_t const& gview, int32_t const* ptr_d_start, int32_t num_paths, - int32_t max_depth); + int32_t max_depth, + bool use_padding); template std:: tuple, rmm::device_uvector, rmm::device_uvector> @@ -38,7 +39,8 @@ template std:: graph_view_t const& gview, int32_t const* ptr_d_start, int64_t num_paths, - int64_t max_depth); + int64_t max_depth, + bool use_padding); template std:: tuple, rmm::device_uvector, rmm::device_uvector> @@ -46,7 +48,8 @@ template std:: graph_view_t const& gview, int64_t const* ptr_d_start, int64_t num_paths, - int64_t max_depth); + int64_t max_depth, + bool use_padding); //} // // SG FP64{ @@ -56,7 +59,8 @@ template std:: graph_view_t const& gview, int32_t const* ptr_d_start, int32_t num_paths, - int32_t max_depth); + int32_t max_depth, + bool use_padding); template std:: tuple, rmm::device_uvector, rmm::device_uvector> @@ -64,7 +68,8 @@ template std:: graph_view_t const& gview, int32_t const* ptr_d_start, int64_t num_paths, - int64_t max_depth); + int64_t max_depth, + bool use_padding); template std:: tuple, rmm::device_uvector, rmm::device_uvector> @@ -72,7 +77,9 @@ template std:: graph_view_t const& gview, int64_t const* ptr_d_start, int64_t num_paths, - int64_t max_depth); + int64_t max_depth, + bool use_padding); +//} template std:: tuple, rmm::device_uvector, rmm::device_uvector> @@ -97,6 +104,16 @@ template std:: int64_t num_paths, rmm::device_buffer&& d_coalesced_v, rmm::device_buffer&& d_sizes); -//} + +template std::tuple, + rmm::device_uvector, + rmm::device_uvector> +query_rw_sizes_offsets(raft::handle_t const& handle, int32_t num_paths, int32_t const* ptr_d_sizes); + +template std::tuple, + rmm::device_uvector, + rmm::device_uvector> +query_rw_sizes_offsets(raft::handle_t const& handle, int64_t num_paths, int64_t const* ptr_d_sizes); + } // namespace experimental } // namespace cugraph diff --git a/cpp/src/sampling/random_walks.cuh b/cpp/src/sampling/random_walks.cuh index 82665003769..10a47318bcb 100644 --- a/cpp/src/sampling/random_walks.cuh +++ b/cpp/src/sampling/random_walks.cuh @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -221,44 +223,6 @@ struct col_indx_extract_t const& d_indices, - device_vec_t const& d_offsets, - device_vec_t const& d_values, - device_vec_t const& d_crt_out_degs, - device_vec_t const& d_sizes, - index_t num_paths, - index_t max_depth) - : handle_(handle), - col_indices_(raw_const_ptr(d_indices)), - row_offsets_(raw_const_ptr(d_offsets)), - values_(raw_const_ptr(d_values)), - out_degs_(raw_const_ptr(d_crt_out_degs)), - sizes_(raw_const_ptr(d_sizes)), - num_paths_(num_paths), - max_depth_(max_depth) - { - } - - col_indx_extract_t(raft::handle_t const& handle, - vertex_t const* p_d_indices, - edge_t const* p_d_offsets, - weight_t const* p_d_values, - edge_t const* p_d_crt_out_degs, - index_t const* p_d_sizes, - index_t num_paths, - index_t max_depth) - : handle_(handle), - col_indices_(p_d_indices), - row_offsets_(p_d_offsets), - values_(p_d_values), - out_degs_(p_d_crt_out_degs), - sizes_(p_d_sizes), - num_paths_(num_paths), - max_depth_(max_depth) - { - } - col_indx_extract_t(raft::handle_t const& handle, graph_t const& graph, edge_t const* p_d_crt_out_degs, @@ -316,7 +280,11 @@ struct col_indx_extract_t 0; }); } @@ -386,11 +354,15 @@ struct random_walker_t { random_walker_t(raft::handle_t const& handle, graph_t const& graph, index_t num_paths, - index_t max_depth) + index_t max_depth, + vertex_t v_padding_val = 0, + weight_t w_padding_val = 0) : handle_(handle), num_paths_(num_paths), max_depth_(max_depth), - d_cached_out_degs_(graph.compute_out_degrees(handle_)) + d_cached_out_degs_(graph.compute_out_degrees(handle_)), + vertex_padding_value_(v_padding_val != 0 ? v_padding_val : graph.get_number_of_vertices()), + weight_padding_value_(w_padding_val) { } @@ -559,7 +531,7 @@ struct random_walker_t { thrust::make_counting_iterator(0), predicate_w); - CUDA_TRY(cudaStreamSynchronize(handle_.get_stream())); + handle_.get_stream_view().synchronize(); d_coalesced_v.resize(thrust::distance(d_coalesced_v.begin(), new_end_v), handle_.get_stream()); d_coalesced_w.resize(thrust::distance(d_coalesced_w.begin(), new_end_w), handle_.get_stream()); @@ -690,11 +662,31 @@ struct random_walker_t { device_vec_t const& get_out_degs(void) const { return d_cached_out_degs_; } + vertex_t get_vertex_padding_value(void) const { return vertex_padding_value_; } + + weight_t get_weight_padding_value(void) const { return weight_padding_value_; } + + void init_padding(device_vec_t& d_coalesced_v, + device_vec_t& d_coalesced_w) const + { + thrust::fill(rmm::exec_policy(handle_.get_stream())->on(handle_.get_stream()), + d_coalesced_v.begin(), + d_coalesced_v.end(), + vertex_padding_value_); + + thrust::fill(rmm::exec_policy(handle_.get_stream())->on(handle_.get_stream()), + d_coalesced_w.begin(), + d_coalesced_w.end(), + weight_padding_value_); + } + private: raft::handle_t const& handle_; index_t num_paths_; index_t max_depth_; device_vec_t d_cached_out_degs_; + vertex_t const vertex_padding_value_; + weight_t const weight_padding_value_; }; /** @@ -709,11 +701,21 @@ struct random_walker_t { * @param d_v_start Device (view) set of starting vertex indices for the RW. * number(paths) == d_v_start.size(). * @param max_depth maximum length of RWs. + * @param use_padding (optional) specifies if return uses padded format (true), or coalesced + * (compressed) format; when padding is used the output is a matrix of vertex paths and a matrix of + * edges paths (weights); in this case the matrices are stored in row major order; the vertex path + * matrix is padded with `num_vertices` values and the weight matrix is padded with `0` values; + * @param seeder (optional) is object providing the random seeding mechanism. Defaults to local + * clock time as initial seed. * @return std::tuple, device_vec_t, - * device_vec_t, seed> Quadruplet of coalesced RW paths, with corresponding edge weights - * for each, and corresponding path sizes. This is meant to minimize the number of DF's to be passed - * to the Python layer. Also returning seed for testing / debugging repro. The meaning of - * "coalesced" here is that a 2D array of paths of different sizes is represented as a 1D array. + * device_vec_t> Triplet of either padded or coalesced RW paths; in the coalesced case + * (default), the return consists of corresponding vertex and edge weights for each, and + * corresponding path sizes. This is meant to minimize the number of DF's to be passed to the Python + * layer. The meaning of "coalesced" here is that a 2D array of paths of different sizes is + * represented as a 1D contiguous array. In the padded case the return is a matrix of num_paths x + * max_depth vertex paths; and num_paths x (max_depth-1) edge (weight) paths, with an empty array of + * sizes. Note: if the graph is un-weighted the edge (weight) paths consists of `weight_t{1}` + * entries; */ template & d_v_start, index_t max_depth, + bool use_padding = false, seeding_policy_t seeder = clock_seeding_t{}) { using vertex_t = typename graph_t::vertex_type; @@ -772,6 +775,10 @@ random_walks_impl(raft::handle_t const& handle, // seed_t seed0 = static_cast(seeder()); + // if padding used, initialize padding values: + // + if (use_padding) rand_walker.init_padding(d_coalesced_v, d_coalesced_w); + // very first vertex, for each path: // rand_walker.start(d_v_start, d_coalesced_v, d_paths_sz); @@ -799,15 +806,25 @@ random_walks_impl(raft::handle_t const& handle, // wrap-up, post-process: // truncate v_set, w_set to actual space used + // unless padding is used // - rand_walker.stop(d_coalesced_v, d_coalesced_w, d_paths_sz); + if (!use_padding) { rand_walker.stop(d_coalesced_v, d_coalesced_w, d_paths_sz); } // because device_uvector is not copy-cnstr-able: // - return std::make_tuple(std::move(d_coalesced_v), - std::move(d_coalesced_w), - std::move(d_paths_sz), - seed0); // also return seed for repro + if (!use_padding) { + return std::make_tuple(std::move(d_coalesced_v), + std::move(d_coalesced_w), + std::move(d_paths_sz), + seed0); // also return seed for repro + } else { + return std::make_tuple( + std::move(d_coalesced_v), + std::move(d_coalesced_w), + device_vec_t(0, stream), // purposely empty size array for the padded case, to avoid + // unnecessary allocations + seed0); // also return seed for repro + } } /** @@ -822,11 +839,21 @@ random_walks_impl(raft::handle_t const& handle, * @param d_v_start Device (view) set of starting vertex indices for the RW. number(RW) == * d_v_start.size(). * @param max_depth maximum length of RWs. + * @param use_padding (optional) specifies if return uses padded format (true), or coalesced + * (compressed) format; when padding is used the output is a matrix of vertex paths and a matrix of + * edges paths (weights); in this case the matrices are stored in row major order; the vertex path + * matrix is padded with `num_vertices` values and the weight matrix is padded with `0` values; + * @param seeder (optional) is object providing the random seeding mechanism. Defaults to local + * clock time as initial seed. * @return std::tuple, device_vec_t, - * device_vec_t, seed> Quadruplet of coalesced RW paths, with corresponding edge weights - * for each, and coresponding path sizes. This is meant to minimize the number of DF's to be passed - * to the Python layer. Also returning seed for testing / debugging repro. The meaning of - * "coalesced" here is that a 2D array of paths of different sizes is represented as a 1D array. + * device_vec_t> Triplet of either padded or coalesced RW paths; in the coalesced case + * (default), the return consists of corresponding vertex and edge weights for each, and + * corresponding path sizes. This is meant to minimize the number of DF's to be passed to the Python + * layer. The meaning of "coalesced" here is that a 2D array of paths of different sizes is + * represented as a 1D contiguous array. In the padded case the return is a matrix of num_paths x + * max_depth vertex paths; and num_paths x (max_depth-1) edge (weight) paths, with an empty array of + * sizes. Note: if the graph is un-weighted the edge (weight) paths consists of `weight_t{1}` + * entries; */ template & d_v_start, index_t max_depth, + bool use_padding = false, seeding_policy_t seeder = clock_seeding_t{}) { CUGRAPH_FAIL("Not implemented yet."); @@ -1003,18 +1031,27 @@ struct coo_convertor_t { * @brief returns random walks (RW) from starting sources, where each path is of given maximum * length. Uniform distribution is assumed for the random engine. * - * @tparam graph_t Type of graph (view). + * @tparam graph_t Type of graph/view (typically, graph_view_t). * @tparam index_t Type used to store indexing and sizes. * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and * handles to various CUDA libraries) to run graph algorithms. - * @param graph Graph object to generate RW on. + * @param graph Graph (view )object to generate RW on. * @param ptr_d_start Device pointer to set of starting vertex indices for the RW. * @param num_paths = number(paths). * @param max_depth maximum length of RWs. - * @return std::tuple, device_vec_t, - * device_vec_t> Triplet of coalesced RW paths, with corresponding edge weights for - * each, and coresponding path sizes. This is meant to minimize the number of DF's to be passed to - * the Python layer. + * @param use_padding (optional) specifies if return uses padded format (true), or coalesced + * (compressed) format; when padding is used the output is a matrix of vertex paths and a matrix of + * edges paths (weights); in this case the matrices are stored in row major order; the vertex path + * matrix is padded with `num_vertices` values and the weight matrix is padded with `0` values; + * @return std::tuple, rmm::device_uvector, + * rmm::device_uvector> Triplet of either padded or coalesced RW paths; in the coalesced + * case (default), the return consists of corresponding vertex and edge weights for each, and + * corresponding path sizes. This is meant to minimize the number of DF's to be passed to the Python + * layer. The meaning of "coalesced" here is that a 2D array of paths of different sizes is + * represented as a 1D contiguous array. In the padded case the return is a matrix of num_paths x + * max_depth vertex paths; and num_paths x (max_depth-1) edge (weight) paths, with an empty array of + * sizes. Note: if the graph is un-weighted the edge (weight) paths consists of `weight_t{1}` + * entries; */ template std::tuple, @@ -1024,7 +1061,8 @@ random_walks(raft::handle_t const& handle, graph_t const& graph, typename graph_t::vertex_type const* ptr_d_start, index_t num_paths, - index_t max_depth) + index_t max_depth, + bool use_padding) { using vertex_t = typename graph_t::vertex_type; @@ -1032,7 +1070,7 @@ random_walks(raft::handle_t const& handle, // detail::device_const_vector_view d_v_start{ptr_d_start, num_paths}; - auto quad_tuple = detail::random_walks_impl(handle, graph, d_v_start, max_depth); + auto quad_tuple = detail::random_walks_impl(handle, graph, d_v_start, max_depth, use_padding); // ignore last element of the quad, seed, // since it's meant for testing / debugging, only: // @@ -1076,5 +1114,47 @@ std:: return to_coo(d_v_view, d_sz_view); } +/** + * @brief returns additional RW information on vertex paths offsets and weight path sizes and + * offsets, for the coalesced case (the padded case does not need or provide this information) + * + * @tparam index_t Type used to store indexing and sizes. + * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and + * handles to various CUDA libraries) to run graph algorithms. + * @param num_paths number of paths. + * @param ptr_d_sizes sizes of vertex paths. + * @return tuple of (vertex_path_offsets, weight_path_sizes, weight_path_offsets), where offsets are + * exclusive scan of corresponding sizes. + */ +template +std::tuple, rmm::device_uvector, rmm::device_uvector> +query_rw_sizes_offsets(raft::handle_t const& handle, index_t num_paths, index_t const* ptr_d_sizes) +{ + rmm::device_uvector d_vertex_offsets(num_paths, handle.get_stream()); + rmm::device_uvector d_weight_sizes(num_paths, handle.get_stream()); + rmm::device_uvector d_weight_offsets(num_paths, handle.get_stream()); + + thrust::exclusive_scan(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + ptr_d_sizes, + ptr_d_sizes + num_paths, + d_vertex_offsets.begin()); + + thrust::transform(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + ptr_d_sizes, + ptr_d_sizes + num_paths, + d_weight_sizes.begin(), + [] __device__(auto vertex_path_sz) { return vertex_path_sz - 1; }); + + handle.get_stream_view().synchronize(); + + thrust::exclusive_scan(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + d_weight_sizes.begin(), + d_weight_sizes.end(), + d_weight_offsets.begin()); + + return std::make_tuple( + std::move(d_vertex_offsets), std::move(d_weight_sizes), std::move(d_weight_offsets)); +} + } // namespace experimental } // namespace cugraph diff --git a/cpp/src/utilities/host_barrier.cpp b/cpp/src/utilities/host_barrier.cpp new file mode 100644 index 00000000000..1c018d624ed --- /dev/null +++ b/cpp/src/utilities/host_barrier.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include + +namespace cugraph { +namespace experimental { + +// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and +// MPI barrier for MPI) +void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view) +{ + stream_view.synchronize(); + + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + // k-tree barrier + + int constexpr k = 2; + static_assert(k >= 2); + std::vector requests(k - 1); + std::vector dummies(k - 1); + + // up + + int mod = 1; + while (mod < comm_size) { + if (comm_rank % mod == 0) { + auto level_rank = comm_rank / mod; + if (level_rank % k == 0) { + auto num_irecvs = 0; + ; + for (int i = 1; i < k; ++i) { + auto src_rank = (level_rank + i) * mod; + if (src_rank < comm_size) { + comm.irecv(dummies.data() + (i - 1), + sizeof(std::byte), + src_rank, + int{0} /* tag */, + requests.data() + (i - 1)); + ++num_irecvs; + } + } + comm.waitall(num_irecvs, requests.data()); + } else { + comm.isend(dummies.data(), + sizeof(std::byte), + (level_rank - (level_rank % k)) * mod, + int{0} /* tag */, + requests.data()); + comm.waitall(1, requests.data()); + } + } + mod *= k; + } + + // down + + mod /= k; + while (mod >= 1) { + if (comm_rank % mod == 0) { + auto level_rank = comm_rank / mod; + if (level_rank % k == 0) { + auto num_isends = 0; + for (int i = 1; i < k; ++i) { + auto dst_rank = (level_rank + i) * mod; + if (dst_rank < comm_size) { + comm.isend(dummies.data() + (i - 1), + sizeof(std::byte), + dst_rank, + int{0} /* tag */, + requests.data() + (i - 1)); + ++num_isends; + } + } + comm.waitall(num_isends, requests.data()); + } else { + comm.irecv(dummies.data(), + sizeof(std::byte), + (level_rank - (level_rank % k)) * mod, + int{0} /* tag */, + requests.data()); + comm.waitall(1, requests.data()); + } + } + mod /= k; + } +} + +} // namespace experimental +} // namespace cugraph diff --git a/cpp/tests/community/mg_louvain_helper.cu b/cpp/tests/community/mg_louvain_helper.cu index 661065ca65b..d62eaa1ec55 100644 --- a/cpp/tests/community/mg_louvain_helper.cu +++ b/cpp/tests/community/mg_louvain_helper.cu @@ -71,7 +71,7 @@ bool compare_renumbered_vectors(raft::handle_t const &handle, v1.end(), vertex_t{0}); - rmm::device_uvector map(max, size_t{0}); + rmm::device_uvector map(max, handle.get_stream()); auto iter = thrust::make_zip_iterator(thrust::make_tuple(v1.begin(), v2.begin())); diff --git a/cpp/tests/experimental/bfs_test.cpp b/cpp/tests/experimental/bfs_test.cpp index 1de439e1430..44b664c5b92 100644 --- a/cpp/tests/experimental/bfs_test.cpp +++ b/cpp/tests/experimental/bfs_test.cpp @@ -83,7 +83,7 @@ void bfs_reference(edge_t const* offsets, struct BFS_Usecase { size_t source{0}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/katz_centrality_test.cpp b/cpp/tests/experimental/katz_centrality_test.cpp index af70b90dd02..232d82a1c91 100644 --- a/cpp/tests/experimental/katz_centrality_test.cpp +++ b/cpp/tests/experimental/katz_centrality_test.cpp @@ -98,7 +98,7 @@ void katz_centrality_reference(edge_t const* offsets, struct KatzCentrality_Usecase { bool test_weighted{false}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/mg_bfs_test.cpp b/cpp/tests/experimental/mg_bfs_test.cpp index ebb2824fb87..f6e0a57e602 100644 --- a/cpp/tests/experimental/mg_bfs_test.cpp +++ b/cpp/tests/experimental/mg_bfs_test.cpp @@ -42,7 +42,7 @@ static int PERF = 0; struct BFS_Usecase { size_t source{0}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/mg_katz_centrality_test.cpp b/cpp/tests/experimental/mg_katz_centrality_test.cpp index b4a7968e955..864b68caf33 100644 --- a/cpp/tests/experimental/mg_katz_centrality_test.cpp +++ b/cpp/tests/experimental/mg_katz_centrality_test.cpp @@ -39,7 +39,7 @@ static int PERF = 0; struct KatzCentrality_Usecase { bool test_weighted{false}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/mg_sssp_test.cpp b/cpp/tests/experimental/mg_sssp_test.cpp index c49efefacd5..70f1a95e1f4 100644 --- a/cpp/tests/experimental/mg_sssp_test.cpp +++ b/cpp/tests/experimental/mg_sssp_test.cpp @@ -42,7 +42,7 @@ static int PERF = 0; struct SSSP_Usecase { size_t source{0}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/pagerank_test.cpp b/cpp/tests/experimental/pagerank_test.cpp index 27739cee01b..1e26245b74c 100644 --- a/cpp/tests/experimental/pagerank_test.cpp +++ b/cpp/tests/experimental/pagerank_test.cpp @@ -134,7 +134,7 @@ void pagerank_reference(edge_t const* offsets, struct PageRank_Usecase { double personalization_ratio{0.0}; bool test_weighted{false}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/experimental/sssp_test.cpp b/cpp/tests/experimental/sssp_test.cpp index a9c12043a7f..d84c1c2fc6c 100644 --- a/cpp/tests/experimental/sssp_test.cpp +++ b/cpp/tests/experimental/sssp_test.cpp @@ -54,14 +54,13 @@ void sssp_reference(edge_t const* offsets, vertex_t source, weight_t cutoff = std::numeric_limits::max()) { - using queue_iterm_t = std::tuple; + using queue_item_t = std::tuple; std::fill(distances, distances + num_vertices, std::numeric_limits::max()); std::fill(predecessors, predecessors + num_vertices, cugraph::invalid_vertex_id::value); *(distances + source) = weight_t{0.0}; - std::priority_queue, std::greater> - queue{}; + std::priority_queue, std::greater> queue{}; queue.push(std::make_tuple(weight_t{0.0}, source)); while (queue.size() > 0) { @@ -89,7 +88,7 @@ void sssp_reference(edge_t const* offsets, struct SSSP_Usecase { size_t source{0}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/pagerank/mg_pagerank_test.cpp b/cpp/tests/pagerank/mg_pagerank_test.cpp index 0eae6a62f31..659a62a727c 100644 --- a/cpp/tests/pagerank/mg_pagerank_test.cpp +++ b/cpp/tests/pagerank/mg_pagerank_test.cpp @@ -43,7 +43,7 @@ static int PERF = 0; struct PageRank_Usecase { double personalization_ratio{0.0}; bool test_weighted{false}; - bool check_correctness{false}; + bool check_correctness{true}; }; template diff --git a/cpp/tests/sampling/random_walks_utils.cuh b/cpp/tests/sampling/random_walks_utils.cuh index b0b06e7f65a..44a6f8d561b 100644 --- a/cpp/tests/sampling/random_walks_utils.cuh +++ b/cpp/tests/sampling/random_walks_utils.cuh @@ -54,7 +54,7 @@ bool host_check_path(std::vector const& row_offsets, bool assert3 = (nnz == static_cast(col_inds.size())); if (assert1 == false || assert2 == false || assert3 == false) { - std::cout << "CSR inconsistency\n"; + std::cerr << "CSR inconsistency\n"; return false; } @@ -68,16 +68,16 @@ bool host_check_path(std::vector const& row_offsets, auto found_next = std::find_if( begin, end, [next_vertex](auto dst_vertex) { return dst_vertex == next_vertex; }); if (found_next == end) { - std::cout << "vertex not found: " << next_vertex << " as neighbor of " << crt_vertex << '\n'; + std::cerr << "vertex not found: " << next_vertex << " as neighbor of " << crt_vertex << '\n'; return false; } auto delta = row_offsets[crt_vertex] + std::distance(begin, found_next); - // std::cout << "delta in ci: " << delta << '\n'; + // std::cerr << "delta in ci: " << delta << '\n'; auto found_edge = values.begin() + delta; if (*found_edge != *it_w) { - std::cout << "weight not found: " << *found_edge << " between " << crt_vertex << " and " + std::cerr << "weight not found: " << *found_edge << " between " << crt_vertex << " and " << next_vertex << '\n'; return false; } @@ -91,7 +91,8 @@ bool host_check_rw_paths( cugraph::experimental::graph_view_t const& graph_view, vector_test_t const& d_coalesced_v, vector_test_t const& d_coalesced_w, - vector_test_t const& d_sizes) + vector_test_t const& d_sizes, + index_t num_paths = 0) // only relevant for the padded case (in which case it must be non-zero) { edge_t num_edges = graph_view.get_number_of_edges(); vertex_t num_vertices = graph_view.get_number_of_vertices(); @@ -102,11 +103,15 @@ bool host_check_rw_paths( std::vector v_ro(num_vertices + 1); std::vector v_ci(num_edges); - std::vector v_vals(num_edges); + std::vector v_vals( + num_edges, 1); // account for unweighted graph, for which RW provides default weights{1} raft::update_host(v_ro.data(), offsets, v_ro.size(), handle.get_stream()); raft::update_host(v_ci.data(), indices, v_ci.size(), handle.get_stream()); - raft::update_host(v_vals.data(), values, v_vals.size(), handle.get_stream()); + + if (graph_view.is_weighted()) { + raft::update_host(v_vals.data(), values, v_vals.size(), handle.get_stream()); + } std::vector v_coalesced(d_coalesced_v.size()); std::vector w_coalesced(d_coalesced_w.size()); @@ -120,10 +125,39 @@ bool host_check_rw_paths( cugraph::experimental::detail::raw_const_ptr(d_coalesced_w), d_coalesced_w.size(), handle.get_stream()); - raft::update_host(v_sizes.data(), - cugraph::experimental::detail::raw_const_ptr(d_sizes), - d_sizes.size(), - handle.get_stream()); + + if (v_sizes.size() > 0) { // coalesced case + raft::update_host(v_sizes.data(), + cugraph::experimental::detail::raw_const_ptr(d_sizes), + d_sizes.size(), + handle.get_stream()); + } else { // padded case + if (num_paths == 0) { + std::cerr << "ERROR: padded case requires `num_paths` info.\n"; + return false; + } + + // extract sizes from v_coalesced (which now contains padded info) + // + auto max_depth = v_coalesced.size() / num_paths; + auto it_start_path = v_coalesced.begin(); + for (index_t row_index = 0; row_index < num_paths; ++row_index) { + auto it_end_path = it_start_path + max_depth; + auto it_padding_found = std::find(it_start_path, it_end_path, num_vertices); + + v_sizes.push_back(std::distance(it_start_path, it_padding_found)); + + it_start_path = it_end_path; + } + + // truncate padded vectors v_coalesced, w_coalesced: + // + v_coalesced.erase(std::remove(v_coalesced.begin(), v_coalesced.end(), num_vertices), + v_coalesced.end()); + + w_coalesced.erase(std::remove(w_coalesced.begin(), w_coalesced.end(), weight_t{0}), + w_coalesced.end()); + } auto it_v_begin = v_coalesced.begin(); auto it_w_begin = w_coalesced.begin(); @@ -136,11 +170,11 @@ bool host_check_rw_paths( it_w_begin += crt_sz - 1; if (!test_path) { // something went wrong; print to debug (since it's random) - raft::print_host_vector("sizes", v_sizes.data(), v_sizes.size(), std::cout); + raft::print_host_vector("sizes", v_sizes.data(), v_sizes.size(), std::cerr); - raft::print_host_vector("coalesced v", v_coalesced.data(), v_coalesced.size(), std::cout); + raft::print_host_vector("coalesced v", v_coalesced.data(), v_coalesced.size(), std::cerr); - raft::print_host_vector("coalesced w", w_coalesced.data(), w_coalesced.size(), std::cout); + raft::print_host_vector("coalesced w", w_coalesced.data(), w_coalesced.size(), std::cerr); return false; } @@ -148,5 +182,96 @@ bool host_check_rw_paths( return true; } +template +bool host_check_query_rw(raft::handle_t const& handle, + vector_test_t const& d_v_sizes, + vector_test_t const& d_v_offsets, + vector_test_t const& d_w_sizes, + vector_test_t const& d_w_offsets) +{ + index_t num_paths = d_v_sizes.size(); + + if (num_paths == 0) return false; + + std::vector v_sizes(num_paths); + std::vector v_offsets(num_paths); + std::vector w_sizes(num_paths); + std::vector w_offsets(num_paths); + + raft::update_host(v_sizes.data(), + cugraph::experimental::detail::raw_const_ptr(d_v_sizes), + num_paths, + handle.get_stream()); + + raft::update_host(v_offsets.data(), + cugraph::experimental::detail::raw_const_ptr(d_v_offsets), + num_paths, + handle.get_stream()); + + raft::update_host(w_sizes.data(), + cugraph::experimental::detail::raw_const_ptr(d_w_sizes), + num_paths, + handle.get_stream()); + + raft::update_host(w_offsets.data(), + cugraph::experimental::detail::raw_const_ptr(d_w_offsets), + num_paths, + handle.get_stream()); + + index_t crt_v_offset = 0; + index_t crt_w_offset = 0; + auto it_v_sz = v_sizes.begin(); + auto it_w_sz = w_sizes.begin(); + auto it_v_offset = v_offsets.begin(); + auto it_w_offset = w_offsets.begin(); + + bool flag_passed{true}; + + for (; it_v_sz != v_sizes.end(); ++it_v_sz, ++it_w_sz, ++it_v_offset, ++it_w_offset) { + if (*it_w_sz != (*it_v_sz) - 1) { + std::cerr << "ERROR: Incorrect weight path size: " << *it_w_sz << ", " << *it_v_sz << '\n'; + flag_passed = false; + break; + } + + if (*it_v_offset != crt_v_offset) { + std::cerr << "ERROR: Incorrect vertex path offset: " << *it_v_offset << ", " << crt_v_offset + << '\n'; + flag_passed = false; + break; + } + + if (*it_w_offset != crt_w_offset) { + std::cerr << "ERROR: Incorrect weight path offset: " << *it_w_offset << ", " << crt_w_offset + << '\n'; + flag_passed = false; + break; + } + + crt_v_offset += *it_v_sz; + crt_w_offset += *it_w_sz; + } + + if (!flag_passed) { + std::cerr << "v sizes:"; + std::copy(v_sizes.begin(), v_sizes.end(), std::ostream_iterator(std::cerr, ", ")); + std::cerr << '\n'; + + std::cerr << "v offsets:"; + std::copy(v_offsets.begin(), v_offsets.end(), std::ostream_iterator(std::cerr, ", ")); + std::cerr << '\n'; + + std::cerr << "w sizes:"; + std::copy(w_sizes.begin(), w_sizes.end(), std::ostream_iterator(std::cerr, ", ")); + std::cerr << '\n'; + + std::cerr << "w offsets:"; + std::copy(w_offsets.begin(), w_offsets.end(), std::ostream_iterator(std::cerr, ", ")); + std::cerr << '\n'; + } + + return flag_passed; +} + } // namespace test } // namespace cugraph diff --git a/cpp/tests/sampling/rw_low_level_test.cu b/cpp/tests/sampling/rw_low_level_test.cu index dd7fd14b3a2..29fd01fc7e0 100644 --- a/cpp/tests/sampling/rw_low_level_test.cu +++ b/cpp/tests/sampling/rw_low_level_test.cu @@ -62,10 +62,16 @@ graph_t make_graph(raft::handle_t cons raft::update_device(d_src.data(), v_src.data(), d_src.size(), handle.get_stream()); raft::update_device(d_dst.data(), v_dst.data(), d_dst.size(), handle.get_stream()); - raft::update_device(d_weights.data(), v_w.data(), d_weights.size(), handle.get_stream()); + + weight_t* ptr_d_weights{nullptr}; + if (is_weighted) { + raft::update_device(d_weights.data(), v_w.data(), d_weights.size(), handle.get_stream()); + + ptr_d_weights = d_weights.data(); + } edgelist_t edgelist{ - d_src.data(), d_dst.data(), d_weights.data(), num_edges}; + d_src.data(), d_dst.data(), ptr_d_weights, num_edges}; graph_t graph( handle, edgelist, num_vertices, graph_properties_t{false, false, is_weighted}, false); @@ -783,6 +789,67 @@ TEST_F(RandomWalksPrimsTest, SimpleGraphRandomWalk) ASSERT_TRUE(test_all_paths); } +TEST(RandomWalksQuery, GraphRWQueryOffsets) +{ + using vertex_t = int32_t; + using edge_t = vertex_t; + using weight_t = float; + using index_t = vertex_t; + + raft::handle_t handle{}; + + edge_t num_edges = 8; + vertex_t num_vertices = 6; + + std::vector v_src{0, 1, 1, 2, 2, 2, 3, 4}; + std::vector v_dst{1, 3, 4, 0, 1, 3, 5, 5}; + std::vector v_w{0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1}; + + auto graph = make_graph(handle, v_src, v_dst, v_w, num_vertices, num_edges, true); + + auto graph_view = graph.view(); + + edge_t const* offsets = graph_view.offsets(); + vertex_t const* indices = graph_view.indices(); + weight_t const* values = graph_view.weights(); + + std::vector v_ro(num_vertices + 1); + std::vector v_ci(num_edges); + std::vector v_vals(num_edges); + + raft::update_host(v_ro.data(), offsets, v_ro.size(), handle.get_stream()); + raft::update_host(v_ci.data(), indices, v_ci.size(), handle.get_stream()); + raft::update_host(v_vals.data(), values, v_vals.size(), handle.get_stream()); + + std::vector v_start{1, 0, 4, 2}; + vector_test_t d_v_start(v_start.size(), handle.get_stream()); + raft::update_device(d_v_start.data(), v_start.data(), d_v_start.size(), handle.get_stream()); + + index_t num_paths = v_start.size(); + index_t max_depth = 5; + + // 0-copy const device view: + // + detail::device_const_vector_view d_start_view{d_v_start.data(), num_paths}; + auto quad = detail::random_walks_impl(handle, graph_view, d_start_view, max_depth); + + auto& d_v_sizes = std::get<2>(quad); + auto seed0 = std::get<3>(quad); + + auto triplet = query_rw_sizes_offsets(handle, num_paths, detail::raw_const_ptr(d_v_sizes)); + + auto& d_v_offsets = std::get<0>(triplet); + auto& d_w_sizes = std::get<1>(triplet); + auto& d_w_offsets = std::get<2>(triplet); + + bool test_paths_sz = + cugraph::test::host_check_query_rw(handle, d_v_sizes, d_v_offsets, d_w_sizes, d_w_offsets); + + if (!test_paths_sz) std::cout << "starting seed on failure: " << seed0 << '\n'; + + ASSERT_TRUE(test_paths_sz); +} + TEST(RandomWalksSpecialCase, SingleRandomWalk) { using vertex_t = int32_t; @@ -840,6 +907,124 @@ TEST(RandomWalksSpecialCase, SingleRandomWalk) ASSERT_TRUE(test_all_paths); } +TEST(RandomWalksSpecialCase, UnweightedGraph) +{ + using vertex_t = int32_t; + using edge_t = vertex_t; + using weight_t = float; + using index_t = vertex_t; + + raft::handle_t handle{}; + + edge_t num_edges = 8; + vertex_t num_vertices = 6; + + std::vector v_src{0, 1, 1, 2, 2, 2, 3, 4}; + std::vector v_dst{1, 3, 4, 0, 1, 3, 5, 5}; + std::vector v_w; + + auto graph = + make_graph(handle, v_src, v_dst, v_w, num_vertices, num_edges, false); // un-weighted + + auto graph_view = graph.view(); + + edge_t const* offsets = graph_view.offsets(); + vertex_t const* indices = graph_view.indices(); + weight_t const* values = graph_view.weights(); + + ASSERT_TRUE(values == nullptr); + + std::vector v_ro(num_vertices + 1); + std::vector v_ci(num_edges); + + raft::update_host(v_ro.data(), offsets, v_ro.size(), handle.get_stream()); + raft::update_host(v_ci.data(), indices, v_ci.size(), handle.get_stream()); + + std::vector v_start{2}; + vector_test_t d_v_start(v_start.size(), handle.get_stream()); + raft::update_device(d_v_start.data(), v_start.data(), d_v_start.size(), handle.get_stream()); + + index_t num_paths = v_start.size(); + index_t max_depth = 5; + + // 0-copy const device view: + // + detail::device_const_vector_view d_start_view{d_v_start.data(), num_paths}; + auto quad = detail::random_walks_impl(handle, graph_view, d_start_view, max_depth); + + auto& d_coalesced_v = std::get<0>(quad); + auto& d_coalesced_w = std::get<1>(quad); + auto& d_sizes = std::get<2>(quad); + auto seed0 = std::get<3>(quad); + + bool test_all_paths = + cugraph::test::host_check_rw_paths(handle, graph_view, d_coalesced_v, d_coalesced_w, d_sizes); + + if (!test_all_paths) std::cout << "starting seed on failure: " << seed0 << '\n'; + + ASSERT_TRUE(test_all_paths); +} + +TEST(RandomWalksPadded, SimpleGraph) +{ + using vertex_t = int32_t; + using edge_t = vertex_t; + using weight_t = float; + using index_t = vertex_t; + + raft::handle_t handle{}; + + edge_t num_edges = 8; + vertex_t num_vertices = 6; + + std::vector v_src{0, 1, 1, 2, 2, 2, 3, 4}; + std::vector v_dst{1, 3, 4, 0, 1, 3, 5, 5}; + std::vector v_w{0.1, 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1}; + + auto graph = make_graph(handle, v_src, v_dst, v_w, num_vertices, num_edges, true); + + auto graph_view = graph.view(); + + edge_t const* offsets = graph_view.offsets(); + vertex_t const* indices = graph_view.indices(); + weight_t const* values = graph_view.weights(); + + std::vector v_ro(num_vertices + 1); + std::vector v_ci(num_edges); + std::vector v_vals(num_edges); + + raft::update_host(v_ro.data(), offsets, v_ro.size(), handle.get_stream()); + raft::update_host(v_ci.data(), indices, v_ci.size(), handle.get_stream()); + raft::update_host(v_vals.data(), values, v_vals.size(), handle.get_stream()); + + std::vector v_start{2}; + vector_test_t d_v_start(v_start.size(), handle.get_stream()); + raft::update_device(d_v_start.data(), v_start.data(), d_v_start.size(), handle.get_stream()); + + index_t num_paths = v_start.size(); + index_t max_depth = 5; + + // 0-copy const device view: + // + detail::device_const_vector_view d_start_view{d_v_start.data(), num_paths}; + bool use_padding{true}; + auto quad = detail::random_walks_impl(handle, graph_view, d_start_view, max_depth, use_padding); + + auto& d_coalesced_v = std::get<0>(quad); + auto& d_coalesced_w = std::get<1>(quad); + auto& d_sizes = std::get<2>(quad); + auto seed0 = std::get<3>(quad); + + ASSERT_TRUE(d_sizes.size() == 0); + + bool test_all_paths = cugraph::test::host_check_rw_paths( + handle, graph_view, d_coalesced_v, d_coalesced_w, d_sizes, num_paths); + + if (!test_all_paths) std::cout << "starting seed on failure: " << seed0 << '\n'; + + ASSERT_TRUE(test_all_paths); +} + TEST(RandomWalksUtility, PathsToCOO) { using namespace cugraph::experimental::detail; diff --git a/python/cugraph/centrality/katz_centrality.py b/python/cugraph/centrality/katz_centrality.py index 4a2b41cfe59..a1e7c1b2349 100644 --- a/python/cugraph/centrality/katz_centrality.py +++ b/python/cugraph/centrality/katz_centrality.py @@ -106,7 +106,11 @@ def katz_centrality( if nstart is not None: if G.renumbered is True: - nstart = G.add_internal_vertex_id(nstart, 'vertex', 'vertex') + if len(G.renumber_map.implementation.col_names) > 1: + cols = nstart.columns[:-1].to_list() + else: + cols = 'vertex' + nstart = G.add_internal_vertex_id(nstart, 'vertex', cols) df = katz_centrality_wrapper.katz_centrality( G, alpha, max_iter, tol, nstart, normalized diff --git a/python/cugraph/centrality/katz_centrality_wrapper.pyx b/python/cugraph/centrality/katz_centrality_wrapper.pyx index d38a0b82824..038723ad9bf 100644 --- a/python/cugraph/centrality/katz_centrality_wrapper.pyx +++ b/python/cugraph/centrality/katz_centrality_wrapper.pyx @@ -34,7 +34,7 @@ def get_output_df(input_graph, nstart): if len(nstart) != num_verts: raise ValueError('nstart must have initial guess for all vertices') - nstart['values'] = graph_primtypes_wrapper.datatype_cast([nstart['values']], [np.float64]) + nstart['values'] = graph_primtypes_wrapper.datatype_cast([nstart['values']], [np.float64])[0] df['katz_centrality'][nstart['vertex']] = nstart['values'] return df diff --git a/python/cugraph/community/egonet.py b/python/cugraph/community/egonet.py index ca3c6149ece..5ae025f1203 100644 --- a/python/cugraph/community/egonet.py +++ b/python/cugraph/community/egonet.py @@ -58,8 +58,10 @@ def ego_graph(G, n, radius=1, center=True, undirected=False, distance=None): Graph or matrix object, which should contain the connectivity information. Edge weights, if present, should be single or double precision floating point values. - n : integer - A single node + n : integer or cudf.DataFrame + A single node as integer or a cudf.DataFrame if nodes are + represented with multiple columns. If a cudf.DataFrame is provided, + only the first row is taken as the node input. radius: integer, optional Include all neighbors of distance<=radius from n. center: bool, optional @@ -91,20 +93,25 @@ def ego_graph(G, n, radius=1, center=True, undirected=False, distance=None): result_graph = type(G)() if G.renumbered is True: - n = G.lookup_internal_vertex_id(cudf.Series([n])) + if isinstance(n, cudf.DataFrame): + n = G.lookup_internal_vertex_id(n, n.columns) + else: + n = G.lookup_internal_vertex_id(cudf.Series([n])) df, offsets = egonet_wrapper.egonet(G, n, radius) if G.renumbered: - df = G.unrenumber(df, "src") - df = G.unrenumber(df, "dst") + df, src_names = G.unrenumber(df, "src", get_column_names=True) + df, dst_names = G.unrenumber(df, "dst", get_column_names=True) if G.edgelist.weights: result_graph.from_cudf_edgelist( - df, source="src", destination="dst", edge_attr="weight" + df, source=src_names, destination=dst_names, + edge_attr="weight" ) else: - result_graph.from_cudf_edgelist(df, source="src", destination="dst") + result_graph.from_cudf_edgelist(df, source=src_names, + destination=dst_names) return _convert_graph_to_output_type(result_graph, input_type) @@ -121,8 +128,8 @@ def batched_ego_graphs( Graph or matrix object, which should contain the connectivity information. Edge weights, if present, should be single or double precision floating point values. - seeds : cudf.Series or list - Specifies the seeds of the induced egonet subgraphs + seeds : cudf.Series or list or cudf.DataFrame + Specifies the seeds of the induced egonet subgraphs. radius: integer, optional Include all neighbors of distance<=radius from n. center: bool, optional @@ -145,7 +152,10 @@ def batched_ego_graphs( (G, input_type) = ensure_cugraph_obj(G, nx_weight_attr="weight") if G.renumbered is True: - seeds = G.lookup_internal_vertex_id(cudf.Series(seeds)) + if isinstance(seeds, cudf.DataFrame): + seeds = G.lookup_internal_vertex_id(seeds, seeds.columns) + else: + seeds = G.lookup_internal_vertex_id(cudf.Series(seeds)) df, offsets = egonet_wrapper.egonet(G, seeds, radius) diff --git a/python/cugraph/community/subgraph_extraction.py b/python/cugraph/community/subgraph_extraction.py index 7815851d465..2df6e037d71 100644 --- a/python/cugraph/community/subgraph_extraction.py +++ b/python/cugraph/community/subgraph_extraction.py @@ -12,8 +12,8 @@ # limitations under the License. from cugraph.community import subgraph_extraction_wrapper -from cugraph.structure.graph_classes import null_check from cugraph.utilities import check_nx_graph +import cudf from cugraph.utilities import cugraph_to_nx @@ -28,8 +28,9 @@ def subgraph(G, vertices): ---------- G : cugraph.Graph cuGraph graph descriptor - vertices : cudf.Series - Specifies the vertices of the induced subgraph + vertices : cudf.Series or cudf.DataFrame + Specifies the vertices of the induced subgraph. For multi-column + vertices, vertices should be provided as a cudf.DataFrame Returns ------- @@ -52,27 +53,30 @@ def subgraph(G, vertices): >>> Sg = cugraph.subgraph(G, sverts) """ - null_check(vertices) - G, isNx = check_nx_graph(G) if G.renumbered: - vertices = G.lookup_internal_vertex_id(vertices) + if isinstance(vertices, cudf.DataFrame): + vertices = G.lookup_internal_vertex_id(vertices, vertices.columns) + else: + vertices = G.lookup_internal_vertex_id(vertices) result_graph = type(G)() df = subgraph_extraction_wrapper.subgraph(G, vertices) if G.renumbered: - df = G.unrenumber(df, "src") - df = G.unrenumber(df, "dst") + df, src_names = G.unrenumber(df, "src", get_column_names=True) + df, dst_names = G.unrenumber(df, "dst", get_column_names=True) if G.edgelist.weights: result_graph.from_cudf_edgelist( - df, source="src", destination="dst", edge_attr="weight" + df, source=src_names, destination=dst_names, + edge_attr="weight" ) else: - result_graph.from_cudf_edgelist(df, source="src", destination="dst") + result_graph.from_cudf_edgelist(df, source=src_names, + destination=dst_names) if isNx is True: result_graph = cugraph_to_nx(result_graph) diff --git a/python/cugraph/cores/k_core.py b/python/cugraph/cores/k_core.py index ca17bdd5c81..17a3baf9c4c 100644 --- a/python/cugraph/cores/k_core.py +++ b/python/cugraph/cores/k_core.py @@ -69,31 +69,38 @@ def k_core(G, k=None, core_number=None): if core_number is not None: if G.renumbered is True: - core_number = G.add_internal_vertex_id( - core_number, "vertex", "vertex", drop=True - ) + if len(G.renumber_map.implementation.col_names) > 1: + cols = core_number.columns[:-1].to_list() + else: + cols = 'vertex' + core_number = G.add_internal_vertex_id(core_number, 'vertex', + cols) + else: core_number = core_number_wrapper.core_number(G) core_number = core_number.rename( columns={"core_number": "values"}, copy=False ) - print(core_number) + if k is None: k = core_number["values"].max() k_core_df = k_core_wrapper.k_core(G, k, core_number) if G.renumbered: - k_core_df = G.unrenumber(k_core_df, "src") - k_core_df = G.unrenumber(k_core_df, "dst") + k_core_df, src_names = G.unrenumber(k_core_df, "src", + get_column_names=True) + k_core_df, dst_names = G.unrenumber(k_core_df, "dst", + get_column_names=True) if G.edgelist.weights: KCoreGraph.from_cudf_edgelist( - k_core_df, source="src", destination="dst", edge_attr="weight" + k_core_df, source=src_names, destination=dst_names, + edge_attr="weight" ) else: KCoreGraph.from_cudf_edgelist( - k_core_df, source="src", destination="dst" + k_core_df, source=src_names, destination=dst_names, ) if isNx is True: diff --git a/python/cugraph/linear_assignment/lap.py b/python/cugraph/linear_assignment/lap.py index c634d9aceb4..d6f02efe77e 100644 --- a/python/cugraph/linear_assignment/lap.py +++ b/python/cugraph/linear_assignment/lap.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import cudf from cugraph.linear_assignment import lap_wrapper @@ -39,9 +40,10 @@ def hungarian(G, workers): as an an edge list. Edge weights are required. If an edge list is not provided then it will be computed. - workers : cudf.Series + workers : cudf.Series or cudf.DataFrame A series or column that identifies the vertex ids of the vertices - in the workers set. All vertices in G that are not in the workers + in the workers set. In case of multi-column vertices, it should be a + cudf.DataFrame. All vertices in G that are not in the workers set are implicitly assigned to the jobs set. Returns @@ -67,16 +69,20 @@ def hungarian(G, workers): """ if G.renumbered: - local_workers = G.lookup_internal_vertex_id(workers) + if isinstance(workers, cudf.DataFrame): + local_workers = G.lookup_internal_vertex_id(workers, + workers.columns) + else: + local_workers = G.lookup_internal_vertex_id(workers) else: local_workers = workers - df = lap_wrapper.sparse_hungarian(G, local_workers) + cost, df = lap_wrapper.sparse_hungarian(G, local_workers) if G.renumbered: df = G.unrenumber(df, 'vertex') - return df + return cost, df def dense_hungarian(costs, num_rows, num_columns): diff --git a/python/cugraph/sampling/random_walks.py b/python/cugraph/sampling/random_walks.py index 7ab3191a07c..84fde262010 100644 --- a/python/cugraph/sampling/random_walks.py +++ b/python/cugraph/sampling/random_walks.py @@ -35,9 +35,10 @@ def random_walks( Use weight parameter if weights need to be considered (currently not supported) - start_vertices : int or list or cudf.Series + start_vertices : int or list or cudf.Series or cudf.DataFrame A single node or a list or a cudf.Series of nodes from which to run - the random walks + the random walks. In case of multi-column vertices it should be + a cudf.DataFrame max_depth : int The maximum depth of the random walks @@ -61,11 +62,17 @@ def random_walks( if start_vertices is int: start_vertices = [start_vertices] - if not isinstance(start_vertices, cudf.Series): + if isinstance(start_vertices, list): start_vertices = cudf.Series(start_vertices) if G.renumbered is True: - start_vertices = G.lookup_internal_vertex_id(start_vertices) + if isinstance(start_vertices, cudf.DataFrame): + start_vertices = G.lookup_internal_vertex_id( + start_vertices, + start_vertices.columns) + else: + start_vertices = G.lookup_internal_vertex_id(start_vertices) + vertex_set, edge_set, sizes = random_walks_wrapper.random_walks( G, start_vertices, max_depth) diff --git a/python/cugraph/structure/graph_classes.py b/python/cugraph/structure/graph_classes.py index 3cd1863a054..52fcb2ffba4 100644 --- a/python/cugraph/structure/graph_classes.py +++ b/python/cugraph/structure/graph_classes.py @@ -293,7 +293,8 @@ def from_numpy_matrix(self, np_matrix): np_array = np.asarray(np_matrix) self.from_numpy_array(np_array) - def unrenumber(self, df, column_name, preserve_order=False): + def unrenumber(self, df, column_name, preserve_order=False, + get_column_names=False): """ Given a DataFrame containing internal vertex ids in the identified column, replace this with external vertex ids. If the renumbering @@ -322,7 +323,8 @@ def unrenumber(self, df, column_name, preserve_order=False): vertex identifiers are added to the DataFrame, the internal vertex identifier column is removed from the dataframe. """ - return self.renumber_map.unrenumber(df, column_name, preserve_order) + return self.renumber_map.unrenumber(df, column_name, preserve_order, + get_column_names) def lookup_internal_vertex_id(self, df, column_name=None): """ diff --git a/python/cugraph/structure/number_map.py b/python/cugraph/structure/number_map.py index 2b7c2b2f296..d90d7a1fda9 100644 --- a/python/cugraph/structure/number_map.py +++ b/python/cugraph/structure/number_map.py @@ -591,7 +591,8 @@ def get_renumbered_df(data): renumber_map.implementation.numbered = True return renumbered_df, renumber_map - def unrenumber(self, df, column_name, preserve_order=False): + def unrenumber(self, df, column_name, preserve_order=False, + get_column_names=False): """ Given a DataFrame containing internal vertex ids in the identified column, replace this with external vertex ids. If the renumbering @@ -611,12 +612,17 @@ def unrenumber(self, df, column_name, preserve_order=False): preserve_order: (optional) bool If True, preserve the ourder of the rows in the output DataFrame to match the input DataFrame + get_column_names: (optional) bool + If True, the unrenumbered column names are returned. Returns --------- df : cudf.DataFrame or dask_cudf.DataFrame The original DataFrame columns exist unmodified. The external vertex identifiers are added to the DataFrame, the internal vertex identifier column is removed from the dataframe. + column_names: string or list of strings + If get_column_names is True, the unrenumbered column names are + returned. Examples -------- >>> M = cudf.read_csv('datasets/karate.csv', delimiter=' ', @@ -636,11 +642,13 @@ def unrenumber(self, df, column_name, preserve_order=False): if len(self.implementation.col_names) == 1: # Output will be renamed to match input mapping = {"0": column_name} + col_names = column_name else: # Output will be renamed to ${i}_${column_name} mapping = {} for nm in self.implementation.col_names: mapping[nm] = nm + "_" + column_name + col_names = list(mapping.values()) if preserve_order: index_name = NumberMap.generate_unused_column_name(df) @@ -654,8 +662,12 @@ def unrenumber(self, df, column_name, preserve_order=False): ).drop(columns=index_name).reset_index(drop=True) if type(df) is dask_cudf.DataFrame: - return df.map_partitions( + df = df.map_partitions( lambda df: df.rename(columns=mapping, copy=False) ) else: - return df.rename(columns=mapping, copy=False) + df = df.rename(columns=mapping, copy=False) + if get_column_names: + return df, col_names + else: + return df diff --git a/python/cugraph/tests/test_egonet.py b/python/cugraph/tests/test_egonet.py index b259c2567dc..fc0ce38eb9c 100644 --- a/python/cugraph/tests/test_egonet.py +++ b/python/cugraph/tests/test_egonet.py @@ -15,6 +15,7 @@ import pytest +import cudf import cugraph from cugraph.tests import utils @@ -75,3 +76,42 @@ def test_batched_ego_graphs(graph_file, seeds, radius): ego_df, source="src", target="dst", edge_attr="weight" ) assert nx.is_isomorphic(ego_nx, ego_cugraph) + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +@pytest.mark.parametrize("seed", SEEDS) +@pytest.mark.parametrize("radius", RADIUS) +def test_multi_column_ego_graph(graph_file, seed, radius): + gc.collect() + + df = utils.read_csv_file(graph_file, read_weights_in_sp=True) + df.rename(columns={'0': 'src_0', '1': 'dst_0'}, inplace=True) + df['src_1'] = df['src_0'] + 1000 + df['dst_1'] = df['dst_0'] + 1000 + + G1 = cugraph.Graph() + G1.from_cudf_edgelist( + df, source=["src_0", "src_1"], destination=["dst_0", "dst_1"], + edge_attr="2" + ) + + seed_df = cudf.DataFrame() + seed_df['v_0'] = [seed] + seed_df['v_1'] = [seed + 1000] + + ego_cugraph_res = cugraph.ego_graph(G1, seed_df, radius=radius) + + G2 = cugraph.Graph() + G2.from_cudf_edgelist( + df, source="src_0", destination="dst_0", + edge_attr="2" + ) + ego_cugraph_exp = cugraph.ego_graph(G2, seed, radius=radius) + + # FIXME: Replace with multi-column view_edge_list() + edgelist_df = ego_cugraph_res.edgelist.edgelist_df + edgelist_df_res = ego_cugraph_res.unrenumber(edgelist_df, "src") + edgelist_df_res = ego_cugraph_res.unrenumber(edgelist_df_res, "dst") + for i in range(len(edgelist_df_res)): + assert ego_cugraph_exp.has_edge(edgelist_df_res["0_src"].iloc[i], + edgelist_df_res["0_dst"].iloc[i]) diff --git a/python/cugraph/tests/test_k_core.py b/python/cugraph/tests/test_k_core.py index 33d403ee27b..d09b719ab79 100644 --- a/python/cugraph/tests/test_k_core.py +++ b/python/cugraph/tests/test_k_core.py @@ -57,7 +57,6 @@ def calc_k_cores(graph_file, directed=True): def compare_edges(cg, nxg): edgelist_df = cg.view_edge_list() src, dest = edgelist_df["src"], edgelist_df["dst"] - assert cg.edgelist.weights is False assert len(src) == nxg.size() for i in range(len(src)): @@ -66,7 +65,7 @@ def compare_edges(cg, nxg): @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) -def test_core_number_Graph(graph_file): +def test_k_core_Graph(graph_file): gc.collect() cu_kcore, nx_kcore = calc_k_cores(graph_file, False) @@ -75,7 +74,7 @@ def test_core_number_Graph(graph_file): @pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) -def test_core_number_Graph_nx(graph_file): +def test_k_core_Graph_nx(graph_file): gc.collect() NM = utils.read_csv_for_nx(graph_file) @@ -86,3 +85,35 @@ def test_core_number_Graph_nx(graph_file): cc = cugraph.k_core(Gnx) assert nx.is_isomorphic(nc, cc) + + +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) +def test_k_core_corenumber_multicolumn(graph_file): + gc.collect() + + cu_M = utils.read_csv_file(graph_file) + cu_M.rename(columns={'0': 'src_0', '1': 'dst_0'}, inplace=True) + cu_M['src_1'] = cu_M['src_0'] + 1000 + cu_M['dst_1'] = cu_M['dst_0'] + 1000 + + G1 = cugraph.Graph() + G1.from_cudf_edgelist(cu_M, source=["src_0", "src_1"], + destination=["dst_0", "dst_1"]) + + corenumber_G1 = cugraph.core_number(G1) + corenumber_G1.rename(columns={'core_number': 'values'}, inplace=True) + corenumber_G1 = corenumber_G1[['0_vertex', '1_vertex', 'values']] + + ck_res = cugraph.k_core(G1, core_number=corenumber_G1) + G2 = cugraph.Graph() + G2.from_cudf_edgelist(cu_M, source="src_0", + destination="dst_0") + ck_exp = cugraph.k_core(G2) + + # FIXME: Replace with multi-column view_edge_list() + edgelist_df = ck_res.edgelist.edgelist_df + edgelist_df_res = ck_res.unrenumber(edgelist_df, "src") + edgelist_df_res = ck_res.unrenumber(edgelist_df_res, "dst") + for i in range(len(edgelist_df_res)): + assert ck_exp.has_edge(edgelist_df_res["0_src"].iloc[i], + edgelist_df_res["0_dst"].iloc[i]) diff --git a/python/cugraph/tests/test_katz_centrality.py b/python/cugraph/tests/test_katz_centrality.py index 1fef6b05d59..ef2f45c08a4 100644 --- a/python/cugraph/tests/test_katz_centrality.py +++ b/python/cugraph/tests/test_katz_centrality.py @@ -1,3 +1,4 @@ + # Copyright (c) 2019-2021, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ import pytest +import cudf import cugraph from cugraph.tests import utils @@ -112,3 +114,38 @@ def test_katz_centrality_nx(graph_file): err = err + 1 print("Mismatches:", err) assert err < (0.1 * len(ck)) + + +@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED) +def test_katz_centrality_multi_column(graph_file): + gc.collect() + + cu_M = utils.read_csv_file(graph_file) + cu_M.rename(columns={'0': 'src_0', '1': 'dst_0'}, inplace=True) + cu_M['src_1'] = cu_M['src_0'] + 1000 + cu_M['dst_1'] = cu_M['dst_0'] + 1000 + + G1 = cugraph.DiGraph() + G1.from_cudf_edgelist(cu_M, source=["src_0", "src_1"], + destination=["dst_0", "dst_1"]) + + G2 = cugraph.DiGraph() + G2.from_cudf_edgelist(cu_M, source="src_0", destination="dst_0") + + k_df_exp = cugraph.katz_centrality(G2, alpha=None, max_iter=1000) + k_df_exp = k_df_exp.sort_values("vertex").reset_index(drop=True) + + nstart = cudf.DataFrame() + nstart['vertex_0'] = k_df_exp['vertex'] + nstart['vertex_1'] = nstart['vertex_0'] + 1000 + nstart['values'] = k_df_exp['katz_centrality'] + + k_df_res = cugraph.katz_centrality(G1, nstart=nstart, + alpha=None, max_iter=1000) + k_df_res = k_df_res.sort_values("0_vertex").reset_index(drop=True) + k_df_res.rename(columns={'0_vertex': 'vertex'}, inplace=True) + + top_res = topKVertices(k_df_res, "katz_centrality", 10) + top_exp = topKVertices(k_df_exp, "katz_centrality", 10) + + assert top_res.equals(top_exp) diff --git a/python/cugraph/tests/test_random_walks.py b/python/cugraph/tests/test_random_walks.py index 9767e81ba1f..ba0cd6eadc9 100644 --- a/python/cugraph/tests/test_random_walks.py +++ b/python/cugraph/tests/test_random_walks.py @@ -126,7 +126,6 @@ def test_random_walks_invalid_max_dept( directed, max_depth ): - """Test calls random_walks an invalid type""" prepare_test() with pytest.raises(TypeError): df, offsets, seeds = calc_random_walks( @@ -152,3 +151,37 @@ def test_random_walks( max_depth=max_depth ) check_random_walks(df, offsets, seeds, df_G) + + +"""@pytest.mark.parametrize("graph_file", utils.DATASETS_SMALL) +@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS) +def test_random_walks( + graph_file, + directed +): + max_depth = random.randint(2, 10) + df_G = utils.read_csv_file(graph_file) + df_G.rename( + columns={"0": "src", "1": "dst", "2": "weight"}, inplace=True) + df_G['src_0'] = df_G['src'] + 1000 + df_G['dst_0'] = df_G['dst'] + 1000 + + if directed: + G = cugraph.DiGraph() + else: + G = cugraph.Graph() + G.from_cudf_edgelist(df_G, source=['src', 'src_0'], + destination=['dst', 'dst_0'], + edge_attr="weight") + + k = random.randint(1, 10) + start_vertices = random.sample(G.nodes().to_array().tolist(), k) + + seeds = cudf.DataFrame() + seeds['v'] = start_vertices + seeds['v_0'] = seeds['v'] + 1000 + + df, offsets = cugraph.random_walks(G, seeds, max_depth) + + check_random_walks(df, offsets, seeds, df_G) +""" diff --git a/python/cugraph/tests/test_subgraph_extraction.py b/python/cugraph/tests/test_subgraph_extraction.py index 56c1c23e0ea..389a7716e48 100644 --- a/python/cugraph/tests/test_subgraph_extraction.py +++ b/python/cugraph/tests/test_subgraph_extraction.py @@ -126,3 +126,39 @@ def test_subgraph_extraction_Graph_nx(graph_file): for (u, v) in cu_sub.edges(): assert nx_sub.has_edge(u, v) + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_subgraph_extraction_multi_column(graph_file): + gc.collect() + + M = utils.read_csv_for_nx(graph_file) + + cu_M = cudf.DataFrame() + cu_M["src_0"] = cudf.Series(M["0"]) + cu_M["dst_0"] = cudf.Series(M["1"]) + cu_M["src_1"] = cu_M["src_0"] + 1000 + cu_M["dst_1"] = cu_M["dst_0"] + 1000 + G1 = cugraph.Graph() + G1.from_cudf_edgelist(cu_M, source=["src_0", "src_1"], + destination=["dst_0", "dst_1"]) + + verts = cudf.Series([0, 1, 17]) + verts_G1 = cudf.DataFrame() + verts_G1['v_0'] = verts + verts_G1['v_1'] = verts + 1000 + + sG1 = cugraph.subgraph(G1, verts_G1) + + G2 = cugraph.Graph() + G2.from_cudf_edgelist(cu_M, source="src_0", destination="dst_0") + + sG2 = cugraph.subgraph(G2, verts) + + # FIXME: Replace with multi-column view_edge_list() + edgelist_df = sG1.edgelist.edgelist_df + edgelist_df_res = sG1.unrenumber(edgelist_df, "src") + edgelist_df_res = sG1.unrenumber(edgelist_df_res, "dst") + for i in range(len(edgelist_df_res)): + assert sG2.has_edge(edgelist_df_res["0_src"].iloc[i], + edgelist_df_res["0_dst"].iloc[i]) diff --git a/python/cugraph/traversal/sssp.py b/python/cugraph/traversal/sssp.py index 8d77e6e9312..f3aebaf43bf 100644 --- a/python/cugraph/traversal/sssp.py +++ b/python/cugraph/traversal/sssp.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2020, NVIDIA CORPORATION. +# Copyright (c) 2019-2021, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -212,7 +212,11 @@ def sssp(G, matrix_graph_type=DiGraph if directed else Graph) if G.renumbered: - source = G.lookup_internal_vertex_id(cudf.Series([source]))[0] + if isinstance(source, cudf.DataFrame): + source = G.lookup_internal_vertex_id( + source, source.columns).iloc[0] + else: + source = G.lookup_internal_vertex_id(cudf.Series([source]))[0] if source is cudf.NA: raise ValueError( @@ -223,7 +227,7 @@ def sssp(G, if G.renumbered: df = G.unrenumber(df, "vertex") df = G.unrenumber(df, "predecessor") - df["predecessor"].fillna(-1, inplace=True) + df.fillna(-1, inplace=True) return _convert_df_to_output_type(df, input_type, return_predecessors)