From e49c805ecc6c22dc1866e3ae74a24d8a48369d5f Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 09:09:39 +0000 Subject: [PATCH 01/33] add gradient accumulator --- tensorflow_addons/optimizers/__init__.py | 1 + .../optimizers/gradient_accumulator.py | 179 ++++++++++++++++++ .../tests/gradient_accumulator_test.py | 134 +++++++++++++ 3 files changed, 314 insertions(+) create mode 100644 tensorflow_addons/optimizers/gradient_accumulator.py create mode 100644 tensorflow_addons/optimizers/tests/gradient_accumulator_test.py diff --git a/tensorflow_addons/optimizers/__init__.py b/tensorflow_addons/optimizers/__init__.py index b8bc0109da..3cf79856c5 100644 --- a/tensorflow_addons/optimizers/__init__.py +++ b/tensorflow_addons/optimizers/__init__.py @@ -32,6 +32,7 @@ from tensorflow_addons.optimizers.lamb import LAMB from tensorflow_addons.optimizers.lazy_adam import LazyAdam from tensorflow_addons.optimizers.lookahead import Lookahead +from tensorflow_addons.optimizers.gradient_accumulator import GradientAccumulator from tensorflow_addons.optimizers.moving_average import MovingAverage from tensorflow_addons.optimizers.novograd import NovoGrad from tensorflow_addons.optimizers.proximal_adagrad import ProximalAdagrad diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py new file mode 100644 index 0000000000..432aa48639 --- /dev/null +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -0,0 +1,179 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import tensorflow as tf +from tensorflow_addons.utils import types +from typeguard import typechecked + + +@tf.keras.utils.register_keras_serializable(package="Addons") +class GradientAccumulator(tf.keras.optimizers.Optimizer): + """Optimizer wrapper for gradient accumulation.""" + + @typechecked + def __init__( + self, + optimizer: types.Optimizer, + accum_steps: types.TensorLike = 4, + name: str = "GradientAccumulator", + **kwargs, + ): + r"""Construct a new GradientAccumulator optimizer. + + Args: + optimizer: str or `tf.keras.optimizers.Optimizer` that will be + used to compute and apply gradients. + accum_steps: int > 0. Update gradient in every accumulation steps. + name: Optional name for the operations created when applying + gradients. Defaults to "GradientAccumulator". + **kwargs: keyword arguments. Allowed to be {`clipnorm`, + `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by + norm; `clipvalue` is clip gradients by value, `decay` is + included for backward compatibility to allow time inverse + decay of learning rate. `lr` is included for backward + compatibility, recommended to use `learning_rate` instead. + """ + super().__init__(name, **kwargs) + self._optimizer = tf.keras.optimizers.get(optimizer) + self._gradients = [] + self._accum_steps = accum_steps + + def _create_slots(self, var_list): + self._optimizer._create_slots(var_list=var_list) + for var in var_list: + self.add_slot(var, "ga") + + self._gradients = [self.get_slot(var, "ga") for var in var_list] + + @property + def gradients(self): + """The accumulated gradients on the current replica.""" + if not self._gradients: + raise ValueError( + "The accumulator should be called first to initialize the gradients" + ) + return list( + gradient.read_value() if gradient is not None else gradient + for gradient in self._gradients + ) + + def apply_gradients(self, grads_and_vars, name=None, **kwargs): + self._optimizer._iterations = self.iterations + return super().apply_gradients(grads_and_vars, name, **kwargs) + + def _resource_apply_dense(self, grad, var, apply_state=None): + accum_gradient = self.get_slot(var, "ga") + if accum_gradient is not None and grad is not None: + accum_gradient.assign_add( + grad, use_locking=self._use_locking, read_value=False + ) + + def _apply(): + if "apply_state" in self._optimizer._dense_apply_args: + train_op = self._optimizer._resource_apply_dense( + accum_gradient.read_value(), var, apply_state=apply_state + ) + else: + train_op = self._optimizer._resource_apply_dense( + accum_gradient.read_value(), var + ) + reset_op = accum_gradient.assign( + tf.zeros_like(accum_gradient), + use_locking=self._use_locking, + read_value=False, + ) + return tf.group(train_op, reset_op) + + apply_op = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + ) + return apply_op + + def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): + accum_gradient = self.get_slot(var, "ga") + if accum_gradient is not None and grad is not None: + self._resource_scatter_add(accum_gradient, indices, grad) + + def _apply(): + if "apply_state" in self._optimizer._sparse_apply_args: + train_op = self._optimizer._resource_apply_sparse( + accum_gradient.sparse_read(indices), + var, + indices, + apply_state=apply_state, + ) + else: + train_op = self._optimizer._resource_apply_sparse( + accum_gradient.sparse_read(indices), var, indices + ) + reset_op = accum_gradient.assign( + tf.zeros_like(accum_gradient), + use_locking=self._use_locking, + read_value=False, + ) + return tf.group(train_op, reset_op) + + apply_op = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + ) + return apply_op + + def reset(self): + """Resets the accumulated gradients on the current replica.""" + assign_ops = [] + if not self._gradients: + return assign_ops + + for gradient in self._gradients: + if gradient is not None: + assign_ops.append( + gradient.assign( + tf.zeros_like(gradient), + use_locking=self._use_locking, + read_value=False, + ) + ) + + return tf.group(assign_ops) + + @property + def lr(self): + return self._optimizer._get_hyper("learning_rate") + + @lr.setter + def lr(self, lr): + self._optimizer._set_hyper("learning_rate", lr) # + + @property + def learning_rate(self): + return self._optimizer._get_hyper("learning_rate") + + @learning_rate.setter + def learning_rate(self, learning_rate): + self._optimizer._set_hyper("learning_rate", learning_rate) + + def get_config(self): + config = { + "accum_steps": self._accum_steps, + "optimizer": tf.keras.optimizers.serialize(self._optimizer), + } + base_config = super().get_config() + return {**base_config, **config} + + @classmethod + def from_config(cls, config, custom_objects=None): + optimizer = tf.keras.optimizers.deserialize( + config.pop("optimizer"), custom_objects=custom_objects + ) + return cls(optimizer, **config) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py new file mode 100644 index 0000000000..a4596880cb --- /dev/null +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -0,0 +1,134 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for GradientAccumulator optimizers.""" + +import numpy as np +import pytest +import tensorflow as tf + +from tensorflow_addons.optimizers import GradientAccumulator + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_run(): + var0 = tf.Variable([1.0, 2.0]) + var1 = tf.Variable([3.0, 4.0]) + accum_steps = 4 + + grads0 = tf.constant([0.1, 0.1]) + grads1 = tf.constant([0.01, 0.01]) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) + + for _ in range(accum_steps): + opt.apply_gradients(grads_and_vars) + + np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) + np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_opt_failure(): + base_opt = None + with pytest.raises(TypeError): + GradientAccumulator(base_opt, 0.5) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_model_weights_not_update(): + grad = tf.Variable([[0.1]]) + model = tf.keras.Sequential( + [ + tf.keras.layers.Dense( + 1, + kernel_initializer=tf.keras.initializers.Constant([[1.0]]), + use_bias=False, + ) + ] + ) + model.build(input_shape=[1, 1]) + + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) + _ = opt.apply_gradients(list(zip([grad], model.variables))) + np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_optimizer_string(): + _ = GradientAccumulator("adam") + + +def test_config(): + sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) + accum_steps = 4 + opt = GradientAccumulator(sgd_opt, accum_steps=accum_steps) + config = opt.get_config() + + assert config["accum_steps"] == accum_steps + + new_opt = GradientAccumulator.from_config(config) + old_sgd_config = opt._optimizer.get_config() + new_sgd_config = new_opt._optimizer.get_config() + + for k1, k2 in zip(old_sgd_config, new_sgd_config): + assert old_sgd_config[k1] == new_sgd_config[k2] + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_fit_simple_linear_model(): + seed = 0x2019 + np.random.seed(seed) + tf.random.set_seed(seed) + num_examples = 5000 + x = np.random.standard_normal((num_examples, 3)) + w = np.random.standard_normal((3, 1)) + y = np.dot(x, w) + np.random.standard_normal((num_examples, 1)) * 1e-4 + + model = tf.keras.models.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) + + opt = GradientAccumulator("sgd") + model.compile(opt, loss="mse") + + model.fit(x, y, epochs=5) + + x = np.random.standard_normal((100, 3)) + y = np.dot(x, w) + + predicted = model.predict(x) + + max_abs_diff = np.max(np.abs(predicted - y)) + assert max_abs_diff < 5e-3 + + +def test_serialization(): + sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) + optimizer = GradientAccumulator(sgd_opt) + config = tf.keras.optimizers.serialize(optimizer) + new_optimizer = tf.keras.optimizers.deserialize(config) + assert new_optimizer.get_config() == optimizer.get_config() + + +@pytest.mark.usefixtures("run_with_mixed_precision_policy") +def test_model_mixed_precision(): + x = np.random.standard_normal((10000, 3)) + w = np.random.standard_normal((3, 1)) + y = np.dot(x, w) + np.random.standard_normal((10000, 1)) * 1e-4 + model = tf.keras.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) + model.compile(GradientAccumulator("sgd"), loss="mse") + model.fit(x, y, epochs=3) From 2c0fbae8d73ba82a3fd0562aa4b18ef47c5269ef Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 09:33:03 +0000 Subject: [PATCH 02/33] add exceptions --- tensorflow_addons/optimizers/tests/standard_test.py | 1 + tools/testing/source_code_test.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tensorflow_addons/optimizers/tests/standard_test.py b/tensorflow_addons/optimizers/tests/standard_test.py index f1d284ad68..3366c4f9a4 100644 --- a/tensorflow_addons/optimizers/tests/standard_test.py +++ b/tensorflow_addons/optimizers/tests/standard_test.py @@ -29,6 +29,7 @@ "ConditionalGradient", # is wrapper "Lookahead", # is wrapper "MovingAverage", # is wrapper + "GradientAccumulator", # is wrapper ] diff --git a/tools/testing/source_code_test.py b/tools/testing/source_code_test.py index c54bf73ea2..299e612078 100644 --- a/tools/testing/source_code_test.py +++ b/tools/testing/source_code_test.py @@ -124,6 +124,7 @@ def test_no_tf_cond(): "tensorflow_addons/metrics/cohens_kappa.py", "tensorflow_addons/seq2seq/sampler.py", "tensorflow_addons/seq2seq/beam_search_decoder.py", + "tensorflow_addons/optimizers/gradient_accumulator.py", ] for file_path, line_idx, line in get_lines_of_source_code(allowlist): From 11e536dc5559dae3d383ef037ceff6e85fb4f88a Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 12:25:46 +0000 Subject: [PATCH 03/33] fix multi gpus bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 432aa48639..45ab758071 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -96,7 +96,7 @@ def _apply(): return tf.group(train_op, reset_op) apply_op = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op @@ -125,7 +125,7 @@ def _apply(): return tf.group(train_op, reset_op) apply_op = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, _apply, lambda: tf.no_op() + self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() ) return apply_op From 1a4c0d495c388c9e6d62884276c9043fa2e17783 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 12:32:26 +0000 Subject: [PATCH 04/33] fix test bugs --- .../optimizers/tests/gradient_accumulator_test.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index a4596880cb..a541ebd05c 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -34,18 +34,11 @@ def test_run(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) - for _ in range(accum_steps): + for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) - np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) - - -@pytest.mark.usefixtures("maybe_run_functions_eagerly") -def test_opt_failure(): - base_opt = None - with pytest.raises(TypeError): - GradientAccumulator(base_opt, 0.5) + np.testing.assert_allclose(var0.read_value(), [0.5, 1.5]) + np.testing.assert_allclose(var1.read_value(), [2.95, 3.95]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -64,7 +57,7 @@ def test_model_weights_not_update(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) _ = opt.apply_gradients(list(zip([grad], model.variables))) - np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) + np.testing.assert_allclose(model.variables[0].read_value(), [[0.8]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") From eabed95fde398a3c64fb4e2ce6a93747a2824dc7 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 13:37:56 +0000 Subject: [PATCH 05/33] fix sparse optimizer --- .../optimizers/gradient_accumulator.py | 4 +-- .../tests/gradient_accumulator_test.py | 34 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 45ab758071..19096195a2 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -108,14 +108,14 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.sparse_read(indices), + accum_gradient.read_value(), var, indices, apply_state=apply_state, ) else: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.sparse_read(indices), var, indices + accum_gradient.read_value(), var, indices ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index a541ebd05c..34d33eae74 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -17,6 +17,7 @@ import numpy as np import pytest import tensorflow as tf +from tensorflow_addons.utils import test_utils from tensorflow_addons.optimizers import GradientAccumulator @@ -42,7 +43,26 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") -def test_model_weights_not_update(): +def test_sparse(): + var0 = tf.Variable([1.0, 2.0]) + var1 = tf.Variable([3.0, 4.0]) + + grads0 = tf.constant([0.1, 0.1]) + grads1 = tf.constant([0.01, 0.01]) + grads0_np_indices = tf.constant([0, 1], dtype=tf.int32) + grads0 = tf.IndexedSlices(grads0, grads0_np_indices, tf.constant([2])) + grads1_np_indices = tf.constant([0, 1], dtype=tf.int32) + grads1 = tf.IndexedSlices(grads1, grads1_np_indices, tf.constant([2])) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + opt.apply_gradients(grads_and_vars) + np.testing.assert_allclose(var0.read_value(), [0.9, 1.9]) + np.testing.assert_allclose(var1.read_value(), [2.99, 3.99]) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_dense(): grad = tf.Variable([[0.1]]) model = tf.keras.Sequential( [ @@ -82,6 +102,7 @@ def test_config(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.needs_gpu def test_fit_simple_linear_model(): seed = 0x2019 np.random.seed(seed) @@ -90,12 +111,13 @@ def test_fit_simple_linear_model(): x = np.random.standard_normal((num_examples, 3)) w = np.random.standard_normal((3, 1)) y = np.dot(x, w) + np.random.standard_normal((num_examples, 1)) * 1e-4 + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + with strategy.scope(): + model = tf.keras.models.Sequential() + model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - model = tf.keras.models.Sequential() - model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - - opt = GradientAccumulator("sgd") - model.compile(opt, loss="mse") + opt = GradientAccumulator("sgd") + model.compile(opt, loss="mse") model.fit(x, y, epochs=5) From a6ff7c014f97dd51266d81094499aca0a40722eb Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 14:06:43 +0000 Subject: [PATCH 06/33] remove read_value --- tensorflow_addons/optimizers/gradient_accumulator.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 19096195a2..24bcabe018 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -82,12 +82,10 @@ def _resource_apply_dense(self, grad, var, apply_state=None): def _apply(): if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( - accum_gradient.read_value(), var, apply_state=apply_state + accum_gradient, var, apply_state=apply_state ) else: - train_op = self._optimizer._resource_apply_dense( - accum_gradient.read_value(), var - ) + train_op = self._optimizer._resource_apply_dense(accum_gradient, var) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, @@ -108,14 +106,14 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.read_value(), + accum_gradient, var, indices, apply_state=apply_state, ) else: train_op = self._optimizer._resource_apply_sparse( - accum_gradient.read_value(), var, indices + accum_gradient, var, indices ) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), From 24ae8a904effe7c65a8ac5ec5f0a2a9e019db472 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 15:30:06 +0000 Subject: [PATCH 07/33] fix sparse test --- .../tests/gradient_accumulator_test.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 34d33eae74..aff641fcb7 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -44,21 +44,25 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_sparse(): - var0 = tf.Variable([1.0, 2.0]) - var1 = tf.Variable([3.0, 4.0]) + var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var1 = tf.Variable([[3.0, 4.0, 0.0]]) - grads0 = tf.constant([0.1, 0.1]) - grads1 = tf.constant([0.01, 0.01]) - grads0_np_indices = tf.constant([0, 1], dtype=tf.int32) - grads0 = tf.IndexedSlices(grads0, grads0_np_indices, tf.constant([2])) - grads1_np_indices = tf.constant([0, 1], dtype=tf.int32) - grads1 = tf.IndexedSlices(grads1, grads1_np_indices, tf.constant([2])) + grads0 = tf.IndexedSlices( + tf.constant([[0.1, 0.1, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + grads1 = tf.IndexedSlices( + tf.constant([[0.01, 0.01, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.9, 1.9]) - np.testing.assert_allclose(var1.read_value(), [2.99, 3.99]) + np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") From 2760fadfbd33526932d126559878dedf98899824 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 16:15:19 +0000 Subject: [PATCH 08/33] fix sparse bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 24bcabe018..bdb3104846 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -105,16 +105,13 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply(): if "apply_state" in self._optimizer._sparse_apply_args: - train_op = self._optimizer._resource_apply_sparse( + train_op = self._optimizer._resource_apply_dense( accum_gradient, var, - indices, apply_state=apply_state, ) else: - train_op = self._optimizer._resource_apply_sparse( - accum_gradient, var, indices - ) + train_op = self._optimizer._resource_apply_dense(accum_gradient, var) reset_op = accum_gradient.assign( tf.zeros_like(accum_gradient), use_locking=self._use_locking, From 4ba7a559702f4850a7880a9dba39b032e1a7e9b5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Thu, 15 Jul 2021 16:22:09 +0000 Subject: [PATCH 09/33] refactor --- .../optimizers/gradient_accumulator.py | 24 ++++--------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index bdb3104846..203b7d4d00 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -79,32 +79,18 @@ def _resource_apply_dense(self, grad, var, apply_state=None): grad, use_locking=self._use_locking, read_value=False ) - def _apply(): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - accum_gradient, var, apply_state=apply_state - ) - else: - train_op = self._optimizer._resource_apply_dense(accum_gradient, var) - reset_op = accum_gradient.assign( - tf.zeros_like(accum_gradient), - use_locking=self._use_locking, - read_value=False, - ) - return tf.group(train_op, reset_op) - - apply_op = tf.cond( - self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() - ) - return apply_op + return self._apply_grad(accum_gradient, var, apply_state) def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): accum_gradient = self.get_slot(var, "ga") if accum_gradient is not None and grad is not None: self._resource_scatter_add(accum_gradient, indices, grad) + return self._apply_grad(accum_gradient, var, apply_state) + + def _apply_grad(self, accum_gradient, var, apply_state): def _apply(): - if "apply_state" in self._optimizer._sparse_apply_args: + if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( accum_gradient, var, From dc50184d03b262ee0afd1e327037b312bb8e65e5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Fri, 16 Jul 2021 00:31:19 +0000 Subject: [PATCH 10/33] add sparse multi gpu test --- .../tests/gradient_accumulator_test.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index aff641fcb7..7e46716a55 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -65,6 +65,32 @@ def test_sparse(): np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.needs_gpu +def test_sparse_multi_gpus(): + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + with strategy.scope(): + var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var1 = tf.Variable([[3.0, 4.0, 0.0]]) + + grads0 = tf.IndexedSlices( + tf.constant([[0.1, 0.1, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + grads1 = tf.IndexedSlices( + tf.constant([[0.01, 0.01, 0.0]]), + tf.constant([0]), + tf.constant([1, 3]), + ) + + grads_and_vars = list(zip([grads0, grads1], [var0, var1])) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + strategy.run(opt.apply_gradients, [grads_and_vars]) + np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + + @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_dense(): grad = tf.Variable([[0.1]]) From 8cd65ad6880d7da20a875d8a6619faa89735fc51 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Sun, 18 Jul 2021 03:22:45 +0000 Subject: [PATCH 11/33] fix rnn bug --- .../optimizers/gradient_accumulator.py | 50 +++++++++++-------- .../tests/gradient_accumulator_test.py | 15 +++--- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 203b7d4d00..68bd5dc07c 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -69,8 +69,15 @@ def gradients(self): ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): - self._optimizer._iterations = self.iterations - return super().apply_gradients(grads_and_vars, name, **kwargs) + train_op = super().apply_gradients(grads_and_vars, name, **kwargs) + with tf.control_dependencies([train_op]): + assign_op = self._optimizer.iterations.assign_add( + tf.cast( + tf.where(self.iterations % self._accum_steps == 0, 1, 0), tf.int64 + ), + read_value=False, + ) + return assign_op def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, "ga") @@ -89,26 +96,29 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta return self._apply_grad(accum_gradient, var, apply_state) def _apply_grad(self, accum_gradient, var, apply_state): - def _apply(): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - accum_gradient, - var, - apply_state=apply_state, - ) - else: - train_op = self._optimizer._resource_apply_dense(accum_gradient, var) - reset_op = accum_gradient.assign( - tf.zeros_like(accum_gradient), - use_locking=self._use_locking, - read_value=False, + grad = tf.where( + (self.iterations + 1) % self._accum_steps == 0, + accum_gradient, + tf.zeros_like(var), + ) + if "apply_state" in self._optimizer._dense_apply_args: + train_op = self._optimizer._resource_apply_dense( + grad, + var, + apply_state=apply_state, ) - return tf.group(train_op, reset_op) - - apply_op = tf.cond( - self.iterations % self._accum_steps == 0, _apply, lambda: tf.no_op() + else: + train_op = self._optimizer._resource_apply_dense(grad, var) + reset_val = tf.where( + grad == accum_gradient, tf.zeros_like(accum_gradient), accum_gradient ) - return apply_op + reset_op = accum_gradient.assign( + reset_val, + use_locking=self._use_locking, + read_value=False, + ) + + return tf.group(train_op, reset_op) def reset(self): """Resets the accumulated gradients on the current replica.""" diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 7e46716a55..fada07a22a 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -38,8 +38,8 @@ def test_run(): for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [0.5, 1.5]) - np.testing.assert_allclose(var1.read_value(), [2.95, 3.95]) + np.testing.assert_allclose(var0.read_value(), [0.6, 1.6]) + np.testing.assert_allclose(var1.read_value(), [2.96, 3.96]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -61,8 +61,8 @@ def test_sparse(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -87,8 +87,8 @@ def test_sparse_multi_gpus(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) strategy.run(opt.apply_gradients, [grads_and_vars]) - np.testing.assert_allclose(var0.read_value(), [[0.9, 1.9, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[2.99, 3.99, 0.0]]) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -107,7 +107,7 @@ def test_dense(): opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) _ = opt.apply_gradients(list(zip([grad], model.variables))) - np.testing.assert_allclose(model.variables[0].read_value(), [[0.8]]) + np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -168,6 +168,7 @@ def test_serialization(): assert new_optimizer.get_config() == optimizer.get_config() +@pytest.mark.usefixtures("maybe_run_functions_eagerly") @pytest.mark.usefixtures("run_with_mixed_precision_policy") def test_model_mixed_precision(): x = np.random.standard_normal((10000, 3)) From 7d40946e53ef4f3b9f99819b85023e28c95a070e Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 00:35:02 +0000 Subject: [PATCH 12/33] fix step bugs --- .../optimizers/gradient_accumulator.py | 69 +++++++++++++++---- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 68bd5dc07c..dbf96589cb 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -24,7 +24,7 @@ class GradientAccumulator(tf.keras.optimizers.Optimizer): @typechecked def __init__( self, - optimizer: types.Optimizer, + inner_optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, name: str = "GradientAccumulator", **kwargs, @@ -32,7 +32,7 @@ def __init__( r"""Construct a new GradientAccumulator optimizer. Args: - optimizer: str or `tf.keras.optimizers.Optimizer` that will be + inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be used to compute and apply gradients. accum_steps: int > 0. Update gradient in every accumulation steps. name: Optional name for the operations created when applying @@ -44,10 +44,12 @@ def __init__( decay of learning rate. `lr` is included for backward compatibility, recommended to use `learning_rate` instead. """ - super().__init__(name, **kwargs) - self._optimizer = tf.keras.optimizers.get(optimizer) + self._optimizer = tf.keras.optimizers.get(inner_optimizer) self._gradients = [] self._accum_steps = accum_steps + self._step = None + self._iteraions = self._optimizer.iterations + super().__init__(name, **kwargs) def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) @@ -56,6 +58,32 @@ def _create_slots(self, var_list): self._gradients = [self.get_slot(var, "ga") for var in var_list] + @property + def step(self): + """Variable. The number of training steps this Optimizer has run.""" + if self._step is None: + with self._distribution_strategy_scope(): + self._step = self.add_weight( + "iter", + shape=[], + initializer="ones", + dtype=tf.int64, + trainable=False, + aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA, + ) + self._weights.append(self._step) + return self._step + + @step.setter + def step(self, variable): + if self._step is not None: + raise RuntimeError( + "Cannot set `step` to a new Variable after " + "the Optimizer weights have been created" + ) + self._step = variable + self._weights.append(self._step) + @property def gradients(self): """The accumulated gradients on the current replica.""" @@ -71,13 +99,17 @@ def gradients(self): def apply_gradients(self, grads_and_vars, name=None, **kwargs): train_op = super().apply_gradients(grads_and_vars, name, **kwargs) with tf.control_dependencies([train_op]): - assign_op = self._optimizer.iterations.assign_add( - tf.cast( - tf.where(self.iterations % self._accum_steps == 0, 1, 0), tf.int64 - ), - read_value=False, - ) - return assign_op + with tf.control_dependencies( + [ + self._optimizer.iterations.assign_add( + tf.cast( + tf.where(self.step % self._accum_steps == 0, 1, 0), tf.int64 + ), + read_value=False, + ) + ] + ): + return self.step.assign_add(1, read_value=False) def _resource_apply_dense(self, grad, var, apply_state=None): accum_gradient = self.get_slot(var, "ga") @@ -97,7 +129,7 @@ def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_sta def _apply_grad(self, accum_gradient, var, apply_state): grad = tf.where( - (self.iterations + 1) % self._accum_steps == 0, + self.step % self._accum_steps == 0, accum_gradient, tf.zeros_like(var), ) @@ -138,6 +170,19 @@ def reset(self): return tf.group(assign_ops) + @property + def inner_optimizer(self): + """The optimizer that this LossScaleOptimizer is wrapping.""" + return self._optimizer + + @property + def iterations(self): + return self._optimizer.iterations + + @iterations.setter + def iterations(self, variable): + self._optimizer.iterations = variable + @property def lr(self): return self._optimizer._get_hyper("learning_rate") From 6949bd3c6cfa36e650207a77be62d2aaab495600 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 01:13:11 +0000 Subject: [PATCH 13/33] fix _iterations --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index dbf96589cb..32009aa4ae 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -44,12 +44,12 @@ def __init__( decay of learning rate. `lr` is included for backward compatibility, recommended to use `learning_rate` instead. """ + super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(inner_optimizer) self._gradients = [] self._accum_steps = accum_steps self._step = None - self._iteraions = self._optimizer.iterations - super().__init__(name, **kwargs) + self._iterations = self._optimizer.iterations def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) From 9e423e57b0c239ca0314e64980b9ba487c919c2c Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:28:20 +0000 Subject: [PATCH 14/33] use gradient transformer --- .../optimizers/gradient_accumulator.py | 98 ++++++++++++------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 32009aa4ae..0be15e8b87 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -46,17 +46,60 @@ def __init__( """ super().__init__(name, **kwargs) self._optimizer = tf.keras.optimizers.get(inner_optimizer) - self._gradients = [] - self._accum_steps = accum_steps self._step = None + self._gradients = {} + self._accum_steps = accum_steps + + def _accum_grad(grads_and_vars): + with tf.init_scope(): + if not self._gradients: + for grad, var in grads_and_vars: + if tf.distribute.has_strategy(): + for v in var.values: + self._gradients[v.ref()] = tf.Variable( + tf.zeros_like(v), trainable=False + ) + else: + self._gradients[var.ref()] = tf.Variable( + tf.zeros_like(var), trainable=False + ) + new_grads_and_vars = [] + for grad, var in grads_and_vars: + if tf.distribute.has_strategy(): + replica_id = tf.get_static_value( + tf.distribute.get_replica_context().replica_id_in_sync_group + ) + handle = self._gradients[var.values[replica_id].ref()] + else: + handle = self._gradients[var.ref()] + + if isinstance(grad, tf.IndexedSlices): + handle.scatter_add(grad) + fake_grad = tf.IndexedSlices( + tf.zeros_like(grad.values), grad.indices, grad.dense_shape + ) + else: + handle.assign_add(grad) + fake_grad = tf.zeros_like(var) + + def _get_grad(): + new_grad = handle.read_value() + handle.assign(tf.zeros_like(handle), use_locking=self._use_locking) + return new_grad + + new_grad = tf.cond( + (self.iterations + 1) % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) + return new_grads_and_vars + + self.gradient_transformers.append(_accum_grad) self._iterations = self._optimizer.iterations def _create_slots(self, var_list): self._optimizer._create_slots(var_list=var_list) - for var in var_list: - self.add_slot(var, "ga") - - self._gradients = [self.get_slot(var, "ga") for var in var_list] @property def step(self): @@ -93,7 +136,7 @@ def gradients(self): ) return list( gradient.read_value() if gradient is not None else gradient - for gradient in self._gradients + for _, gradient in self._gradients ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): @@ -112,27 +155,6 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): return self.step.assign_add(1, read_value=False) def _resource_apply_dense(self, grad, var, apply_state=None): - accum_gradient = self.get_slot(var, "ga") - if accum_gradient is not None and grad is not None: - accum_gradient.assign_add( - grad, use_locking=self._use_locking, read_value=False - ) - - return self._apply_grad(accum_gradient, var, apply_state) - - def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): - accum_gradient = self.get_slot(var, "ga") - if accum_gradient is not None and grad is not None: - self._resource_scatter_add(accum_gradient, indices, grad) - - return self._apply_grad(accum_gradient, var, apply_state) - - def _apply_grad(self, accum_gradient, var, apply_state): - grad = tf.where( - self.step % self._accum_steps == 0, - accum_gradient, - tf.zeros_like(var), - ) if "apply_state" in self._optimizer._dense_apply_args: train_op = self._optimizer._resource_apply_dense( grad, @@ -141,16 +163,16 @@ def _apply_grad(self, accum_gradient, var, apply_state): ) else: train_op = self._optimizer._resource_apply_dense(grad, var) - reset_val = tf.where( - grad == accum_gradient, tf.zeros_like(accum_gradient), accum_gradient - ) - reset_op = accum_gradient.assign( - reset_val, - use_locking=self._use_locking, - read_value=False, - ) + return train_op - return tf.group(train_op, reset_op) + def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): + if "apply_state" in self._optimizer._sparse_apply_args: + train_op = self._optimizer._resource_apply_sparse( + grad, var, indices, apply_state=apply_state + ) + else: + train_op = self._optimizer._resource_apply_sparse(grad, var, indices) + return train_op def reset(self): """Resets the accumulated gradients on the current replica.""" @@ -158,7 +180,7 @@ def reset(self): if not self._gradients: return assign_ops - for gradient in self._gradients: + for _, gradient in self._gradients: if gradient is not None: assign_ops.append( gradient.assign( From 7f3b2e921cbe48e447f3feba470a7ac08198cfd3 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:34:51 +0000 Subject: [PATCH 15/33] fix bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 0be15e8b87..2847d0e2f4 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -88,7 +88,7 @@ def _get_grad(): return new_grad new_grad = tf.cond( - (self.iterations + 1) % self._accum_steps == 0, + (self.step + 1) % self._accum_steps == 0, _get_grad, lambda: fake_grad, ) From 99dcde51d1046cebea8c28f2848e2426b96e761e Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 06:40:49 +0000 Subject: [PATCH 16/33] fix step bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 2847d0e2f4..2b2b79938e 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -88,7 +88,7 @@ def _get_grad(): return new_grad new_grad = tf.cond( - (self.step + 1) % self._accum_steps == 0, + self.step % self._accum_steps == 0, _get_grad, lambda: fake_grad, ) From a1845810c514a387fc6e0621ca6cee5e1bcddd60 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 08:25:41 +0000 Subject: [PATCH 17/33] simpify code --- .../optimizers/gradient_accumulator.py | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 2b2b79938e..dd422e182f 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -95,7 +95,7 @@ def _get_grad(): new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars - self.gradient_transformers.append(_accum_grad) + self._optimizer.gradient_transformers.append(_accum_grad) self._iterations = self._optimizer.iterations def _create_slots(self, var_list): @@ -140,7 +140,7 @@ def gradients(self): ) def apply_gradients(self, grads_and_vars, name=None, **kwargs): - train_op = super().apply_gradients(grads_and_vars, name, **kwargs) + train_op = self._optimizer.apply_gradients(grads_and_vars, name, **kwargs) with tf.control_dependencies([train_op]): with tf.control_dependencies( [ @@ -154,26 +154,6 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): ): return self.step.assign_add(1, read_value=False) - def _resource_apply_dense(self, grad, var, apply_state=None): - if "apply_state" in self._optimizer._dense_apply_args: - train_op = self._optimizer._resource_apply_dense( - grad, - var, - apply_state=apply_state, - ) - else: - train_op = self._optimizer._resource_apply_dense(grad, var) - return train_op - - def _resource_apply_sparse(self, grad: types.TensorLike, var, indices, apply_state): - if "apply_state" in self._optimizer._sparse_apply_args: - train_op = self._optimizer._resource_apply_sparse( - grad, var, indices, apply_state=apply_state - ) - else: - train_op = self._optimizer._resource_apply_sparse(grad, var, indices) - return train_op - def reset(self): """Resets the accumulated gradients on the current replica.""" assign_ops = [] From d0718f851e65cbc06961d6407cf22d1faa86bd29 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 10:39:00 +0000 Subject: [PATCH 18/33] optimize --- .../optimizers/gradient_accumulator.py | 49 ++++++++++++++----- .../tests/gradient_accumulator_test.py | 15 +++--- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index dd422e182f..87b02492ff 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -78,21 +78,48 @@ def _accum_grad(grads_and_vars): fake_grad = tf.IndexedSlices( tf.zeros_like(grad.values), grad.indices, grad.dense_shape ) + + def _get_grad(): + new_grad = handle.read_value() + indices = tf.nest.flatten( + tf.where( + tf.reduce_sum( + new_grad, axis=list(range(len(new_grad.shape))[1:]) + ) + != 0 + )[0] + ) + values = tf.gather(new_grad, indices) + dense_shape = new_grad.shape + new_grad = tf.IndexedSlices(values, indices, dense_shape) + handle.assign( + tf.zeros_like(handle), use_locking=self._use_locking + ) + return new_grad + + new_grad = tf.cond( + self.step % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) fake_grad = tf.zeros_like(var) - def _get_grad(): - new_grad = handle.read_value() - handle.assign(tf.zeros_like(handle), use_locking=self._use_locking) - return new_grad - - new_grad = tf.cond( - self.step % self._accum_steps == 0, - _get_grad, - lambda: fake_grad, - ) - new_grads_and_vars.append((new_grad, var)) + def _get_grad(): + new_grad = handle.read_value() + handle.assign( + tf.zeros_like(handle), use_locking=self._use_locking + ) + return new_grad + + new_grad = tf.cond( + self.step % self._accum_steps == 0, + _get_grad, + lambda: fake_grad, + ) + new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars self._optimizer.gradient_transformers.append(_accum_grad) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index fada07a22a..7fe4171edd 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -44,12 +44,12 @@ def test_run(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_sparse(): - var0 = tf.Variable([[1.0, 2.0, 0.0]]) + var0 = tf.Variable([[1.0, 2.0, 0.0], [1.0, 2.0, 0.0]]) var1 = tf.Variable([[3.0, 4.0, 0.0]]) grads0 = tf.IndexedSlices( tf.constant([[0.1, 0.1, 0.0]]), - tf.constant([0]), + tf.constant([1]), tf.constant([1, 3]), ) grads1 = tf.IndexedSlices( @@ -59,10 +59,11 @@ def test_sparse(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) - opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) - np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) + for _ in range(8): + opt.apply_gradients(grads_and_vars) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]]) + np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -85,7 +86,7 @@ def test_sparse_multi_gpus(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0, momentum=0.1)) + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) From 2af54758a832ccb74c386916a8b9fd7db0bbf2a0 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 10:41:04 +0000 Subject: [PATCH 19/33] fix bug --- tensorflow_addons/optimizers/gradient_accumulator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 87b02492ff..b298ccf35a 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -87,7 +87,7 @@ def _get_grad(): new_grad, axis=list(range(len(new_grad.shape))[1:]) ) != 0 - )[0] + ) ) values = tf.gather(new_grad, indices) dense_shape = new_grad.shape From 42fccea6ffd77317d2ce072015438d60c141fa94 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 11:54:36 +0000 Subject: [PATCH 20/33] fix bug --- .../optimizers/gradient_accumulator.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index b298ccf35a..c392239039 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -75,33 +75,36 @@ def _accum_grad(grads_and_vars): if isinstance(grad, tf.IndexedSlices): handle.scatter_add(grad) - fake_grad = tf.IndexedSlices( - tf.zeros_like(grad.values), grad.indices, grad.dense_shape - ) def _get_grad(): new_grad = handle.read_value() - indices = tf.nest.flatten( + indices = tf.squeeze( tf.where( tf.reduce_sum( new_grad, axis=list(range(len(new_grad.shape))[1:]) ) != 0 - ) + ), + axis=-1, ) + values = tf.gather(new_grad, indices) - dense_shape = new_grad.shape - new_grad = tf.IndexedSlices(values, indices, dense_shape) + dense_shape = tf.constant(new_grad.shape.as_list()) handle.assign( tf.zeros_like(handle), use_locking=self._use_locking ) - return new_grad + return values, tf.cast(indices, tf.int32), dense_shape - new_grad = tf.cond( + values, indices, dense_shape = tf.cond( self.step % self._accum_steps == 0, _get_grad, - lambda: fake_grad, + lambda: ( + tf.zeros_like(grad.values), + grad.indices, + grad.dense_shape, + ), ) + new_grad = tf.IndexedSlices(values, indices, dense_shape) new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) From 93794ec1ddf53c132290d53f7c408c0e896d2bf4 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 12:12:05 +0000 Subject: [PATCH 21/33] simpify code --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index c392239039..0c302e4c0b 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -175,9 +175,7 @@ def apply_gradients(self, grads_and_vars, name=None, **kwargs): with tf.control_dependencies( [ self._optimizer.iterations.assign_add( - tf.cast( - tf.where(self.step % self._accum_steps == 0, 1, 0), tf.int64 - ), + tf.cast(self.step % self._accum_steps == 0, tf.int64), read_value=False, ) ] From e62cc95b5fd23e1a700b846e0384ce5b18750eb5 Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Mon, 19 Jul 2021 12:37:15 +0000 Subject: [PATCH 22/33] add mean reduction --- tensorflow_addons/optimizers/gradient_accumulator.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 0c302e4c0b..4bcbf8ccc7 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -26,6 +26,7 @@ def __init__( self, inner_optimizer: types.Optimizer, accum_steps: types.TensorLike = 4, + reduction: str = "SUM", name: str = "GradientAccumulator", **kwargs, ): @@ -35,6 +36,7 @@ def __init__( inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be used to compute and apply gradients. accum_steps: int > 0. Update gradient in every accumulation steps. + reduction: str, Reduction method ['SUM', 'MEAN'] name: Optional name for the operations created when applying gradients. Defaults to "GradientAccumulator". **kwargs: keyword arguments. Allowed to be {`clipnorm`, @@ -49,6 +51,7 @@ def __init__( self._step = None self._gradients = {} self._accum_steps = accum_steps + self._reduction = reduction def _accum_grad(grads_and_vars): with tf.init_scope(): @@ -78,6 +81,8 @@ def _accum_grad(grads_and_vars): def _get_grad(): new_grad = handle.read_value() + if self._reduction == "MEAN": + new_grad /= tf.cast(self._accum_steps, new_grad.dtype) indices = tf.squeeze( tf.where( tf.reduce_sum( @@ -108,10 +113,11 @@ def _get_grad(): new_grads_and_vars.append((new_grad, var)) else: handle.assign_add(grad) - fake_grad = tf.zeros_like(var) def _get_grad(): new_grad = handle.read_value() + if self._reduction == "MEAN": + new_grad /= tf.cast(self._accum_steps, new_grad.dtype) handle.assign( tf.zeros_like(handle), use_locking=self._use_locking ) @@ -120,7 +126,7 @@ def _get_grad(): new_grad = tf.cond( self.step % self._accum_steps == 0, _get_grad, - lambda: fake_grad, + lambda: tf.zeros_like(grad), ) new_grads_and_vars.append((new_grad, var)) return new_grads_and_vars From 64b70b48bbdd652fe5426230e319d6151d93d74a Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Tue, 20 Jul 2021 01:24:52 +0000 Subject: [PATCH 23/33] decrease memory usage --- .../optimizers/gradient_accumulator.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 4bcbf8ccc7..268e019454 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -57,24 +57,12 @@ def _accum_grad(grads_and_vars): with tf.init_scope(): if not self._gradients: for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - for v in var.values: - self._gradients[v.ref()] = tf.Variable( - tf.zeros_like(v), trainable=False - ) - else: - self._gradients[var.ref()] = tf.Variable( - tf.zeros_like(var), trainable=False - ) + self._gradients[var.ref()] = tf.Variable( + tf.zeros_like(var), trainable=False + ) new_grads_and_vars = [] for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - replica_id = tf.get_static_value( - tf.distribute.get_replica_context().replica_id_in_sync_group - ) - handle = self._gradients[var.values[replica_id].ref()] - else: - handle = self._gradients[var.ref()] + handle = self._gradients[var.ref()] if isinstance(grad, tf.IndexedSlices): handle.scatter_add(grad) From 4dbc208efc2c3b6b9d9c6bb7f75e32c85a66c01d Mon Sep 17 00:00:00 2001 From: fsx950223 Date: Tue, 20 Jul 2021 01:39:15 +0000 Subject: [PATCH 24/33] add name --- tensorflow_addons/optimizers/gradient_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 268e019454..9ae1084493 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -57,8 +57,8 @@ def _accum_grad(grads_and_vars): with tf.init_scope(): if not self._gradients: for grad, var in grads_and_vars: - self._gradients[var.ref()] = tf.Variable( - tf.zeros_like(var), trainable=False + self._gradients[var.ref()] = self.add_weight( + "ga", shape=var.shape, dtype=var.dtype, trainable=False ) new_grads_and_vars = [] for grad, var in grads_and_vars: From b314592c19c1f8fa081cbe9e797238a97716e491 Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 08:57:13 +0200 Subject: [PATCH 25/33] Implement GA alternative --- .../optimizers/gradient_accumulator.py | 531 ++++++++++-------- .../tests/gradient_accumulator_test.py | 51 +- 2 files changed, 330 insertions(+), 252 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 4bcbf8ccc7..a1e48b000e 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -12,240 +12,311 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +from typing import Union, Dict, Hashable, List + import tensorflow as tf + from tensorflow_addons.utils import types -from typeguard import typechecked - - -@tf.keras.utils.register_keras_serializable(package="Addons") -class GradientAccumulator(tf.keras.optimizers.Optimizer): - """Optimizer wrapper for gradient accumulation.""" - - @typechecked - def __init__( - self, - inner_optimizer: types.Optimizer, - accum_steps: types.TensorLike = 4, - reduction: str = "SUM", - name: str = "GradientAccumulator", - **kwargs, - ): - r"""Construct a new GradientAccumulator optimizer. - - Args: - inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be - used to compute and apply gradients. - accum_steps: int > 0. Update gradient in every accumulation steps. - reduction: str, Reduction method ['SUM', 'MEAN'] - name: Optional name for the operations created when applying - gradients. Defaults to "GradientAccumulator". - **kwargs: keyword arguments. Allowed to be {`clipnorm`, - `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by - norm; `clipvalue` is clip gradients by value, `decay` is - included for backward compatibility to allow time inverse - decay of learning rate. `lr` is included for backward - compatibility, recommended to use `learning_rate` instead. - """ - super().__init__(name, **kwargs) - self._optimizer = tf.keras.optimizers.get(inner_optimizer) - self._step = None - self._gradients = {} - self._accum_steps = accum_steps - self._reduction = reduction - - def _accum_grad(grads_and_vars): - with tf.init_scope(): - if not self._gradients: - for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - for v in var.values: - self._gradients[v.ref()] = tf.Variable( - tf.zeros_like(v), trainable=False - ) - else: - self._gradients[var.ref()] = tf.Variable( - tf.zeros_like(var), trainable=False - ) - new_grads_and_vars = [] - for grad, var in grads_and_vars: - if tf.distribute.has_strategy(): - replica_id = tf.get_static_value( - tf.distribute.get_replica_context().replica_id_in_sync_group - ) - handle = self._gradients[var.values[replica_id].ref()] - else: - handle = self._gradients[var.ref()] - - if isinstance(grad, tf.IndexedSlices): - handle.scatter_add(grad) - - def _get_grad(): - new_grad = handle.read_value() - if self._reduction == "MEAN": - new_grad /= tf.cast(self._accum_steps, new_grad.dtype) - indices = tf.squeeze( - tf.where( - tf.reduce_sum( - new_grad, axis=list(range(len(new_grad.shape))[1:]) - ) - != 0 - ), - axis=-1, - ) - - values = tf.gather(new_grad, indices) - dense_shape = tf.constant(new_grad.shape.as_list()) - handle.assign( - tf.zeros_like(handle), use_locking=self._use_locking - ) - return values, tf.cast(indices, tf.int32), dense_shape - - values, indices, dense_shape = tf.cond( - self.step % self._accum_steps == 0, - _get_grad, - lambda: ( - tf.zeros_like(grad.values), - grad.indices, - grad.dense_shape, - ), - ) - new_grad = tf.IndexedSlices(values, indices, dense_shape) - new_grads_and_vars.append((new_grad, var)) - else: - handle.assign_add(grad) - - def _get_grad(): - new_grad = handle.read_value() - if self._reduction == "MEAN": - new_grad /= tf.cast(self._accum_steps, new_grad.dtype) - handle.assign( - tf.zeros_like(handle), use_locking=self._use_locking - ) - return new_grad - - new_grad = tf.cond( - self.step % self._accum_steps == 0, - _get_grad, - lambda: tf.zeros_like(grad), - ) - new_grads_and_vars.append((new_grad, var)) - return new_grads_and_vars - - self._optimizer.gradient_transformers.append(_accum_grad) - self._iterations = self._optimizer.iterations - - def _create_slots(self, var_list): - self._optimizer._create_slots(var_list=var_list) - - @property - def step(self): - """Variable. The number of training steps this Optimizer has run.""" - if self._step is None: - with self._distribution_strategy_scope(): - self._step = self.add_weight( - "iter", - shape=[], - initializer="ones", - dtype=tf.int64, - trainable=False, - aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA, + + +class AccumulationGradientTransformer: + _accu_gradients: Union[Dict[Hashable, tf.Variable], None] = None + + def __init__(self, optimizer: types.Optimizer, accu_steps: types.TensorLike, trainable_variables): + self.optimizer = optimizer + self.accu_steps = accu_steps + self.step = tf.Variable(0, dtype=tf.int64, name='ga_step') + self._accu_gradients: Union[List[tf.Variable], None] = None + policy = tf.keras.mixed_precision.global_policy() + self.variable_dtype = policy.variable_dtype + self._accu_gradients = {var.ref(): self.optimizer.add_slot(var, "ga") for var in trainable_variables} + + def __call__(self, grads_and_vars, *args, **kwargs): + + variables = [var for (_, var) in grads_and_vars] + accu_gradients = self._accu_gradients + step_inc_op = self.step.assign_add(1, read_value=False) + + with tf.control_dependencies([step_inc_op]): + can_apply = tf.cast(self.step % self.accu_steps == 0, dtype=self.variable_dtype) + accumulate = tf.cast(self.step % (self.accu_steps + 1) != 0, dtype=self.variable_dtype) + + accum_ops = list() + for grad, var in grads_and_vars: + + # Get the accumulated gradient + grad_accum = accu_gradients[var.ref()] * accumulate + + if isinstance(grad, tf.IndexedSlices): + added = tf.IndexedSlices( + values=grad.values + tf.gather_nd(grad_accum, grad.indices), + indices=grad.indices, + dense_shape=grad.dense_shape ) - self._weights.append(self._step) - return self._step - - @step.setter - def step(self, variable): - if self._step is not None: - raise RuntimeError( - "Cannot set `step` to a new Variable after " - "the Optimizer weights have been created" - ) - self._step = variable - self._weights.append(self._step) - - @property - def gradients(self): - """The accumulated gradients on the current replica.""" - if not self._gradients: - raise ValueError( - "The accumulator should be called first to initialize the gradients" - ) - return list( - gradient.read_value() if gradient is not None else gradient - for _, gradient in self._gradients + accu_op = accu_gradients[var.ref()].scatter_update(added) + else: + accu_op = accu_gradients[var.ref()].assign(grad + grad_accum, read_value=False) + + accum_ops.append(accu_op) + + iter_dec_op = self.optimizer.iterations.assign_add( + -1 * tf.cast(can_apply, dtype=self.optimizer.iterations.dtype), read_value=False ) - def apply_gradients(self, grads_and_vars, name=None, **kwargs): - train_op = self._optimizer.apply_gradients(grads_and_vars, name, **kwargs) - with tf.control_dependencies([train_op]): - with tf.control_dependencies( - [ - self._optimizer.iterations.assign_add( - tf.cast(self.step % self._accum_steps == 0, tf.int64), - read_value=False, - ) - ] - ): - return self.step.assign_add(1, read_value=False) - - def reset(self): - """Resets the accumulated gradients on the current replica.""" - assign_ops = [] - if not self._gradients: - return assign_ops - - for _, gradient in self._gradients: - if gradient is not None: - assign_ops.append( - gradient.assign( - tf.zeros_like(gradient), - use_locking=self._use_locking, - read_value=False, - ) - ) + with tf.control_dependencies(accum_ops + [iter_dec_op]): + gradients = [accu_gradients[var.ref()] * can_apply for var in variables] + return list(zip(gradients, variables)) + - return tf.group(assign_ops) - - @property - def inner_optimizer(self): - """The optimizer that this LossScaleOptimizer is wrapping.""" - return self._optimizer - - @property - def iterations(self): - return self._optimizer.iterations - - @iterations.setter - def iterations(self, variable): - self._optimizer.iterations = variable - - @property - def lr(self): - return self._optimizer._get_hyper("learning_rate") - - @lr.setter - def lr(self, lr): - self._optimizer._set_hyper("learning_rate", lr) # - - @property - def learning_rate(self): - return self._optimizer._get_hyper("learning_rate") - - @learning_rate.setter - def learning_rate(self, learning_rate): - self._optimizer._set_hyper("learning_rate", learning_rate) - - def get_config(self): - config = { - "accum_steps": self._accum_steps, - "optimizer": tf.keras.optimizers.serialize(self._optimizer), - } - base_config = super().get_config() - return {**base_config, **config} - - @classmethod - def from_config(cls, config, custom_objects=None): - optimizer = tf.keras.optimizers.deserialize( - config.pop("optimizer"), custom_objects=custom_objects +def GradientAccumulator( + optimizer: types.Optimizer, + accu_steps: int = 2, + trainable_variables=None +) -> types.Optimizer: + if trainable_variables is None: + trainable_variables = list() + + if isinstance(optimizer, str): + optimizer = tf.keras.optimizers.get(optimizer) + + optimizer.gradient_transformers.append( + AccumulationGradientTransformer( + optimizer=optimizer, + accu_steps=accu_steps, + trainable_variables=trainable_variables ) - return cls(optimizer, **config) + ) + + return optimizer + +# class GradientAccumulator(tf.keras.optimizers.Optimizer): +# """Optimizer wrapper for gradient accumulation.""" +# +# @typechecked +# def __init__( +# self, +# inner_optimizer: types.Optimizer, +# accum_steps: types.TensorLike = 4, +# reduction: str = "SUM", +# name: str = "GradientAccumulator", +# **kwargs, +# ): +# r"""Construct a new GradientAccumulator optimizer. +# +# Args: +# inner_optimizer: str or `tf.keras.optimizers.Optimizer` that will be +# used to compute and apply gradients. +# accum_steps: int > 0. Update gradient in every accumulation steps. +# reduction: str, Reduction method ['SUM', 'MEAN'] +# name: Optional name for the operations created when applying +# gradients. Defaults to "GradientAccumulator". +# **kwargs: keyword arguments. Allowed to be {`clipnorm`, +# `clipvalue`, `lr`, `decay`}. `clipnorm` is clip gradients by +# norm; `clipvalue` is clip gradients by value, `decay` is +# included for backward compatibility to allow time inverse +# decay of learning rate. `lr` is included for backward +# compatibility, recommended to use `learning_rate` instead. +# """ +# super().__init__(name, **kwargs) +# self._optimizer = tf.keras.optimizers.get(inner_optimizer) +# self._step = None +# self._gradients = {} +# self._accum_steps = accum_steps +# self._reduction = reduction +# +# def _accum_grad(grads_and_vars): +# with tf.init_scope(): +# if not self._gradients: +# for grad, var in grads_and_vars: +# if tf.distribute.has_strategy(): +# for v in var.values: +# self._gradients[v.ref()] = tf.Variable( +# tf.zeros_like(v), trainable=False +# ) +# else: +# self._gradients[var.ref()] = tf.Variable( +# tf.zeros_like(var), trainable=False +# ) +# new_grads_and_vars = [] +# for grad, var in grads_and_vars: +# if tf.distribute.has_strategy(): +# replica_id = tf.get_static_value( +# tf.distribute.get_replica_context().replica_id_in_sync_group +# ) +# handle = self._gradients[var.values[replica_id].ref()] +# else: +# handle = self._gradients[var.ref()] +# +# if isinstance(grad, tf.IndexedSlices): +# handle.scatter_add(grad) +# +# def _get_grad(): +# new_grad = handle.read_value() +# if self._reduction == "MEAN": +# new_grad /= tf.cast(self._accum_steps, new_grad.dtype) +# indices = tf.squeeze( +# tf.where( +# tf.reduce_sum( +# new_grad, axis=list(range(len(new_grad.shape))[1:]) +# ) +# != 0 +# ), +# axis=-1, +# ) +# +# values = tf.gather(new_grad, indices) +# dense_shape = tf.constant(new_grad.shape.as_list()) +# handle.assign( +# tf.zeros_like(handle), use_locking=self._use_locking +# ) +# return values, tf.cast(indices, tf.int32), dense_shape +# +# values, indices, dense_shape = tf.cond( +# self.step % self._accum_steps == 0, +# _get_grad, +# lambda: ( +# tf.zeros_like(grad.values), +# grad.indices, +# grad.dense_shape, +# ), +# ) +# new_grad = tf.IndexedSlices(values, indices, dense_shape) +# new_grads_and_vars.append((new_grad, var)) +# else: +# handle.assign_add(grad) +# +# def _get_grad(): +# new_grad = handle.read_value() +# if self._reduction == "MEAN": +# new_grad /= tf.cast(self._accum_steps, new_grad.dtype) +# handle.assign( +# tf.zeros_like(handle), use_locking=self._use_locking +# ) +# return new_grad +# +# new_grad = tf.cond( +# self.step % self._accum_steps == 0, +# _get_grad, +# lambda: tf.zeros_like(grad), +# ) +# new_grads_and_vars.append((new_grad, var)) +# return new_grads_and_vars +# +# self._optimizer.gradient_transformers.append(_accum_grad) +# self._iterations = self._optimizer.iterations +# +# def _create_slots(self, var_list): +# self._optimizer._create_slots(var_list=var_list) +# +# @property +# def step(self): +# """Variable. The number of training steps this Optimizer has run.""" +# if self._step is None: +# with self._distribution_strategy_scope(): +# self._step = self.add_weight( +# "iter", +# shape=[], +# initializer="ones", +# dtype=tf.int64, +# trainable=False, +# aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA, +# ) +# self._weights.append(self._step) +# return self._step +# +# @step.setter +# def step(self, variable): +# if self._step is not None: +# raise RuntimeError( +# "Cannot set `step` to a new Variable after " +# "the Optimizer weights have been created" +# ) +# self._step = variable +# self._weights.append(self._step) +# +# @property +# def gradients(self): +# """The accumulated gradients on the current replica.""" +# if not self._gradients: +# raise ValueError( +# "The accumulator should be called first to initialize the gradients" +# ) +# return list( +# gradient.read_value() if gradient is not None else gradient +# for _, gradient in self._gradients +# ) +# +# def apply_gradients(self, grads_and_vars, name=None, **kwargs): +# train_op = self._optimizer.apply_gradients(grads_and_vars, name, **kwargs) +# with tf.control_dependencies([train_op]): +# with tf.control_dependencies( +# [ +# self._optimizer.iterations.assign_add( +# tf.cast(self.step % self._accum_steps == 0, tf.int64), +# read_value=False, +# ) +# ] +# ): +# return self.step.assign_add(1, read_value=False) +# +# def reset(self): +# """Resets the accumulated gradients on the current replica.""" +# assign_ops = [] +# if not self._gradients: +# return assign_ops +# +# for _, gradient in self._gradients: +# if gradient is not None: +# assign_ops.append( +# gradient.assign( +# tf.zeros_like(gradient), +# use_locking=self._use_locking, +# read_value=False, +# ) +# ) +# +# return tf.group(assign_ops) +# +# @property +# def inner_optimizer(self): +# """The optimizer that this LossScaleOptimizer is wrapping.""" +# return self._optimizer +# +# @property +# def iterations(self): +# return self._optimizer.iterations +# +# @iterations.setter +# def iterations(self, variable): +# self._optimizer.iterations = variable +# +# @property +# def lr(self): +# return self._optimizer._get_hyper("learning_rate") +# +# @lr.setter +# def lr(self, lr): +# self._optimizer._set_hyper("learning_rate", lr) # +# +# @property +# def learning_rate(self): +# return self._optimizer._get_hyper("learning_rate") +# +# @learning_rate.setter +# def learning_rate(self, learning_rate): +# self._optimizer._set_hyper("learning_rate", learning_rate) +# +# def get_config(self): +# config = { +# "accum_steps": self._accum_steps, +# "optimizer": tf.keras.optimizers.serialize(self._optimizer), +# } +# base_config = super().get_config() +# return {**base_config, **config} +# +# @classmethod +# def from_config(cls, config, custom_objects=None): +# optimizer = tf.keras.optimizers.deserialize( +# config.pop("optimizer"), custom_objects=custom_objects +# ) +# return cls(optimizer, **config) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 7fe4171edd..94dd1fd0b4 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -33,7 +33,9 @@ def test_run(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps) + variables = [var for _, var in grads_and_vars] + + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps, trainable_variables=variables) for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) @@ -59,10 +61,12 @@ def test_sparse(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) - for _ in range(8): + variables = [var for _, var in grads_and_vars] + accu_steps = 2 + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accu_steps=accu_steps, trainable_variables=variables) + for _ in range(accu_steps * 4): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]]) + np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]], rtol=1e-5) np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) @@ -86,7 +90,8 @@ def test_sparse_multi_gpus(): ) grads_and_vars = list(zip([grads0, grads1], [var0, var1])) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0)) + variables = [var for _, var in grads_and_vars] + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), trainable_variables=variables) strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @@ -106,7 +111,8 @@ def test_dense(): ) model.build(input_shape=[1, 1]) - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=2.0), accum_steps=2) + variables = model.trainable_variables + opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accu_steps=2, trainable_variables=variables) _ = opt.apply_gradients(list(zip([grad], model.variables))) np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) @@ -116,20 +122,20 @@ def test_optimizer_string(): _ = GradientAccumulator("adam") -def test_config(): - sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) - accum_steps = 4 - opt = GradientAccumulator(sgd_opt, accum_steps=accum_steps) - config = opt.get_config() - - assert config["accum_steps"] == accum_steps - - new_opt = GradientAccumulator.from_config(config) - old_sgd_config = opt._optimizer.get_config() - new_sgd_config = new_opt._optimizer.get_config() - - for k1, k2 in zip(old_sgd_config, new_sgd_config): - assert old_sgd_config[k1] == new_sgd_config[k2] +# def test_config(): +# sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) +# accum_steps = 4 +# opt = GradientAccumulator(sgd_opt, accum_steps=accum_steps) +# config = opt.get_config() +# +# assert config["accum_steps"] == accum_steps +# +# new_opt = GradientAccumulator.from_config(config) +# old_sgd_config = opt._optimizer.get_config() +# new_sgd_config = new_opt._optimizer.get_config() +# +# for k1, k2 in zip(old_sgd_config, new_sgd_config): +# assert old_sgd_config[k1] == new_sgd_config[k2] @pytest.mark.usefixtures("maybe_run_functions_eagerly") @@ -147,7 +153,7 @@ def test_fit_simple_linear_model(): model = tf.keras.models.Sequential() model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - opt = GradientAccumulator("sgd") + opt = GradientAccumulator("sgd", trainable_variables=model.trainable_variables) model.compile(opt, loss="mse") model.fit(x, y, epochs=5) @@ -177,5 +183,6 @@ def test_model_mixed_precision(): y = np.dot(x, w) + np.random.standard_normal((10000, 1)) * 1e-4 model = tf.keras.Sequential() model.add(tf.keras.layers.Dense(input_shape=(3,), units=1)) - model.compile(GradientAccumulator("sgd"), loss="mse") + opt = GradientAccumulator("sgd", trainable_variables=model.trainable_variables) + model.compile(opt, loss="mse") model.fit(x, y, epochs=3) From 222e7578fa8e93653e6aa0b9ab4cf32367fda635 Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 09:14:39 +0200 Subject: [PATCH 26/33] Run black formatter --- .../optimizers/gradient_accumulator.py | 37 +++++++++++++------ .../tests/gradient_accumulator_test.py | 22 ++++++++--- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index a1e48b000e..4be3c3e741 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -22,14 +22,21 @@ class AccumulationGradientTransformer: _accu_gradients: Union[Dict[Hashable, tf.Variable], None] = None - def __init__(self, optimizer: types.Optimizer, accu_steps: types.TensorLike, trainable_variables): + def __init__( + self, + optimizer: types.Optimizer, + accu_steps: types.TensorLike, + trainable_variables, + ): self.optimizer = optimizer self.accu_steps = accu_steps - self.step = tf.Variable(0, dtype=tf.int64, name='ga_step') + self.step = tf.Variable(0, dtype=tf.int64, name="ga_step") self._accu_gradients: Union[List[tf.Variable], None] = None policy = tf.keras.mixed_precision.global_policy() self.variable_dtype = policy.variable_dtype - self._accu_gradients = {var.ref(): self.optimizer.add_slot(var, "ga") for var in trainable_variables} + self._accu_gradients = { + var.ref(): self.optimizer.add_slot(var, "ga") for var in trainable_variables + } def __call__(self, grads_and_vars, *args, **kwargs): @@ -38,8 +45,12 @@ def __call__(self, grads_and_vars, *args, **kwargs): step_inc_op = self.step.assign_add(1, read_value=False) with tf.control_dependencies([step_inc_op]): - can_apply = tf.cast(self.step % self.accu_steps == 0, dtype=self.variable_dtype) - accumulate = tf.cast(self.step % (self.accu_steps + 1) != 0, dtype=self.variable_dtype) + can_apply = tf.cast( + self.step % self.accu_steps == 0, dtype=self.variable_dtype + ) + accumulate = tf.cast( + self.step % (self.accu_steps + 1) != 0, dtype=self.variable_dtype + ) accum_ops = list() for grad, var in grads_and_vars: @@ -51,16 +62,19 @@ def __call__(self, grads_and_vars, *args, **kwargs): added = tf.IndexedSlices( values=grad.values + tf.gather_nd(grad_accum, grad.indices), indices=grad.indices, - dense_shape=grad.dense_shape + dense_shape=grad.dense_shape, ) accu_op = accu_gradients[var.ref()].scatter_update(added) else: - accu_op = accu_gradients[var.ref()].assign(grad + grad_accum, read_value=False) + accu_op = accu_gradients[var.ref()].assign( + grad + grad_accum, read_value=False + ) accum_ops.append(accu_op) iter_dec_op = self.optimizer.iterations.assign_add( - -1 * tf.cast(can_apply, dtype=self.optimizer.iterations.dtype), read_value=False + -1 * tf.cast(can_apply, dtype=self.optimizer.iterations.dtype), + read_value=False, ) with tf.control_dependencies(accum_ops + [iter_dec_op]): @@ -69,9 +83,7 @@ def __call__(self, grads_and_vars, *args, **kwargs): def GradientAccumulator( - optimizer: types.Optimizer, - accu_steps: int = 2, - trainable_variables=None + optimizer: types.Optimizer, accu_steps: int = 2, trainable_variables=None ) -> types.Optimizer: if trainable_variables is None: trainable_variables = list() @@ -83,12 +95,13 @@ def GradientAccumulator( AccumulationGradientTransformer( optimizer=optimizer, accu_steps=accu_steps, - trainable_variables=trainable_variables + trainable_variables=trainable_variables, ) ) return optimizer + # class GradientAccumulator(tf.keras.optimizers.Optimizer): # """Optimizer wrapper for gradient accumulation.""" # diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 94dd1fd0b4..0f4ffbd52b 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -35,7 +35,9 @@ def test_run(): variables = [var for _, var in grads_and_vars] - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accum_steps, trainable_variables=variables) + opt = GradientAccumulator( + tf.keras.optimizers.SGD(lr=1.0), accum_steps, trainable_variables=variables + ) for _ in range(accum_steps + 1): opt.apply_gradients(grads_and_vars) @@ -63,10 +65,16 @@ def test_sparse(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) variables = [var for _, var in grads_and_vars] accu_steps = 2 - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accu_steps=accu_steps, trainable_variables=variables) + opt = GradientAccumulator( + tf.keras.optimizers.SGD(lr=1.0), + accu_steps=accu_steps, + trainable_variables=variables, + ) for _ in range(accu_steps * 4): opt.apply_gradients(grads_and_vars) - np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]], rtol=1e-5) + np.testing.assert_allclose( + var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]], rtol=1e-5 + ) np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) @@ -91,7 +99,9 @@ def test_sparse_multi_gpus(): grads_and_vars = list(zip([grads0, grads1], [var0, var1])) variables = [var for _, var in grads_and_vars] - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), trainable_variables=variables) + opt = GradientAccumulator( + tf.keras.optimizers.SGD(lr=1.0), trainable_variables=variables + ) strategy.run(opt.apply_gradients, [grads_and_vars]) np.testing.assert_allclose(var0.read_value(), [[1.0, 2.0, 0.0]]) np.testing.assert_allclose(var1.read_value(), [[3.0, 4.0, 0.0]]) @@ -112,7 +122,9 @@ def test_dense(): model.build(input_shape=[1, 1]) variables = model.trainable_variables - opt = GradientAccumulator(tf.keras.optimizers.SGD(lr=1.0), accu_steps=2, trainable_variables=variables) + opt = GradientAccumulator( + tf.keras.optimizers.SGD(lr=1.0), accu_steps=2, trainable_variables=variables + ) _ = opt.apply_gradients(list(zip([grad], model.variables))) np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) From 5e75bd7e69d739f2396f7409b1ec8880a65a51cb Mon Sep 17 00:00:00 2001 From: Stefan Falk <43335432+stefan-falk@users.noreply.github.com> Date: Tue, 20 Jul 2021 09:31:06 +0200 Subject: [PATCH 27/33] Update gradient_accumulator.py --- .../optimizers/gradient_accumulator.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index a13cf48a72..a292a76b18 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -1,3 +1,17 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== from typing import List, Union, Dict, Hashable import tensorflow as tf From b31c896152c08e52932ee3e1d5b71ae32186bc24 Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 09:34:44 +0200 Subject: [PATCH 28/33] Fixing code mess up --- .../optimizers/gradient_accumulator.py | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index a292a76b18..870fe2d290 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -12,25 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -from typing import List, Union, Dict, Hashable +from typing import Union, Dict, Hashable, List import tensorflow as tf + from tensorflow_addons.utils import types -from typeguard import typechecked class AccumulationGradientTransformer: - _accu_gradients: Union[Dict[Hashable, tf.Variable], None] = None - def __init__(self, optimizer: types.Optimizer, accu_steps: types.TensorLike, trainable_variables): + def __init__( + self, + optimizer: types.Optimizer, + accu_steps: types.TensorLike, + trainable_variables, + ): self.optimizer = optimizer self.accu_steps = accu_steps - self.step = tf.Variable(0, dtype=tf.int64, name='ga_step') + self.step = tf.Variable(0, dtype=tf.int64, name="ga_step") self._accu_gradients: Union[List[tf.Variable], None] = None policy = tf.keras.mixed_precision.global_policy() self.variable_dtype = policy.variable_dtype - self._accu_gradients = {var.ref(): self.optimizer.add_slot(var, "ga") for var in trainable_variables} + self._accu_gradients = { + var.ref(): self.optimizer.add_slot(var, "ga") for var in trainable_variables + } def __call__(self, grads_and_vars, *args, **kwargs): @@ -39,26 +45,36 @@ def __call__(self, grads_and_vars, *args, **kwargs): step_inc_op = self.step.assign_add(1, read_value=False) with tf.control_dependencies([step_inc_op]): - can_apply = tf.cast(self.step % self.accu_steps == 0, dtype=self.variable_dtype) - accumulate = tf.cast(self.step % tf.constant(self.accu_steps + 1, dtype=self.step.dtype) != 0, dtype=self.variable_dtype) + can_apply = tf.cast( + self.step % self.accu_steps == 0, dtype=self.variable_dtype + ) + accumulate = tf.cast( + self.step % (self.accu_steps + 1) != 0, dtype=self.variable_dtype + ) accum_ops = list() for grad, var in grads_and_vars: + + # Get the accumulated gradient grad_accum = accu_gradients[var.ref()] * accumulate + if isinstance(grad, tf.IndexedSlices): added = tf.IndexedSlices( - values=grad.values + tf.gather_nd(grad_accum, grad.indices[..., None]), + values=grad.values + tf.gather_nd(grad_accum, grad.indices), indices=grad.indices, - dense_shape=grad.dense_shape + dense_shape=grad.dense_shape, ) accu_op = accu_gradients[var.ref()].scatter_update(added) else: - accu_op = accu_gradients[var.ref()].assign(grad + grad_accum, read_value=False) + accu_op = accu_gradients[var.ref()].assign( + grad + grad_accum, read_value=False + ) accum_ops.append(accu_op) iter_dec_op = self.optimizer.iterations.assign_add( - -1 * tf.cast(can_apply, dtype=self.optimizer.iterations.dtype), read_value=False + -1 * tf.cast(can_apply, dtype=self.optimizer.iterations.dtype), + read_value=False, ) with tf.control_dependencies(accum_ops + [iter_dec_op]): @@ -67,11 +83,8 @@ def __call__(self, grads_and_vars, *args, **kwargs): def GradientAccumulator( - optimizer: types.Optimizer, - accum_steps: int = 2, - trainable_variables=None + optimizer: types.Optimizer, accu_steps: int = 2, trainable_variables=None ) -> types.Optimizer: - if trainable_variables is None: trainable_variables = list() @@ -81,8 +94,8 @@ def GradientAccumulator( optimizer.gradient_transformers.append( AccumulationGradientTransformer( optimizer=optimizer, - accu_steps=accum_steps, - trainable_variables=trainable_variables + accu_steps=accu_steps, + trainable_variables=trainable_variables, ) ) From a03643e2eb3fff6aa9ef38c9b18deeecb33fcd0a Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 09:58:11 +0200 Subject: [PATCH 29/33] Add embedding test --- .../optimizers/gradient_accumulator.py | 3 +- .../tests/gradient_accumulator_test.py | 41 ++++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 870fe2d290..2110e16c66 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -60,7 +60,8 @@ def __call__(self, grads_and_vars, *args, **kwargs): if isinstance(grad, tf.IndexedSlices): added = tf.IndexedSlices( - values=grad.values + tf.gather_nd(grad_accum, grad.indices), + values=grad.values + + tf.gather_nd(grad_accum, grad.indices[..., None]), indices=grad.indices, dense_shape=grad.dense_shape, ) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 0f4ffbd52b..2d1a423b4d 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -18,7 +18,8 @@ import pytest import tensorflow as tf from tensorflow_addons.utils import test_utils - +from tensorflow.keras import layers +from tensorflow import keras from tensorflow_addons.optimizers import GradientAccumulator @@ -198,3 +199,41 @@ def test_model_mixed_precision(): opt = GradientAccumulator("sgd", trainable_variables=model.trainable_variables) model.compile(opt, loss="mse") model.fit(x, y, epochs=3) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +@pytest.mark.needs_gpu +def test_embedding(): + def _get_dataset(vocab_size: int, batch_size: int = 10): + def _generator_fn(): + size = np.random.randint(5, 10) + x = np.random.randint(low=0, high=vocab_size, size=size) + y = np.asarray([np.random.rand()]) + yield x, y + + return tf.data.Dataset.from_generator( + generator=_generator_fn, + output_types=(tf.int32, tf.float32), + output_shapes=((None,), (1,)), + ).padded_batch(batch_size) + + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + + vocab_size = 10 + + with strategy.scope(): + inputs = layers.Input(shape=(None,), dtype=tf.int32) + x = inputs + x = layers.Embedding(input_dim=vocab_size, output_dim=8)(x) + x = layers.Dense(1)(x) + model = tf.keras.models.Model(inputs=inputs, outputs=x) + + optimizer = GradientAccumulator( + optimizer="adam", trainable_variables=model.trainable_variables + ) + + model.compile(optimizer=optimizer, loss="mse") + + data = _get_dataset(vocab_size).repeat() + + model.fit(data, epochs=1, steps_per_epoch=5) From 0a8e68610d6b388e8e2ae16913c71fcae2102b8c Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 10:26:00 +0200 Subject: [PATCH 30/33] Add currently failing LSTM-test --- .../optimizers/gradient_accumulator.py | 4 +- .../tests/gradient_accumulator_test.py | 51 ++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 2110e16c66..fe0477a097 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -59,9 +59,11 @@ def __call__(self, grads_and_vars, *args, **kwargs): grad_accum = accu_gradients[var.ref()] * accumulate if isinstance(grad, tf.IndexedSlices): + # Not sure why e.g. the Embedding layer requires an additional dimension here + grad_indices = grad.indices[..., None] if len(grad.indices.shape) < 2 else grad.indices added = tf.IndexedSlices( values=grad.values - + tf.gather_nd(grad_accum, grad.indices[..., None]), + + tf.gather_nd(grad_accum, grad_indices), indices=grad.indices, dense_shape=grad.dense_shape, ) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 2d1a423b4d..6297d62358 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -17,9 +17,10 @@ import numpy as np import pytest import tensorflow as tf +from tensorflow.python.data.experimental import AutoShardPolicy + from tensorflow_addons.utils import test_utils from tensorflow.keras import layers -from tensorflow import keras from tensorflow_addons.optimizers import GradientAccumulator @@ -211,11 +212,56 @@ def _generator_fn(): y = np.asarray([np.random.rand()]) yield x, y - return tf.data.Dataset.from_generator( + dataset = tf.data.Dataset.from_generator( + generator=_generator_fn, + output_types=(tf.int32, tf.float32), + output_shapes=((None,), (1,)), + ).padded_batch(batch_size) + options = tf.data.Options() + options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA + dataset.with_options(options) + return dataset + + strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + + vocab_size = 10 + + with strategy.scope(): + inputs = layers.Input(shape=(None,), dtype=tf.int32) + x = inputs + x = layers.Embedding(input_dim=vocab_size, output_dim=8)(x) + x = layers.Dense(1)(x) + model = tf.keras.models.Model(inputs=inputs, outputs=x) + + optimizer = GradientAccumulator( + optimizer="adam", trainable_variables=model.trainable_variables + ) + + model.compile(optimizer=optimizer, loss="mse") + + data = _get_dataset(vocab_size).repeat() + + model.fit(data, epochs=1, steps_per_epoch=5) + + +@pytest.mark.usefixtures("maybe_run_functions_eagerly") +def test_lstm(): + def _get_dataset(vocab_size: int, batch_size: int = 10): + def _generator_fn(): + size = np.random.randint(5, 10) + x = np.random.randint(low=0, high=vocab_size, size=size) + y = np.asarray([np.random.rand()]) + yield x, y + + dataset = tf.data.Dataset.from_generator( generator=_generator_fn, output_types=(tf.int32, tf.float32), output_shapes=((None,), (1,)), ).padded_batch(batch_size) + options = tf.data.Options() + options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA + dataset.with_options(options) + return dataset strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) @@ -225,6 +271,7 @@ def _generator_fn(): inputs = layers.Input(shape=(None,), dtype=tf.int32) x = inputs x = layers.Embedding(input_dim=vocab_size, output_dim=8)(x) + x = layers.LSTM(4)(x) x = layers.Dense(1)(x) model = tf.keras.models.Model(inputs=inputs, outputs=x) From 40b6e38875dcfffd220b03eaad6f3378adee5d86 Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 10:51:56 +0200 Subject: [PATCH 31/33] use default-strategy --- tensorflow_addons/optimizers/tests/gradient_accumulator_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 6297d62358..9969d5cdec 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -263,7 +263,7 @@ def _generator_fn(): dataset.with_options(options) return dataset - strategy = tf.distribute.MirroredStrategy(test_utils.gpus_for_testing()) + strategy = tf.distribute.get_strategy() vocab_size = 10 From 6db2187ed643974be79151ec6a380173091f85b3 Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 14:20:32 +0200 Subject: [PATCH 32/33] Use custom implementation for GA --- .../optimizers/gradient_accumulator.py | 85 ++++++++++++++++--- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index fe0477a097..1817306c7b 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -1,17 +1,3 @@ -# Copyright 2021 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== from typing import Union, Dict, Hashable, List import tensorflow as tf @@ -85,6 +71,77 @@ def __call__(self, grads_and_vars, *args, **kwargs): return list(zip(gradients, variables)) +# class _Override: +# +# def __init__(self, fn): +# self.fn = fn +# +# def __call__(self, *args, **kwargs): +# return self.fn(*args, **kwargs) +# +# +# def override(fn): +# return _Override(fn) +# +# +# class OptimizerWrapper(tf.keras.optimizers.Optimizer, abc.ABC): +# inner_optimizer: types.Optimizer = None +# +# def __init__(self, optimizer: types.Optimizer, name, **kwargs): +# if isinstance(optimizer, str): +# self.inner_optimizer = tf.keras.optimizers.get(optimizer) +# else: +# self.inner_optimizer = optimizer +# super().__init__(name, **kwargs) +# +# def __getattribute__(self, item): +# +# try: +# self_item = super(tf.keras.optimizers.Optimizer, self).__getattribute__(item) +# if item == "inner_optimizer": +# return self_item +# if isinstance(self_item, _Override): +# return self_item(self) +# except AttributeError: +# pass +# +# inner_optimizer = super(tf.keras.optimizers.Optimizer, self).__getattribute__("inner_optimizer") +# return getattr(inner_optimizer, item) +# +# def __getitem__(self, item): +# return self +# +# +# class GradientAccumulator(OptimizerWrapper): +# +# def __init__( +# self, +# optimizer: types.Optimizer, +# trainable_variables, +# accu_steps: types.TensorLike = 2, +# name: str = "GradientAccumulator", +# **kwargs +# ): +# super().__init__(optimizer, name, **kwargs) +# self.gradient_transformers.append( +# AccumulationGradientTransformer( +# optimizer=self.inner_optimizer, +# accu_steps=accu_steps, +# trainable_variables=trainable_variables, +# ) +# ) +# self.accu_steps = accu_steps +# +# @override +# def get_config(self): +# config = { +# "accu_steps": self.accu_steps, +# "optimizer": tf.keras.optimizers.serialize(self.inner_optimizer), +# } +# base_config = self.inner_optimizer.get_config() +# return {**base_config, **config} + + def GradientAccumulator( optimizer: types.Optimizer, accu_steps: int = 2, trainable_variables=None ) -> types.Optimizer: From 4142f378bf9b06dc9ffd6a332ed477e1177ad7fb Mon Sep 17 00:00:00 2001 From: Stefan Falk Date: Tue, 20 Jul 2021 15:02:14 +0200 Subject: [PATCH 33/33] Some cleaning up --- .../optimizers/gradient_accumulator.py | 89 ++++--------------- .../tests/gradient_accumulator_test.py | 17 ++-- 2 files changed, 25 insertions(+), 81 deletions(-) diff --git a/tensorflow_addons/optimizers/gradient_accumulator.py b/tensorflow_addons/optimizers/gradient_accumulator.py index 1817306c7b..e3930de4fe 100644 --- a/tensorflow_addons/optimizers/gradient_accumulator.py +++ b/tensorflow_addons/optimizers/gradient_accumulator.py @@ -1,3 +1,17 @@ +# Copyright 2021 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== from typing import Union, Dict, Hashable, List import tensorflow as tf @@ -49,7 +63,7 @@ def __call__(self, grads_and_vars, *args, **kwargs): grad_indices = grad.indices[..., None] if len(grad.indices.shape) < 2 else grad.indices added = tf.IndexedSlices( values=grad.values - + tf.gather_nd(grad_accum, grad_indices), + + tf.gather_nd(grad_accum, grad_indices), indices=grad.indices, dense_shape=grad.dense_shape, ) @@ -71,79 +85,8 @@ def __call__(self, grads_and_vars, *args, **kwargs): return list(zip(gradients, variables)) -# class _Override: -# -# def __init__(self, fn): -# self.fn = fn -# -# def __call__(self, *args, **kwargs): -# return self.fn(*args, **kwargs) -# -# -# def override(fn): -# return _Override(fn) -# -# -# class OptimizerWrapper(tf.keras.optimizers.Optimizer, abc.ABC): -# inner_optimizer: types.Optimizer = None -# -# def __init__(self, optimizer: types.Optimizer, name, **kwargs): -# if isinstance(optimizer, str): -# self.inner_optimizer = tf.keras.optimizers.get(optimizer) -# else: -# self.inner_optimizer = optimizer -# super().__init__(name, **kwargs) -# -# def __getattribute__(self, item): -# -# try: -# self_item = super(tf.keras.optimizers.Optimizer, self).__getattribute__(item) -# if item == "inner_optimizer": -# return self_item -# if isinstance(self_item, _Override): -# return self_item(self) -# except AttributeError: -# pass -# -# inner_optimizer = super(tf.keras.optimizers.Optimizer, self).__getattribute__("inner_optimizer") -# return getattr(inner_optimizer, item) -# -# def __getitem__(self, item): -# return self -# -# -# class GradientAccumulator(OptimizerWrapper): -# -# def __init__( -# self, -# optimizer: types.Optimizer, -# trainable_variables, -# accu_steps: types.TensorLike = 2, -# name: str = "GradientAccumulator", -# **kwargs -# ): -# super().__init__(optimizer, name, **kwargs) -# self.gradient_transformers.append( -# AccumulationGradientTransformer( -# optimizer=self.inner_optimizer, -# accu_steps=accu_steps, -# trainable_variables=trainable_variables, -# ) -# ) -# self.accu_steps = accu_steps -# -# @override -# def get_config(self): -# config = { -# "accu_steps": self.accu_steps, -# "optimizer": tf.keras.optimizers.serialize(self.inner_optimizer), -# } -# base_config = self.inner_optimizer.get_config() -# return {**base_config, **config} - - def GradientAccumulator( - optimizer: types.Optimizer, accu_steps: int = 2, trainable_variables=None + optimizer: types.Optimizer, trainable_variables, accu_steps: int = 2 ) -> types.Optimizer: if trainable_variables is None: trainable_variables = list() diff --git a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py index 9969d5cdec..9f06640e12 100644 --- a/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py +++ b/tensorflow_addons/optimizers/tests/gradient_accumulator_test.py @@ -38,7 +38,7 @@ def test_run(): variables = [var for _, var in grads_and_vars] opt = GradientAccumulator( - tf.keras.optimizers.SGD(lr=1.0), accum_steps, trainable_variables=variables + tf.keras.optimizers.SGD(lr=1.0), variables, accu_steps=accum_steps, ) for _ in range(accum_steps + 1): @@ -69,13 +69,13 @@ def test_sparse(): accu_steps = 2 opt = GradientAccumulator( tf.keras.optimizers.SGD(lr=1.0), - accu_steps=accu_steps, trainable_variables=variables, + accu_steps=accu_steps, ) for _ in range(accu_steps * 4): opt.apply_gradients(grads_and_vars) np.testing.assert_allclose( - var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]], rtol=1e-5 + var0.read_value(), [[1.0, 2.0, 0.0], [0.2, 1.2, 0.0]], rtol=1e-6 ) np.testing.assert_allclose(var1.read_value(), [[2.92, 3.92, 0.0]]) @@ -125,7 +125,7 @@ def test_dense(): variables = model.trainable_variables opt = GradientAccumulator( - tf.keras.optimizers.SGD(lr=1.0), accu_steps=2, trainable_variables=variables + tf.keras.optimizers.SGD(lr=1.0), trainable_variables=variables, accu_steps=2, ) _ = opt.apply_gradients(list(zip([grad], model.variables))) np.testing.assert_allclose(model.variables[0].read_value(), [[1.0]]) @@ -133,16 +133,17 @@ def test_dense(): @pytest.mark.usefixtures("maybe_run_functions_eagerly") def test_optimizer_string(): - _ = GradientAccumulator("adam") + _ = GradientAccumulator("adam", trainable_variables=[]) # def test_config(): # sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) # accum_steps = 4 -# opt = GradientAccumulator(sgd_opt, accum_steps=accum_steps) +# opt = GradientAccumulator(sgd_opt, trainable_variables=[], accu_steps=accum_steps) +# print(str(opt)) # config = opt.get_config() # -# assert config["accum_steps"] == accum_steps +# assert config["accu_steps"] == accum_steps # # new_opt = GradientAccumulator.from_config(config) # old_sgd_config = opt._optimizer.get_config() @@ -183,7 +184,7 @@ def test_fit_simple_linear_model(): def test_serialization(): sgd_opt = tf.keras.optimizers.SGD(lr=2.0, nesterov=True, momentum=0.3, decay=0.1) - optimizer = GradientAccumulator(sgd_opt) + optimizer = GradientAccumulator(sgd_opt, trainable_variables=[]) config = tf.keras.optimizers.serialize(optimizer) new_optimizer = tf.keras.optimizers.deserialize(config) assert new_optimizer.get_config() == optimizer.get_config()