Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions deepmd/cluster/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
"""Get local GPU resources from `CUDA_VISIBLE_DEVICES` enviroment variable."""
"""Get local GPU resources."""

import os
import socket
import subprocess as sp
import sys

from deepmd.env import tf
from typing import List, Tuple, Optional

__all__ = ["get_resource"]

__all__ = ["get_gpus", "get_resource"]


def get_gpus():
"""Get available IDs of GPU cards at local.
These IDs are valid when used as the TensorFlow device ID.

Returns:
-------
Optional[List[int]]
List of available GPU IDs. Otherwise, None.
"""
test_cmd = 'from tensorflow.python.client import device_lib; ' \
'devices = device_lib.list_local_devices(); ' \
'gpus = [d.name for d in devices if d.device_type == "GPU"]; ' \
'print(len(gpus))'
with sp.Popen([sys.executable, "-c", test_cmd], stderr=sp.PIPE, stdout=sp.PIPE) as p:
stdout, stderr = p.communicate()
if p.returncode != 0:
decoded = stderr.decode('UTF-8')
raise RuntimeError('Failed to detect availbe GPUs due to:\n%s' % decoded)
decoded = stdout.decode('UTF-8').strip()
num_gpus = int(decoded)
return list(range(num_gpus)) if num_gpus > 0 else None


def get_resource() -> Tuple[str, List[str], Optional[List[int]]]:
Expand All @@ -17,10 +45,5 @@ def get_resource() -> Tuple[str, List[str], Optional[List[int]]]:
"""
nodename = socket.gethostname()
nodelist = [nodename]
gpus_env = os.getenv("CUDA_VISIBLE_DEVICES", None)
if not gpus_env:
gpus = None
else:
gpus = [gpu for gpu in gpus_env.split(",")]

gpus = get_gpus()
return nodename, nodelist, gpus
8 changes: 3 additions & 5 deletions deepmd/cluster/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import re
import os

from deepmd.cluster import local
from typing import List, Tuple, Optional, Iterable

__all__ = ["get_resource"]
Expand Down Expand Up @@ -45,11 +47,7 @@ def get_resource() -> Tuple[str, List[str], Optional[List[int]]]:
raise ValueError(
f"Nodename({nodename}) not in nodelist({nodelist}). This should not happen!"
)
gpus_env = os.getenv("CUDA_VISIBLE_DEVICES")
if not gpus_env:
gpus = None
else:
gpus = [int(gpu) for gpu in gpus_env.split(",")]
gpus = local.get_gpus()
return nodename, nodelist, gpus


Expand Down
80 changes: 17 additions & 63 deletions deepmd/train/run_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,57 +52,6 @@
)


def _is_distributed(HVD: "HVD") -> bool:
"""Check if there are more than one MPI processes.

Parameters
----------
HVD : HVD
Horovod object

Returns
-------
bool
True if we have more than 1 MPI process
"""
return HVD.size() > 1


def _distributed_task_config(
HVD: "HVD",
gpu_list: Optional[List[int]] = None
) -> Tuple[int, int, str]:
"""Create configuration for distributed tensorflow session.

Parameters
----------
HVD : horovod.tensorflow
Horovod TensorFlow module
gpu_list : Optional[List[int]], optional
the list of GPUs on each node, by default None

Returns
-------
Tuple[int, int, str]
task count, index of this task, the device for this task
"""
my_rank = HVD.rank()
world_size = HVD.size()

# setup gpu/cpu devices
if gpu_list is not None:
numb_gpu = len(gpu_list)
gpu_idx = HVD.local_rank()
if gpu_idx >= numb_gpu:
my_device = "cpu:0" # "cpu:%d" % node_task_idx
else:
my_device = f"gpu:{gpu_idx:d}"
else:
my_device = "cpu:0" # "cpu:%d" % node_task_idx

return world_size, my_rank, my_device


class RunOptions:
"""Class with inf oon how to run training (cluster, MPI and GPU config).

Expand Down Expand Up @@ -174,14 +123,14 @@ def print_resource_summary(self):
log.info("---Summary of the training---------------------------------------")
if self.is_distrib:
log.info("distributed")
log.info(f"world size: {self.world_size}")
log.info(f"world size: {self.world_size}")
log.info(f"my rank: {self.my_rank}")
log.info(f"node list: {self.nodelist}")
log.info(f"node list: {self.nodelist}")
log.info(f"running on: {self.nodename}")
if self.gpus is None:
log.info(f"CUDA_VISIBLE_DEVICES: unset")
else:
log.info(f"CUDA_VISIBLE_DEVICES: {self.gpus}")
log.info(f"computing device: {self.my_device}")
env_value = os.environ.get('CUDA_VISIBLE_DEVICES', 'unset')
log.info(f"CUDA_VISIBLE_DEVICES: {env_value}")
log.info(f"Count of visible GPU: {len(self.gpus or [])}")
intra, inter = get_tf_default_nthreads()
log.info(f"num_intra_threads: {intra:d}")
log.info(f"num_inter_threads: {inter:d}")
Expand Down Expand Up @@ -225,7 +174,7 @@ def _try_init_distrib(self):
try:
import horovod.tensorflow as HVD
HVD.init()
self.is_distrib = _is_distributed(HVD)
self.is_distrib = HVD.size() > 1
except ImportError:
log.warning("Switch to serial execution due to lack of horovod module.")
self.is_distrib = False
Expand All @@ -250,11 +199,16 @@ def _init_distributed(self, HVD: "HVD"):
self.nodename = nodename
self.nodelist = nodelist
self.gpus = gpus
(
self.world_size,
self.my_rank,
self.my_device,
) = _distributed_task_config(HVD, gpus)
self.my_rank = HVD.rank()
self.world_size = HVD.size()

if gpus is not None:
gpu_idx = HVD.local_rank()
if gpu_idx >= len(gpus):
raise RuntimeError('Count of local processes is larger than that of available GPUs!')
self.my_device = f"gpu:{gpu_idx:d}"
else:
self.my_device = "cpu:0"

def _init_serial(self):
"""Initialize setting for serial training."""
Expand Down