From 1117c549ab1a36e356fe5dfbf0e6760bc746eca6 Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Wed, 22 Sep 2021 19:42:44 -0400 Subject: [PATCH 1/3] automatic batch size for `dp test` Resolves #1149. We start nbatch * natoms from 1024 (or we can set a different number), and iteratively multiply it by 2 until catching the OOM error. A small issue is that it's a bit slow to catch the TF OOM error. It's a problem of TF and I don't know how to resolve it. Luckily we only need to catch once. --- deepmd/entrypoints/test.py | 11 +++- deepmd/infer/deep_pot.py | 2 +- deepmd/utils/batch_size.py | 117 +++++++++++++++++++++++++++++++++++++ deepmd/utils/errors.py | 3 + deepmd/utils/sess.py | 3 +- 5 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 deepmd/utils/batch_size.py diff --git a/deepmd/entrypoints/test.py b/deepmd/entrypoints/test.py index 65887d8f91..cb7644e525 100644 --- a/deepmd/entrypoints/test.py +++ b/deepmd/entrypoints/test.py @@ -9,10 +9,11 @@ from deepmd.utils import random as dp_random from deepmd.utils.data import DeepmdData from deepmd.utils.weight_avg import weighted_average +from deepmd.utils.batch_size import AutoBatchSize if TYPE_CHECKING: from deepmd.infer import DeepDipole, DeepPolar, DeepPot, DeepWFC - from deepmd.infer.deep_eval import DeepTensor + from deepmd.infer.deep_tensor import DeepTensor __all__ = ["test"] @@ -69,6 +70,7 @@ def test( # init model dp = DeepPotential(model) + auto_batch_size = AutoBatchSize() for cc, system in enumerate(all_sys): log.info("# ---------------output of dp test--------------- ") @@ -82,6 +84,7 @@ def test( err = test_ener( dp, data, + auto_batch_size, system, numb_test, detail_file, @@ -159,6 +162,7 @@ def save_txt_file( def test_ener( dp: "DeepPot", data: DeepmdData, + auto_batch_size: AutoBatchSize, system: str, numb_test: int, detail_file: Optional[str], @@ -226,7 +230,10 @@ def test_ener( else: aparam = None - ret = dp.eval( + ret = auto_batch_size.execuate_all( + dp.eval, + numb_test, + natoms, coord, box, atype, diff --git a/deepmd/infer/deep_pot.py b/deepmd/infer/deep_pot.py index 63625905e8..b8d557ae76 100644 --- a/deepmd/infer/deep_pot.py +++ b/deepmd/infer/deep_pot.py @@ -324,7 +324,7 @@ def _eval_inner( feed_dict_test[self.t_fparam] = np.reshape(fparam, [-1]) if self.has_aparam: feed_dict_test[self.t_aparam] = np.reshape(aparam, [-1]) - v_out = self.sess.run (t_out, feed_dict = feed_dict_test) + v_out = run_sess(self.sess, t_out, feed_dict = feed_dict_test) energy = v_out[0] force = v_out[1] virial = v_out[2] diff --git a/deepmd/utils/batch_size.py b/deepmd/utils/batch_size.py new file mode 100644 index 0000000000..020fb9419e --- /dev/null +++ b/deepmd/utils/batch_size.py @@ -0,0 +1,117 @@ +import logging +from typing import Callable, Tuple + +import numpy as np + +from deepmd.utils.errors import OutOfMemoryError + +class AutoBatchSize: + """This class allows DeePMD-kit to automatically decide the maximum + batch size that will not cause an OOM error. + + Notes + ----- + We assume all OOM error will raise :metd:`OutOfMemoryError`. + + Parameters + ---------- + initial_batch_size : int, default: 1024 + initial batch size (number of total atoms) + + Attributes + ---------- + current_batch_size : int + current batch size (number of total atoms) + maximum_working_batch_size : int + maximum working batch size + minimal_not_working_batch_size : int + minimal not working batch size + """ + def __init__(self, initial_batch_size: int = 1024) -> None: + # See also PyTorchLightning/pytorch-lightning#1638 + # TODO: discuss a proper initial batch size + self.current_batch_size = initial_batch_size + self.maximum_working_batch_size = 0 + self.minimal_not_working_batch_size = 2**31 + + def execuate(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]: + """Excuate a method with given batch size. + + Parameters + ---------- + callable : Callable + The method should accept the batch size and start_index as parameters, + and returns execuated batch size and data. + start_index : int + start index + natoms : int + natoms + + Returns + ------- + int + execuated batch size * number of atoms + tuple + result from callable, None if failing to execuate + """ + try: + n_batch, result = callable(max(self.current_batch_size // natoms, 1), start_index) + except OutOfMemoryError as e: + # TODO: it's very slow to catch OOM error; I don't know what TF is doing here + # but luckily we only need to catch once + self.minimal_not_working_batch_size = min(self.minimal_not_working_batch_size, self.current_batch_size) + if self.maximum_working_batch_size >= self.minimal_not_working_batch_size: + self.maximum_working_batch_size = self.minimal_not_working_batch_size // 2 + if self.minimal_not_working_batch_size <= natoms: + raise OutOfMemoryError("The callable still throws an out-of-memory (OOM) error even when batch size is 1!") from e + # adjust the next batch size + self._adjust_batch_size(0.5) + return 0, None + else: + n_tot = n_batch * natoms + self.maximum_working_batch_size = max(self.maximum_working_batch_size, n_tot) + # adjust the next batch size + if n_tot >= self.current_batch_size and self.current_batch_size * 2 < self.minimal_not_working_batch_size: + self._adjust_batch_size(2) + return n_batch, result + + def _adjust_batch_size(self, factor: float): + old_batch_size = self.current_batch_size + self.current_batch_size = int(self.current_batch_size * factor) + logging.info("Adjust batch size from %d to %d" % (old_batch_size, self.current_batch_size)) + + def execuate_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs) -> Tuple[np.ndarray]: + """Excuate a method with all given data. + + Parameters + ---------- + callable : Callable + The method should accept *args and **kwargs as input and return the similiar array. + total_size : int + Total size + natoms : int + The number of atoms + **kwargs + If 2D np.ndarray, assume the first axis is batch; otherwise do nothing. + """ + def execuate_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tuple[np.ndarray]]: + end_index = start_index + batch_size + end_index = min(end_index, total_size) + return (end_index - start_index), callable( + *[(vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for vv in args], + **{kk: (vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for kk, vv in kwargs.items()}, + ) + + index = 0 + results = [] + while index < total_size: + n_batch, result = self.execuate(execuate_with_batch_size, index, natoms) + if not isinstance(result, tuple): + result = (result,) + index += n_batch + if n_batch: + for rr in result: + rr.reshape((n_batch, -1)) + results.append(result) + + return tuple([np.concatenate(r, axis=0) for r in zip(*results)]) diff --git a/deepmd/utils/errors.py b/deepmd/utils/errors.py index 4a6617c055..c4579c43a1 100644 --- a/deepmd/utils/errors.py +++ b/deepmd/utils/errors.py @@ -3,3 +3,6 @@ class GraphTooLargeError(Exception): class GraphWithoutTensorError(Exception): pass + +class OutOfMemoryError(Exception): + """This error is caused by out-of-memory (OOM).""" \ No newline at end of file diff --git a/deepmd/utils/sess.py b/deepmd/utils/sess.py index 21f1581d35..07723c13c4 100644 --- a/deepmd/utils/sess.py +++ b/deepmd/utils/sess.py @@ -1,6 +1,7 @@ import os from deepmd.env import tf +from deepmd.utils.errors import OutOfMemoryError def run_sess(sess: tf.Session, *args, **kwargs): @@ -35,4 +36,4 @@ def run_sess(sess: tf.Session, *args, **kwargs): "variable (current value: %s).\n" % ( os.getenv("CUDA_VISIBLE_DEVICES", None), )) - raise RuntimeError(MESSAGE) from e + raise OutOfMemoryError(MESSAGE) from e From 152264792fd3934fd4664afd2fdbc1fc67e37b0c Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Wed, 22 Sep 2021 20:45:49 -0400 Subject: [PATCH 2/3] replace `execuate` with `execute` --- deepmd/entrypoints/test.py | 2 +- deepmd/utils/batch_size.py | 14 +++++++------- doc/troubleshooting/model-compatability.md | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/deepmd/entrypoints/test.py b/deepmd/entrypoints/test.py index cb7644e525..1a5b18969c 100644 --- a/deepmd/entrypoints/test.py +++ b/deepmd/entrypoints/test.py @@ -230,7 +230,7 @@ def test_ener( else: aparam = None - ret = auto_batch_size.execuate_all( + ret = auto_batch_size.execute_all( dp.eval, numb_test, natoms, diff --git a/deepmd/utils/batch_size.py b/deepmd/utils/batch_size.py index 020fb9419e..f2ed1c9b7f 100644 --- a/deepmd/utils/batch_size.py +++ b/deepmd/utils/batch_size.py @@ -34,14 +34,14 @@ def __init__(self, initial_batch_size: int = 1024) -> None: self.maximum_working_batch_size = 0 self.minimal_not_working_batch_size = 2**31 - def execuate(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]: + def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]: """Excuate a method with given batch size. Parameters ---------- callable : Callable The method should accept the batch size and start_index as parameters, - and returns execuated batch size and data. + and returns executed batch size and data. start_index : int start index natoms : int @@ -50,9 +50,9 @@ def execuate(self, callable: Callable, start_index: int, natoms: int) -> Tuple[i Returns ------- int - execuated batch size * number of atoms + executed batch size * number of atoms tuple - result from callable, None if failing to execuate + result from callable, None if failing to execute """ try: n_batch, result = callable(max(self.current_batch_size // natoms, 1), start_index) @@ -80,7 +80,7 @@ def _adjust_batch_size(self, factor: float): self.current_batch_size = int(self.current_batch_size * factor) logging.info("Adjust batch size from %d to %d" % (old_batch_size, self.current_batch_size)) - def execuate_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs) -> Tuple[np.ndarray]: + def execute_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs) -> Tuple[np.ndarray]: """Excuate a method with all given data. Parameters @@ -94,7 +94,7 @@ def execuate_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs If 2D np.ndarray, assume the first axis is batch; otherwise do nothing. """ - def execuate_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tuple[np.ndarray]]: + def execute_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tuple[np.ndarray]]: end_index = start_index + batch_size end_index = min(end_index, total_size) return (end_index - start_index), callable( @@ -105,7 +105,7 @@ def execuate_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tu index = 0 results = [] while index < total_size: - n_batch, result = self.execuate(execuate_with_batch_size, index, natoms) + n_batch, result = self.execute(execute_with_batch_size, index, natoms) if not isinstance(result, tuple): result = (result,) index += n_batch diff --git a/doc/troubleshooting/model-compatability.md b/doc/troubleshooting/model-compatability.md index fcc73f6cb3..bc1b464047 100644 --- a/doc/troubleshooting/model-compatability.md +++ b/doc/troubleshooting/model-compatability.md @@ -4,7 +4,7 @@ When the version of DeePMD-kit used to training model is different from the that DeePMD-kit guarantees that the codes with the same major and minor revisions are compatible. That is to say v0.12.5 is compatible to v0.12.0, but is not compatible to v0.11.0 nor v1.0.0. -One can execuate `dp convert-from` to convert an old model to a new one. +One can execute `dp convert-from` to convert an old model to a new one. | Model version | v0.12 | v1.0 | v1.1 | v1.2 | v1.3 | v2.0 | |:-:|:-----------:|:----------:|:----------:|:----------:|:----------:|:----------:| @@ -12,5 +12,5 @@ One can execuate `dp convert-from` to convert an old model to a new one. **Legend**: - 😄: The model is compatible with the DeePMD-kit package. -- 😊: The model is incompatible with the DeePMD-kit package, but one can execuate `dp convert-from` to convert an old model to v2.0. +- 😊: The model is incompatible with the DeePMD-kit package, but one can execute `dp convert-from` to convert an old model to v2.0. - 😢: The model is incompatible with the DeePMD-kit package, and there is no way to convert models. From 57c4d3c194711c5360bf840dacb3374e89a0b449 Mon Sep 17 00:00:00 2001 From: Jinzhe Zeng Date: Wed, 22 Sep 2021 23:23:34 -0400 Subject: [PATCH 3/3] add unittest; bugfix --- deepmd/utils/batch_size.py | 24 ++++++++++++---- source/tests/test_auto_batch_size.py | 42 ++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 6 deletions(-) create mode 100644 source/tests/test_auto_batch_size.py diff --git a/deepmd/utils/batch_size.py b/deepmd/utils/batch_size.py index f2ed1c9b7f..981572ecd8 100644 --- a/deepmd/utils/batch_size.py +++ b/deepmd/utils/batch_size.py @@ -17,6 +17,8 @@ class AutoBatchSize: ---------- initial_batch_size : int, default: 1024 initial batch size (number of total atoms) + factor : float, default: 2. + increased factor Attributes ---------- @@ -27,12 +29,13 @@ class AutoBatchSize: minimal_not_working_batch_size : int minimal not working batch size """ - def __init__(self, initial_batch_size: int = 1024) -> None: + def __init__(self, initial_batch_size: int = 1024, factor: float = 2.) -> None: # See also PyTorchLightning/pytorch-lightning#1638 # TODO: discuss a proper initial batch size self.current_batch_size = initial_batch_size self.maximum_working_batch_size = 0 self.minimal_not_working_batch_size = 2**31 + self.factor = factor def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]: """Excuate a method with given batch size. @@ -53,6 +56,11 @@ def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[in executed batch size * number of atoms tuple result from callable, None if failing to execute + + Raises + ------ + OutOfMemoryError + OOM when batch size is 1 """ try: n_batch, result = callable(max(self.current_batch_size // natoms, 1), start_index) @@ -61,18 +69,18 @@ def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[in # but luckily we only need to catch once self.minimal_not_working_batch_size = min(self.minimal_not_working_batch_size, self.current_batch_size) if self.maximum_working_batch_size >= self.minimal_not_working_batch_size: - self.maximum_working_batch_size = self.minimal_not_working_batch_size // 2 + self.maximum_working_batch_size = int(self.minimal_not_working_batch_size / self.factor) if self.minimal_not_working_batch_size <= natoms: raise OutOfMemoryError("The callable still throws an out-of-memory (OOM) error even when batch size is 1!") from e # adjust the next batch size - self._adjust_batch_size(0.5) + self._adjust_batch_size(1./self.factor) return 0, None else: n_tot = n_batch * natoms self.maximum_working_batch_size = max(self.maximum_working_batch_size, n_tot) # adjust the next batch size - if n_tot >= self.current_batch_size and self.current_batch_size * 2 < self.minimal_not_working_batch_size: - self._adjust_batch_size(2) + if n_tot >= self.current_batch_size and self.current_batch_size * self.factor < self.minimal_not_working_batch_size: + self._adjust_batch_size(self.factor) return n_batch, result def _adjust_batch_size(self, factor: float): @@ -114,4 +122,8 @@ def execute_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tup rr.reshape((n_batch, -1)) results.append(result) - return tuple([np.concatenate(r, axis=0) for r in zip(*results)]) + r = tuple([np.concatenate(r, axis=0) for r in zip(*results)]) + if len(r) == 1: + # avoid returning tuple if callable doesn't return tuple + r = r[0] + return r diff --git a/source/tests/test_auto_batch_size.py b/source/tests/test_auto_batch_size.py new file mode 100644 index 0000000000..f8aa0b8e60 --- /dev/null +++ b/source/tests/test_auto_batch_size.py @@ -0,0 +1,42 @@ +import unittest + +import numpy as np + +from deepmd.utils.batch_size import AutoBatchSize +from deepmd.utils.errors import OutOfMemoryError + +class TestAutoBatchSize(unittest.TestCase): + def oom(self, batch_size, start_index): + if batch_size >= 512: + raise OutOfMemoryError + return batch_size, np.zeros((batch_size, 2)) + + def test_execute_oom(self): + # initial batch size 256 = 128 * 2 + auto_batch_size = AutoBatchSize(256, 2.) + # no error - 128 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 128) + self.assertEqual(result.shape, (128, 2)) + # no error - 256 + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # error - 512 return 0, None + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 0) + self.assertIsNone(result) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + # 256 again + nb, result = auto_batch_size.execute(self.oom, 1, 2) + self.assertEqual(nb, 256) + self.assertEqual(result.shape, (256, 2)) + + def test_execute_all(self): + dd1 = np.zeros((10000, 2, 1)) + auto_batch_size = AutoBatchSize(256, 2.) + dd2 = auto_batch_size.execute_all(np.array, 10000, 2, dd1) + np.testing.assert_equal(dd1, dd2)