From cd098c28bfbfd177cd298335b9ec90c67a03e4e5 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 10 Jun 2022 15:02:48 -0400 Subject: [PATCH 1/6] separated pandas and numpy implementations --- .../ml/inference/sklearn_inference.py | 94 ++++++++++--------- .../ml/inference/sklearn_inference_test.py | 57 ++++++----- 2 files changed, 88 insertions(+), 63 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index 3c8eddfd7d3a..8df03d5b5d47 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -18,10 +18,8 @@ import enum import pickle import sys -from typing import Any from typing import Iterable from typing import List -from typing import Union import numpy import pandas @@ -43,10 +41,26 @@ class ModelFileType(enum.Enum): JOBLIB = 2 -class SklearnModelHandler(ModelHandler[Union[numpy.ndarray, pandas.DataFrame], - PredictionResult, - BaseEstimator]): - """ Implementation of the ModelHandler interface for scikit-learn. +def load_model(model_uri, file_type): + file = FileSystems.open(model_uri, 'rb') + if file_type == ModelFileType.PICKLE: + return pickle.load(file) + elif file_type == ModelFileType.JOBLIB: + if not joblib: + raise ImportError( + 'Could not import joblib in this execution environment. ' + 'For help with managing dependencies on Python workers.' + 'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long + ) + return joblib.load(file) + raise AssertionError('Unsupported serialization type.') + + +class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray, + PredictionResult, + BaseEstimator]): + """ Implementation of the ModelHandler interface for scikit-learn + using numpy arrays as input. NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees. @@ -60,42 +74,44 @@ def __init__( def load_model(self) -> BaseEstimator: """Loads and initializes a model for processing.""" - file = FileSystems.open(self._model_uri, 'rb') - if self._model_file_type == ModelFileType.PICKLE: - return pickle.load(file) - elif self._model_file_type == ModelFileType.JOBLIB: - if not joblib: - raise ImportError( - 'Could not import joblib in this execution environment. ' - 'For help with managing dependencies on Python workers.' - 'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long - ) - return joblib.load(file) - raise AssertionError('Unsupported serialization type.') + return load_model(self._model_uri, self._model_file_type) def run_inference( - self, - batch: List[Union[numpy.ndarray, pandas.DataFrame]], - model: BaseEstimator, + self, batch: List[numpy.ndarray], model: BaseEstimator, **kwargs) -> Iterable[PredictionResult]: - # TODO(github.com/apache/beam/issues/21769): Use supplied input type hint. - if isinstance(batch[0], numpy.ndarray): - return SklearnModelHandler._predict_np_array(batch, model) - elif isinstance(batch[0], pandas.DataFrame): - return SklearnModelHandler._predict_pandas_dataframe(batch, model) - raise ValueError('Unsupported data type.') - - @staticmethod - def _predict_np_array(batch: List[numpy.ndarray], - model: Any) -> Iterable[PredictionResult]: # vectorize data for better performance vectorized_batch = numpy.stack(batch, axis=0) predictions = model.predict(vectorized_batch) return [PredictionResult(x, y) for x, y in zip(batch, predictions)] - @staticmethod - def _predict_pandas_dataframe(batch: List[pandas.DataFrame], - model: Any) -> Iterable[PredictionResult]: + def get_num_bytes(self, batch: List[pandas.DataFrame]) -> int: + """Returns the number of bytes of data for a batch.""" + return sum(sys.getsizeof(element) for element in batch) + + +class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame, + PredictionResult, + BaseEstimator]): + """ Implementation of the ModelHandler interface for scikit-learn that + supports pandas dataframes. + + NOTE: This API and its implementation are under development and + do not provide backward compatibility guarantees. + """ + def __init__( + self, + model_uri: str, + model_file_type: ModelFileType = ModelFileType.PICKLE): + self._model_uri = model_uri + self._model_file_type = model_file_type + + def load_model(self) -> BaseEstimator: + """Loads and initializes a model for processing.""" + return load_model(self._model_uri, self._model_file_type) + + def run_inference( + self, batch: List[pandas.DataFrame], model: BaseEstimator, + **kwargs) -> Iterable[PredictionResult]: # sklearn_inference currently only supports single rowed dataframes. for dataframe in batch: if dataframe.shape[0] != 1: @@ -112,12 +128,6 @@ def _predict_pandas_dataframe(batch: List[pandas.DataFrame], inference in zip(splits, predictions) ] - def get_num_bytes( - self, batch: List[Union[numpy.ndarray, pandas.DataFrame]]) -> int: + def get_num_bytes(self, batch: List[pandas.DataFrame]) -> int: """Returns the number of bytes of data for a batch.""" - if isinstance(batch[0], numpy.ndarray): - return sum(sys.getsizeof(element) for element in batch) - elif isinstance(batch[0], pandas.DataFrame): - data_frames: List[pandas.DataFrame] = batch - return sum(df.memory_usage(deep=True).sum() for df in data_frames) - raise ValueError('Unsupported data type.') + return sum(df.memory_usage(deep=True).sum() for df in batch) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py index e23acf4ab413..f4bf49276fcb 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py @@ -40,7 +40,8 @@ from apache_beam.ml.inference import api from apache_beam.ml.inference import base from apache_beam.ml.inference.sklearn_inference import ModelFileType -from apache_beam.ml.inference.sklearn_inference import SklearnModelHandler +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -129,7 +130,7 @@ def tearDown(self): def test_predict_output(self): fake_model = FakeModel() - inference_runner = SklearnModelHandler(model_uri='unused') + inference_runner = SklearnModelHandlerNumpy(model_uri='unused') batched_examples = [ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) ] @@ -144,7 +145,7 @@ def test_predict_output(self): def test_data_vectorized(self): fake_model = FakeModel() - inference_runner = SklearnModelHandler(model_uri='unused') + inference_runner = SklearnModelHandlerNumpy(model_uri='unused') batched_examples = [ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) ] @@ -153,8 +154,8 @@ def test_data_vectorized(self): inference_runner.run_inference(batched_examples, fake_model) self.assertEqual(1, fake_model.total_predict_calls) - def test_num_bytes(self): - inference_runner = SklearnModelHandler(model_uri='unused') + def test_num_bytes_numpy(self): + inference_runner = SklearnModelHandlerNumpy(model_uri='unused') batched_examples_int = [ numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) ] @@ -180,9 +181,8 @@ def test_pipeline_pickled(self): examples = [numpy.array([0, 0]), numpy.array([1, 1])] pcoll = pipeline | 'start' >> beam.Create(examples) - #TODO(BEAM-14305) Test against the public API. actual = pcoll | base.RunInference( - SklearnModelHandler(model_uri=temp_file_name)) + SklearnModelHandlerNumpy(model_uri=temp_file_name)) expected = [ api.PredictionResult(numpy.array([0, 0]), 0), api.PredictionResult(numpy.array([1, 1]), 1) @@ -199,10 +199,9 @@ def test_pipeline_joblib(self): examples = [numpy.array([0, 0]), numpy.array([1, 1])] pcoll = pipeline | 'start' >> beam.Create(examples) - #TODO(BEAM-14305) Test against the public API. actual = pcoll | base.RunInference( - SklearnModelHandler( + SklearnModelHandlerNumpy( model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB)) expected = [ api.PredictionResult(numpy.array([0, 0]), 0), @@ -218,7 +217,7 @@ def test_bad_file_raises(self): pcoll = pipeline | 'start' >> beam.Create(examples) # TODO(BEAM-14305) Test against the public API. _ = pcoll | base.RunInference( - SklearnModelHandler(model_uri='/var/bad_file_name')) + SklearnModelHandlerNumpy(model_uri='/var/bad_file_name')) pipeline.run() @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') @@ -226,7 +225,7 @@ def test_bad_input_type_raises(self): with self.assertRaisesRegex(AssertionError, 'Unsupported serialization type'): with tempfile.NamedTemporaryFile() as file: - model_loader = SklearnModelHandler( + model_loader = SklearnModelHandlerNumpy( model_uri=file.name, model_file_type=None) model_loader.load_model() @@ -240,7 +239,30 @@ def test_pipeline_pandas(self): splits = [dataframe.loc[[i]] for i in dataframe.index] pcoll = pipeline | 'start' >> beam.Create(splits) actual = pcoll | api.RunInference( - SklearnModelLoader(model_uri=temp_file_name)) + SklearnModelHandlerPandas(model_uri=temp_file_name)) + + expected = [ + api.PredictionResult(splits[0], 5), + api.PredictionResult(splits[1], 8), + api.PredictionResult(splits[2], 1), + api.PredictionResult(splits[3], 1), + api.PredictionResult(splits[4], 2), + ] + assert_that( + actual, equal_to(expected, equals_fn=_compare_dataframe_predictions)) + + @unittest.skipIf(platform.system() == 'Windows', 'BEAM-14359') + def test_pipeline_pandas_joblib(self): + temp_file_name = self.tmpdir + os.sep + 'pickled_file' + with open(temp_file_name, 'wb') as file: + joblib.dump(build_pandas_pipeline(), file) + with TestPipeline() as pipeline: + dataframe = pandas_dataframe() + splits = [dataframe.loc[[i]] for i in dataframe.index] + pcoll = pipeline | 'start' >> beam.Create(splits) + actual = pcoll | api.RunInference( + SklearnModelHandlerPandas( + model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB)) expected = [ api.PredictionResult(splits[0], 5), @@ -265,7 +287,7 @@ def test_pipeline_pandas_with_keys(self): pcoll = pipeline | 'start' >> beam.Create(keyed_rows) actual = pcoll | api.RunInference( - SklearnModelLoader(model_uri=temp_file_name)) + SklearnModelHandlerPandas(model_uri=temp_file_name)) expected = [ ('0', api.PredictionResult(splits[0], 5)), ('1', api.PredictionResult(splits[1], 8)), @@ -276,17 +298,10 @@ def test_pipeline_pandas_with_keys(self): assert_that( actual, equal_to(expected, equals_fn=_compare_dataframe_predictions)) - def test_infer_invalid_data_type(self): - with self.assertRaises(ValueError): - unexpected_input_type = [[1, 2, 3, 4], [5, 6, 7, 8]] - inference_runner = SklearnModelLoader(model_uri=unused) - fake_model = FakeModel() - inference_runner.run_inference(unexpected_input_type, fake_model) - def test_infer_too_many_rows_in_dataframe(self): with self.assertRaises(ValueError): data_frame_too_many_rows = pandas_dataframe() - inference_runner = SklearnModelLoader(model_uri=unused) + inference_runner = SklearnModelHandlerPandas(model_uri='unused') fake_model = FakeModel() inference_runner.run_inference([data_frame_too_many_rows], fake_model) From aec5d2d3b9e490dbc21e566aade0406f59b8ce8a Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 10 Jun 2022 15:56:34 -0400 Subject: [PATCH 2/6] separated pandas and numpy implementations --- sdks/python/apache_beam/ml/inference/sklearn_inference.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index 8df03d5b5d47..cbe38f6a6b0f 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -61,9 +61,6 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray, BaseEstimator]): """ Implementation of the ModelHandler interface for scikit-learn using numpy arrays as input. - - NOTE: This API and its implementation are under development and - do not provide backward compatibility guarantees. """ def __init__( self, @@ -94,9 +91,6 @@ class SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame, BaseEstimator]): """ Implementation of the ModelHandler interface for scikit-learn that supports pandas dataframes. - - NOTE: This API and its implementation are under development and - do not provide backward compatibility guarantees. """ def __init__( self, From aff4d40c96e68f6b3aa9b4fd573cac27185b25c4 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 13 Jun 2022 09:49:25 -0400 Subject: [PATCH 3/6] Update sdks/python/apache_beam/ml/inference/sklearn_inference.py Co-authored-by: Brian Hulette --- sdks/python/apache_beam/ml/inference/sklearn_inference.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index 6adecdef25b7..e13235807c95 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -50,7 +50,8 @@ def _load_model(model_uri, file_type): raise ImportError( 'Could not import joblib in this execution environment. ' 'For help with managing dependencies on Python workers.' - 'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long + 'see ' + 'https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' ) return joblib.load(file) raise AssertionError('Unsupported serialization type.') From 64a55fb75431cdcd6f6af8238d3f3b56f98cea08 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 13 Jun 2022 11:17:20 -0400 Subject: [PATCH 4/6] fixed unit test --- sdks/python/apache_beam/ml/inference/sklearn_inference_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py index 576b90d4869c..044f2b849c09 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py @@ -304,7 +304,7 @@ def test_infer_too_many_rows_in_dataframe(self): data_frame_too_many_rows = pandas_dataframe() fake_model = FakeModel() inference_runner = SklearnModelHandlerPandas(model_uri='unused') - inference_runner.run_inference(data_frame_too_many_rows, fake_model) + inference_runner.run_inference([data_frame_too_many_rows], fake_model) if __name__ == '__main__': From 4e4501506e6887f68df9c12435121b88c4dd77a5 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 13 Jun 2022 13:50:25 -0400 Subject: [PATCH 5/6] merged to fix conflicts --- sdks/python/apache_beam/ml/inference/sklearn_inference.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index 221fcc2e6008..19c6f5b5046f 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -50,8 +50,7 @@ def _load_model(model_uri, file_type): raise ImportError( 'Could not import joblib in this execution environment. ' 'For help with managing dependencies on Python workers.' - 'see ' - 'https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long + 'see https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/' # pylint: disable=line-too-long ) return joblib.load(file) raise AssertionError('Unsupported serialization type.') From 23650ba26f5aa799bd580a86cbd2fc3e06e3dfce Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 13 Jun 2022 15:09:02 -0400 Subject: [PATCH 6/6] fixed import order --- sdks/python/apache_beam/ml/inference/sklearn_inference_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py index d41c31bd5928..2c63de25f992 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py @@ -41,8 +41,8 @@ from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.sklearn_inference import ModelFileType -from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to