From 253289033ff7647446c8978f71ac8d945d0ce687 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 8 Nov 2022 13:09:03 -0500 Subject: [PATCH 01/22] Initial type def and function signature --- .../ml/inference/pytorch_inference.py | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 46938ad619d8..1d010c33fe65 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -38,6 +38,12 @@ 'PytorchModelHandlerKeyedTensor', ] +TensorInferenceFn = Callable[ + [Sequence[torch.Tensor], torch.nn.Module, Optional[Dict[str, Any]]], Iterable[PredictionResult]] + +KeyedTensorInferenceFn = Callable[ + [Sequence[Dict[str, torch.Tensor]], torch.nn.Module, Optional[Dict[str, Any]]], Iterable[PredictionResult]] + def _load_model( model_class: torch.nn.Module, state_dict_path, device, **model_params): @@ -99,6 +105,8 @@ def _convert_to_result( ] return [PredictionResult(x, y) for x, y in zip(batch, predictions)] +def _default_tensor_inference_fn(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: + return class PytorchModelHandlerTensor(ModelHandler[torch.Tensor, PredictionResult, @@ -108,7 +116,9 @@ def __init__( state_dict_path: str, model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], - device: str = 'CPU'): + device: str = 'CPU', + *, + inference_fn: TensorInferenceFn = _default_tensor_inference_fn): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -127,6 +137,8 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. + inference_fn: the inference function to use. + default=_default_tensor_inference_fn. **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. @@ -140,6 +152,7 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params + self._inference_fn=inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" @@ -205,6 +218,14 @@ def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]): pass +def _default_keyed_tensor_inference_fn( + batch: Sequence[Dict[str, torch.Tensor]], + model: torch.nn.Module, + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + return + + @experimental(extra_message="No backwards-compatibility guarantees.") class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, torch.Tensor], PredictionResult, @@ -214,7 +235,9 @@ def __init__( state_dict_path: str, model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], - device: str = 'CPU'): + device: str = 'CPU', + *, + inference_fn: KeyedTensorInferenceFn = _default_keyed_tensor_inference_fn): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -237,6 +260,8 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. + inference_fn: the inference function to use. + default=_default_keyed_tensor_inference_fn **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. @@ -250,6 +275,7 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params + self._inference_fn=inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" From 4ed9ca5c4f6ec3b2c217dc50547956bc4db1c487 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 11:02:52 -0500 Subject: [PATCH 02/22] [Draft] Add custom inference fn support to Pytorch Model Handler --- .../apache_beam/ml/inference/pytorch_inference.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 1d010c33fe65..cba2fcf62338 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -105,8 +105,8 @@ def _convert_to_result( ] return [PredictionResult(x, y) for x, y in zip(batch, predictions)] -def _default_tensor_inference_fn(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: - return +#def _default_tensor_inference_fn(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: + # return class PytorchModelHandlerTensor(ModelHandler[torch.Tensor, PredictionResult, @@ -116,9 +116,7 @@ def __init__( state_dict_path: str, model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], - device: str = 'CPU', - *, - inference_fn: TensorInferenceFn = _default_tensor_inference_fn): + device: str = 'CPU'): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -137,8 +135,6 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. - inference_fn: the inference function to use. - default=_default_tensor_inference_fn. **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. @@ -152,7 +148,6 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params - self._inference_fn=inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" @@ -194,9 +189,9 @@ def run_inference( # torch.no_grad() mitigates GPU memory issues # https://github.com/apache/beam/issues/22811 + converted_tensors = _convert_to_device(batch, self._device) with torch.no_grad(): - batched_tensors = torch.stack(batch) - batched_tensors = _convert_to_device(batched_tensors, self._device) + batched_tensors = torch.stack(converted_tensors) predictions = model(batched_tensors, **inference_args) return _convert_to_result(batch, predictions) From cbe4297d273291652f0868e5185b9701e88385ec Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 11:08:51 -0500 Subject: [PATCH 03/22] Formatting --- .../ml/inference/pytorch_inference.py | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index cba2fcf62338..038422dd57a7 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -39,10 +39,15 @@ ] TensorInferenceFn = Callable[ - [Sequence[torch.Tensor], torch.nn.Module, Optional[Dict[str, Any]]], Iterable[PredictionResult]] + [Sequence[torch.Tensor], torch.nn.Module, Optional[Dict[str, Any]]], + Iterable[PredictionResult]] -KeyedTensorInferenceFn = Callable[ - [Sequence[Dict[str, torch.Tensor]], torch.nn.Module, Optional[Dict[str, Any]]], Iterable[PredictionResult]] +KeyedTensorInferenceFn = Callable[[ + Sequence[Dict[str, torch.Tensor]], + torch.nn.Module, + Optional[Dict[str, Any]] +], + Iterable[PredictionResult]] def _load_model( @@ -105,8 +110,10 @@ def _convert_to_result( ] return [PredictionResult(x, y) for x, y in zip(batch, predictions)] + #def _default_tensor_inference_fn(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: - # return +# return + class PytorchModelHandlerTensor(ModelHandler[torch.Tensor, PredictionResult, @@ -214,10 +221,10 @@ def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]): def _default_keyed_tensor_inference_fn( - batch: Sequence[Dict[str, torch.Tensor]], - model: torch.nn.Module, - inference_args: Optional[Dict[str, Any]] = None - ) -> Iterable[PredictionResult]: + batch: Sequence[Dict[str, torch.Tensor]], + model: torch.nn.Module, + inference_args: Optional[Dict[str, + Any]] = None) -> Iterable[PredictionResult]: return @@ -232,7 +239,8 @@ def __init__( model_params: Dict[str, Any], device: str = 'CPU', *, - inference_fn: KeyedTensorInferenceFn = _default_keyed_tensor_inference_fn): + inference_fn: KeyedTensorInferenceFn = _default_keyed_tensor_inference_fn + ): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -270,7 +278,7 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params - self._inference_fn=inference_fn + self._inference_fn = inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" From a8d51501b2b43ae530a8ec831737a61fe783439b Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 11:42:04 -0500 Subject: [PATCH 04/22] Split out default --- .../ml/inference/pytorch_inference.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 038422dd57a7..e34e7f6a2b06 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -111,8 +111,17 @@ def _convert_to_result( return [PredictionResult(x, y) for x, y in zip(batch, predictions)] -#def _default_tensor_inference_fn(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: -# return +def _default_tensor_inference_fn( + batch: Sequence[torch.Tensor], + model: torch.nn.Module, + inference_args: Optional[Dict[str, + Any]] = None) -> Iterable[PredictionResult]: + # torch.no_grad() mitigates GPU memory issues + # https://github.com/apache/beam/issues/22811 + with torch.no_grad(): + batched_tensors = torch.stack(batch) + predictions = model(batched_tensors, **inference_args) + return _convert_to_result(batch, predictions) class PytorchModelHandlerTensor(ModelHandler[torch.Tensor, @@ -123,7 +132,9 @@ def __init__( state_dict_path: str, model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], - device: str = 'CPU'): + device: str = 'CPU', + *, + inference_fn: TensorInferenceFn = _default_tensor_inference_fn): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -142,6 +153,8 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. + inference_fn: the inference function to use during RunInference. + default=_default_tensor_inference_fn **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. @@ -155,6 +168,7 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params + self._inference_fn = inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" @@ -194,13 +208,8 @@ def run_inference( """ inference_args = {} if not inference_args else inference_args - # torch.no_grad() mitigates GPU memory issues - # https://github.com/apache/beam/issues/22811 converted_tensors = _convert_to_device(batch, self._device) - with torch.no_grad(): - batched_tensors = torch.stack(converted_tensors) - predictions = model(batched_tensors, **inference_args) - return _convert_to_result(batch, predictions) + return self._inference_fn(converted_tensors, model, inference_args) def get_num_bytes(self, batch: Sequence[torch.Tensor]) -> int: """ From a495835be07365ce435b5a85afcc312e7f8ae4ae Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 11:52:01 -0500 Subject: [PATCH 05/22] Remove Keyed version for testing --- .../ml/inference/pytorch_inference.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index e34e7f6a2b06..618eb7e5e344 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -229,12 +229,12 @@ def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]): pass -def _default_keyed_tensor_inference_fn( - batch: Sequence[Dict[str, torch.Tensor]], - model: torch.nn.Module, - inference_args: Optional[Dict[str, - Any]] = None) -> Iterable[PredictionResult]: - return +#def _default_keyed_tensor_inference_fn( +# batch: Sequence[Dict[str, torch.Tensor]], +# model: torch.nn.Module, +# inference_args: Optional[Dict[str, +# Any]] = None) -> Iterable[PredictionResult]: +# return @experimental(extra_message="No backwards-compatibility guarantees.") @@ -247,8 +247,6 @@ def __init__( model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], device: str = 'CPU', - *, - inference_fn: KeyedTensorInferenceFn = _default_keyed_tensor_inference_fn ): """Implementation of the ModelHandler interface for PyTorch. @@ -272,8 +270,6 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. - inference_fn: the inference function to use. - default=_default_keyed_tensor_inference_fn **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. @@ -287,7 +283,6 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params - self._inference_fn = inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" From 4e35ebdec7013b9ceb3b655fa183c95d7d0d4ecf Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 14:57:14 -0500 Subject: [PATCH 06/22] Move device optimization --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 618eb7e5e344..421d7e5ea582 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -39,7 +39,7 @@ ] TensorInferenceFn = Callable[ - [Sequence[torch.Tensor], torch.nn.Module, Optional[Dict[str, Any]]], + [Sequence[torch.Tensor], torch.nn.Module, str, Optional[Dict[str, Any]]], Iterable[PredictionResult]] KeyedTensorInferenceFn = Callable[[ @@ -114,12 +114,14 @@ def _convert_to_result( def _default_tensor_inference_fn( batch: Sequence[torch.Tensor], model: torch.nn.Module, + device: str, inference_args: Optional[Dict[str, Any]] = None) -> Iterable[PredictionResult]: # torch.no_grad() mitigates GPU memory issues # https://github.com/apache/beam/issues/22811 with torch.no_grad(): batched_tensors = torch.stack(batch) + batched_tensors = _convert_to_device(batched_tensors, device) predictions = model(batched_tensors, **inference_args) return _convert_to_result(batch, predictions) @@ -208,8 +210,7 @@ def run_inference( """ inference_args = {} if not inference_args else inference_args - converted_tensors = _convert_to_device(batch, self._device) - return self._inference_fn(converted_tensors, model, inference_args) + return self._inference_fn(batch, model, self._device, inference_args) def get_num_bytes(self, batch: Sequence[torch.Tensor]) -> int: """ From 0c4e292765b9978efb68805b7c9496e21340d63a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 16:26:21 -0500 Subject: [PATCH 07/22] Make default available for import, add to test classes --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 4 ++-- .../python/apache_beam/ml/inference/pytorch_inference_test.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 421d7e5ea582..dade8373dea6 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -111,7 +111,7 @@ def _convert_to_result( return [PredictionResult(x, y) for x, y in zip(batch, predictions)] -def _default_tensor_inference_fn( +def default_tensor_inference_fn( batch: Sequence[torch.Tensor], model: torch.nn.Module, device: str, @@ -136,7 +136,7 @@ def __init__( model_params: Dict[str, Any], device: str = 'CPU', *, - inference_fn: TensorInferenceFn = _default_tensor_inference_fn): + inference_fn: TensorInferenceFn = default_tensor_inference_fn): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 32036f43de86..a2e7fd1979fb 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -37,6 +37,7 @@ import torch from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch_inference import default_tensor_inference_fn from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor except ImportError: @@ -108,12 +109,14 @@ class TestPytorchModelHandlerForInferenceOnly(PytorchModelHandlerTensor): def __init__(self, device): self._device = device + self._inference_fn = default_tensor_inference_fn class TestPytorchModelHandlerKeyedTensorForInferenceOnly( PytorchModelHandlerKeyedTensor): def __init__(self, device): self._device = device + self._inference_fn = default_tensor_inference_fn def _compare_prediction_result(x, y): From 5143a2368853172e2748e85bb7c5d8c41bb1cd8f Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 9 Nov 2022 16:26:52 -0500 Subject: [PATCH 08/22] Remove incorrect default from keyed test --- sdks/python/apache_beam/ml/inference/pytorch_inference_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index a2e7fd1979fb..249ecfd81ef1 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -116,7 +116,6 @@ class TestPytorchModelHandlerKeyedTensorForInferenceOnly( PytorchModelHandlerKeyedTensor): def __init__(self, device): self._device = device - self._inference_fn = default_tensor_inference_fn def _compare_prediction_result(x, y): From 05b1fcc3744d99439ab329fd734d3ab5c91e77cd Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 10 Nov 2022 13:02:18 -0500 Subject: [PATCH 09/22] Keyed impl --- .../ml/inference/pytorch_inference.py | 54 ++++++++++--------- .../ml/inference/pytorch_inference_test.py | 2 + 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index dade8373dea6..94faf0ba7d8b 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -45,6 +45,7 @@ KeyedTensorInferenceFn = Callable[[ Sequence[Dict[str, torch.Tensor]], torch.nn.Module, + str, Optional[Dict[str, Any]] ], Iterable[PredictionResult]] @@ -230,12 +231,30 @@ def validate_inference_args(self, inference_args: Optional[Dict[str, Any]]): pass -#def _default_keyed_tensor_inference_fn( -# batch: Sequence[Dict[str, torch.Tensor]], -# model: torch.nn.Module, -# inference_args: Optional[Dict[str, -# Any]] = None) -> Iterable[PredictionResult]: -# return +def default_keyed_tensor_inference_fn( + batch: Sequence[Dict[str, torch.Tensor]], + model: torch.nn.Module, + device: str, + inference_args: Optional[Dict[str, + Any]] = None) -> Iterable[PredictionResult]: + # If elements in `batch` are provided as a dictionaries from key to Tensors, + # then iterate through the batch list, and group Tensors to the same key + key_to_tensor_list = defaultdict(list) + + # torch.no_grad() mitigates GPU memory issues + # https://github.com/apache/beam/issues/22811 + with torch.no_grad(): + for example in batch: + for key, tensor in example.items(): + key_to_tensor_list[key].append(tensor) + key_to_batched_tensors = {} + for key in key_to_tensor_list: + batched_tensors = torch.stack(key_to_tensor_list[key]) + batched_tensors = _convert_to_device(batched_tensors, self._device) + key_to_batched_tensors[key] = batched_tensors + predictions = model(**key_to_batched_tensors, **inference_args) + + return _convert_to_result(batch, predictions) @experimental(extra_message="No backwards-compatibility guarantees.") @@ -248,7 +267,8 @@ def __init__( model_class: Callable[..., torch.nn.Module], model_params: Dict[str, Any], device: str = 'CPU', - ): + *, + inference_fn: KeyedTensorInferenceFn = default_keyed_tensor_inference_fn): """Implementation of the ModelHandler interface for PyTorch. Example Usage:: @@ -284,6 +304,7 @@ def __init__( self._device = torch.device('cpu') self._model_class = model_class self._model_params = model_params + self._inference_fn = inference_fn def load_model(self) -> torch.nn.Module: """Loads and initializes a Pytorch model for processing.""" @@ -323,24 +344,7 @@ def run_inference( """ inference_args = {} if not inference_args else inference_args - # If elements in `batch` are provided as a dictionaries from key to Tensors, - # then iterate through the batch list, and group Tensors to the same key - key_to_tensor_list = defaultdict(list) - - # torch.no_grad() mitigates GPU memory issues - # https://github.com/apache/beam/issues/22811 - with torch.no_grad(): - for example in batch: - for key, tensor in example.items(): - key_to_tensor_list[key].append(tensor) - key_to_batched_tensors = {} - for key in key_to_tensor_list: - batched_tensors = torch.stack(key_to_tensor_list[key]) - batched_tensors = _convert_to_device(batched_tensors, self._device) - key_to_batched_tensors[key] = batched_tensors - predictions = model(**key_to_batched_tensors, **inference_args) - - return _convert_to_result(batch, predictions) + return self._inference_fn(batch, model, self._device, inference_args) def get_num_bytes(self, batch: Sequence[torch.Tensor]) -> int: """ diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 249ecfd81ef1..bbdb2b3f96b1 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -37,6 +37,7 @@ import torch from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch_inference import default_keyed_tensor_inference_fn from apache_beam.ml.inference.pytorch_inference import default_tensor_inference_fn from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor @@ -116,6 +117,7 @@ class TestPytorchModelHandlerKeyedTensorForInferenceOnly( PytorchModelHandlerKeyedTensor): def __init__(self, device): self._device = device + self._inference_fn = default_keyed_tensor_inference_fn def _compare_prediction_result(x, y): From f63820b20170ff9363a734278636e9ffde5e16fd Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 10 Nov 2022 13:37:20 -0500 Subject: [PATCH 10/22] Fix device arg --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 94faf0ba7d8b..2b801976a2a0 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -250,7 +250,7 @@ def default_keyed_tensor_inference_fn( key_to_batched_tensors = {} for key in key_to_tensor_list: batched_tensors = torch.stack(key_to_tensor_list[key]) - batched_tensors = _convert_to_device(batched_tensors, self._device) + batched_tensors = _convert_to_device(batched_tensors, device) key_to_batched_tensors[key] = batched_tensors predictions = model(**key_to_batched_tensors, **inference_args) @@ -291,6 +291,8 @@ def __init__( device: the device on which you wish to run the model. If ``device = GPU`` then a GPU device will be used if it is available. Otherwise, it will be CPU. + inference_fn: the function to invoke on run_inference. + default = default_keyed_tensor_inference_fn **Supported Versions:** RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10. From a92672584f16688dc99c363bb37459fb5d67fbbc Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 10 Nov 2022 15:35:18 -0500 Subject: [PATCH 11/22] custom inference test --- .../ml/inference/pytorch_inference_test.py | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index bbdb2b3f96b1..555a8cf69733 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -108,16 +108,16 @@ class TestPytorchModelHandlerForInferenceOnly(PytorchModelHandlerTensor): - def __init__(self, device): + def __init__(self, device, *, inference_fn = default_tensor_inference_fn): self._device = device - self._inference_fn = default_tensor_inference_fn + self._inference_fn = inference_fn class TestPytorchModelHandlerKeyedTensorForInferenceOnly( PytorchModelHandlerKeyedTensor): - def __init__(self, device): + def __init__(self, device, *, inference_fn = default_keyed_tensor_inference_fn): self._device = device - self._inference_fn = default_keyed_tensor_inference_fn + self._inference_fn = inference_fn def _compare_prediction_result(x, y): @@ -138,6 +138,16 @@ def _compare_prediction_result(x, y): return torch.equal(x.inference, y.inference) +def custom_tensor_inference_fn(batch, model, device, inference_args): + predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + batch, + torch.Tensor([item * 2.0 + 1.5 for item in batch]).reshape(-1, 1)) + ] + return predictions + + class PytorchLinearRegression(torch.nn.Module): def __init__(self, input_dim, output_dim): super().__init__() @@ -235,6 +245,33 @@ def test_run_inference_multiple_tensor_features_dict_output(self): for actual, expected in zip(predictions, TWO_FEATURES_DICT_OUT_PREDICTIONS): self.assertEqual(actual, expected) + def test_run_inference_custom(self): + examples = [ + torch.from_numpy(np.array([1], dtype="float32")), + torch.from_numpy(np.array([5], dtype="float32")), + torch.from_numpy(np.array([-3], dtype="float32")), + torch.from_numpy(np.array([10.0], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 1.5 + for example in examples]).reshape(-1, 1)) + ] + + model = PytorchLinearRegression(input_dim=1, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = TestPytorchModelHandlerForInferenceOnly( + torch.device('cpu'), inference_fn = custom_tensor_inference_fn) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertEqual(actual, expected) + def test_run_inference_keyed(self): """ This tests for inputs that are passed as a dictionary from key to tensor From 253d88761d01a421bc59a4040822e178d9940172 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 10 Nov 2022 15:36:10 -0500 Subject: [PATCH 12/22] formatting --- .../apache_beam/ml/inference/pytorch_inference_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 555a8cf69733..cb4c29e11035 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -108,14 +108,14 @@ class TestPytorchModelHandlerForInferenceOnly(PytorchModelHandlerTensor): - def __init__(self, device, *, inference_fn = default_tensor_inference_fn): + def __init__(self, device, *, inference_fn=default_tensor_inference_fn): self._device = device self._inference_fn = inference_fn class TestPytorchModelHandlerKeyedTensorForInferenceOnly( PytorchModelHandlerKeyedTensor): - def __init__(self, device, *, inference_fn = default_keyed_tensor_inference_fn): + def __init__(self, device, *, inference_fn=default_keyed_tensor_inference_fn): self._device = device self._inference_fn = inference_fn @@ -267,7 +267,7 @@ def test_run_inference_custom(self): model.eval() inference_runner = TestPytorchModelHandlerForInferenceOnly( - torch.device('cpu'), inference_fn = custom_tensor_inference_fn) + torch.device('cpu'), inference_fn=custom_tensor_inference_fn) predictions = inference_runner.run_inference(examples, model) for actual, expected in zip(predictions, expected_predictions): self.assertEqual(actual, expected) From b9778637c3711bbd7feb70d9845a758d9ab8d948 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 10:52:50 -0500 Subject: [PATCH 13/22] Add helpers to define custom inference functions using model methods --- .../ml/inference/pytorch_inference.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 2b801976a2a0..3ffe42a187d5 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -127,6 +127,31 @@ def default_tensor_inference_fn( return _convert_to_result(batch, predictions) +def make_tensor_model_fn(model_fn: str) -> TensorInferenceFn: + """ + Produces a TensorInferenceFn that uses a method of the model other that + the forward() method. + + Args: + model_fn: A string name of the method to be used. This is accessed through + model.get_attr() + """ + def attr_fn( + batch: Sequence[torch.Tensor], + model: torch.nn.Module, + device: str, + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + with torch.no_grad(): + batched_tensors = torch.stack(batch) + batched_tensors = _convert_to_device(batched_tensors, device) + pred_fn = model.get_attr(model_fn) + predictions = pred_fn(batched_tensors, **inference_args) + return _convert_to_result(batch, predictions) + + return attr_fn + + class PytorchModelHandlerTensor(ModelHandler[torch.Tensor, PredictionResult, torch.nn.Module]): @@ -257,6 +282,43 @@ def default_keyed_tensor_inference_fn( return _convert_to_result(batch, predictions) +def make_keyed_tensor_model_fn(model_fn: str) -> KeyedTensorInferenceFn: + """ + Produces a KeyedTensorInferenceFn that uses a method of the model other that + the forward() method. + + Args: + model_fn: A string name of the method to be used. This is accessed through + model.get_attr() + """ + def attr_fn( + batch: Sequence[torch.Tensor], + model: torch.nn.Module, + device: str, + inference_args: Optional[Dict[str, Any]] = None + ) -> Iterable[PredictionResult]: + # If elements in `batch` are provided as a dictionaries from key to Tensors, + # then iterate through the batch list, and group Tensors to the same key + key_to_tensor_list = defaultdict(list) + + # torch.no_grad() mitigates GPU memory issues + # https://github.com/apache/beam/issues/22811 + with torch.no_grad(): + for example in batch: + for key, tensor in example.items(): + key_to_tensor_list[key].append(tensor) + key_to_batched_tensors = {} + for key in key_to_tensor_list: + batched_tensors = torch.stack(key_to_tensor_list[key]) + batched_tensors = _convert_to_device(batched_tensors, device) + key_to_batched_tensors[key] = batched_tensors + pred_fn = model.get_attr(model_fn) + predictions = pred_fn(**key_to_batched_tensors, **inference_args) + return _convert_to_result(batch, predictions) + + return attr_fn + + @experimental(extra_message="No backwards-compatibility guarantees.") class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, torch.Tensor], PredictionResult, From d9928739c1bc4ac6493e5d4af39b7b8147f2265a Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 12:33:42 -0500 Subject: [PATCH 14/22] Trailing whitespace --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 3ffe42a187d5..3145504f73ab 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -130,7 +130,7 @@ def default_tensor_inference_fn( def make_tensor_model_fn(model_fn: str) -> TensorInferenceFn: """ Produces a TensorInferenceFn that uses a method of the model other that - the forward() method. + the forward() method. Args: model_fn: A string name of the method to be used. This is accessed through @@ -285,7 +285,7 @@ def default_keyed_tensor_inference_fn( def make_keyed_tensor_model_fn(model_fn: str) -> KeyedTensorInferenceFn: """ Produces a KeyedTensorInferenceFn that uses a method of the model other that - the forward() method. + the forward() method. Args: model_fn: A string name of the method to be used. This is accessed through From f95042d8923bfef593bced417063d96d1c60cf00 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 13:25:10 -0500 Subject: [PATCH 15/22] Unit tests --- .../ml/inference/pytorch_inference_test.py | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index cb4c29e11035..b244ce35ef29 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -39,6 +39,8 @@ from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.pytorch_inference import default_keyed_tensor_inference_fn from apache_beam.ml.inference.pytorch_inference import default_tensor_inference_fn + from apache_beam.ml.inference.pytorch_inference import make_keyed_tensor_model_fn + from apache_beam.ml.inference.pytorch_inference import make_tensor_model_fn from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor except ImportError: @@ -356,6 +358,73 @@ def test_inference_runner_inference_args(self): for actual, expected in zip(predictions, KEYED_TORCH_PREDICTIONS): self.assertEqual(actual, expected) + def test_run_inference_helper(self): + examples = [ + torch.from_numpy(np.array([1], dtype="float32")), + torch.from_numpy(np.array([5], dtype="float32")), + torch.from_numpy(np.array([-3], dtype="float32")), + torch.from_numpy(np.array([10.0], dtype="float32")), + ] + expected_predictions = [ + PredictionResult(ex, pred) for ex, + pred in zip( + examples, + torch.Tensor([example * 2.0 + 0.5 + for example in examples]).reshape(-1, 1)) + ] + + gen_fn = make_tensor_model_fn('generate') + + model = PytorchLinearRegression(input_dim=1, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + inference_runner = TestPytorchModelHandlerForInferenceOnly( + torch.device('cpu'), inference_fn=gen_fn) + predictions = inference_runner.run_inference(examples, model) + for actual, expected in zip(predictions, expected_predictions): + self.assertEqual(actual, expected) + + def test_run_inference_keyed_helper(self): + """ + This tests for inputs that are passed as a dictionary from key to tensor + instead of a standard non-keyed tensor example. + + Example: + Typical input format is + input = torch.tensor([1, 2, 3]) + + But Pytorch syntax allows inputs to have the form + input = { + 'k1' : torch.tensor([1, 2, 3]), + 'k2' : torch.tensor([4, 5, 6]) + } + """ + class PytorchLinearRegressionMultipleArgs(torch.nn.Module): + def __init__(self, input_dim, output_dim): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, k1, k2): + out = self.linear(k1) + self.linear(k2) + return out + + model = PytorchLinearRegressionMultipleArgs(input_dim=1, output_dim=1) + model.load_state_dict( + OrderedDict([('linear.weight', torch.Tensor([[2.0]])), + ('linear.bias', torch.Tensor([0.5]))])) + model.eval() + + gen_fn = make_keyed_tensor_model_fn('generate') + + inference_runner = TestPytorchModelHandlerKeyedTensorForInferenceOnly( + torch.device('cpu'), inference_fn=gen_fn) + predictions = inference_runner.run_inference(KEYED_TORCH_EXAMPLES, model) + for actual, expected in zip(predictions, KEYED_TORCH_PREDICTIONS): + self.assertTrue(_compare_prediction_result(actual, expected)) + def test_num_bytes(self): inference_runner = TestPytorchModelHandlerForInferenceOnly( torch.device('cpu')) From fb9fb9a4edbfc1733d3b655ae49769b4eacc4ddd Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 15:37:14 -0500 Subject: [PATCH 16/22] Fix incorrect getattr syntax --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 3145504f73ab..fa8d242f63d9 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -145,7 +145,7 @@ def attr_fn( with torch.no_grad(): batched_tensors = torch.stack(batch) batched_tensors = _convert_to_device(batched_tensors, device) - pred_fn = model.get_attr(model_fn) + pred_fn = getattr(model_fn, str) predictions = pred_fn(batched_tensors, **inference_args) return _convert_to_result(batch, predictions) @@ -312,7 +312,7 @@ def attr_fn( batched_tensors = torch.stack(key_to_tensor_list[key]) batched_tensors = _convert_to_device(batched_tensors, device) key_to_batched_tensors[key] = batched_tensors - pred_fn = model.get_attr(model_fn) + pred_fn = getattr(model, model_fn) predictions = pred_fn(**key_to_batched_tensors, **inference_args) return _convert_to_result(batch, predictions) From 047e873d043b7f8367da7afd4760e59c1e52c2a3 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 16:05:45 -0500 Subject: [PATCH 17/22] Type typo --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index fa8d242f63d9..67bcf60eab86 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -145,7 +145,7 @@ def attr_fn( with torch.no_grad(): batched_tensors = torch.stack(batch) batched_tensors = _convert_to_device(batched_tensors, device) - pred_fn = getattr(model_fn, str) + pred_fn = getattr(model, model_fn) predictions = pred_fn(batched_tensors, **inference_args) return _convert_to_result(batch, predictions) From 2d178eddbc793e5125c7ef581def4bd8ea77ea78 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 14 Nov 2022 16:06:29 -0500 Subject: [PATCH 18/22] Fix docstring --- sdks/python/apache_beam/ml/inference/pytorch_inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index 67bcf60eab86..a8b834c99a37 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -134,7 +134,7 @@ def make_tensor_model_fn(model_fn: str) -> TensorInferenceFn: Args: model_fn: A string name of the method to be used. This is accessed through - model.get_attr() + getattr(model, model_fn) """ def attr_fn( batch: Sequence[torch.Tensor], @@ -289,7 +289,7 @@ def make_keyed_tensor_model_fn(model_fn: str) -> KeyedTensorInferenceFn: Args: model_fn: A string name of the method to be used. This is accessed through - model.get_attr() + getattr(model, model_fn) """ def attr_fn( batch: Sequence[torch.Tensor], From be503814fd700bb353f843052750b89b46f5f300 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 15 Nov 2022 19:11:37 +0000 Subject: [PATCH 19/22] Fix keyed helper, add basic generate route --- .../apache_beam/ml/inference/pytorch_inference.py | 2 +- .../ml/inference/pytorch_inference_test.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index a8b834c99a37..5428e8bf4cac 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -313,7 +313,7 @@ def attr_fn( batched_tensors = _convert_to_device(batched_tensors, device) key_to_batched_tensors[key] = batched_tensors pred_fn = getattr(model, model_fn) - predictions = pred_fn(**key_to_batched_tensors, **inference_args) + predictions = pred_fn(**key_to_batched_tensors, **inference_args) return _convert_to_result(batch, predictions) return attr_fn diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index b244ce35ef29..32ff05f3e91e 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -159,6 +159,10 @@ def forward(self, x): out = self.linear(x) return out + def generate(self, x): + out = self.linear(x) + return out + class PytorchLinearRegressionDict(torch.nn.Module): def __init__(self, input_dim, output_dim): @@ -169,6 +173,10 @@ def forward(self, x): out = self.linear(x) return {'output1': out, 'output2': out} + def generate(self, x): + out = self.linear(x) + return {'output1': out, 'output2': out} + class PytorchLinearRegressionKeyedBatchAndExtraInferenceArgs(torch.nn.Module): """ @@ -411,6 +419,10 @@ def forward(self, k1, k2): out = self.linear(k1) + self.linear(k2) return out + def generate(self, k1, k2): + out = self.linear(k1) + self.linear(k2) + return out + model = PytorchLinearRegressionMultipleArgs(input_dim=1, output_dim=1) model.load_state_dict( OrderedDict([('linear.weight', torch.Tensor([[2.0]])), From 84249ccd4bf850814248003b50c172856b41f75c Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 15 Nov 2022 14:21:25 -0500 Subject: [PATCH 20/22] Modify generate() to be different than forward() --- .../ml/inference/pytorch_inference_test.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 32ff05f3e91e..0e66497845c2 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -101,6 +101,14 @@ for example in KEYED_TORCH_EXAMPLES]).reshape(-1, 1)) ] +KEYED_TORCH_HELPER_PREDICTIONS = [ + PredictionResult(ex, pred) for ex, + pred in zip( + KEYED_TORCH_EXAMPLES, + torch.Tensor([(example['k1'] * 2.0 + 0.5) + (example['k2'] * 2.0 + 0.5) + 0.5 + for example in KEYED_TORCH_EXAMPLES]).reshape(-1, 1)) +] + KEYED_TORCH_DICT_OUT_PREDICTIONS = [ PredictionResult( p.example, { @@ -160,7 +168,7 @@ def forward(self, x): return out def generate(self, x): - out = self.linear(x) + out = self.linear(x) + 0.5 return out @@ -377,7 +385,7 @@ def test_run_inference_helper(self): PredictionResult(ex, pred) for ex, pred in zip( examples, - torch.Tensor([example * 2.0 + 0.5 + torch.Tensor([example * 2.0 + 1.0 for example in examples]).reshape(-1, 1)) ] @@ -420,7 +428,7 @@ def forward(self, k1, k2): return out def generate(self, k1, k2): - out = self.linear(k1) + self.linear(k2) + out = self.linear(k1) + self.linear(k2) + 0.5 return out model = PytorchLinearRegressionMultipleArgs(input_dim=1, output_dim=1) @@ -434,7 +442,7 @@ def generate(self, k1, k2): inference_runner = TestPytorchModelHandlerKeyedTensorForInferenceOnly( torch.device('cpu'), inference_fn=gen_fn) predictions = inference_runner.run_inference(KEYED_TORCH_EXAMPLES, model) - for actual, expected in zip(predictions, KEYED_TORCH_PREDICTIONS): + for actual, expected in zip(predictions, KEYED_TORCH_HELPER_PREDICTIONS): self.assertTrue(_compare_prediction_result(actual, expected)) def test_num_bytes(self): From f3ce521d3a9a3d20cbbcdfd9099e49c9b3b4171b Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 15 Nov 2022 14:29:13 -0500 Subject: [PATCH 21/22] formatting --- .../apache_beam/ml/inference/pytorch_inference_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 0e66497845c2..48659e148ded 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -105,7 +105,8 @@ PredictionResult(ex, pred) for ex, pred in zip( KEYED_TORCH_EXAMPLES, - torch.Tensor([(example['k1'] * 2.0 + 0.5) + (example['k2'] * 2.0 + 0.5) + 0.5 + torch.Tensor([(example['k1'] * 2.0 + 0.5) + + (example['k2'] * 2.0 + 0.5) + 0.5 for example in KEYED_TORCH_EXAMPLES]).reshape(-1, 1)) ] @@ -385,7 +386,7 @@ def test_run_inference_helper(self): PredictionResult(ex, pred) for ex, pred in zip( examples, - torch.Tensor([example * 2.0 + 1.0 + torch.Tensor([example * 2.0 + 1.0 for example in examples]).reshape(-1, 1)) ] From dae57c562e1971621291836102f20949559b8fd6 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Tue, 15 Nov 2022 14:37:28 -0500 Subject: [PATCH 22/22] Remove extra generate() def --- .../python/apache_beam/ml/inference/pytorch_inference_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 48659e148ded..d6d3a2934555 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -182,10 +182,6 @@ def forward(self, x): out = self.linear(x) return {'output1': out, 'output2': out} - def generate(self, x): - out = self.linear(x) - return {'output1': out, 'output2': out} - class PytorchLinearRegressionKeyedBatchAndExtraInferenceArgs(torch.nn.Module): """