From f38a6eb5593eee9a6ac591969fa1daa35d3f79e3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Mar 2025 15:49:07 -0400 Subject: [PATCH 1/2] Add custom detector to support model handlers from RunInference. --- .../ml/anomaly/detectors/custom.py | 229 ++++++++++++ .../ml/anomaly/detectors/custom_test.py | 348 ++++++++++++++++++ 2 files changed, 577 insertions(+) create mode 100644 sdks/python/apache_beam/ml/anomaly/detectors/custom.py create mode 100644 sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/custom.py b/sdks/python/apache_beam/ml/anomaly/detectors/custom.py new file mode 100644 index 000000000000..e73f3b206b9f --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/custom.py @@ -0,0 +1,229 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import typing +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import SupportsFloat + +import numpy +import pandas + +import apache_beam as beam +from apache_beam.ml.anomaly.base import AnomalyDetector +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import torch +except ImportError: + torch = None + +try: + import tensorflow as tf +except ImportError: + tf = None + + +def _to_numpy_array(row: beam.Row): + """Converts an Apache Beam Row to a NumPy array.""" + return numpy.array(list(row)) + + +def _to_pandas_dataframe(row: beam.Row): + """Converts an Apache Beam Row to a Pandas DataFrame.""" + return pandas.DataFrame.from_records([row._asdict()]) + + +def _to_pytorch_tensor(row: beam.Row): + """Converts an Apache Beam Row to a PyTorch Tensor.""" + return torch.Tensor(list(row)) + + +def _to_pytorch_keyed_tensor(row: beam.Row): + """Converts an Apache Beam Row to a dictionary of PyTorch Tensors.""" + return {str(k): torch.Tensor(v) for k, v in row._asdict().items()} + + +def _to_tensorflow_tensor(row: beam.Row): + """Converts an Apache Beam Row to a TensorFlow Tensor.""" + return tf.convert_to_tensor(list(row)) + + +def _to_tensorflow_keyed_tensor(row: beam.Row): + """Converts an Apache Beam Row to a dictionary of TensorFlow Tensors.""" + return {str(k): tf.constant(v) for k, v in row._asdict().items()} + + +class InputConverter(): + """A utility class for converting Apache Beam Rows into different formats.""" + _map: Dict[type, Callable[[beam.Row], Any]] = {} + _map[numpy.ndarray] = _to_numpy_array + _map[pandas.DataFrame] = _to_pandas_dataframe + if torch: + _map[torch.Tensor] = _to_pytorch_tensor + _map[Dict[str, torch.Tensor]] = _to_pytorch_keyed_tensor + if tf: + _map[tf.Tensor] = _to_tensorflow_tensor + _map[Dict[str, tf.Tensor]] = _to_tensorflow_keyed_tensor + + @classmethod + def convert_to(cls, x, to_type): + """Converts an input to a specified type. + + Args: + x: The input to convert. + to_type: The target type for conversion. + + Returns: + The converted input. + + Raises: + ValueError: If the target type is unknown or conversion fails. + """ + if isinstance(to_type, type) and issubclass(to_type, beam.Row): + return x + + if to_type in cls._map: + return cls._map[to_type](x) + + raise ValueError( + f"Unknown input type {to_type} for value {x}. " + f"Please provide input_convert_fn to convert {to_type} to Beam Rows") + + +class OutputConverter(): + """A utility class for converting model prediction results to float values.""" + @staticmethod + def convert_from(result: PredictionResult, from_type=None) -> float: + """Converts RunInference's PredictionResult to a float value. + + Args: + result: The PredictionResult object. + from_type: The original type of the inference result (optional). + + Returns: + The converted float value. + + Raises: + ValueError: If the output type is unknown or conversion fails. + """ + x = result.inference + from_type = from_type or type(x) + + if isinstance(x, SupportsFloat): + # Handles int, float, and other numeric types + return float(x) + elif isinstance(x, numpy.number): + # Handles numpy numeric types + return float(x) + elif torch is not None and isinstance(x, torch.Tensor): + return float(x.item()) + elif tf is not None and isinstance(x, tf.Tensor): + if x.ndim >= 1: + return float(x.numpy()[0]) + else: + return float(x.numpy()) + else: + raise ValueError( + f"Unknown output type {from_type} of value {x}. " + f"Please provide output_convert_fn to convert PredictionResult " + f"(with inference field of type {from_type}) to float.") + + +def get_input_type(model_handler: ModelHandler): + """Extracts the input (example) type from a ModelHandler. + + Args: + model_handler: The ModelHandler instance. + + Returns: + The input type expected by the model handler. + """ + # TODO: Python 3.12 introduced types.get_original_bases() to access + # __orig_bases__, but prior to that we will need to access the special + # attribute directly. + # Here we get input_type from + # ModelHandler(Generic[ExampleT, PredictionT, ModelT]) + input_type = typing.get_args(type(model_handler).__orig_bases__[0])[0] + + is_keyed = typing.get_origin(input_type) is dict and \ + typing.get_args(input_type)[0] is str + + if is_keyed: + input_type = typing.get_args(input_type)[1] + + if tf and torch: + if input_type == typing.Union[torch.Tensor, tf.Tensor]: + # check framework to tell if it is from pytorch or tensorflow + input_type = torch.Tensor if model_handler._framework == 'pt' \ + else tf.Tensor + + return Dict[str, input_type] if is_keyed else input_type + + +@specifiable +class CustomDetector(AnomalyDetector): + """A custom anomaly detector that uses a provided model handler for scoring. + + Args: + model_handler: The ModelHandler to use for inference. + run_inference_args: Optional arguments to pass to RunInference + input_convert_fn: Optional function to convert input Beam Rows to the + model's expected input type. + output_convert_fn: Optional function to convert model PredictionResults + to float scores. + **kwargs: Additional keyword arguments to pass to the base + AnomalyDetector class. + """ + def __init__( + self, + model_handler: ModelHandler, + run_inference_args: Optional[Dict[str, Any]] = None, + input_convert_fn: Optional[Callable[[beam.Row], Any]] = None, + output_convert_fn: Optional[Callable[[PredictionResult], float]] = None, + **kwargs): + super().__init__(**kwargs) + + self._model_handler = model_handler + self._keyed_model_handler = KeyedModelHandler(model_handler) + self._input_type = get_input_type(self._model_handler) + self._run_inference_args = run_inference_args or {} + self.convert_input = input_convert_fn or self._default_convert_input + self.convert_output = output_convert_fn or self._default_convert_output + + # always override model_identifier with model_id from the detector + self._run_inference_args["model_identifier"] = self._model_id + + def _default_convert_input(self, x: beam.Row) -> Any: + return InputConverter.convert_to(x, self._input_type) + + def _default_convert_output(self, x: PredictionResult) -> float: + return OutputConverter.convert_from(x) + + def learn_one(self, x: beam.Row) -> None: + """Not implemented since CustomDetector invokes RunInference directly.""" + raise NotImplementedError + + def score_one(self, x: beam.Row) -> Optional[float]: + """Not implemented since CustomDetector invokes RunInference directly.""" + raise NotImplementedError diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py new file mode 100644 index 000000000000..2e2fa4c25869 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py @@ -0,0 +1,348 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest +from typing import Dict + +import numpy +import pandas + +import apache_beam as beam +from apache_beam.ml.anomaly.detectors.custom import CustomDetector +from apache_beam.ml.anomaly.detectors.custom import InputConverter +from apache_beam.ml.anomaly.detectors.custom import OutputConverter +from apache_beam.ml.anomaly.detectors.custom import get_input_type +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy # pylint: disable=line-too-long +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas # pylint: disable=line-too-long + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import torch + from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor # pylint: disable=line-too-long + from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor # pylint: disable=line-too-long +except ImportError: + torch = None + +try: + import tensorflow as tf + from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerNumpy + from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +except ImportError: + tf = None + +try: + import onnx + from apache_beam.ml.inference.onnx_inference import OnnxModelHandlerNumpy +except ImportError: + onnx = None + +try: + from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerTensor # pylint: disable=line-too-long + from apache_beam.ml.inference.huggingface_inference import HuggingFaceModelHandlerKeyedTensor # pylint: disable=line-too-long + from transformers import AutoModel + hugging_face = True +except ImportError: + hugging_face = False + +try: + from apache_beam.ml.inference.tensorrt_inference import TensorRTEngineHandlerNumPy # pylint: disable=line-too-long + tensorrt = True +except ImportError: + tensorrt = False + +try: + import xgboost + from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerNumpy + from apache_beam.ml.inference.xgboost_inference import XGBoostModelHandlerPandas # pylint: disable=line-too-long +except ImportError: + xgboost = None + +FAKE_URI = "fake-model-uri" + + +class ExampleT: + pass + + +class PredictionT: + pass + + +class ModelT: + pass + + +class InputConverterTest(unittest.TestCase): + def setUp(self): + self.input_data = beam.Row(a=1, b=2, c=3) + + def test_convert_to_beam_row(self): + result = InputConverter.convert_to(self.input_data, beam.Row) + self.assertTrue(isinstance(result, beam.Row)) + self.assertEqual(self.input_data, result) + + def test_convert_to_numpy(self): + result = InputConverter.convert_to(self.input_data, numpy.ndarray) + self.assertTrue(isinstance(result, numpy.ndarray)) + self.assertTrue(numpy.array_equal(result, numpy.array([1, 2, 3]))) + + def test_convert_to_pandas(self): + result = InputConverter.convert_to(self.input_data, pandas.DataFrame) + self.assertTrue(isinstance(result, pandas.DataFrame)) + self.assertTrue( + result.equals(pandas.DataFrame({ + "a": [1], "b": [2], "c": [3] + }))) + + def test_convert_to_unknown_type(self): + with self.assertRaises(ValueError): + InputConverter.convert_to(self.input_data, list) + + @unittest.skipIf(torch is None, "torch is not installed") + def test_convert_to_torch(self): + result = InputConverter.convert_to(self.input_data, torch.Tensor) + self.assertTrue(isinstance(result, torch.Tensor)) + self.assertTrue(torch.equal(result, torch.Tensor([1, 2, 3]))) + + @unittest.skipIf(torch is None, "torch is not installed") + def test_convert_to_keyed_torch(self): + result = InputConverter.convert_to(self.input_data, Dict[str, torch.Tensor]) + self.assertTrue(isinstance(result, dict)) + expected = { + 'a': torch.Tensor(1), 'b': torch.Tensor(2), 'c': torch.Tensor(3) + } + for key, value in result.items(): + self.assertTrue(torch.equal(value, expected[key])) + + @unittest.skipIf(tf is None, "tensorflow is not installed") + def test_convert_to_tensorflow(self): + result = InputConverter.convert_to(self.input_data, tf.Tensor) + self.assertTrue(isinstance(result, tf.Tensor)) + self.assertTrue(tf.reduce_all(tf.equal(result, tf.constant([1, 2, 3])))) + + @unittest.skipIf(tf is None, "tensorflow is not installed") + def test_convert_to_keyed_tensorflow(self): + result = InputConverter.convert_to(self.input_data, Dict[str, tf.Tensor]) + self.assertTrue(isinstance(result, dict)) + expected = {'a': tf.constant(1), 'b': tf.constant(2), 'c': tf.constant(3)} + for key, value in result.items(): + self.assertTrue( + tf.reduce_all(tf.equal(value, tf.constant(expected[key])))) + + +class OutputConverterTest(unittest.TestCase): + def test_convert_from_int(self): + result = PredictionResult(example=None, inference=10) + self.assertAlmostEqual(OutputConverter.convert_from(result), 10.0) + + def test_convert_from_float(self): + result = PredictionResult(example=None, inference=3.14) + self.assertAlmostEqual(OutputConverter.convert_from(result), 3.14) + + def test_convert_from_numpy_int32(self): + result = PredictionResult(example=None, inference=numpy.int32(100)) + self.assertAlmostEqual(OutputConverter.convert_from(result), 100.0) + + def test_convert_from_numpy_int64(self): + result = PredictionResult(example=None, inference=numpy.int64(5)) + self.assertAlmostEqual(OutputConverter.convert_from(result), 5.0) + + def test_convert_from_numpy_float32(self): + result = PredictionResult(example=None, inference=numpy.float32(2.71)) + self.assertAlmostEqual(OutputConverter.convert_from(result), 2.71) + + @unittest.skipIf(torch is None, "torch is not installed") + def test_convert_from_torch_tensor(self): + result = PredictionResult(example=None, inference=torch.tensor(2.5)) + self.assertAlmostEqual(OutputConverter.convert_from(result), 2.5) + + @unittest.skipIf(tf is None, "tensorflow is not installed") + def test_convert_from_tensorflow_tensor_scalar(self): + result = PredictionResult(example=None, inference=tf.constant(7)) + self.assertAlmostEqual(OutputConverter.convert_from(result), 7.0) + + @unittest.skipIf(tf is None, "tensorflow is not installed") + def test_convert_from_tensorflow_tensor_array(self): + result = PredictionResult(example=None, inference=tf.constant([4.2])) + self.assertAlmostEqual(OutputConverter.convert_from(result), 4.2, places=6) + + def test_convert_from_unknown_type(self): + result = PredictionResult(example=None, inference="hello") + with self.assertRaises(ValueError): + OutputConverter.convert_from(result) + + +class CustomDetectorConvertInputTest(unittest.TestCase): + def setUp(self): + self.input_data = beam.Row(a=1, b=2, c=3) + + def test_default_convert_input(self): + class NumpyInputHandler(ModelHandler[numpy.ndarray, PredictionT, ModelT]): + pass + + detector = CustomDetector(NumpyInputHandler()) + result = detector.convert_input(self.input_data) + self.assertTrue(numpy.array_equal(result, numpy.array([1, 2, 3]))) + + def test_custom_convert_input(self): + class MyInputData(): + def __init__(self, x, y, z): + self._x = x + self._y = y + self._z = z + + def __eq__(self, other): + if isinstance(other, MyInputData): + return self._x == other._x and self._y == other._y and \ + self._z == other._z + return False + + def beam_row_to_my_data(row: beam.Row) -> MyInputData: + return MyInputData(x=row.a, y=row.b, z=row.c) + + class MyDataInputHandler(ModelHandler[MyInputData, PredictionT, ModelT]): + pass + + # without the right input convert fn, it will raise an exception + detector = CustomDetector(MyDataInputHandler()) + self.assertRaises(ValueError, detector.convert_input, self.input_data) + + detector = CustomDetector( + MyDataInputHandler(), input_convert_fn=beam_row_to_my_data) + result = detector.convert_input(self.input_data) + self.assertEqual(result, MyInputData(1, 2, 3)) + + +class CustomDetectorConvertOutputTest(unittest.TestCase): + def test_default_convert_output(self): + class MyHandler(ModelHandler[ExampleT, PredictionT, ModelT]): + pass + + detector = CustomDetector(MyHandler()) + result = detector.convert_output( + PredictionResult(example=None, inference=int(10.0))) + self.assertAlmostEqual(result, 10.0) + + def test_custom_convert_output(self): + class MyHandler(ModelHandler[ExampleT, PredictionT, ModelT]): + pass + + class MyOutputData(): + def __init__(self, x, y): + self._x = x + self._y = y + + def prediction_to_float(pred: PredictionResult) -> float: + return pred.inference._y + + detector = CustomDetector(MyHandler()) + model_output = PredictionResult( + example=None, inference=MyOutputData(12, 34)) + # without the right output convert fn, it will raise an exception + self.assertRaises(ValueError, detector.convert_output, model_output) + + detector = CustomDetector( + MyHandler(), output_convert_fn=prediction_to_float) + result = detector.convert_output(model_output) + self.assertAlmostEqual(result, 34) + + +class GetInputTypeTest(unittest.TestCase): + def test_get_input_type(self): + class MyHandler(ModelHandler[ExampleT, PredictionT, ModelT]): + pass + + self.assertEqual(get_input_type(MyHandler()), ExampleT) + + def test_numpy_ndarray_input_model_handlers(self): + self.assertEqual( + get_input_type(SklearnModelHandlerNumpy(model_uri=FAKE_URI)), + numpy.ndarray) + if tf: + self.assertEqual( + get_input_type(TFModelHandlerNumpy(model_uri=FAKE_URI)), + numpy.ndarray) + + if onnx: + self.assertEqual( + get_input_type(OnnxModelHandlerNumpy(model_uri=FAKE_URI)), + numpy.ndarray) + + if tensorrt: + self.assertEqual( + get_input_type( + TensorRTEngineHandlerNumPy(min_batch_size=1, max_batch_size=1)), + numpy.ndarray) + + if xgboost: + self.assertEqual( + get_input_type(XGBoostModelHandlerNumpy(xgboost.XGBClassifier, "")), + list) + + def test_panda_dataframe_input_model_handlers(self): + self.assertEqual( + get_input_type(SklearnModelHandlerPandas(model_uri=FAKE_URI)), + pandas.DataFrame) + + if xgboost: + self.assertEqual( + get_input_type(XGBoostModelHandlerPandas(xgboost.XGBClassifier, "")), + pandas.DataFrame) + + def test_tensorflow_tensor_input_model_handlers(self): + if tf: + self.assertEqual( + get_input_type(TFModelHandlerTensor(model_uri=FAKE_URI)), tf.Tensor) + + if hugging_face: + # TODO: Currently there is no way to tell whether the input of + # HuggingFaceModelHandlerTensor is tf.Tensor or torch.Tensor before + # the inference takes place. Use tensorflow tensor by default. + self.assertEqual( + get_input_type( + HuggingFaceModelHandlerTensor( + model_uri=FAKE_URI, model_class=AutoModel)), + tf.Tensor) + self.assertEqual( + get_input_type( + HuggingFaceModelHandlerKeyedTensor( + model_uri=FAKE_URI, model_class=AutoModel, framework='tf')), + Dict[str, tf.Tensor]) + + def test_torch_tensor_input_model_handlers(self): + if torch: + self.assertEqual( + get_input_type(PytorchModelHandlerTensor(model_uri=FAKE_URI)), + torch.Tensor) + self.assertEqual( + get_input_type(PytorchModelHandlerKeyedTensor(model_uri=FAKE_URI)), + Dict[str, torch.Tensor]) + + if hugging_face: + self.assertEqual( + get_input_type( + HuggingFaceModelHandlerKeyedTensor( + model_uri=FAKE_URI, model_class=AutoModel, framework='pt')), + Dict[str, torch.Tensor]) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From ce275a6a02d70721a09cab841caf9d77d5fb48ea Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Mar 2025 20:11:25 -0400 Subject: [PATCH 2/2] Fix tests on pytorch. - torch.Tensor(n) gives a nx1-tensor with zeros: i.e. Tensor(0, 0, ... 0). - torch.tensor(n) gives a 1x1-tensor with value n: i.e. Tensor(n). --- sdks/python/apache_beam/ml/anomaly/detectors/custom.py | 4 ++-- sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/custom.py b/sdks/python/apache_beam/ml/anomaly/detectors/custom.py index e73f3b206b9f..ebd472be195c 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/custom.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/custom.py @@ -56,12 +56,12 @@ def _to_pandas_dataframe(row: beam.Row): def _to_pytorch_tensor(row: beam.Row): """Converts an Apache Beam Row to a PyTorch Tensor.""" - return torch.Tensor(list(row)) + return torch.tensor(list(row)) def _to_pytorch_keyed_tensor(row: beam.Row): """Converts an Apache Beam Row to a dictionary of PyTorch Tensors.""" - return {str(k): torch.Tensor(v) for k, v in row._asdict().items()} + return {str(k): torch.tensor(v) for k, v in row._asdict().items()} def _to_tensorflow_tensor(row: beam.Row): diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py b/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py index 2e2fa4c25869..1316e4945cc5 100644 --- a/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py +++ b/sdks/python/apache_beam/ml/anomaly/detectors/custom_test.py @@ -119,14 +119,14 @@ def test_convert_to_unknown_type(self): def test_convert_to_torch(self): result = InputConverter.convert_to(self.input_data, torch.Tensor) self.assertTrue(isinstance(result, torch.Tensor)) - self.assertTrue(torch.equal(result, torch.Tensor([1, 2, 3]))) + self.assertTrue(torch.equal(result, torch.tensor([1, 2, 3]))) @unittest.skipIf(torch is None, "torch is not installed") def test_convert_to_keyed_torch(self): result = InputConverter.convert_to(self.input_data, Dict[str, torch.Tensor]) self.assertTrue(isinstance(result, dict)) expected = { - 'a': torch.Tensor(1), 'b': torch.Tensor(2), 'c': torch.Tensor(3) + 'a': torch.tensor(1), 'b': torch.tensor(2), 'c': torch.tensor(3) } for key, value in result.items(): self.assertTrue(torch.equal(value, expected[key]))