Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/detectors/custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import typing
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import SupportsFloat

import numpy
import pandas

import apache_beam as beam
from apache_beam.ml.anomaly.base import AnomalyDetector
from apache_beam.ml.anomaly.specifiable import specifiable
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import ModelHandler
from apache_beam.ml.inference.base import PredictionResult

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
import torch
except ImportError:
torch = None

try:
import tensorflow as tf
except ImportError:
tf = None


def _to_numpy_array(row: beam.Row):
"""Converts an Apache Beam Row to a NumPy array."""
return numpy.array(list(row))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this adapter really makes sense. For example, lets say you have a row like:

Row(a=1, b=2, c=3)

These are all different features, and it is unlikely that you actually want to actually treat them as a single numpy array. Even if you do, there's not really a guarantee that this will happen in the right order.

Worse, this row could actually be:

Row(a=1, b=2, c='foo')

where we only want to run anomaly detection against a (we solve this elsewhere with

x = beam.Row(**{f: getattr(data, f) for f in self._underlying._features})
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expressed this less strongly before, but seeing this in practice I really think that we're better off just supporting ModelHandler[beam.Row, float] and making users handle the conversion from row to input/output types.

I think a bunch of these adapters either don't really make sense or are overly opinionated in unpredictable ways which will be hard for users to reason about, and there is a much easier path for them to define the exact behavior they want (with_preproces_fn/with_postprocess_fn)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similarly true with the postprocessing. There is no single way that models will output an anomaly prediction, and it often may require some light postprocessing which can be pretty custom.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that it will be much simpler to only support ModelHandler[beam.Row, float], but I am also hesitating to put all the adapter burden to users, which could be a friction of adapting the new transform.

With that said, I think instead of putting those functions in the SDK, maybe we can show them in examples later. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, examples seem reasonable. I think taking the burden away from users would be great, but in practice I don't think their model preprocessing steps will be predictable enough to do that. Users will also be accustomed to having some simple preprocessing steps, and this fits in neatly with that.



def _to_pandas_dataframe(row: beam.Row):
"""Converts an Apache Beam Row to a Pandas DataFrame."""
return pandas.DataFrame.from_records([row._asdict()])


def _to_pytorch_tensor(row: beam.Row):
"""Converts an Apache Beam Row to a PyTorch Tensor."""
return torch.tensor(list(row))


def _to_pytorch_keyed_tensor(row: beam.Row):
"""Converts an Apache Beam Row to a dictionary of PyTorch Tensors."""
return {str(k): torch.tensor(v) for k, v in row._asdict().items()}


def _to_tensorflow_tensor(row: beam.Row):
"""Converts an Apache Beam Row to a TensorFlow Tensor."""
return tf.convert_to_tensor(list(row))


def _to_tensorflow_keyed_tensor(row: beam.Row):
"""Converts an Apache Beam Row to a dictionary of TensorFlow Tensors."""
return {str(k): tf.constant(v) for k, v in row._asdict().items()}


class InputConverter():
"""A utility class for converting Apache Beam Rows into different formats."""
_map: Dict[type, Callable[[beam.Row], Any]] = {}
_map[numpy.ndarray] = _to_numpy_array
_map[pandas.DataFrame] = _to_pandas_dataframe
if torch:
_map[torch.Tensor] = _to_pytorch_tensor
_map[Dict[str, torch.Tensor]] = _to_pytorch_keyed_tensor
if tf:
_map[tf.Tensor] = _to_tensorflow_tensor
_map[Dict[str, tf.Tensor]] = _to_tensorflow_keyed_tensor

@classmethod
def convert_to(cls, x, to_type):
"""Converts an input to a specified type.

Args:
x: The input to convert.
to_type: The target type for conversion.

Returns:
The converted input.

Raises:
ValueError: If the target type is unknown or conversion fails.
"""
if isinstance(to_type, type) and issubclass(to_type, beam.Row):
return x

if to_type in cls._map:
return cls._map[to_type](x)

raise ValueError(
f"Unknown input type {to_type} for value {x}. "
f"Please provide input_convert_fn to convert {to_type} to Beam Rows")


class OutputConverter():
"""A utility class for converting model prediction results to float values."""
@staticmethod
def convert_from(result: PredictionResult, from_type=None) -> float:
"""Converts RunInference's PredictionResult to a float value.

Args:
result: The PredictionResult object.
from_type: The original type of the inference result (optional).

Returns:
The converted float value.

Raises:
ValueError: If the output type is unknown or conversion fails.
"""
x = result.inference
from_type = from_type or type(x)

if isinstance(x, SupportsFloat):
# Handles int, float, and other numeric types
return float(x)
elif isinstance(x, numpy.number):
# Handles numpy numeric types
return float(x)
elif torch is not None and isinstance(x, torch.Tensor):
return float(x.item())
elif tf is not None and isinstance(x, tf.Tensor):
if x.ndim >= 1:
return float(x.numpy()[0])
else:
return float(x.numpy())
else:
raise ValueError(
f"Unknown output type {from_type} of value {x}. "
f"Please provide output_convert_fn to convert PredictionResult "
f"(with inference field of type {from_type}) to float.")


def get_input_type(model_handler: ModelHandler):
"""Extracts the input (example) type from a ModelHandler.

Args:
model_handler: The ModelHandler instance.

Returns:
The input type expected by the model handler.
"""
# TODO: Python 3.12 introduced types.get_original_bases() to access
# __orig_bases__, but prior to that we will need to access the special
# attribute directly.
# Here we get input_type from
# ModelHandler(Generic[ExampleT, PredictionT, ModelT])
input_type = typing.get_args(type(model_handler).__orig_bases__[0])[0]

is_keyed = typing.get_origin(input_type) is dict and \
typing.get_args(input_type)[0] is str

if is_keyed:
input_type = typing.get_args(input_type)[1]

if tf and torch:
if input_type == typing.Union[torch.Tensor, tf.Tensor]:
# check framework to tell if it is from pytorch or tensorflow
input_type = torch.Tensor if model_handler._framework == 'pt' \
else tf.Tensor

return Dict[str, input_type] if is_keyed else input_type


@specifiable
class CustomDetector(AnomalyDetector):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're using RunInference, this should probably be called OfflineDetector or something similar. CustomDetector could also include online detectors the user defines themselves

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. That makes sense.

"""A custom anomaly detector that uses a provided model handler for scoring.

Args:
model_handler: The ModelHandler to use for inference.
run_inference_args: Optional arguments to pass to RunInference
input_convert_fn: Optional function to convert input Beam Rows to the
model's expected input type.
output_convert_fn: Optional function to convert model PredictionResults
to float scores.
**kwargs: Additional keyword arguments to pass to the base
AnomalyDetector class.
"""
def __init__(
self,
model_handler: ModelHandler,
run_inference_args: Optional[Dict[str, Any]] = None,
input_convert_fn: Optional[Callable[[beam.Row], Any]] = None,
output_convert_fn: Optional[Callable[[PredictionResult], float]] = None,
**kwargs):
super().__init__(**kwargs)

self._model_handler = model_handler
self._keyed_model_handler = KeyedModelHandler(model_handler)
self._input_type = get_input_type(self._model_handler)
self._run_inference_args = run_inference_args or {}
self.convert_input = input_convert_fn or self._default_convert_input
self.convert_output = output_convert_fn or self._default_convert_output

# always override model_identifier with model_id from the detector
self._run_inference_args["model_identifier"] = self._model_id

def _default_convert_input(self, x: beam.Row) -> Any:
return InputConverter.convert_to(x, self._input_type)

def _default_convert_output(self, x: PredictionResult) -> float:
return OutputConverter.convert_from(x)

def learn_one(self, x: beam.Row) -> None:
"""Not implemented since CustomDetector invokes RunInference directly."""
raise NotImplementedError

def score_one(self, x: beam.Row) -> Optional[float]:
"""Not implemented since CustomDetector invokes RunInference directly."""
raise NotImplementedError
Loading
Loading