From 52581a6ffbecd6f8a6729d63f1af123cb96983ab Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Fri, 10 Jun 2022 12:01:57 -0400 Subject: [PATCH 1/2] Refactor api into base --- .../inference/pytorch_image_classification.py | 4 +- .../apache_beam/ml/inference/__init__.py | 1 + sdks/python/apache_beam/ml/inference/api.py | 62 ------------------- sdks/python/apache_beam/ml/inference/base.py | 27 ++++++++ .../ml/inference/pytorch_inference.py | 2 +- .../ml/inference/pytorch_inference_test.py | 2 +- .../ml/inference/sklearn_inference.py | 2 +- .../ml/inference/sklearn_inference_test.py | 15 +++-- 8 files changed, 40 insertions(+), 75 deletions(-) delete mode 100644 sdks/python/apache_beam/ml/inference/api.py diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py b/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py index a3ea84ad01b5..47da88ba72d2 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_classification.py @@ -27,8 +27,8 @@ import apache_beam as beam import torch from apache_beam.io.filesystems import FileSystems -from apache_beam.ml.inference.api import PredictionResult -from apache_beam.ml.inference.api import RunInference +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions diff --git a/sdks/python/apache_beam/ml/inference/__init__.py b/sdks/python/apache_beam/ml/inference/__init__.py index cce3acad34a4..d3b4ff354067 100644 --- a/sdks/python/apache_beam/ml/inference/__init__.py +++ b/sdks/python/apache_beam/ml/inference/__init__.py @@ -14,3 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from apache_beam.ml.inference.base import RunInference diff --git a/sdks/python/apache_beam/ml/inference/api.py b/sdks/python/apache_beam/ml/inference/api.py deleted file mode 100644 index 68bd20fbb2dc..000000000000 --- a/sdks/python/apache_beam/ml/inference/api.py +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# mypy: ignore-errors - -from dataclasses import dataclass -from typing import Tuple -from typing import TypeVar -from typing import Union - -import apache_beam as beam -from apache_beam.ml.inference import base - -_K = TypeVar('_K') -_INPUT_TYPE = TypeVar('_INPUT_TYPE') -_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE') - - -@dataclass -class PredictionResult: - example: _INPUT_TYPE - inference: _OUTPUT_TYPE - - -@beam.typehints.with_input_types(Union[_INPUT_TYPE, Tuple[_K, _INPUT_TYPE]]) -@beam.typehints.with_output_types(Union[PredictionResult, Tuple[_K, PredictionResult]]) # pylint: disable=line-too-long -class RunInference(beam.PTransform): - """ - NOTE: This API and its implementation are under development and - do not provide backward compatibility guarantees. - - A transform that takes a PCollection of examples (or features) to be used on - an ML model. It will then output inferences (or predictions) for those - examples in a PCollection of PredictionResults, containing the input examples - and output inferences. - - If examples are paired with keys, it will output a tuple - (key, PredictionResult) for each (key, example) input. - - Models for supported frameworks can be loaded via a URI. Supported services - can also be used. - - TODO(BEAM-14046): Add and link to help documentation - """ - def __init__(self, model_loader: base.ModelLoader): - self._model_loader = model_loader - - def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - return pcoll | base.RunInference(self._model_loader) diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 2d10cf7a1561..031d58b47e2e 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -32,12 +32,15 @@ import pickle import sys import time +from dataclasses import dataclass from typing import Any from typing import Generic from typing import Iterable from typing import List from typing import Mapping +from typing import Tuple from typing import TypeVar +from typing import Union import apache_beam as beam from apache_beam.utils import shared @@ -54,6 +57,15 @@ ModelT = TypeVar('ModelT') ExampleT = TypeVar('ExampleT') PredictionT = TypeVar('PredictionT') +_K = TypeVar('_K') +_INPUT_TYPE = TypeVar('_INPUT_TYPE') +_OUTPUT_TYPE = TypeVar('_OUTPUT_TYPE') + + +@dataclass +class PredictionResult: + example: _INPUT_TYPE + inference: _OUTPUT_TYPE def _to_milliseconds(time_ns: int) -> int: @@ -101,12 +113,27 @@ def batch_elements_kwargs(self) -> Mapping[str, Any]: return {} +@beam.typehints.with_input_types(Union[_INPUT_TYPE, Tuple[_K, _INPUT_TYPE]]) +@beam.typehints.with_output_types(Union[PredictionResult, Tuple[_K, PredictionResult]]) # pylint: disable=line-too-long class RunInference(beam.PTransform[beam.PCollection[ExampleT], beam.PCollection[PredictionT]]): """An extensible transform for running inferences. Args: model_loader: An implementation of ModelLoader. clock: A clock implementing get_current_time_in_microseconds. + + This transform takes a PCollection of examples (or features) to be used on + an ML model. It will then output inferences (or predictions) for those + examples in a PCollection of PredictionResults, containing the input examples + and output inferences. + + If examples are paired with keys, it will output a tuple + (key, PredictionResult) for each (key, example) input. + + Models for supported frameworks can be loaded via a URI. Supported services + can also be used. + + TODO(BEAM-14046): Add and link to help documentation """ def __init__( self, diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py b/sdks/python/apache_beam/ml/inference/pytorch_inference.py index d2a47d530635..1e0fd523c8e2 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py @@ -27,7 +27,7 @@ import torch from apache_beam.io.filesystems import FileSystems -from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import InferenceRunner from apache_beam.ml.inference.base import ModelLoader diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py index 0011ba58244d..089aa2631055 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_test.py @@ -35,7 +35,7 @@ # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: import torch - from apache_beam.ml.inference.api import PredictionResult + from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.pytorch_inference import PytorchInferenceRunner from apache_beam.ml.inference.pytorch_inference import PytorchModelLoader diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py b/sdks/python/apache_beam/ml/inference/sklearn_inference.py index 00d63f9fb6c1..d19c994eb8a3 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py @@ -25,7 +25,7 @@ from sklearn.base import BaseEstimator from apache_beam.io.filesystems import FileSystems -from apache_beam.ml.inference.api import PredictionResult +from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import InferenceRunner from apache_beam.ml.inference.base import ModelLoader diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py index f1efe73c96cd..b2d00e3be5aa 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py @@ -30,7 +30,6 @@ from sklearn import svm import apache_beam as beam -from apache_beam.ml.inference import api from apache_beam.ml.inference import base from apache_beam.ml.inference.sklearn_inference import ModelFileType from apache_beam.ml.inference.sklearn_inference import SklearnInferenceRunner @@ -76,9 +75,9 @@ def test_predict_output(self): numpy.array([1, 2, 3]), numpy.array([4, 5, 6]), numpy.array([7, 8, 9]) ] expected_predictions = [ - api.PredictionResult(numpy.array([1, 2, 3]), 6), - api.PredictionResult(numpy.array([4, 5, 6]), 15), - api.PredictionResult(numpy.array([7, 8, 9]), 24) + base.PredictionResult(numpy.array([1, 2, 3]), 6), + base.PredictionResult(numpy.array([4, 5, 6]), 15), + base.PredictionResult(numpy.array([7, 8, 9]), 24) ] inferences = inference_runner.run_inference(batched_examples, fake_model) for actual, expected in zip(inferences, expected_predictions): @@ -126,8 +125,8 @@ def test_pipeline_pickled(self): actual = pcoll | base.RunInference( SklearnModelLoader(model_uri=temp_file_name)) expected = [ - api.PredictionResult(numpy.array([0, 0]), 0), - api.PredictionResult(numpy.array([1, 1]), 1) + base.PredictionResult(numpy.array([0, 0]), 0), + base.PredictionResult(numpy.array([1, 1]), 1) ] assert_that( actual, equal_to(expected, equals_fn=_compare_prediction_result)) @@ -147,8 +146,8 @@ def test_pipeline_joblib(self): SklearnModelLoader( model_uri=temp_file_name, model_file_type=ModelFileType.JOBLIB)) expected = [ - api.PredictionResult(numpy.array([0, 0]), 0), - api.PredictionResult(numpy.array([1, 1]), 1) + base.PredictionResult(numpy.array([0, 0]), 0), + base.PredictionResult(numpy.array([1, 1]), 1) ] assert_that( actual, equal_to(expected, equals_fn=_compare_prediction_result)) From a273b052c759e81df8a06b59bc17e15123293933 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Fri, 10 Jun 2022 12:34:41 -0400 Subject: [PATCH 2/2] Add Generic to PredictionResult --- sdks/python/apache_beam/ml/inference/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/base.py b/sdks/python/apache_beam/ml/inference/base.py index 031d58b47e2e..4eb2ee05d483 100644 --- a/sdks/python/apache_beam/ml/inference/base.py +++ b/sdks/python/apache_beam/ml/inference/base.py @@ -64,8 +64,8 @@ @dataclass class PredictionResult: - example: _INPUT_TYPE - inference: _OUTPUT_TYPE + example: Generic[_INPUT_TYPE] + inference: Generic[_OUTPUT_TYPE] def _to_milliseconds(time_ns: int) -> int: