From fb2557b7d743030eb6310abbcf54aa94ad6d0ee1 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 27 Apr 2022 16:25:33 -0400 Subject: [PATCH 01/13] Fix lint and types --- sdks/python/apache_beam/ml/inference/api.py | 2 +- sdks/python/apache_beam/ml/inference/base.py | 10 +- .../ml/inference/sklearn_inference.py | 6 +- .../ml/inference/sklearn_loader_test.py | 177 ++++++++++++++++++ 4 files changed, 186 insertions(+), 9 deletions(-) create mode 100644 sdks/python/apache_beam/ml/inference/sklearn_loader_test.py diff --git a/sdks/python/apache_beam/ml/inference/api.py b/sdks/python/apache_beam/ml/inference/api.py index 73dd87bb4e95..df149214ef9f 100644 --- a/sdks/python/apache_beam/ml/inference/api.py +++ b/sdks/python/apache_beam/ml/inference/api.py @@ -51,7 +51,7 @@ class RunInference(beam.PTransform): TODO(BEAM-14046): Add and link to help documentation """ - def __init__(self, model_loader: base.ModelLoader) -> beam.pvalue.PCollection: + def __init__(self, model_loader: base.ModelLoader): self._model_loader = model_loader def expand(self, pcoll: beam.PCollection) -> beam.PCollection: diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 80490285edd3..8362416ee93a 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -44,7 +44,7 @@ from apache_beam.utils import shared try: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-position import resource except ImportError: resource = None @@ -59,7 +59,8 @@ class InferenceRunner(): """Implements running inferences for a framework.""" def run_inference(self, batch: List[Any], model: Any) -> Iterable[Any]: - """Runs inferences on a batch of examples and returns an Iterable of Predictions.""" + """Runs inferences on a batch of examples and + returns an Iterable of Predictions.""" raise NotImplementedError(type(self)) def get_num_bytes(self, batch: Any) -> int: @@ -67,7 +68,7 @@ def get_num_bytes(self, batch: Any) -> int: return len(pickle.dumps(batch)) def get_metrics_namespace(self) -> str: - """Returns a namespace for metrics collected by the RunInference transform.""" + """Returns a namespace for metrics collected by RunInference transform.""" return 'RunInference' @@ -249,8 +250,7 @@ def get_current_time_in_microseconds(self) -> int: class _FineGrainedClock(_Clock): def get_current_time_in_microseconds(self) -> int: return int( - time.clock_gettime_ns(time.CLOCK_REALTIME) / # pytype: disable=module-attr - _NANOSECOND_TO_MICROSECOND) + time.clock_gettime_ns(time.CLOCK_REALTIME) / _NANOSECOND_TO_MICROSECOND) #TODO(BEAM-14255): Research simplifying the internal clock and just using time. diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index e0890a725c59..b9127a75ee3f 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -42,14 +42,14 @@ class ModelFileType(enum.Enum): class SklearnInferenceRunner(InferenceRunner): - def run_inference(self, batch: List[numpy.array], - model: Any) -> Iterable[numpy.array]: + def run_inference(self, batch: List[numpy.ndarray], + model: Any) -> Iterable[PredictionResult]: # vectorize data for better performance vectorized_batch = numpy.stack(batch, axis=0) predictions = model.predict(vectorized_batch) return [PredictionResult(x, y) for x, y in zip(batch, predictions)] - def get_num_bytes(self, batch: List[numpy.array]) -> int: + def get_num_bytes(self, batch: List[numpy.ndarray]) -> int: """Returns the number of bytes of data for a batch.""" return sum(sys.getsizeof(element) for element in batch) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py new file mode 100644 index 000000000000..e318d0e64b32 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import pickle +import platform +import shutil +import sys +import tempfile +import unittest + +import joblib +import numpy +from sklearn import svm + +import apache_beam as beam +from apache_beam.ml.inference import api +from apache_beam.ml.inference import base +from apache_beam.ml.inference.sklearn_loader import ModelFileType +from apache_beam.ml.inference.sklearn_loader import SklearnInferenceRunner +from apache_beam.ml.inference.sklearn_loader import SklearnModelLoader +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +def _compare_prediction_result(a, b): + example_equal = numpy.array_equal(a.example, b.example) + return a.inference == b.inference and example_equal + + +class FakeModel: + def __init__(self): + self.total_predict_calls = 0 + + def predict(self, input_vector: numpy.ndarray): + self.total_predict_calls += 1 + return numpy.sum(input_vector, axis=1) + + +def build_model(): + x = [[0, 0], [1, 1]] + y = [0, 1] + model = svm.SVC() + model.fit(x, y) + return model + + +class SkLearnRunInferenceTest(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_predict_output(self): + fake_model = FakeModel() + inference_runner = SklearnInferenceRunner() + batched_examples = [ + numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) + ] + expected_predictions = [ + api.PredictionResult(numpy.array([1, 2, 3]), 6), + api.PredictionResult(numpy.array([4, 5, 6]), 15), + api.PredictionResult(numpy.array([7, 8, 9]), 24) + ] + inferences = inference_runner.run_inference(batched_examples, fake_model) + for actual, expected in zip(inferences, expected_predictions): + self.assertTrue(_compare_prediction_result(actual, expected)) + + def test_data_vectorized(self): + fake_model = FakeModel() + inference_runner = SklearnInferenceRunner() + batched_examples = [ + numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) + ] + # even though there are 3 examples, the data should + # be vectorized and only 1 call should happen. + inference_runner.run_inference(batched_examples, fake_model) + self.assertEqual(1, fake_model.total_predict_calls) + + def test_num_bytes(self): + inference_runner = SklearnInferenceRunner() + batched_examples_int = [ + numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) + ] + self.assertEqual( + sys.getsizeof(batched_examples_int[0]) * 3, + inference_runner.get_num_bytes(batched_examples_int)) + + batched_examples_float = [ + numpy.array([1.0, 2.0, 3.0]), + numpy.array([4.1, 5.2, 6.3]), + numpy.array([7.7, 8.8, 9.9]) + ] + self.assertEqual( + sys.getsizeof(batched_examples_float[0]) * 3, + inference_runner.get_num_bytes(batched_examples_float)) + + @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') + def test_pipeline_pickled(self): + temp_file_name = self.tmpdir + os.sep + 'pickled_file' + with open(temp_file_name, 'wb') as file: + pickle.dump(build_model(), file) + with TestPipeline() as pipeline: + examples = [numpy.array([0, 0]), numpy.array([1, 1])] + + pcoll = pipeline | 'start' >> beam.Create(examples) + #TODO(BEAM-14305) Test against the public API. + actual = pcoll | base.RunInference( + SklearnModelLoader(model_uri=temp_file_name)) + expected = [ + api.PredictionResult(numpy.array([0, 0]), 0), + api.PredictionResult(numpy.array([1, 1]), 1) + ] + assert_that( + actual, equal_to(expected, equals_fn=_compare_prediction_result)) + + @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') + def test_pipeline_joblib(self): + temp_file_name = self.tmpdir + os.sep + 'joblib_file' + with open(temp_file_name, 'wb') as file: + joblib.dump(build_model(), file) + with TestPipeline() as pipeline: + examples = [numpy.array([0, 0]), numpy.array([1, 1])] + + pcoll = pipeline | 'start' >> beam.Create(examples) + #TODO(BEAM-14305) Test against the public API. + + actual = pcoll | base.RunInference( + SklearnModelLoader( + model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB)) + expected = [ + api.PredictionResult(numpy.array([0, 0]), 0), + api.PredictionResult(numpy.array([1, 1]), 1) + ] + assert_that( + actual, equal_to(expected, equals_fn=_compare_prediction_result)) + + def test_bad_file_raises(self): + with self.assertRaises(RuntimeError): + with TestPipeline() as pipeline: + examples = [numpy.array([0, 0])] + pcoll = pipeline | 'start' >> beam.Create(examples) + # TODO(BEAM-14305) Test against the public API. + _ = pcoll | base.RunInference( + SklearnModelLoader(model_uri='/var/bad_file_name')) + pipeline.run() + + @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') + def test_bad_input_type_raises(self): + with self.assertRaisesRegex(AssertionError, + 'Unsupported serialization type'): + with tempfile.NamedTemporaryFile() as file: + model_loader = SklearnModelLoader( + model_uri=file.name, model_file_type=None) + model_loader.load_model() + + +if __name__ == '__main__': + unittest.main() From 7ae1ab13ac9920928ac8958fa8e6c7ad041b5a60 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 27 Apr 2022 17:11:26 -0400 Subject: [PATCH 02/13] Add __init__ file to make the inference as package --- .../apache_beam/ml/inference/__init__.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 sdks/python/apache_beam/ml/inference/__init__.py diff --git a/sdks/python/apache_beam/ml/inference/__init__.py b/sdks/python/apache_beam/ml/inference/__init__.py new file mode 100644 index 000000000000..9f588901efc3 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/__init__.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Beam RunInference API +- For high-level documentation see + TODO(BEAM-14046): Add and link to help documentation +- :mod:`apache_beam.ml.inference.api`: Main RunInference interface +- :mod:`apache_beam.ml.inference.base`: RunInference base class +- :mod:`apache_beam.ml.inference.pytorch_inference`: Pytorch RunInference + implementation +- :mod:`apache_beam.ml.inference.sklearn_inference`: Scikit-learn RunInference + implementation +""" From 4ad8a737f97fc7999693480c6d0b2b0be25f5e75 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 27 Apr 2022 20:43:28 -0400 Subject: [PATCH 03/13] Fixup mypy errors --- sdks/python/apache_beam/ml/inference/api.py | 1 + sdks/python/apache_beam/ml/inference/base.py | 4 ++-- sdks/python/apache_beam/ml/inference/base_test.py | 2 +- sdks/python/apache_beam/ml/inference/pytorch.py | 8 ++++---- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/api.py b/sdks/python/apache_beam/ml/inference/api.py index df149214ef9f..10a5a306704a 100644 --- a/sdks/python/apache_beam/ml/inference/api.py +++ b/sdks/python/apache_beam/ml/inference/api.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# mypy: ignore-errors from dataclasses import dataclass from typing import Tuple diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 8362416ee93a..68971bcf70dd 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -44,10 +44,10 @@ from apache_beam.utils import shared try: - # pylint: disable=wrong-import-position + # pylint: disable=wrong-import-order, wrong-import-position import resource except ImportError: - resource = None + resource = None # type: ignore[assignment] _MICROSECOND_TO_MILLISECOND = 1000 _NANOSECOND_TO_MICROSECOND = 1000 diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py index ab7bb16383ec..49500e87a9ae 100644 --- a/sdks/python/apache_beam/ml/inference/base_test.py +++ b/sdks/python/apache_beam/ml/inference/base_test.py @@ -23,7 +23,7 @@ from typing import Iterable import apache_beam as beam -import apache_beam.ml.inference.base as base +from apache_beam.ml.inference import base from apache_beam.metrics.metric import MetricsFilter from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that diff --git a/sdks/python/apache_beam/ml/inference/pytorch.py b/sdks/python/apache_beam/ml/inference/pytorch.py index cb0935e3a3a7..b95f4afe1d0d 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch.py +++ b/sdks/python/apache_beam/ml/inference/pytorch.py @@ -49,10 +49,10 @@ def run_inference(self, batch: List[torch.Tensor], the inference call. """ - batch = torch.stack(batch) - if batch.device != self._device: - batch = batch.to(self._device) - predictions = model(batch) + torch_batch = torch.stack(batch) + if torch_batch.device != self._device: + torch_batch = torch_batch.to(self._device) + predictions = model(torch_batch) return [PredictionResult(x, y) for x, y in zip(batch, predictions)] def get_num_bytes(self, batch: List[torch.Tensor]) -> int: From 108dbd0932e32383ed17b6ea0c5f47e4dd627c11 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Apr 2022 00:58:03 -0400 Subject: [PATCH 04/13] remove content from __init__ --- sdks/python/apache_beam/ml/inference/__init__.py | 11 ----------- sdks/python/apache_beam/ml/inference/base.py | 3 ++- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/__init__.py b/sdks/python/apache_beam/ml/inference/__init__.py index 9f588901efc3..cce3acad34a4 100644 --- a/sdks/python/apache_beam/ml/inference/__init__.py +++ b/sdks/python/apache_beam/ml/inference/__init__.py @@ -14,14 +14,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -"""Beam RunInference API -- For high-level documentation see - TODO(BEAM-14046): Add and link to help documentation -- :mod:`apache_beam.ml.inference.api`: Main RunInference interface -- :mod:`apache_beam.ml.inference.base`: RunInference base class -- :mod:`apache_beam.ml.inference.pytorch_inference`: Pytorch RunInference - implementation -- :mod:`apache_beam.ml.inference.sklearn_inference`: Scikit-learn RunInference - implementation -""" diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 68971bcf70dd..3c7e6fe0e3de 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -250,7 +250,8 @@ def get_current_time_in_microseconds(self) -> int: class _FineGrainedClock(_Clock): def get_current_time_in_microseconds(self) -> int: return int( - time.clock_gettime_ns(time.CLOCK_REALTIME) / _NANOSECOND_TO_MICROSECOND) + time.clock_gettime_ns(time.CLOCK_REALTIME) / # type: ignore[attr-defined] + _NANOSECOND_TO_MICROSECOND) #TODO(BEAM-14255): Research simplifying the internal clock and just using time. From 5c410afeaa4c4cfd856be6bdaf877b3aeb2ac996 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Apr 2022 01:54:13 -0400 Subject: [PATCH 05/13] Fixup lint for a newly added change --- sdks/python/apache_beam/ml/inference/base_test.py | 2 +- sdks/python/apache_beam/ml/inference/sklearn_inference.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py index 49500e87a9ae..55936f63ed4f 100644 --- a/sdks/python/apache_beam/ml/inference/base_test.py +++ b/sdks/python/apache_beam/ml/inference/base_test.py @@ -23,8 +23,8 @@ from typing import Iterable import apache_beam as beam -from apache_beam.ml.inference import base from apache_beam.metrics.metric import MetricsFilter +from apache_beam.ml.inference import base from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index b9127a75ee3f..3b56eada881e 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -71,8 +71,9 @@ def load_model(self): elif self._model_file_type == ModelFileType.JOBLIB: if not joblib: raise ImportError( - 'Could not import joblib in this execution' - ' environment. For help with managing dependencies on Python workers see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' + 'Could not import joblib in this execution environment. ' + 'For help with managing dependencies on Python workers.' + 'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long ) return joblib.load(file) raise AssertionError('Unsupported serialization type.') From 0cf7316fe4307a7941854c68f30f78fbe932a55e Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Thu, 28 Apr 2022 14:09:57 -0500 Subject: [PATCH 06/13] Add torch to docs test --- sdks/python/tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 858db8590a89..4d8b40bff090 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -138,7 +138,7 @@ commands = python setup.py mypy [testenv:py38-docs] -extras = test,gcp,docs,interactive,dataframe +extras = test,gcp,docs,interactive,dataframe,['torch==1.9.0'] deps = Sphinx==1.8.5 sphinx_rtd_theme==0.4.3 From b338fb5b59e40496c2211b1ae17cfb80ebcbfb9a Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Thu, 28 Apr 2022 14:17:07 -0500 Subject: [PATCH 07/13] update torch location --- sdks/python/tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 4d8b40bff090..0c2dcaaa9266 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -138,12 +138,13 @@ commands = python setup.py mypy [testenv:py38-docs] -extras = test,gcp,docs,interactive,dataframe,['torch==1.9.0'] +extras = test,gcp,docs,interactive,dataframe deps = Sphinx==1.8.5 sphinx_rtd_theme==0.4.3 docutils<0.18 Jinja2==3.0.3 # TODO(BEAM-14172): Sphinx version is too old. + torch==1.9.0 commands = time {toxinidir}/scripts/generate_pydoc.sh From 8a0bb0c30da208cc605ee42b15bb2362bd0f2b6c Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Thu, 28 Apr 2022 17:21:01 -0500 Subject: [PATCH 08/13] Remove monkey patch from generate_pydoc --- sdks/python/scripts/generate_pydoc.sh | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index 7890ecd9665d..cb86043b34a8 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -235,17 +235,6 @@ nitpick_ignore = [] nitpick_ignore += [('py:class', iden) for iden in ignore_identifiers] nitpick_ignore += [('py:obj', iden) for iden in ignore_identifiers] nitpick_ignore += [('py:exc', iden) for iden in ignore_references] - -# Monkey patch functools.wraps to retain original function argument signature -# for documentation. -# https://github.com/sphinx-doc/sphinx/issues/1711 -import functools -def fake_wraps(wrapped): - def wrapper(decorator): - return wrapped - return wrapper - -functools.wraps = fake_wraps EOF #=== index.rst ===# From cbf07af8c84c93be04d7fd93a8299739a02677f6 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Fri, 29 Apr 2022 16:20:48 -0500 Subject: [PATCH 09/13] Fix docstring for Pytorch --- sdks/python/apache_beam/ml/inference/pytorch.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch.py b/sdks/python/apache_beam/ml/inference/pytorch.py index b95f4afe1d0d..eddb4b634224 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch.py +++ b/sdks/python/apache_beam/ml/inference/pytorch.py @@ -75,10 +75,13 @@ def __init__( model_params: Dict[str, Any], device: str = 'CPU'): """ - state_dict_path: path to the saved dictionary of the model state. - model_class: class of the Pytorch model that defines the model structure. - device: the device on which you wish to run the model. If ``device = GPU`` - then device will be cuda if it is available. Otherwise, it will be cpu. + Initializes a PytorchModelLoader + :param state_dict_path: path to the saved dictionary of the model state. + :param model_class: class of the Pytorch model that defines the model + structure. + :param device: the device on which you wish to run the model. If + ``device = GPU`` then device will be cuda if it is avaiable. Otherwise, + it will be cpu. See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details From 6d8a9cffcefb544ef673fdb5756a22e63317473a Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 29 Apr 2022 20:17:02 -0400 Subject: [PATCH 10/13] fixup: lint --- .../apache_beam/ml/inference/sklearn_inference_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py index a35a6922957f..f1efe73c96cd 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py @@ -30,8 +30,8 @@ from sklearn import svm import apache_beam as beam -import apache_beam.ml.inference.api as api -import apache_beam.ml.inference.base as base +from apache_beam.ml.inference import api +from apache_beam.ml.inference import base from apache_beam.ml.inference.sklearn_inference import ModelFileType from apache_beam.ml.inference.sklearn_inference import SklearnInferenceRunner from apache_beam.ml.inference.sklearn_inference import SklearnModelLoader @@ -49,7 +49,7 @@ class FakeModel: def __init__(self): self.total_predict_calls = 0 - def predict(self, input_vector: numpy.array): + def predict(self, input_vector: numpy.ndarray): self.total_predict_calls += 1 return numpy.sum(input_vector, axis=1) From 76d443db6774dec51c3595e14d10002882060be0 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Fri, 29 Apr 2022 20:25:44 -0400 Subject: [PATCH 11/13] cleanup from rebasing onto master --- .../ml/inference/sklearn_loader_test.py | 177 ------------------ 1 file changed, 177 deletions(-) delete mode 100644 sdks/python/apache_beam/ml/inference/sklearn_loader_test.py diff --git a/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py b/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py deleted file mode 100644 index e318d0e64b32..000000000000 --- a/sdks/python/apache_beam/ml/inference/sklearn_loader_test.py +++ /dev/null @@ -1,177 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pytype: skip-file - -import os -import pickle -import platform -import shutil -import sys -import tempfile -import unittest - -import joblib -import numpy -from sklearn import svm - -import apache_beam as beam -from apache_beam.ml.inference import api -from apache_beam.ml.inference import base -from apache_beam.ml.inference.sklearn_loader import ModelFileType -from apache_beam.ml.inference.sklearn_loader import SklearnInferenceRunner -from apache_beam.ml.inference.sklearn_loader import SklearnModelLoader -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to - - -def _compare_prediction_result(a, b): - example_equal = numpy.array_equal(a.example, b.example) - return a.inference == b.inference and example_equal - - -class FakeModel: - def __init__(self): - self.total_predict_calls = 0 - - def predict(self, input_vector: numpy.ndarray): - self.total_predict_calls += 1 - return numpy.sum(input_vector, axis=1) - - -def build_model(): - x = [[0, 0], [1, 1]] - y = [0, 1] - model = svm.SVC() - model.fit(x, y) - return model - - -class SkLearnRunInferenceTest(unittest.TestCase): - def setUp(self): - self.tmpdir = tempfile.mkdtemp() - - def tearDown(self): - shutil.rmtree(self.tmpdir) - - def test_predict_output(self): - fake_model = FakeModel() - inference_runner = SklearnInferenceRunner() - batched_examples = [ - numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) - ] - expected_predictions = [ - api.PredictionResult(numpy.array([1, 2, 3]), 6), - api.PredictionResult(numpy.array([4, 5, 6]), 15), - api.PredictionResult(numpy.array([7, 8, 9]), 24) - ] - inferences = inference_runner.run_inference(batched_examples, fake_model) - for actual, expected in zip(inferences, expected_predictions): - self.assertTrue(_compare_prediction_result(actual, expected)) - - def test_data_vectorized(self): - fake_model = FakeModel() - inference_runner = SklearnInferenceRunner() - batched_examples = [ - numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) - ] - # even though there are 3 examples, the data should - # be vectorized and only 1 call should happen. - inference_runner.run_inference(batched_examples, fake_model) - self.assertEqual(1, fake_model.total_predict_calls) - - def test_num_bytes(self): - inference_runner = SklearnInferenceRunner() - batched_examples_int = [ - numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) - ] - self.assertEqual( - sys.getsizeof(batched_examples_int[0]) * 3, - inference_runner.get_num_bytes(batched_examples_int)) - - batched_examples_float = [ - numpy.array([1.0, 2.0, 3.0]), - numpy.array([4.1, 5.2, 6.3]), - numpy.array([7.7, 8.8, 9.9]) - ] - self.assertEqual( - sys.getsizeof(batched_examples_float[0]) * 3, - inference_runner.get_num_bytes(batched_examples_float)) - - @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') - def test_pipeline_pickled(self): - temp_file_name = self.tmpdir + os.sep + 'pickled_file' - with open(temp_file_name, 'wb') as file: - pickle.dump(build_model(), file) - with TestPipeline() as pipeline: - examples = [numpy.array([0, 0]), numpy.array([1, 1])] - - pcoll = pipeline | 'start' >> beam.Create(examples) - #TODO(BEAM-14305) Test against the public API. - actual = pcoll | base.RunInference( - SklearnModelLoader(model_uri=temp_file_name)) - expected = [ - api.PredictionResult(numpy.array([0, 0]), 0), - api.PredictionResult(numpy.array([1, 1]), 1) - ] - assert_that( - actual, equal_to(expected, equals_fn=_compare_prediction_result)) - - @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') - def test_pipeline_joblib(self): - temp_file_name = self.tmpdir + os.sep + 'joblib_file' - with open(temp_file_name, 'wb') as file: - joblib.dump(build_model(), file) - with TestPipeline() as pipeline: - examples = [numpy.array([0, 0]), numpy.array([1, 1])] - - pcoll = pipeline | 'start' >> beam.Create(examples) - #TODO(BEAM-14305) Test against the public API. - - actual = pcoll | base.RunInference( - SklearnModelLoader( - model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB)) - expected = [ - api.PredictionResult(numpy.array([0, 0]), 0), - api.PredictionResult(numpy.array([1, 1]), 1) - ] - assert_that( - actual, equal_to(expected, equals_fn=_compare_prediction_result)) - - def test_bad_file_raises(self): - with self.assertRaises(RuntimeError): - with TestPipeline() as pipeline: - examples = [numpy.array([0, 0])] - pcoll = pipeline | 'start' >> beam.Create(examples) - # TODO(BEAM-14305) Test against the public API. - _ = pcoll | base.RunInference( - SklearnModelLoader(model_uri='/var/bad_file_name')) - pipeline.run() - - @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') - def test_bad_input_type_raises(self): - with self.assertRaisesRegex(AssertionError, - 'Unsupported serialization type'): - with tempfile.NamedTemporaryFile() as file: - model_loader = SklearnModelLoader( - model_uri=file.name, model_file_type=None) - model_loader.load_model() - - -if __name__ == '__main__': - unittest.main() From 936eba78083cdee985a9cffaf0db68d65fc86f3e Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 3 May 2022 11:43:58 -0400 Subject: [PATCH 12/13] remove torch from tox, and update docstring --- sdks/python/apache_beam/ml/inference/pytorch.py | 4 ++-- sdks/python/tox.ini | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/pytorch.py b/sdks/python/apache_beam/ml/inference/pytorch.py index eddb4b634224..1eac09af3a7a 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch.py +++ b/sdks/python/apache_beam/ml/inference/pytorch.py @@ -80,8 +80,8 @@ def __init__( :param model_class: class of the Pytorch model that defines the model structure. :param device: the device on which you wish to run the model. If - ``device = GPU`` then device will be cuda if it is avaiable. Otherwise, - it will be cpu. + ``device = GPU`` then a GPU device will be used if it is available. + Otherwise, it will be CPU. See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 0c2dcaaa9266..858db8590a89 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -144,7 +144,6 @@ deps = sphinx_rtd_theme==0.4.3 docutils<0.18 Jinja2==3.0.3 # TODO(BEAM-14172): Sphinx version is too old. - torch==1.9.0 commands = time {toxinidir}/scripts/generate_pydoc.sh From c7c1520369ae103a272d811bf659314ed50496e5 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 3 May 2022 12:18:04 -0400 Subject: [PATCH 13/13] Remove restriction on torch dependency --- sdks/python/tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 858db8590a89..14b94291cfaf 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -144,6 +144,7 @@ deps = sphinx_rtd_theme==0.4.3 docutils<0.18 Jinja2==3.0.3 # TODO(BEAM-14172): Sphinx version is too old. + torch commands = time {toxinidir}/scripts/generate_pydoc.sh