From 638a27180c05cd9f67d05d294928e96c6cb8c132 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 18 Feb 2025 21:08:13 -0500 Subject: [PATCH 1/5] Add threshold and aggregation functions. Also include the following changes: * change prediction to predictions (iterable) in AnomalyResult. * fix some tests that contaminate _KNONW_SPECIFIABLE --- .../apache_beam/ml/anomaly/aggregations.py | 263 +++++++++++++++ .../ml/anomaly/aggregations_test.py | 120 +++++++ sdks/python/apache_beam/ml/anomaly/base.py | 6 +- .../apache_beam/ml/anomaly/base_test.py | 11 +- .../ml/anomaly/specifiable_test.py | 20 +- .../apache_beam/ml/anomaly/thresholds.py | 319 ++++++++++++++++++ .../apache_beam/ml/anomaly/thresholds_test.py | 228 +++++++++++++ 7 files changed, 962 insertions(+), 5 deletions(-) create mode 100644 sdks/python/apache_beam/ml/anomaly/aggregations.py create mode 100644 sdks/python/apache_beam/ml/anomaly/aggregations_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/thresholds.py create mode 100644 sdks/python/apache_beam/ml/anomaly/thresholds_test.py diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations.py b/sdks/python/apache_beam/ml/anomaly/aggregations.py new file mode 100644 index 000000000000..ba80d7075bda --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/aggregations.py @@ -0,0 +1,263 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import collections +import math +import statistics +from typing import Callable +from typing import Iterable + +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly.base import AggregationFn +from apache_beam.ml.anomaly.specifiable import specifiable + + +class LabelAggregation(AggregationFn): + """Aggregates anomaly predictions based on their labels. + + This is an abstract base class for `AggregationFn`s that combine multiple + `AnomalyPrediction` objects into a single `AnomalyPrediction` based on + the labels of the input predictions. + + Args: + agg_func (Callable[[Iterable[int]], int]): A function that aggregates + a collection of anomaly labels (integers) into a single label. + include_history (bool): If True, include the input predictions in the + `agg_history` of the output. Defaults to False. + """ + def __init__( + self, + agg_func: Callable[[Iterable[int]], int], + include_history: bool = False): + self._agg = agg_func + self._include_history = include_history + self._agg_model_id = None + + def apply( + self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: + """Applies the label aggregation function to a list of predictions. + + Args: + predictions (Iterable[AnomalyPrediction]): A collection of + `AnomalyPrediction` objects to be aggregated. + + Returns: + AnomalyPrediction: A single `AnomalyPrediction` object with the + aggregated label. + """ + labels = [ + prediction.label for prediction in predictions + if prediction.label is not None + ] + + if len(labels) == 0: + return AnomalyPrediction(model_id=self._agg_model_id) + + label = self._agg(labels) + + history = list(predictions) if self._include_history else None + + return AnomalyPrediction( + model_id=self._agg_model_id, label=label, agg_history=history) + + +class ScoreAggregation(AggregationFn): + """Aggregates anomaly predictions based on their scores. + + This is an abstract base class for `AggregationFn`s that combine multiple + `AnomalyPrediction` objects into a single `AnomalyPrediction` based on + the scores of the input predictions. + + Args: + agg_func (Callable[[Iterable[float]], float]): A function that aggregates + a collection of anomaly scores (floats) into a single score. + include_history (bool): If True, include the input predictions in the + `agg_history` of the output. Defaults to False. + """ + def __init__( + self, + agg_func: Callable[[Iterable[float]], float], + include_history: bool = False): + self._agg = agg_func + self._include_history = include_history + self._agg_model_id = None + + def apply( + self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: + """Applies the score aggregation function to a list of predictions. + + Args: + predictions (Iterable[AnomalyPrediction]): A collection of + `AnomalyPrediction` objects to be aggregated. + + Returns: + AnomalyPrediction: A single `AnomalyPrediction` object with the + aggregated score. + """ + scores = [ + prediction.score for prediction in predictions + if prediction.score is not None and not math.isnan(prediction.score) + ] + if len(scores) == 0: + return AnomalyPrediction(model_id=self._agg_model_id) + + score = self._agg(scores) + + history = list(predictions) if self._include_history else None + + return AnomalyPrediction( + model_id=self._agg_model_id, score=score, agg_history=history) + + +@specifiable +class MajorityVote(LabelAggregation): + """Aggregates anomaly labels using majority voting. + + This `AggregationFn` implements a majority voting strategy to combine + anomaly labels from multiple `AnomalyPrediction` objects. It counts the + occurrences of normal and outlier labels and selects the label with the + higher count as the aggregated label. In case of a tie, a tie-breaker + label is used. + + Example: + If input labels are [normal, outlier, outlier, normal, outlier], and + normal_label=0, outlier_label=1, then the aggregated label will be + outlier (1) because outliers have a majority (3 vs 2). + + Args: + normal_label (int): The integer label for normal predictions. Defaults to 0. + outlier_label (int): The integer label for outlier predictions. Defaults to + 1. + tie_breaker (int): The label to return if there is a tie in votes. + Defaults to 0 (normal_label). + **kwargs: Additional keyword arguments to pass to the base + `LabelAggregation` class. + """ + def __init__(self, normal_label=0, outlier_label=1, tie_breaker=0, **kwargs): + self._tie_breaker = tie_breaker + self._normal_label = normal_label + self._outlier_label = outlier_label + + def inner(predictions: Iterable[int]) -> int: + counters = collections.Counter(predictions) + if counters[self._normal_label] < counters[self._outlier_label]: + vote = self._outlier_label + elif counters[self._normal_label] > counters[self._outlier_label]: + vote = self._normal_label + else: + vote = self._tie_breaker + return vote + + super().__init__(agg_func=inner, **kwargs) + + +# And scheme +@specifiable +class AllVote(LabelAggregation): + """Aggregates anomaly labels using an "all vote" (AND) scheme. + + This `AggregationFn` implements an "all vote" strategy. It aggregates + anomaly labels such that the result is considered an outlier only if all + input `AnomalyPrediction` objects are labeled as outliers. + + Example: + If input labels are [outlier, outlier, outlier], and outlier_label=1, + then the aggregated label will be outlier (1). + If input labels are [outlier, normal, outlier], and outlier_label=1, + then the aggregated label will be normal (0). + + Args: + normal_label (int): The integer label for normal predictions. Defaults to 0. + outlier_label (int): The integer label for outlier predictions. Defaults to + 1. + **kwargs: Additional keyword arguments to pass to the base + `LabelAggregation` class. + """ + def __init__(self, normal_label=0, outlier_label=1, **kwargs): + self._normal_label = normal_label + self._outlier_label = outlier_label + + def inner(predictions: Iterable[int]) -> int: + return self._outlier_label if all( + map(lambda p: p == self._outlier_label, + predictions)) else self._normal_label + + super().__init__(agg_func=inner, **kwargs) + + +# Or scheme +@specifiable +class AnyVote(LabelAggregation): + """Aggregates anomaly labels using an "any vote" (OR) scheme. + + This `AggregationFn` implements an "any vote" strategy. It aggregates + anomaly labels such that the result is considered an outlier if at least + one of the input `AnomalyPrediction` objects is labeled as an outlier. + + Example: + If input labels are [normal, normal, outlier], and outlier_label=1, + then the aggregated label will be outlier (1). + If input labels are [normal, normal, normal], and outlier_label=1, + then the aggregated label will be normal (0). + + Args: + normal_label (int): The integer label for normal predictions. Defaults to 0. + outlier_label (int): The integer label for outlier predictions. Defaults to + 1. + **kwargs: Additional keyword arguments to pass to the base + `LabelAggregation` class. + """ + def __init__(self, normal_label=0, outlier_label=1, **kwargs): + self._normal_label = normal_label + self._outlier_label = outlier_label + + def inner(predictions: Iterable[int]) -> int: + return self._outlier_label if any( + map(lambda p: p == self._outlier_label, + predictions)) else self._normal_label + + super().__init__(agg_func=inner, **kwargs) + + +@specifiable +class AverageScore(ScoreAggregation): + """Aggregates anomaly scores by calculating their average. + + This `AggregationFn` computes the average of the anomaly scores from a + collection of `AnomalyPrediction` objects. + + Args: + **kwargs: Additional keyword arguments to pass to the base + `ScoreAggregation` class. + """ + def __init__(self, **kwargs): + super().__init__(agg_func=statistics.mean, **kwargs) + + +@specifiable +class MaxScore(ScoreAggregation): + """Aggregates anomaly scores by selecting the maximum score. + + This `AggregationFn` selects the highest anomaly score from a collection + of `AnomalyPrediction` objects as the aggregated score. + + Args: + **kwargs: Additional keyword arguments to pass to the base + `ScoreAggregation` class. + """ + def __init__(self, **kwargs): + super().__init__(agg_func=max, **kwargs) diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py new file mode 100644 index 000000000000..ec178793398f --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest + +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly import aggregations + + +class MajorityVoteTest(unittest.TestCase): + def test_default(self): + normal = AnomalyPrediction(label=0) + outlier = AnomalyPrediction(label=1) + vote = aggregations.MajorityVote(_run_init=True).apply + + self.assertEqual(vote([]), AnomalyPrediction()) + + self.assertEqual(vote([normal]), normal) + + self.assertEqual(vote([outlier]), outlier) + + self.assertEqual(vote([outlier, normal, normal]), normal) + + self.assertEqual(vote([outlier, normal, outlier]), outlier) + + # use normal to break ties by default + self.assertEqual(vote([outlier, normal]), normal) + + def test_tie_breaker(self): + normal = AnomalyPrediction(label=0) + outlier = AnomalyPrediction(label=1) + vote = aggregations.MajorityVote(tie_breaker=1, _run_init=True).apply + + self.assertEqual(vote([outlier, normal]), outlier) + + +class AllVoteTest(unittest.TestCase): + def test_default(self): + normal = AnomalyPrediction(label=0) + outlier = AnomalyPrediction(label=1) + vote = aggregations.AllVote(_run_init=True).apply + + self.assertEqual(vote([]), AnomalyPrediction()) + + self.assertEqual(vote([normal]), normal) + + self.assertEqual(vote([outlier]), outlier) + + # outlier is only labeled when everyone is outlier + self.assertEqual(vote([normal, normal, normal]), normal) + self.assertEqual(vote([outlier, normal, normal]), normal) + self.assertEqual(vote([outlier, normal, outlier]), normal) + self.assertEqual(vote([outlier, outlier, outlier]), outlier) + + +class AnyVoteTest(unittest.TestCase): + def test_default(self): + normal = AnomalyPrediction(label=0) + outlier = AnomalyPrediction(label=1) + vote = aggregations.AnyVote(_run_init=True).apply + + self.assertEqual(vote([]), AnomalyPrediction()) + + self.assertEqual(vote([normal]), normal) + + self.assertEqual(vote([outlier]), outlier) + + # outlier is labeled when at least one is outlier + self.assertEqual(vote([normal, normal, normal]), normal) + self.assertEqual(vote([outlier, normal, normal]), outlier) + self.assertEqual(vote([outlier, normal, outlier]), outlier) + self.assertEqual(vote([outlier, outlier, outlier]), outlier) + + +class AverageScoreTest(unittest.TestCase): + def test_default(self): + avg = aggregations.AverageScore(_run_init=True).apply + + self.assertEqual(avg([]), AnomalyPrediction()) + + self.assertEqual( + avg([AnomalyPrediction(score=1)]), AnomalyPrediction(score=1)) + + self.assertEqual( + avg([AnomalyPrediction(score=1), AnomalyPrediction(score=2)]), + AnomalyPrediction(score=1.5)) + + +class MaxScoreTest(unittest.TestCase): + def test_default(self): + avg = aggregations.MaxScore(_run_init=True).apply + + self.assertEqual(avg([]), AnomalyPrediction()) + + self.assertEqual( + avg([AnomalyPrediction(score=1)]), AnomalyPrediction(score=1)) + + self.assertEqual( + avg([AnomalyPrediction(score=1), AnomalyPrediction(score=2)]), + AnomalyPrediction(score=2)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index 6a717cf5db16..2dd53707a582 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -61,8 +61,10 @@ class AnomalyResult(): """A dataclass for the anomaly detection results""" #: The original input data. example: beam.Row - #: The `AnomalyPrediction` object containing the prediction. - prediction: AnomalyPrediction + #: The iterable of `AnomalyPrediction` objects containing the predictions. + #: Expect length 1 if it is a result for a non-ensemble detector or an + #: ensemble detector with an aggregation strategy applied. + predictions: Iterable[AnomalyPrediction] class ThresholdFn(abc.ABC): diff --git a/sdks/python/apache_beam/ml/anomaly/base_test.py b/sdks/python/apache_beam/ml/anomaly/base_test.py index e58674d8c1e9..8ccc28f01932 100644 --- a/sdks/python/apache_beam/ml/anomaly/base_test.py +++ b/sdks/python/apache_beam/ml/anomaly/base_test.py @@ -17,6 +17,7 @@ from __future__ import annotations +import copy import logging import unittest @@ -34,8 +35,11 @@ class TestAnomalyDetector(unittest.TestCase): def setUp(self) -> None: - # Remove all registered specifiable classes and reset. + self.saved_specifiable = copy.deepcopy(_KNOWN_SPECIFIABLE) + + def tearDown(self) -> None: _KNOWN_SPECIFIABLE.clear() + _KNOWN_SPECIFIABLE.update(self.saved_specifiable) @parameterized.expand([(False, False), (True, False), (False, True), (True, True)]) @@ -142,8 +146,11 @@ def __eq__(self, value) -> bool: class TestEnsembleAnomalyDetector(unittest.TestCase): def setUp(self) -> None: - # Remove all registered specifiable classes and reset. + self.saved_specifiable = copy.deepcopy(_KNOWN_SPECIFIABLE) + + def tearDown(self) -> None: _KNOWN_SPECIFIABLE.clear() + _KNOWN_SPECIFIABLE.update(self.saved_specifiable) @parameterized.expand([(False, False), (True, False), (False, True), (True, True)]) diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py index 19b9d81c3d53..a3133f32e996 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py @@ -15,6 +15,7 @@ # limitations under the License. # +import copy import dataclasses import logging import unittest @@ -32,8 +33,11 @@ class TestSpecifiable(unittest.TestCase): def setUp(self) -> None: - # Remove all registered specifiable classes and reset. + self.saved_specifiable = copy.deepcopy(_KNOWN_SPECIFIABLE) + + def tearDown(self) -> None: _KNOWN_SPECIFIABLE.clear() + _KNOWN_SPECIFIABLE.update(self.saved_specifiable) def test_decorator_in_function_form(self): class A(): @@ -217,6 +221,13 @@ class ShoppingCart(): class TestInitCallCount(unittest.TestCase): + def setUp(self) -> None: + self.saved_specifiable = copy.deepcopy(_KNOWN_SPECIFIABLE) + + def tearDown(self) -> None: + _KNOWN_SPECIFIABLE.clear() + _KNOWN_SPECIFIABLE.update(self.saved_specifiable) + def test_on_demand_init(self): @specifiable(on_demand_init=True, just_in_time_init=False) class FooOnDemand(): @@ -411,6 +422,13 @@ def __init__(self, c): class TestNestedSpecifiable(unittest.TestCase): + def setUp(self) -> None: + self.saved_specifiable = copy.deepcopy(_KNOWN_SPECIFIABLE) + + def tearDown(self) -> None: + _KNOWN_SPECIFIABLE.clear() + _KNOWN_SPECIFIABLE.update(self.saved_specifiable) + @parameterized.expand([[Child_1, 0], [Child_2, 0], [Child_1, 1], [Child_2, 1], [Child_1, 2], [Child_2, 2]]) def test_nested_specifiable(self, Child, mode): diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds.py b/sdks/python/apache_beam/ml/anomaly/thresholds.py new file mode 100644 index 000000000000..e3d7b9d0ab3d --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/thresholds.py @@ -0,0 +1,319 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +import dataclasses +from typing import Any +from typing import cast +from typing import Iterable +from typing import Optional +from typing import Tuple +from typing import Union + +import apache_beam as beam +from apache_beam.coders import DillCoder +from apache_beam.ml.anomaly.base import AnomalyResult +from apache_beam.ml.anomaly.base import ThresholdFn +from apache_beam.ml.anomaly.specifiable import Spec +from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.specifiable import Specifiable +from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.transforms.userstate import ReadModifyWriteStateSpec +from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState + + +class BaseThresholdDoFn(beam.DoFn): + """Applies a ThresholdFn to anomaly detection results. + + This abstract base class defines the structure for DoFns that use a + `ThresholdFn` to convert anomaly scores into anomaly labels (e.g., normal + or outlier). It handles the core logic of applying the threshold function + and updating the prediction labels within `AnomalyResult` objects. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + """ + def __init__(self, threshold_fn_spec: Spec): + self._threshold_fn_spec = threshold_fn_spec + self._threshold_fn: ThresholdFn + + def _apply_threshold_to_predictions( + self, result: AnomalyResult) -> AnomalyResult: + """Updates the prediction labels in an AnomalyResult using the ThresholdFn. + + Args: + result (AnomalyResult): The input `AnomalyResult` containing anomaly + scores. + + Returns: + AnomalyResult: A new `AnomalyResult` with updated prediction labels + and threshold values. + """ + predictions = [ + dataclasses.replace( + p, + label=self._threshold_fn.apply(p.score), + threshold=self._threshold_fn.threshold) for p in result.predictions + ] + return dataclasses.replace(result, predictions=predictions) + + +class StatelessThresholdDoFn(BaseThresholdDoFn): + """Applies a stateless ThresholdFn to anomaly detection results. + + This DoFn is designed for stateless `ThresholdFn` implementations. It + initializes the `ThresholdFn` once during setup and applies it to each + incoming element without maintaining any state across elements. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + + Raises: + AssertionError: If the provided `threshold_fn_spec` leads to the + creation of a stateful `ThresholdFn`. + """ + def __init__(self, threshold_fn_spec: Spec): + threshold_fn_spec.config["_run_init"] = True + self._threshold_fn = cast( + ThresholdFn, Specifiable.from_spec(threshold_fn_spec)) + assert not self._threshold_fn.is_stateful, \ + "This DoFn can only take stateless function as threshold_fn" + + def process(self, element: Tuple[Any, Tuple[Any, AnomalyResult]], + **kwargs) -> Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: + """Processes a batch of anomaly results using a stateless ThresholdFn. + + Args: + element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing + an element in the Beam pipeline. It is expected to be in the format + `(key1, (key2, AnomalyResult))`, where key1 is the original input key, + and key2 is a disambiguating key for distinct data points. + **kwargs: Additional keyword arguments passed to the `process` method + in Beam DoFns. + + Yields: + Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing + a single output element with the same structure as the input, but with + the `AnomalyResult` having updated prediction labels based on the + stateless `ThresholdFn`. + """ + k1, (k2, result) = element + yield k1, (k2, self._apply_threshold_to_predictions(result)) + + +class StatefulThresholdDoFn(BaseThresholdDoFn): + """Applies a stateful ThresholdFn to anomaly detection results. + + This DoFn is designed for stateful `ThresholdFn` implementations. It leverages + Beam's state management to persist and update the state of the `ThresholdFn` + across multiple elements. This is necessary for `ThresholdFn`s that need to + accumulate information or adapt over time, such as quantile-based thresholds. + + Args: + threshold_fn_spec (Spec): Specification defining the `ThresholdFn` to be + used. + + Raises: + AssertionError: If the provided `threshold_fn_spec` leads to the + creation of a stateless `ThresholdFn`. + """ + THRESHOLD_STATE_INDEX = ReadModifyWriteStateSpec('saved_tracker', DillCoder()) + + def __init__(self, threshold_fn_spec: Spec): + threshold_fn_spec.config["_run_init"] = True + threshold_fn: ThresholdFn = cast( + ThresholdFn, Specifiable.from_spec(threshold_fn_spec)) + assert threshold_fn.is_stateful, \ + "This DoFn can only take stateful function as threshold_fn" + self._threshold_fn_spec = threshold_fn_spec + + def process( + self, + element: Tuple[Any, Tuple[Any, AnomalyResult]], + threshold_state: Union[ReadModifyWriteRuntimeState, + Any] = beam.DoFn.StateParam(THRESHOLD_STATE_INDEX), + **kwargs) -> Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: + """Processes a batch of anomaly results using a stateful ThresholdFn. + + For each input element, this DoFn retrieves the stateful `ThresholdFn` from + Beam state, initializes it if it's the first time, applies it to update + the prediction labels in the `AnomalyResult`, and then updates the state in + Beam for future elements. + + Args: + element (Tuple[Any, Tuple[Any, AnomalyResult]]): A tuple representing + an element in the Beam pipeline. It is expected to be in the format + `(key1, (key2, AnomalyResult))`, where key1 is the original input key, + and key2 is a disambiguating key for distinct data points. + threshold_state (Union[ReadModifyWriteRuntimeState, Any]): A Beam state + parameter that provides access to the persisted state of the + `ThresholdFn`. It is automatically managed by Beam. + **kwargs: Additional keyword arguments passed to the `process` method + in Beam DoFns. + + Yields: + Iterable[Tuple[Any, Tuple[Any, AnomalyResult]]]: An iterable containing + a single output element with the same structure as the input, but + with the `AnomalyResult` having updated prediction labels based on + the stateful `ThresholdFn`. + """ + k1, (k2, result) = element + + self._threshold_fn = threshold_state.read() + if self._threshold_fn is None: + self._threshold_fn: Specifiable = Specifiable.from_spec( + self._threshold_fn_spec) + + yield k1, (k2, self._apply_threshold_to_predictions(result)) + + threshold_state.write(self._threshold_fn) + + +@specifiable +class FixedThreshold(ThresholdFn): + """Applies a fixed cutoff value to anomaly scores. + + This `ThresholdFn` is stateless and uses a pre-defined cutoff value to + classify anomaly scores. Scores below the cutoff are considered normal, while + scores at or above the cutoff are classified as outliers. + + Args: + cutoff (float): The fixed threshold value. Anomaly scores at or above this + value will be labeled as outliers. + **kwargs: Additional keyword arguments to be passed to the base + `ThresholdFn` constructor. + """ + def __init__(self, cutoff: float, **kwargs): + super().__init__(**kwargs) + self._cutoff = cutoff + + @property + def is_stateful(self) -> bool: + """Indicates whether this ThresholdFn is stateful. + + Returns: + bool: Always False for `FixedThreshold` as it is stateless. + """ + return False + + @property + def threshold(self) -> float: + """Returns the fixed cutoff threshold value. + + Returns: + float: The fixed threshold value. + """ + return self._cutoff + + def apply(self, score: Optional[float]) -> int: + """Applies the fixed threshold to an anomaly score. + + Classifies the given anomaly score as normal or outlier based on the + predefined cutoff. + + Args: + score (Optional[float]): The input anomaly score. + + Returns: + int: The anomaly label (normal or outlier). Returns the normal label + if the score is None or less than the threshold, otherwise returns + the outlier label. + """ + if score is None or score < self.threshold: + return self._normal_label + + return self._outlier_label + + +@specifiable +class QuantileThreshold(ThresholdFn): + """Applies a quantile-based dynamic threshold to anomaly scores. + + This `ThresholdFn` is stateful and uses a quantile tracker to dynamically + determine the threshold for anomaly detection. It estimates the specified + quantile of the incoming anomaly scores and uses this quantile value as the + threshold. + + The threshold adapts over time as more data is processed, making it suitable + for scenarios where the distribution of anomaly scores may change. + + Args: + quantile (Optional[float]): The quantile to be tracked (e.g., 0.95 for the + 95th percentile). This value determines the dynamic threshold. Defaults to + 0.95. + quantile_tracker (Optional[BufferedQuantileTracker]): An optional + pre-initialized quantile tracker. If provided, this tracker will be used; + otherwise, a `BufferedSlidingQuantileTracker` will be created with a + default window size of 100. + **kwargs: Additional keyword arguments to be passed to the base + `ThresholdFn` constructor. + """ + def __init__( + self, + quantile: Optional[float] = 0.95, + quantile_tracker: Optional[QuantileTracker] = None, + **kwargs): + super().__init__(**kwargs) + if quantile_tracker is not None: + self._tracker = quantile_tracker + else: + self._tracker = BufferedSlidingQuantileTracker( + window_size=100, q=quantile) + + @property + def is_stateful(self) -> bool: + """Indicates whether this ThresholdFn is stateful. + + Returns: + bool: Always True for `QuantileThreshold` as it is stateful. + """ + return True + + @property + def threshold(self) -> float: + """Returns the current quantile-based threshold value. + + Returns: + float: The dynamically calculated threshold value based on the quantile + tracker. + """ + return self._tracker.get() + + def apply(self, score: Optional[float]) -> int: + """Applies the quantile-based threshold to an anomaly score. + + Updates the quantile tracker with the given score and classifies the score + as normal or outlier based on the current quantile threshold. + + Args: + score (Optional[float]): The input anomaly score. + + Returns: + int: The anomaly label (normal or outlier). Returns the normal label if + the score is None or less than the dynamic quantile threshold, otherwise + returns the outlier label. + """ + self._tracker.push(score) + + if score is None or score < self.threshold: + return self._normal_label + + return self._outlier_label diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py new file mode 100644 index 000000000000..cad88ac4f1a2 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py @@ -0,0 +1,228 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest + +import apache_beam as beam +from apache_beam.ml.anomaly import thresholds +from apache_beam.ml.anomaly.base import AnomalyPrediction +from apache_beam.ml.anomaly.base import AnomalyResult +from apache_beam.ml.anomaly.specifiable import Spec +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +class TestFixedThreshold(unittest.TestCase): + def test_threshold(self): + input = [ + (1, (2, AnomalyResult(beam.Row(x=10), [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=3)]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + beam.Row(x=10), + [AnomalyPrediction(score=1, label=0, threshold=2)]))), + ( + 1, + ( + 3, + AnomalyResult( + beam.Row(x=20), + [AnomalyPrediction(score=2, label=1, threshold=2)]))), + ( + 1, + ( + 4, + AnomalyResult( + beam.Row(x=20), + [AnomalyPrediction(score=3, label=1, threshold=2)]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + | beam.ParDo( + thresholds.StatelessThresholdDoFn( + thresholds.FixedThreshold(2, normal_label=0, + outlier_label=1).to_spec()))) # type: ignore + + assert_that(result, equal_to(expected)) + + def test_multiple_predictions(self): + input = [ + ( + 1, + ( + 2, + AnomalyResult( + beam.Row(x=10), + [AnomalyPrediction(score=1), AnomalyPrediction(score=4)]))), + ( + 1, + ( + 3, + AnomalyResult( + beam.Row(x=20), + [AnomalyPrediction(score=2), AnomalyPrediction(score=0.5) + ]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + beam.Row(x=10), + [ + AnomalyPrediction(score=1, label=0, threshold=2), + AnomalyPrediction(score=4, label=1, threshold=2) + ]))), + ( + 1, + ( + 3, + AnomalyResult( + beam.Row(x=20), + [ + AnomalyPrediction(score=2, label=1, threshold=2), + AnomalyPrediction(score=0.5, label=0, threshold=2) + ]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + | beam.ParDo( + thresholds.StatelessThresholdDoFn( + thresholds.FixedThreshold(2, normal_label=0, + outlier_label=1).to_spec()))) # type: ignore + + assert_that(result, equal_to(expected)) + + +class TestQuantileThreshold(unittest.TestCase): + def test_threshold(self): + # use the input data with two keys to test stateful threshold function + input = [ + (1, (2, AnomalyResult(beam.Row(x=10), [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(beam.Row(x=30), [AnomalyPrediction(score=3)]))), + (2, (2, AnomalyResult(beam.Row(x=40), [AnomalyPrediction(score=10)]))), + (2, (3, AnomalyResult(beam.Row(x=50), [AnomalyPrediction(score=20)]))), + (2, (4, AnomalyResult(beam.Row(x=60), [AnomalyPrediction(score=30)]))), + ] + expected = [ + ( + 1, + ( + 2, + AnomalyResult( + beam.Row(x=10), + [AnomalyPrediction(score=1, label=1, threshold=1)]))), + ( + 1, + ( + 3, + AnomalyResult( + beam.Row(x=20), + [AnomalyPrediction(score=2, label=1, threshold=1.5)]))), + ( + 2, + ( + 2, + AnomalyResult( + beam.Row(x=40), + [AnomalyPrediction(score=10, label=1, threshold=10)]))), + ( + 2, + ( + 3, + AnomalyResult( + beam.Row(x=50), + [AnomalyPrediction(score=20, label=1, threshold=15)]))), + ( + 1, + ( + 4, + AnomalyResult( + beam.Row(x=30), + [AnomalyPrediction(score=3, label=1, threshold=2)]))), + ( + 2, + ( + 4, + AnomalyResult( + beam.Row(x=60), + [AnomalyPrediction(score=30, label=1, threshold=20)]))), + ] + with TestPipeline() as p: + result = ( + p + | beam.Create(input) + # use median just for test convenience + | beam.ParDo( + thresholds.StatefulThresholdDoFn( + thresholds.QuantileThreshold( + quantile=0.5, normal_label=0, + outlier_label=1).to_spec()))) # type: ignore + + assert_that(result, equal_to(expected)) + + def test_quantile_tracker(self): + t1 = thresholds.QuantileThreshold() + self.assertTrue(isinstance(t1._tracker, BufferedSlidingQuantileTracker)) + self.assertEqual(t1._tracker._q, 0.95) + self.assertEqual(t1.to_spec(), Spec("QuantileThreshold", config={})) + + t2 = thresholds.QuantileThreshold(quantile=0.99) + self.assertTrue(isinstance(t2._tracker, BufferedSlidingQuantileTracker)) + self.assertEqual(t2._tracker._q, 0.99) + self.assertEqual( + t2.to_spec(), Spec("QuantileThreshold", config={"quantile": 0.99})) + + # argument quantile=0.9 is not used because quantile_tracker is set + t3 = thresholds.QuantileThreshold( + quantile=0.9, quantile_tracker=SimpleSlidingQuantileTracker(50, 0.975)) + self.assertTrue(isinstance(t3._tracker, SimpleSlidingQuantileTracker)) + self.assertEqual(t3._tracker._q, 0.975) + print(t3.to_spec()) + self.assertEqual( + t3.to_spec(), + Spec( + "QuantileThreshold", + config={ + 'quantile': 0.9, + 'quantile_tracker': Spec( + type='SimpleSlidingQuantileTracker', + config={ + 'window_size': 50, 'q': 0.975 + }) + })) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From e384169b269bad4643b79a0a164654d35351f24f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 18 Feb 2025 23:54:48 -0500 Subject: [PATCH 2/5] Fix lints. --- sdks/python/apache_beam/ml/anomaly/aggregations.py | 2 +- sdks/python/apache_beam/ml/anomaly/aggregations_test.py | 2 +- sdks/python/apache_beam/ml/anomaly/thresholds.py | 8 ++++---- sdks/python/apache_beam/ml/anomaly/thresholds_test.py | 8 ++++---- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations.py b/sdks/python/apache_beam/ml/anomaly/aggregations.py index ba80d7075bda..43632070b88d 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations.py @@ -21,8 +21,8 @@ from typing import Callable from typing import Iterable -from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly.base import AggregationFn +from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly.specifiable import specifiable diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py index ec178793398f..1e1d868fef33 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py @@ -18,8 +18,8 @@ import logging import unittest -from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly import aggregations +from apache_beam.ml.anomaly.base import AnomalyPrediction class MajorityVoteTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds.py b/sdks/python/apache_beam/ml/anomaly/thresholds.py index e3d7b9d0ab3d..5f91b286dd84 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds.py @@ -19,23 +19,23 @@ import dataclasses from typing import Any -from typing import cast from typing import Iterable from typing import Optional from typing import Tuple from typing import Union +from typing import cast import apache_beam as beam from apache_beam.coders import DillCoder from apache_beam.ml.anomaly.base import AnomalyResult from apache_beam.ml.anomaly.base import ThresholdFn from apache_beam.ml.anomaly.specifiable import Spec -from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.specifiable import Specifiable -from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker +from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long -from apache_beam.transforms.userstate import ReadModifyWriteStateSpec +from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker from apache_beam.transforms.userstate import ReadModifyWriteRuntimeState +from apache_beam.transforms.userstate import ReadModifyWriteStateSpec class BaseThresholdDoFn(beam.DoFn): diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py index cad88ac4f1a2..8819b9152c86 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py @@ -23,8 +23,8 @@ from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly.base import AnomalyResult from apache_beam.ml.anomaly.specifiable import Spec -from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -67,7 +67,7 @@ def test_threshold(self): | beam.ParDo( thresholds.StatelessThresholdDoFn( thresholds.FixedThreshold(2, normal_label=0, - outlier_label=1).to_spec()))) # type: ignore + outlier_label=1).to_spec()))) assert_that(result, equal_to(expected)) @@ -118,7 +118,7 @@ def test_multiple_predictions(self): | beam.ParDo( thresholds.StatelessThresholdDoFn( thresholds.FixedThreshold(2, normal_label=0, - outlier_label=1).to_spec()))) # type: ignore + outlier_label=1).to_spec()))) assert_that(result, equal_to(expected)) @@ -187,7 +187,7 @@ def test_threshold(self): thresholds.StatefulThresholdDoFn( thresholds.QuantileThreshold( quantile=0.5, normal_label=0, - outlier_label=1).to_spec()))) # type: ignore + outlier_label=1).to_spec()))) assert_that(result, equal_to(expected)) From 7ede19cde81ab5ed889b92e04a1feb92fc239c1d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 19 Feb 2025 00:00:18 -0500 Subject: [PATCH 3/5] Get rid of _run_init. The init will be called just-in-time. --- .../apache_beam/ml/anomaly/aggregations_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py index 1e1d868fef33..eda63bd5ebc6 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py @@ -26,7 +26,7 @@ class MajorityVoteTest(unittest.TestCase): def test_default(self): normal = AnomalyPrediction(label=0) outlier = AnomalyPrediction(label=1) - vote = aggregations.MajorityVote(_run_init=True).apply + vote = aggregations.MajorityVote().apply self.assertEqual(vote([]), AnomalyPrediction()) @@ -44,7 +44,7 @@ def test_default(self): def test_tie_breaker(self): normal = AnomalyPrediction(label=0) outlier = AnomalyPrediction(label=1) - vote = aggregations.MajorityVote(tie_breaker=1, _run_init=True).apply + vote = aggregations.MajorityVote(tie_breaker=1).apply self.assertEqual(vote([outlier, normal]), outlier) @@ -53,7 +53,7 @@ class AllVoteTest(unittest.TestCase): def test_default(self): normal = AnomalyPrediction(label=0) outlier = AnomalyPrediction(label=1) - vote = aggregations.AllVote(_run_init=True).apply + vote = aggregations.AllVote().apply self.assertEqual(vote([]), AnomalyPrediction()) @@ -72,7 +72,7 @@ class AnyVoteTest(unittest.TestCase): def test_default(self): normal = AnomalyPrediction(label=0) outlier = AnomalyPrediction(label=1) - vote = aggregations.AnyVote(_run_init=True).apply + vote = aggregations.AnyVote().apply self.assertEqual(vote([]), AnomalyPrediction()) @@ -89,7 +89,7 @@ def test_default(self): class AverageScoreTest(unittest.TestCase): def test_default(self): - avg = aggregations.AverageScore(_run_init=True).apply + avg = aggregations.AverageScore().apply self.assertEqual(avg([]), AnomalyPrediction()) @@ -103,7 +103,7 @@ def test_default(self): class MaxScoreTest(unittest.TestCase): def test_default(self): - avg = aggregations.MaxScore(_run_init=True).apply + avg = aggregations.MaxScore().apply self.assertEqual(avg([]), AnomalyPrediction()) From cd86a36f86fc7cd28d09d593fe88e2eeddfd9491 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 20 Feb 2025 22:21:37 -0500 Subject: [PATCH 4/5] Add logic to handle missing and error labels/scores. Also includes the following adjustments per reviewer's feedback - Rename include_history to include_source_predictions. - Get rid of the unnecessary cast --- .../apache_beam/ml/anomaly/aggregations.py | 122 +++++++++++++----- .../ml/anomaly/aggregations_test.py | 40 ++++++ sdks/python/apache_beam/ml/anomaly/base.py | 11 +- .../apache_beam/ml/anomaly/thresholds.py | 54 +++++--- .../apache_beam/ml/anomaly/thresholds_test.py | 77 ++++++----- 5 files changed, 220 insertions(+), 84 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations.py b/sdks/python/apache_beam/ml/anomaly/aggregations.py index 43632070b88d..f8057e090bb5 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations.py @@ -20,13 +20,35 @@ import statistics from typing import Callable from typing import Iterable +from typing import Optional from apache_beam.ml.anomaly.base import AggregationFn from apache_beam.ml.anomaly.base import AnomalyPrediction from apache_beam.ml.anomaly.specifiable import specifiable -class LabelAggregation(AggregationFn): +class _AggModelIdMixin: + def __init__(self, agg_model_id: Optional[str] = None): + self._agg_model_id = agg_model_id + + def _set_agg_model_id_if_unset(self, agg_model_id: str) -> None: + if self._agg_model_id is None: + self._agg_model_id = agg_model_id + + def apply(self, result_dict): + result_dict["model_id"] = self._agg_model_id + + +class _SourcePredictionMixin: + def __init__(self, include_source_predictions): + self._include_source_predictions = include_source_predictions + + def apply(self, result_dict, source_predictions): + if self._include_source_predictions: + result_dict["source_predictions"] = list(source_predictions) + + +class LabelAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin): """Aggregates anomaly predictions based on their labels. This is an abstract base class for `AggregationFn`s that combine multiple @@ -36,16 +58,22 @@ class LabelAggregation(AggregationFn): Args: agg_func (Callable[[Iterable[int]], int]): A function that aggregates a collection of anomaly labels (integers) into a single label. - include_history (bool): If True, include the input predictions in the - `agg_history` of the output. Defaults to False. + agg_model_id (Optional[str]): The model id used in aggregated predictions. + Defaults to None. + include_source_predictions (bool): If True, include the input predictions in + the `source_predictions` of the output. Defaults to False. """ def __init__( self, agg_func: Callable[[Iterable[int]], int], - include_history: bool = False): + agg_model_id: Optional[str] = None, + include_source_predictions: bool = False, + missing_label: int = -2, + ): self._agg = agg_func - self._include_history = include_history - self._agg_model_id = None + self._missing_label = missing_label + _AggModelIdMixin.__init__(self, agg_model_id) + _SourcePredictionMixin.__init__(self, include_source_predictions) def apply( self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: @@ -57,25 +85,38 @@ def apply( Returns: AnomalyPrediction: A single `AnomalyPrediction` object with the - aggregated label. + aggregated label. The aggregated label is determined as follows: + + - If there are any non-missing and non-error labels, the `agg_func` is + applied to aggregate them. + - If all labels are error labels (`None`), the aggregated label is also + `None`. + - If there are a mix of missing and error labels, the aggregated label + is the `missing_label`. """ + result_dict = {} + _AggModelIdMixin.apply(self, result_dict) + _SourcePredictionMixin.apply(self, result_dict, predictions) + labels = [ - prediction.label for prediction in predictions - if prediction.label is not None + prediction.label for prediction in predictions if + prediction.label is not None and prediction.label != self._missing_label ] - if len(labels) == 0: - return AnomalyPrediction(model_id=self._agg_model_id) - - label = self._agg(labels) - - history = list(predictions) if self._include_history else None + if len(labels) > 0: + # apply aggregation_fn if there is any non-None and non-missing label + result_dict["label"] = self._agg(labels) + elif all(map(lambda x: x.label is None, predictions)): + # all are error labels (None) -- all scores are error + result_dict["label"] = None + else: + # some missing labels with some error labels (None) + result_dict["label"] = self._missing_label - return AnomalyPrediction( - model_id=self._agg_model_id, label=label, agg_history=history) + return AnomalyPrediction(**result_dict) -class ScoreAggregation(AggregationFn): +class ScoreAggregation(AggregationFn, _AggModelIdMixin, _SourcePredictionMixin): """Aggregates anomaly predictions based on their scores. This is an abstract base class for `AggregationFn`s that combine multiple @@ -85,16 +126,19 @@ class ScoreAggregation(AggregationFn): Args: agg_func (Callable[[Iterable[float]], float]): A function that aggregates a collection of anomaly scores (floats) into a single score. - include_history (bool): If True, include the input predictions in the - `agg_history` of the output. Defaults to False. + agg_model_id (Optional[str]): The model id used in aggregated predictions. + Defaults to None. + include_source_predictions (bool): If True, include the input predictions in + the `source_predictions` of the output. Defaults to False. """ def __init__( self, agg_func: Callable[[Iterable[float]], float], - include_history: bool = False): + agg_model_id: Optional[str] = None, + include_source_predictions: bool = False): self._agg = agg_func - self._include_history = include_history - self._agg_model_id = None + _AggModelIdMixin.__init__(self, agg_model_id) + _SourcePredictionMixin.__init__(self, include_source_predictions) def apply( self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction: @@ -106,21 +150,35 @@ def apply( Returns: AnomalyPrediction: A single `AnomalyPrediction` object with the - aggregated score. + aggregated score. The aggregated score is determined as follows: + + - If there are any non-missing and non-error scores, the `agg_func` is + applied to aggregate them. + - If all scores are error scores (`None`), the aggregated score is also + `None`. + - If there are a mix of missing (`NaN`) and error scores (`None`), the + aggregated score is `NaN`. """ + result_dict = {} + _AggModelIdMixin.apply(self, result_dict) + _SourcePredictionMixin.apply(self, result_dict, predictions) + scores = [ prediction.score for prediction in predictions if prediction.score is not None and not math.isnan(prediction.score) ] - if len(scores) == 0: - return AnomalyPrediction(model_id=self._agg_model_id) - - score = self._agg(scores) - - history = list(predictions) if self._include_history else None - return AnomalyPrediction( - model_id=self._agg_model_id, score=score, agg_history=history) + if len(scores) > 0: + # apply aggregation_fn if there is any non-None and non-NaN score + result_dict["score"] = self._agg(scores) + elif all(map(lambda x: x.score is None, predictions)): + # all are error scores (None) + result_dict["score"] = None + else: + # some missing scores (NaN) with some error scores (None) + result_dict["score"] = float("NaN") + + return AnomalyPrediction(**result_dict) @specifiable diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py index eda63bd5ebc6..dee3fd377d74 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py @@ -16,12 +16,33 @@ # import logging +import math import unittest from apache_beam.ml.anomaly import aggregations from apache_beam.ml.anomaly.base import AnomalyPrediction +class LabelAggTestWithMissingOrError(unittest.TestCase): + def test_default(self): + normal = AnomalyPrediction(label=0) + outlier = AnomalyPrediction(label=1) + missing = AnomalyPrediction(label=-2) + error = AnomalyPrediction(label=None) + + vote = aggregations.MajorityVote().apply + + # missing and error labels are ignored if there is any normal/outlier + self.assertEqual(vote([normal, missing, error]), normal) + self.assertEqual(vote([outlier, missing, error]), outlier) + + # if there is any missing among errors, return missing + self.assertEqual(vote([error, missing, error]), missing) + + # return error only when all are errors + self.assertEqual(vote([error, error, error]), error) + + class MajorityVoteTest(unittest.TestCase): def test_default(self): normal = AnomalyPrediction(label=0) @@ -87,6 +108,25 @@ def test_default(self): self.assertEqual(vote([outlier, outlier, outlier]), outlier) +class ScoreAggTestWithMissingOrError(unittest.TestCase): + def test_default(self): + normal = AnomalyPrediction(score=1.0) + missing = AnomalyPrediction(score=float("NaN")) + error = AnomalyPrediction(score=None) + + avg = aggregations.AverageScore().apply + + # missing and error scores are ignored if there is any normal/outlier + self.assertEqual(avg([normal, missing, error]), normal) + + # if there is any missing among errors, return missing. + # note that NaN != NaN, so we cannot use `assertEqual` here. + self.assertTrue(avg([error, missing, error]).score) + + # return error only when all are errors + self.assertEqual(avg([error, error, error]), error) + + class AverageScoreTest(unittest.TestCase): def test_default(self): avg = aggregations.AverageScore().apply diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index 2dd53707a582..c19b4a53d567 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -53,7 +53,7 @@ class AnomalyPrediction(): info: str = "" #: If enabled, a list of `AnomalyPrediction` objects used to derive the #: aggregated prediction. - agg_history: Optional[Iterable[AnomalyPrediction]] = None + source_predictions: Optional[Iterable[AnomalyPrediction]] = None @dataclass(frozen=True) @@ -74,10 +74,17 @@ class ThresholdFn(abc.ABC): normal_label: The integer label used to identify normal data. Defaults to 0. outlier_label: The integer label used to identify outlier data. Defaults to 1. + missing_label: The integer label used when a score is missing because the + model is not ready to score. """ - def __init__(self, normal_label: int = 0, outlier_label: int = 1): + def __init__( + self, + normal_label: int = 0, + outlier_label: int = 1, + missing_label: int = -2): self._normal_label = normal_label self._outlier_label = outlier_label + self._missing_label = missing_label @property @abc.abstractmethod diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds.py b/sdks/python/apache_beam/ml/anomaly/thresholds.py index 5f91b286dd84..636733ab052e 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds.py @@ -18,12 +18,12 @@ from __future__ import annotations import dataclasses +import math from typing import Any from typing import Iterable from typing import Optional from typing import Tuple from typing import Union -from typing import cast import apache_beam as beam from apache_beam.coders import DillCoder @@ -92,8 +92,7 @@ class StatelessThresholdDoFn(BaseThresholdDoFn): """ def __init__(self, threshold_fn_spec: Spec): threshold_fn_spec.config["_run_init"] = True - self._threshold_fn = cast( - ThresholdFn, Specifiable.from_spec(threshold_fn_spec)) + self._threshold_fn = Specifiable.from_spec(threshold_fn_spec) assert not self._threshold_fn.is_stateful, \ "This DoFn can only take stateless function as threshold_fn" @@ -139,8 +138,7 @@ class StatefulThresholdDoFn(BaseThresholdDoFn): def __init__(self, threshold_fn_spec: Spec): threshold_fn_spec.config["_run_init"] = True - threshold_fn: ThresholdFn = cast( - ThresholdFn, Specifiable.from_spec(threshold_fn_spec)) + threshold_fn: ThresholdFn = Specifiable.from_spec(threshold_fn_spec) assert threshold_fn.is_stateful, \ "This DoFn can only take stateful function as threshold_fn" self._threshold_fn_spec = threshold_fn_spec @@ -223,7 +221,7 @@ def threshold(self) -> float: """ return self._cutoff - def apply(self, score: Optional[float]) -> int: + def apply(self, score: Optional[float]) -> Optional[int]: """Applies the fixed threshold to an anomaly score. Classifies the given anomaly score as normal or outlier based on the @@ -233,11 +231,23 @@ def apply(self, score: Optional[float]) -> int: score (Optional[float]): The input anomaly score. Returns: - int: The anomaly label (normal or outlier). Returns the normal label - if the score is None or less than the threshold, otherwise returns - the outlier label. + Optional[int]: The anomaly label: + - `normal_label` if the score is less than the threshold. + - `outlier_label` if the score is at or above the threshold. + - `missing_label` if the score is `NaN` (detector not ready). + - `None` if the score is `None` (detector ready, but unable to produce + score). """ - if score is None or score < self.threshold: + # score error: detector is ready but is unable to produce the score due to + # errors such as ill-formatted input. + if score is None: + return None + + # score missing: detector is not yet ready + if math.isnan(score): + return self._missing_label + + if score < self.threshold: return self._normal_label return self._outlier_label @@ -297,7 +307,7 @@ def threshold(self) -> float: """ return self._tracker.get() - def apply(self, score: Optional[float]) -> int: + def apply(self, score: Optional[float]) -> Optional[int]: """Applies the quantile-based threshold to an anomaly score. Updates the quantile tracker with the given score and classifies the score @@ -307,13 +317,27 @@ def apply(self, score: Optional[float]) -> int: score (Optional[float]): The input anomaly score. Returns: - int: The anomaly label (normal or outlier). Returns the normal label if - the score is None or less than the dynamic quantile threshold, otherwise - returns the outlier label. + Optional[int]: The anomaly label: + - `normal_label` if the score is less than the threshold. + - `outlier_label` if the score is at or above the threshold. + - `missing_label` if the score is `NaN` (detector not ready). + - `None` if the score is `None` (detector ready, but unable to produce + score). """ + # score error: detector is ready but is unable to produce the score due to + # errors such as ill-formatted input. + if score is None: + # store NaN instead of None in quantile tracker to simplify tracker logic. + # After all, it is only a place holder and will not be used in calculation + self._tracker.push(float("NaN")) + return None + self._tracker.push(score) + # score missing: detector is not yet ready + if math.isnan(score): + return self._missing_label - if score is None or score < self.threshold: + if score < self.threshold: return self._normal_label return self._outlier_label diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py index 8819b9152c86..413c0e52c6b0 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds_test.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds_test.py @@ -29,13 +29,22 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +R = beam.Row(x=10, y=20) + class TestFixedThreshold(unittest.TestCase): - def test_threshold(self): + def test_apply_only(self): + threshold_fn = thresholds.FixedThreshold(2) + self.assertEqual(threshold_fn.apply(1.0), 0) + self.assertEqual(threshold_fn.apply(2.0), 1) + self.assertEqual(threshold_fn.apply(None), None) + self.assertEqual(threshold_fn.apply(float('NaN')), -2) + + def test_dofn_on_single_prediction(self): input = [ - (1, (2, AnomalyResult(beam.Row(x=10), [AnomalyPrediction(score=1)]))), - (1, (3, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=2)]))), - (1, (4, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=3)]))), + (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), ] expected = [ ( @@ -43,22 +52,19 @@ def test_threshold(self): ( 2, AnomalyResult( - beam.Row(x=10), - [AnomalyPrediction(score=1, label=0, threshold=2)]))), + R, [AnomalyPrediction(score=1, label=0, threshold=2)]))), ( 1, ( 3, AnomalyResult( - beam.Row(x=20), - [AnomalyPrediction(score=2, label=1, threshold=2)]))), + R, [AnomalyPrediction(score=2, label=1, threshold=2)]))), ( 1, ( 4, AnomalyResult( - beam.Row(x=20), - [AnomalyPrediction(score=3, label=1, threshold=2)]))), + R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), ] with TestPipeline() as p: result = ( @@ -68,24 +74,23 @@ def test_threshold(self): thresholds.StatelessThresholdDoFn( thresholds.FixedThreshold(2, normal_label=0, outlier_label=1).to_spec()))) - assert_that(result, equal_to(expected)) - def test_multiple_predictions(self): + def test_dofn_on_multiple_predictions(self): input = [ ( 1, ( 2, AnomalyResult( - beam.Row(x=10), + R, [AnomalyPrediction(score=1), AnomalyPrediction(score=4)]))), ( 1, ( 3, AnomalyResult( - beam.Row(x=20), + R, [AnomalyPrediction(score=2), AnomalyPrediction(score=0.5) ]))), ] @@ -95,7 +100,7 @@ def test_multiple_predictions(self): ( 2, AnomalyResult( - beam.Row(x=10), + R, [ AnomalyPrediction(score=1, label=0, threshold=2), AnomalyPrediction(score=4, label=1, threshold=2) @@ -105,7 +110,7 @@ def test_multiple_predictions(self): ( 3, AnomalyResult( - beam.Row(x=20), + R, [ AnomalyPrediction(score=2, label=1, threshold=2), AnomalyPrediction(score=0.5, label=0, threshold=2) @@ -124,15 +129,23 @@ def test_multiple_predictions(self): class TestQuantileThreshold(unittest.TestCase): - def test_threshold(self): + def test_apply_only(self): + threshold_fn = thresholds.QuantileThreshold(0.9) + self.assertEqual(threshold_fn.apply(1.0), 1) + self.assertEqual(threshold_fn.apply(2.0), 1) + self.assertEqual(threshold_fn.apply(1.2), 0) + self.assertEqual(threshold_fn.apply(None), None) + self.assertEqual(threshold_fn.apply(float('NaN')), -2) + + def test_dofn_on_single_prediction(self): # use the input data with two keys to test stateful threshold function input = [ - (1, (2, AnomalyResult(beam.Row(x=10), [AnomalyPrediction(score=1)]))), - (1, (3, AnomalyResult(beam.Row(x=20), [AnomalyPrediction(score=2)]))), - (1, (4, AnomalyResult(beam.Row(x=30), [AnomalyPrediction(score=3)]))), - (2, (2, AnomalyResult(beam.Row(x=40), [AnomalyPrediction(score=10)]))), - (2, (3, AnomalyResult(beam.Row(x=50), [AnomalyPrediction(score=20)]))), - (2, (4, AnomalyResult(beam.Row(x=60), [AnomalyPrediction(score=30)]))), + (1, (2, AnomalyResult(R, [AnomalyPrediction(score=1)]))), + (1, (3, AnomalyResult(R, [AnomalyPrediction(score=2)]))), + (1, (4, AnomalyResult(R, [AnomalyPrediction(score=3)]))), + (2, (2, AnomalyResult(R, [AnomalyPrediction(score=10)]))), + (2, (3, AnomalyResult(R, [AnomalyPrediction(score=20)]))), + (2, (4, AnomalyResult(R, [AnomalyPrediction(score=30)]))), ] expected = [ ( @@ -140,43 +153,37 @@ def test_threshold(self): ( 2, AnomalyResult( - beam.Row(x=10), - [AnomalyPrediction(score=1, label=1, threshold=1)]))), + R, [AnomalyPrediction(score=1, label=1, threshold=1)]))), ( 1, ( 3, AnomalyResult( - beam.Row(x=20), - [AnomalyPrediction(score=2, label=1, threshold=1.5)]))), + R, [AnomalyPrediction(score=2, label=1, threshold=1.5)]))), ( 2, ( 2, AnomalyResult( - beam.Row(x=40), - [AnomalyPrediction(score=10, label=1, threshold=10)]))), + R, [AnomalyPrediction(score=10, label=1, threshold=10)]))), ( 2, ( 3, AnomalyResult( - beam.Row(x=50), - [AnomalyPrediction(score=20, label=1, threshold=15)]))), + R, [AnomalyPrediction(score=20, label=1, threshold=15)]))), ( 1, ( 4, AnomalyResult( - beam.Row(x=30), - [AnomalyPrediction(score=3, label=1, threshold=2)]))), + R, [AnomalyPrediction(score=3, label=1, threshold=2)]))), ( 2, ( 4, AnomalyResult( - beam.Row(x=60), - [AnomalyPrediction(score=30, label=1, threshold=20)]))), + R, [AnomalyPrediction(score=30, label=1, threshold=20)]))), ] with TestPipeline() as p: result = ( From 317017753bc17edeaf1a46b36ff011ccd09c5cd1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 21 Feb 2025 16:01:54 -0500 Subject: [PATCH 5/5] Fix lints. --- .../apache_beam/ml/anomaly/aggregations.py | 19 +++++++++++-------- .../ml/anomaly/aggregations_test.py | 1 - sdks/python/apache_beam/ml/anomaly/base.py | 2 +- .../apache_beam/ml/anomaly/thresholds.py | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations.py b/sdks/python/apache_beam/ml/anomaly/aggregations.py index f8057e090bb5..832f28316502 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations.py @@ -18,6 +18,7 @@ import collections import math import statistics +from typing import Any from typing import Callable from typing import Iterable from typing import Optional @@ -35,7 +36,7 @@ def _set_agg_model_id_if_unset(self, agg_model_id: str) -> None: if self._agg_model_id is None: self._agg_model_id = agg_model_id - def apply(self, result_dict): + def add_model_id(self, result_dict): result_dict["model_id"] = self._agg_model_id @@ -43,7 +44,7 @@ class _SourcePredictionMixin: def __init__(self, include_source_predictions): self._include_source_predictions = include_source_predictions - def apply(self, result_dict, source_predictions): + def add_source_predictions(self, result_dict, source_predictions): if self._include_source_predictions: result_dict["source_predictions"] = list(source_predictions) @@ -94,9 +95,10 @@ def apply( - If there are a mix of missing and error labels, the aggregated label is the `missing_label`. """ - result_dict = {} - _AggModelIdMixin.apply(self, result_dict) - _SourcePredictionMixin.apply(self, result_dict, predictions) + result_dict: dict[str, Any] = {} + _AggModelIdMixin.add_model_id(self, result_dict) + _SourcePredictionMixin.add_source_predictions( + self, result_dict, predictions) labels = [ prediction.label for prediction in predictions if @@ -159,9 +161,10 @@ def apply( - If there are a mix of missing (`NaN`) and error scores (`None`), the aggregated score is `NaN`. """ - result_dict = {} - _AggModelIdMixin.apply(self, result_dict) - _SourcePredictionMixin.apply(self, result_dict, predictions) + result_dict: dict[str, Any] = {} + _AggModelIdMixin.add_model_id(self, result_dict) + _SourcePredictionMixin.add_source_predictions( + self, result_dict, predictions) scores = [ prediction.score for prediction in predictions diff --git a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py index dee3fd377d74..e9efd69b48db 100644 --- a/sdks/python/apache_beam/ml/anomaly/aggregations_test.py +++ b/sdks/python/apache_beam/ml/anomaly/aggregations_test.py @@ -16,7 +16,6 @@ # import logging -import math import unittest from apache_beam.ml.anomaly import aggregations diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index c19b4a53d567..4242decf97ef 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -99,7 +99,7 @@ def threshold(self) -> Optional[float]: raise NotImplementedError @abc.abstractmethod - def apply(self, score: Optional[float]) -> int: + def apply(self, score: Optional[float]) -> Optional[int]: """Applies the threshold function to a given score to classify it as normal or outlier. diff --git a/sdks/python/apache_beam/ml/anomaly/thresholds.py b/sdks/python/apache_beam/ml/anomaly/thresholds.py index 636733ab052e..d777aa5cde00 100644 --- a/sdks/python/apache_beam/ml/anomaly/thresholds.py +++ b/sdks/python/apache_beam/ml/anomaly/thresholds.py @@ -52,7 +52,7 @@ class BaseThresholdDoFn(beam.DoFn): """ def __init__(self, threshold_fn_spec: Spec): self._threshold_fn_spec = threshold_fn_spec - self._threshold_fn: ThresholdFn + self._threshold_fn = None def _apply_threshold_to_predictions( self, result: AnomalyResult) -> AnomalyResult: @@ -92,7 +92,7 @@ class StatelessThresholdDoFn(BaseThresholdDoFn): """ def __init__(self, threshold_fn_spec: Spec): threshold_fn_spec.config["_run_init"] = True - self._threshold_fn = Specifiable.from_spec(threshold_fn_spec) + self._threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) assert not self._threshold_fn.is_stateful, \ "This DoFn can only take stateless function as threshold_fn" @@ -138,7 +138,7 @@ class StatefulThresholdDoFn(BaseThresholdDoFn): def __init__(self, threshold_fn_spec: Spec): threshold_fn_spec.config["_run_init"] = True - threshold_fn: ThresholdFn = Specifiable.from_spec(threshold_fn_spec) + threshold_fn: Any = Specifiable.from_spec(threshold_fn_spec) assert threshold_fn.is_stateful, \ "This DoFn can only take stateful function as threshold_fn" self._threshold_fn_spec = threshold_fn_spec