diff --git a/deepmd/entrypoints/test.py b/deepmd/entrypoints/test.py index 65887d8f91..1a5b18969c 100644 --- a/deepmd/entrypoints/test.py +++ b/deepmd/entrypoints/test.py @@ -9,10 +9,11 @@ from deepmd.utils import random as dp_random from deepmd.utils.data import DeepmdData from deepmd.utils.weight_avg import weighted_average +from deepmd.utils.batch_size import AutoBatchSize if TYPE_CHECKING: from deepmd.infer import DeepDipole, DeepPolar, DeepPot, DeepWFC - from deepmd.infer.deep_eval import DeepTensor + from deepmd.infer.deep_tensor import DeepTensor __all__ = ["test"] @@ -69,6 +70,7 @@ def test( # init model dp = DeepPotential(model) + auto_batch_size = AutoBatchSize() for cc, system in enumerate(all_sys): log.info("# ---------------output of dp test--------------- ") @@ -82,6 +84,7 @@ def test( err = test_ener( dp, data, + auto_batch_size, system, numb_test, detail_file, @@ -159,6 +162,7 @@ def save_txt_file( def test_ener( dp: "DeepPot", data: DeepmdData, + auto_batch_size: AutoBatchSize, system: str, numb_test: int, detail_file: Optional[str], @@ -226,7 +230,10 @@ def test_ener( else: aparam = None - ret = dp.eval( + ret = auto_batch_size.execute_all( + dp.eval, + numb_test, + natoms, coord, box, atype, diff --git a/deepmd/infer/deep_pot.py b/deepmd/infer/deep_pot.py index 63625905e8..b8d557ae76 100644 --- a/deepmd/infer/deep_pot.py +++ b/deepmd/infer/deep_pot.py @@ -324,7 +324,7 @@ def _eval_inner( feed_dict_test[self.t_fparam] = np.reshape(fparam, [-1]) if self.has_aparam: feed_dict_test[self.t_aparam] = np.reshape(aparam, [-1]) - v_out = self.sess.run (t_out, feed_dict = feed_dict_test) + v_out = run_sess(self.sess, t_out, feed_dict = feed_dict_test) energy = v_out[0] force = v_out[1] virial = v_out[2] diff --git a/deepmd/utils/batch_size.py b/deepmd/utils/batch_size.py new file mode 100644 index 0000000000..981572ecd8 --- /dev/null +++ b/deepmd/utils/batch_size.py @@ -0,0 +1,129 @@ +import logging +from typing import Callable, Tuple + +import numpy as np + +from deepmd.utils.errors import OutOfMemoryError + +class AutoBatchSize: + """This class allows DeePMD-kit to automatically decide the maximum + batch size that will not cause an OOM error. + + Notes + ----- + We assume all OOM error will raise :metd:`OutOfMemoryError`. + + Parameters + ---------- + initial_batch_size : int, default: 1024 + initial batch size (number of total atoms) + factor : float, default: 2. + increased factor + + Attributes + ---------- + current_batch_size : int + current batch size (number of total atoms) + maximum_working_batch_size : int + maximum working batch size + minimal_not_working_batch_size : int + minimal not working batch size + """ + def __init__(self, initial_batch_size: int = 1024, factor: float = 2.) -> None: + # See also PyTorchLightning/pytorch-lightning#1638 + # TODO: discuss a proper initial batch size + self.current_batch_size = initial_batch_size + self.maximum_working_batch_size = 0 + self.minimal_not_working_batch_size = 2**31 + self.factor = factor + + def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]: + """Excuate a method with given batch size. + + Parameters + ---------- + callable : Callable + The method should accept the batch size and start_index as parameters, + and returns executed batch size and data. + start_index : int + start index + natoms : int + natoms + + Returns + ------- + int + executed batch size * number of atoms + tuple + result from callable, None if failing to execute + + Raises + ------ + OutOfMemoryError + OOM when batch size is 1 + """ + try: + n_batch, result = callable(max(self.current_batch_size // natoms, 1), start_index) + except OutOfMemoryError as e: + # TODO: it's very slow to catch OOM error; I don't know what TF is doing here + # but luckily we only need to catch once + self.minimal_not_working_batch_size = min(self.minimal_not_working_batch_size, self.current_batch_size) + if self.maximum_working_batch_size >= self.minimal_not_working_batch_size: + self.maximum_working_batch_size = int(self.minimal_not_working_batch_size / self.factor) + if self.minimal_not_working_batch_size <= natoms: + raise OutOfMemoryError("The callable still throws an out-of-memory (OOM) error even when batch size is 1!") from e + # adjust the next batch size + self._adjust_batch_size(1./self.factor) + return 0, None + else: + n_tot = n_batch * natoms + self.maximum_working_batch_size = max(self.maximum_working_batch_size, n_tot) + # adjust the next batch size + if n_tot >= self.current_batch_size and self.current_batch_size * self.factor < self.minimal_not_working_batch_size: + self._adjust_batch_size(self.factor) + return n_batch, result + + def _adjust_batch_size(self, factor: float): + old_batch_size = self.current_batch_size + self.current_batch_size = int(self.current_batch_size * factor) + logging.info("Adjust batch size from %d to %d" % (old_batch_size, self.current_batch_size)) + + def execute_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs) -> Tuple[np.ndarray]: + """Excuate a method with all given data. + + Parameters + ---------- + callable : Callable + The method should accept *args and **kwargs as input and return the similiar array. + total_size : int + Total size + natoms : int + The number of atoms + **kwargs + If 2D np.ndarray, assume the first axis is batch; otherwise do nothing. + """ + def execute_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tuple[np.ndarray]]: + end_index = start_index + batch_size + end_index = min(end_index, total_size) + return (end_index - start_index), callable( + *[(vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for vv in args], + **{kk: (vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for kk, vv in kwargs.items()}, + ) + + index = 0 + results = [] + while index < total_size: + n_batch, result = self.execute(execute_with_batch_size, index, natoms) + if not isinstance(result, tuple): + result = (result,) + index += n_batch + if n_batch: + for rr in result: + rr.reshape((n_batch, -1)) + results.append(result) + + r = tuple([np.concatenate(r, axis=0) for r in zip(*results)]) + if len(r) == 1: + # avoid returning tuple if callable doesn't return tuple + r = r[0] + return r diff --git a/deepmd/utils/errors.py b/deepmd/utils/errors.py index 4a6617c055..c4579c43a1 100644 --- a/deepmd/utils/errors.py +++ b/deepmd/utils/errors.py @@ -3,3 +3,6 @@ class GraphTooLargeError(Exception): class GraphWithoutTensorError(Exception): pass + +class OutOfMemoryError(Exception): + """This error is caused by out-of-memory (OOM).""" \ No newline at end of file diff --git a/deepmd/utils/sess.py b/deepmd/utils/sess.py index 21f1581d35..07723c13c4 100644 --- a/deepmd/utils/sess.py +++ b/deepmd/utils/sess.py @@ -1,6 +1,7 @@ import os from deepmd.env import tf +from deepmd.utils.errors import OutOfMemoryError def run_sess(sess: tf.Session, *args, **kwargs): @@ -35,4 +36,4 @@ def run_sess(sess: tf.Session, *args, **kwargs): "variable (current value: %s).\n" % ( os.getenv("CUDA_VISIBLE_DEVICES", None), )) - raise RuntimeError(MESSAGE) from e + raise OutOfMemoryError(MESSAGE) from e diff --git a/doc/troubleshooting/model-compatability.md b/doc/troubleshooting/model-compatability.md index fcc73f6cb3..bc1b464047 100644 --- a/doc/troubleshooting/model-compatability.md +++ b/doc/troubleshooting/model-compatability.md @@ -4,7 +4,7 @@ When the version of DeePMD-kit used to training model is different from the that DeePMD-kit guarantees that the codes with the same major and minor revisions are compatible. That is to say v0.12.5 is compatible to v0.12.0, but is not compatible to v0.11.0 nor v1.0.0. -One can execuate `dp convert-from` to convert an old model to a new one. +One can execute `dp convert-from` to convert an old model to a new one. | Model version | v0.12 | v1.0 | v1.1 | v1.2 | v1.3 | v2.0 | |:-:|:-----------:|:----------:|:----------:|:----------:|:----------:|:----------:| @@ -12,5 +12,5 @@ One can execuate `dp convert-from` to convert an old model to a new one. **Legend**: - 😄: The model is compatible with the DeePMD-kit package. -- 😊: The model is incompatible with the DeePMD-kit package, but one can execuate `dp convert-from` to convert an old model to v2.0. +- 😊: The model is incompatible with the DeePMD-kit package, but one can execute `dp convert-from` to convert an old model to v2.0. - 😢: The model is incompatible with the DeePMD-kit package, and there is no way to convert models. diff --git a/source/tests/test_auto_batch_size.py b/source/tests/test_auto_batch_size.py new file mode 100644 index 0000000000..f8aa0b8e60 --- /dev/null +++ b/source/tests/test_auto_batch_size.py @@ -0,0 +1,42 @@ +import unittest + +import numpy as np + +from deepmd.utils.batch_size import AutoBatchSize +from deepmd.utils.errors import OutOfMemoryError + +class TestAutoBatchSize(unittest.TestCase): + def oom(self, batch_size, start_index): + if batch_size >= 512: + raise OutOfMemoryError + return batch_size, np.zeros((batch_size, 2)) + + def test_execute_oom(self): + # initial batch size 256 = 128 * 2 + auto_batch_size = AutoBatchSize(256, 2.) + # no error - 128 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + # no error - 256 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # error - 512 return 0, None + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 0) + self.assertIsNone(result) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + + def test_execute_all(self): + dd1 = np.zeros((10000, 2, 1)) + auto_batch_size = AutoBatchSize(256, 2.) + dd2 = auto_batch_size.execute_all(np.array, 10000, 2, dd1) + np.testing.assert_equal(dd1, dd2)