From 46945de83f1ce843351bbeea4b9468a40a31ce1e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Feb 2025 11:30:55 -0500 Subject: [PATCH 1/6] Change prediction in AnomalyResult to predictions which is now an iterable of AnomalyPrediction. --- sdks/python/apache_beam/ml/anomaly/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index 6a717cf5db16..b849268067b6 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -61,8 +61,9 @@ class AnomalyResult(): """A dataclass for the anomaly detection results""" #: The original input data. example: beam.Row - #: The `AnomalyPrediction` object containing the prediction. - prediction: AnomalyPrediction + #: The iterable of `AnomalyPrediction` objects containing the predictions. + #: Expect length 1 if an aggregation strategy is applied. + predictions: Iterable[AnomalyPrediction] class ThresholdFn(abc.ABC): From 510754e81c8ec896674118b22ff77642314dcba3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 13 Feb 2025 15:43:56 -0500 Subject: [PATCH 2/6] Add mean, stdev and quantile trackers with tests. --- sdks/python/apache_beam/ml/anomaly/base.py | 5 +- .../ml/anomaly/univariate/__init__.py | 16 ++ .../apache_beam/ml/anomaly/univariate/base.py | 50 ++++++ .../apache_beam/ml/anomaly/univariate/mean.py | 89 ++++++++++ .../ml/anomaly/univariate/mean_test.py | 164 ++++++++++++++++++ .../ml/anomaly/univariate/perf_test.py | 77 ++++++++ .../ml/anomaly/univariate/quantile.py | 95 ++++++++++ .../ml/anomaly/univariate/quantile_test.py | 162 +++++++++++++++++ .../ml/anomaly/univariate/stdev.py | 97 +++++++++++ .../ml/anomaly/univariate/stdev_test.py | 157 +++++++++++++++++ 10 files changed, 909 insertions(+), 3 deletions(-) create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/__init__.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/base.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/mean.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/quantile.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/stdev.py create mode 100644 sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py index b849268067b6..6a717cf5db16 100644 --- a/sdks/python/apache_beam/ml/anomaly/base.py +++ b/sdks/python/apache_beam/ml/anomaly/base.py @@ -61,9 +61,8 @@ class AnomalyResult(): """A dataclass for the anomaly detection results""" #: The original input data. example: beam.Row - #: The iterable of `AnomalyPrediction` objects containing the predictions. - #: Expect length 1 if an aggregation strategy is applied. - predictions: Iterable[AnomalyPrediction] + #: The `AnomalyPrediction` object containing the prediction. + prediction: AnomalyPrediction class ThresholdFn(abc.ABC): diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py new file mode 100644 index 000000000000..9bd6ee45d23e --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from collections import deque +from enum import Enum + + +class BaseTracker(abc.ABC): + @abc.abstractmethod + def push(self, x): + raise NotImplementedError() + + @abc.abstractmethod + def get(self): + raise NotImplementedError() + + +class WindowMode(Enum): + LANDMARK = 1 + SLIDING = 2 + + +class WindowedTracker(BaseTracker): + def __init__(self, window_mode, **kwargs): + if window_mode == WindowMode.SLIDING: + self._window_size = kwargs.get("window_size", 100) + self._queue = deque(maxlen=self._window_size) + self._n = 0 + self._window_mode = window_mode + + def push(self, x): + self._queue.append(x) + + def pop(self): + return self._queue.popleft() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py new file mode 100644 index 000000000000..7c2ae0a76820 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import WindowMode +from apache_beam.ml.anomaly.univariate.base import WindowedTracker + +__all__ = [ + "SimpleSlidingMeanTracker", + "IncLandmarkMeanTracker", + "IncSlidingMeanTracker" +] + + +class MeanTracker(WindowedTracker): + pass + + +class SimpleSlidingMeanTracker(MeanTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + if len(self._queue) == 0: + return float('nan') + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanmean(self._queue) + + +class IncMeanTracker(MeanTracker): + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode=window_mode, **kwargs) + self._mean = 0 + + def push(self, x): + if not math.isnan(x): + self._n += 1 + delta = x - self._mean + else: + delta = 0 + + if self._window_mode == WindowMode.SLIDING: + if len(self._queue) >= self._window_size and \ + not math.isnan(old_x := self.pop()): + self._n -= 1 + delta += (self._mean - old_x) + + super().push(x) + + if self._n > 0: + self._mean += delta / self._n + else: + self._mean = 0 + + def get(self): + if self._n < 1: + # keep it consistent with numpy + return float("nan") + return self._mean + + +class IncLandmarkMeanTracker(IncMeanTracker): + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +class IncSlidingMeanTracker(IncMeanTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py new file mode 100644 index 000000000000..1348f314a0ed --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py @@ -0,0 +1,164 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.mean import IncLandmarkMeanTracker +from apache_beam.ml.anomaly.univariate.mean import IncSlidingMeanTracker +from apache_beam.ml.anomaly.univariate.mean import SimpleSlidingMeanTracker + +FLOAT64_MAX = 1.79769313486231570814527423731704356798070e+308 + + +class LandmarkMeanTest(unittest.TestCase): + def test_without_nan(self): + t = IncLandmarkMeanTracker() + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 7.0) + t.push(-3) + self.assertEqual(t.get(), 5.0) + + def test_with_nan(self): + t = IncLandmarkMeanTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + def test_with_float64_max(self): + t = IncLandmarkMeanTracker() + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = IncLandmarkMeanTracker() + t2 = SimpleSlidingMeanTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingMeanTest(unittest.TestCase): + @parameterized.expand([ + # SimpleSlidingMeanTracker, + IncSlidingMeanTracker + ]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty + + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(8) + self.assertEqual(t.get(), 4.0) + t.push(16) + self.assertEqual(t.get(), 9.0) + t.push(-3) + self.assertEqual(t.get(), 7.0) + + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) + def test_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + t.push(1) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # All values in the tracker are NaN + t.push(4) + self.assertEqual(t.get(), 4.0) + + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) + def test_with_float64_max(self, tracker): + t = tracker(2) + t.push(FLOAT64_MAX) + self.assertEqual(t.get(), FLOAT64_MAX) + t.push(FLOAT64_MAX) + if tracker is IncSlidingMeanTracker: + self.assertEqual(t.get(), FLOAT64_MAX) + self.assertFalse(math.isinf(t.get())) + else: + # SimpleSlidingMean (using Numpy) returns inf when it computes the + # average of [float64_max, float64_max]. + self.assertTrue(math.isinf(t.get())) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncSlidingMeanTracker(100) + t2 = SimpleSlidingMeanTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py new file mode 100644 index 000000000000..114aa1889e83 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import random +import statistics +import time +import timeit +import unittest +import warnings + +from apache_beam.ml.anomaly.univariate.mean import * +from apache_beam.ml.anomaly.univariate.quantile import * +from apache_beam.ml.anomaly.univariate.stdev import * + +seed_value_time = int(time.time()) +random.seed(seed_value_time) +print(f"{'Seed value':30s}{seed_value_time}") + +numbers = [] +for _ in range(50000): + numbers.append(random.randint(0, 1000)) + + +def run_tracker(tracker, numbers): + for i in range(len(numbers)): + tracker.push(numbers[i]) + _ = tracker.get() + + +def print_result(tracker, number=10, repeat=5): + runtimes = timeit.repeat( + lambda: run_tracker(tracker, numbers), number=number, repeat=repeat) + mean = statistics.mean(runtimes) + sd = statistics.stdev(runtimes) + print(f"{tracker.__class__.__name__:30s}{mean:.6f} ± {sd:.6f}") + + +class PerfTest(unittest.TestCase): + def test_mean_perf(self): + print() + print_result(IncLandmarkMeanTracker()) + print_result(IncSlidingMeanTracker(100)) + print_result(SimpleSlidingMeanTracker(100), number=1) + + def test_stdev_perf(self): + print() + print_result(IncLandmarkStdevTracker()) + print_result(IncSlidingStdevTracker(100)) + print_result(SimpleSlidingStdevTracker(100), number=1) + + def test_quantile_perf(self): + print() + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + print_result(IncLandmarkQuantileTracker(0.5)) + print_result(IncSlidingQuantileTracker(100, 0.5)) + print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py new file mode 100644 index 000000000000..632fcc9f6f66 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -0,0 +1,95 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import typing +import warnings + +import numpy as np +from sortedcontainers import SortedList + +from apache_beam.ml.anomaly.univariate.base import WindowMode +from apache_beam.ml.anomaly.univariate.base import WindowedTracker + +__all__ = [ + "IncLandmarkQuantileTracker", + "SimpleSlidingQuantileTracker", + "IncSlidingQuantileTracker" +] + + +class QuantileTracker(WindowedTracker): + pass + + +class SimpleSlidingQuantileTracker(QuantileTracker): + def __init__(self, window_size, q): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q + + def get(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanquantile(self._queue, self._q) + + +class IncQuantileTracker(WindowedTracker): + def __init__(self, window_mode, q, **kwargs): + super().__init__(window_mode, **kwargs) + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q + self._sorted_items = SortedList() + + def push(self, x): + if not math.isnan(x): + self._sorted_items.add(x) + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._sorted_items.discard(old_x) + + super().push(x) + + def get(self): + n = len(self._sorted_items) + if n < 1: + return float("nan") + + pos = self._q * (n - 1) + lo = math.floor(pos) + lo_value = typing.cast(float, self._sorted_items[lo]) + + # Use linear interpolation to yield the requested quantile + hi = min(lo + 1, n - 1) + hi_value: float = typing.cast(float, self._sorted_items[hi]) + return lo_value + (hi_value - lo_value) * (pos - lo) + + +class IncLandmarkQuantileTracker(IncQuantileTracker): + def __init__(self, q): + warnings.warn( + "Quantile trackers should not be used in production due to " + "the unbounded memory consumption.") + super().__init__(window_mode=WindowMode.LANDMARK, q=q) + + +class IncSlidingQuantileTracker(IncQuantileTracker): + def __init__(self, window_size, q): + super().__init__( + window_mode=WindowMode.SLIDING, q=q, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py new file mode 100644 index 000000000000..ceed2483e4fa --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py @@ -0,0 +1,162 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest +import warnings + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.quantile import IncLandmarkQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import IncSlidingQuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long + + +class LandmarkQuantileTest(unittest.TestCase): + def test_without_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = IncLandmarkQuantileTracker(0.5) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.5) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.5) + + def test_with_nan(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t = IncLandmarkQuantileTracker(0.2) + + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.2) + t.push(float('nan')) + self.assertEqual(t.get(), 1.2) + t.push(0) + self.assertEqual(t.get(), 0.4) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + t1 = IncLandmarkQuantileTracker(0.5) + t2 = SimpleSlidingQuantileTracker(len(numbers), 0.5) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +class SlidingQuantileTest(unittest.TestCase): + @parameterized.expand( + [ #SimpleSlidingQuantileTracker, + IncSlidingQuantileTracker + ]) + def test_without_nan(self, tracker): + t = tracker(3, 0.5) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.5) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 2.0) + t.push(3) + self.assertEqual(t.get(), 2.0) + t.push(1) + self.assertEqual(t.get(), 1.0) + + @parameterized.expand( + [SimpleSlidingQuantileTracker, IncSlidingQuantileTracker]) + def test_with_nan(self, tracker): + t = tracker(3, 0.8) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 1.0) + t.push(2) + self.assertEqual(t.get(), 1.8) + t.push(float('nan')) + self.assertEqual(t.get(), 1.8) + t.push(2) + self.assertEqual(t.get(), 2.0) + t.push(0) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 1.6) + t.push(float('nan')) + self.assertEqual(t.get(), 0.0) + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + t.push(3) + self.assertEqual(t.get(), 3.0) + t.push(1) + self.assertEqual(t.get(), 2.6) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + def _accuracy_helper(): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncSlidingQuantileTracker(100, 0.1) + t2 = SimpleSlidingQuantileTracker(100, 0.1) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue(abs(t1.get() - t2.get()) < 1e-9) + + for _ in range(10): + _accuracy_helper() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py new file mode 100644 index 000000000000..1e895669dc85 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -0,0 +1,97 @@ +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import WindowMode +from apache_beam.ml.anomaly.univariate.base import WindowedTracker + +__all__ = [ + "SimpleSlidingStdevTracker", + "IncLandmarkStdevTracker", + "IncSlidingStdevTracker" +] + + +class StdevTracker(WindowedTracker): + pass + + +class SimpleSlidingStdevTracker(StdevTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + # We do not use nanstd, since nanstd([]) gives 0, which is incorrect. + # Use nanvar instead. + return math.sqrt(np.nanvar(self._queue, ddof=1)) + + +class IncStdevTracker(StdevTracker): + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode, **kwargs) + self._mean = 0 + self._m2 = 0 + + def push(self, x): + if not math.isnan(x): + self._n += 1 + delta1 = x - self._mean + else: + delta1 = 0 + + if self._window_mode == WindowMode.SLIDING: + if (len(self._queue) >= self._window_size and + not math.isnan(old_x := self.pop())): + self._n -= 1 + delta2 = self._mean - old_x + else: + delta2 = 0 + + super().push(x) + else: + delta2 = 0 + + if self._n > 0: + self._mean += (delta1 + delta2) / self._n + + if delta1 != 0: + self._m2 += delta1 * (x - self._mean) + if delta2 != 0: + self._m2 += delta2 * (old_x - self._mean) + else: + self._mean = 0 + self._m2 = 0 + + def get(self): + if self._n < 2: + # keep it consistent with numpy + return float("nan") + dof = self._n - 1 + return math.sqrt(self._m2 / dof) + + +class IncLandmarkStdevTracker(IncStdevTracker): + def __init__(self): + super().__init__(window_mode=WindowMode.LANDMARK) + + +class IncSlidingStdevTracker(IncStdevTracker): + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py new file mode 100644 index 000000000000..c26f16e3a1b7 --- /dev/null +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev_test.py @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import math +import random +import time +import unittest + +from parameterized import parameterized + +from apache_beam.ml.anomaly.univariate.stdev import IncLandmarkStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import IncSlidingStdevTracker +from apache_beam.ml.anomaly.univariate.stdev import SimpleSlidingStdevTracker + + +class LandmarkStdevTest(unittest.TestCase): + def test_without_nan(self): + t = IncLandmarkStdevTracker() + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_with_nan(self): + t = IncLandmarkStdevTracker() + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 1.0) + + t.push(10) + self.assertEqual(t.get(), 4.08248290463863) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncLandmarkStdevTracker() + t2 = SimpleSlidingStdevTracker(len(numbers)) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +class SlidingStdevTest(unittest.TestCase): + @parameterized.expand([SimpleSlidingStdevTracker, IncSlidingStdevTracker]) + def test_without_nan(self, tracker): + t = tracker(3) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertTrue(math.isnan(t.get())) + t.push(1) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + t.push(10) + self.assertEqual(t.get(), 4.725815626252609) + + @parameterized.expand([SimpleSlidingStdevTracker, IncSlidingStdevTracker]) + def test_stdev_with_nan(self, tracker): + t = tracker(3) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) # NaN is ignored + + t.push(1) + self.assertTrue(math.isnan(t.get())) + t.push(2) + self.assertEqual(t.get(), 0.7071067811865476) + t.push(3) + self.assertEqual(t.get(), 1.0) + + # flush the only number out + t.push(float('nan')) + self.assertEqual(t.get(), 0.7071067811865476) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + t.push(float('nan')) + self.assertTrue(math.isnan(t.get())) + + if tracker is IncSlidingStdevTracker: + self.assertEqual(t._m2, 0) + self.assertEqual(t._mean, 0) + + t.push(4) + self.assertTrue(math.isnan(t.get())) + t.push(5) + self.assertEqual(t.get(), 0.7071067811865476) + + def test_accuracy_fuzz(self): + seed = int(time.time()) + random.seed(seed) + print("Random seed: %d" % seed) + + for _ in range(10): + numbers = [] + for _ in range(5000): + numbers.append(random.randint(0, 1000)) + + t1 = IncSlidingStdevTracker(100) + t2 = SimpleSlidingStdevTracker(100) + for v in numbers: + t1.push(v) + t2.push(v) + self.assertTrue((math.isnan(t1.get()) and math.isnan(t2.get())) or + abs(t1.get() - t2.get()) < 1e-9) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From b2ffec86abcfa915e3ea5ea3a9cae61e8d5952bb Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 14 Feb 2025 22:18:03 -0500 Subject: [PATCH 3/6] Add docstrings --- .../apache_beam/ml/anomaly/univariate/base.py | 38 ++++++++ .../apache_beam/ml/anomaly/univariate/mean.py | 64 +++++++++++-- .../ml/anomaly/univariate/mean_test.py | 5 +- .../ml/anomaly/univariate/perf_test.py | 10 +- .../ml/anomaly/univariate/quantile.py | 94 +++++++++++++++++-- .../ml/anomaly/univariate/quantile_test.py | 16 ++-- .../ml/anomaly/univariate/stdev.py | 66 +++++++++++-- 7 files changed, 257 insertions(+), 36 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/base.py b/sdks/python/apache_beam/ml/anomaly/univariate/base.py index 9bd6ee45d23e..b0eb2aba1e69 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/base.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/base.py @@ -21,21 +21,49 @@ class BaseTracker(abc.ABC): + """Abstract base class for all univariate trackers.""" @abc.abstractmethod def push(self, x): + """Push a new value to the tracker. + + Args: + x: The value to be pushed. + """ raise NotImplementedError() @abc.abstractmethod def get(self): + """Get the current tracking value. + + Returns: + The current tracked value, the type of which depends on the specific + tracker implementation. + """ raise NotImplementedError() class WindowMode(Enum): + """Enum representing the window mode for windowed trackers.""" + #: operating on all data points from the beginning. LANDMARK = 1 + #: operating on a fixed-size sliding window of recent data points. SLIDING = 2 class WindowedTracker(BaseTracker): + """Abstract base class for trackers that operate on a data window. + + This class provides a foundation for trackers that maintain a window of data, + either as a landmark window or a sliding window. It provides basic push and + pop operations. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments. + For `SLIDING` window mode, `window_size` can be specified to set the + maximum size of the sliding window. Defaults to 100. + """ def __init__(self, window_mode, **kwargs): if window_mode == WindowMode.SLIDING: self._window_size = kwargs.get("window_size", 100) @@ -44,7 +72,17 @@ def __init__(self, window_mode, **kwargs): self._window_mode = window_mode def push(self, x): + """Adds a new value to the data window. + + Args: + x: The value to be added to the window. + """ self._queue.append(x) def pop(self): + """Removes and returns the oldest value from the data window (FIFO). + + Returns: + The oldest value from the window. + """ return self._queue.popleft() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py index 7c2ae0a76820..eeb806e5d378 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -15,6 +15,16 @@ # limitations under the License. # +"""Trackers for calculating mean in windowed fashion. + +This module defines different types of mean trackers that operate on windows +of data. It includes: + + * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window. + * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode. + * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode. +""" + import math import warnings @@ -23,22 +33,36 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "SimpleSlidingMeanTracker", - "IncLandmarkMeanTracker", - "IncSlidingMeanTracker" -] - class MeanTracker(WindowedTracker): + """Abstract base class for mean trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for mean trackers. + """ pass class SimpleSlidingMeanTracker(MeanTracker): + """Sliding window mean tracker that calculates mean using NumPy. + + This tracker uses NumPy's `nanmean` function to calculate the mean of the + values currently in the sliding window. It's a simple, non-incremental + approach. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) def get(self): + """Calculates and returns the mean of the current sliding window. + + Returns: + float: The mean of the values in the current sliding window. + Returns NaN if the window is empty. + """ if len(self._queue) == 0: return float('nan') @@ -48,11 +72,27 @@ def get(self): class IncMeanTracker(MeanTracker): + """Base class for incremental mean trackers. + + This class implements incremental calculation of the mean, which is more + efficient for streaming data as it updates the mean with each new data point + instead of recalculating from scratch. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, **kwargs): super().__init__(window_mode=window_mode, **kwargs) self._mean = 0 def push(self, x): + """Pushes a new value and updates the incremental mean. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._n += 1 delta = x - self._mean @@ -73,6 +113,12 @@ def push(self, x): self._mean = 0 def get(self): + """Returns the current incremental mean. + + Returns: + float: The current incremental mean value. + Returns NaN if no valid (non-NaN) values have been pushed. + """ if self._n < 1: # keep it consistent with numpy return float("nan") @@ -80,10 +126,16 @@ def get(self): class IncLandmarkMeanTracker(IncMeanTracker): + """Landmark window mean tracker using incremental calculation.""" def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) class IncSlidingMeanTracker(IncMeanTracker): + """Sliding window mean tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py index 1348f314a0ed..0c8888597ebe 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py @@ -89,10 +89,7 @@ def test_accuracy_fuzz(self): class SlidingMeanTest(unittest.TestCase): - @parameterized.expand([ - # SimpleSlidingMeanTracker, - IncSlidingMeanTracker - ]) + @parameterized.expand([SimpleSlidingMeanTracker, IncSlidingMeanTracker]) def test_without_nan(self, tracker): t = tracker(3) self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py index 114aa1889e83..b7324aa94f70 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -55,20 +55,24 @@ def test_mean_perf(self): print() print_result(IncLandmarkMeanTracker()) print_result(IncSlidingMeanTracker(100)) + # SimpleSlidingMeanTracker (numpy-based batch approach) is an order of + # magnitude slower than other methods. To prevent excessively long test + # runs, we reduce the number of repetitions. print_result(SimpleSlidingMeanTracker(100), number=1) def test_stdev_perf(self): print() print_result(IncLandmarkStdevTracker()) print_result(IncSlidingStdevTracker(100)) - print_result(SimpleSlidingStdevTracker(100), number=1) + # Same as test_mean_perf, we reduce the number of repetitions here. def test_quantile_perf(self): print() with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - print_result(IncLandmarkQuantileTracker(0.5)) - print_result(IncSlidingQuantileTracker(100, 0.5)) + print_result(BufferedLandmarkQuantileTracker(0.5)) + print_result(BufferedSlidingQuantileTracker(100, 0.5)) + # Same as test_mean_perf, we reduce the number of repetitions here. print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index 632fcc9f6f66..cf7b94c320ad 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -15,6 +15,19 @@ # limitations under the License. # +"""Trackers for calculating quantiles in windowed fashion. + +This module defines different types of quantile trackers that operate on +windows of data. It includes: + + * `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding + window. + * `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in + landmark window mode. + * `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in + sliding window mode. +""" + import math import typing import warnings @@ -25,30 +38,59 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "IncLandmarkQuantileTracker", - "SimpleSlidingQuantileTracker", - "IncSlidingQuantileTracker" -] - class QuantileTracker(WindowedTracker): + """Abstract base class for quantile trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for quantile trackers. + """ pass class SimpleSlidingQuantileTracker(QuantileTracker): + """Sliding window quantile tracker using NumPy. + + This tracker uses NumPy's `nanquantile` function to calculate the specified + quantile of the values currently in the sliding window. It's a simple, + non-incremental approach. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, window_size, q): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) assert 0 <= q <= 1, "quantile argument should be between 0 and 1" self._q = q def get(self): + """Calculates and returns the specified quantile of the current sliding + window. + + Returns: + float: The specified quantile of the values in the current sliding window. + Returns NaN if the window is empty. + """ with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") return np.nanquantile(self._queue, self._q) -class IncQuantileTracker(WindowedTracker): +class BufferedQuantileTracker(WindowedTracker): + """Abstract base class for buffered quantile trackers. + + Warning: + Buffered quantile trackers are NOT truly incremental in the sense that they + don't update the quantile in constant time per new data point. They maintain + a sorted list of all values in the window. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, q, **kwargs): super().__init__(window_mode, **kwargs) assert 0 <= q <= 1, "quantile argument should be between 0 and 1" @@ -56,6 +98,11 @@ def __init__(self, window_mode, q, **kwargs): self._sorted_items = SortedList() def push(self, x): + """Pushes a new value, maintains the sorted list, and manages the window. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._sorted_items.add(x) @@ -67,6 +114,13 @@ def push(self, x): super().push(x) def get(self): + """Returns the current quantile value using the sorted list. + + Calculates the quantile using linear interpolation on the sorted values. + + Returns: + float: The calculated quantile value. Returns NaN if the window is empty. + """ n = len(self._sorted_items) if n < 1: return float("nan") @@ -81,7 +135,17 @@ def get(self): return lo_value + (hi_value - lo_value) * (pos - lo) -class IncLandmarkQuantileTracker(IncQuantileTracker): +class BufferedLandmarkQuantileTracker(BufferedQuantileTracker): + """Landmark quantile tracker using a sorted list for quantile calculation. + + Warning: + Landmark quantile trackers have unbounded memory consumption as they store + all pushed values in a sorted list. Avoid using in production for + long-running streams. + + Args: + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, q): warnings.warn( "Quantile trackers should not be used in production due to " @@ -89,7 +153,19 @@ def __init__(self, q): super().__init__(window_mode=WindowMode.LANDMARK, q=q) -class IncSlidingQuantileTracker(IncQuantileTracker): +class BufferedSlidingQuantileTracker(BufferedQuantileTracker): + """Sliding window quantile tracker using a sorted list for quantile + calculation. + + Warning: + Maintains a sorted list of values within the sliding window to calculate + the specified quantile. Memory consumption is bounded by the window size + but can still be significant for large windows. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ def __init__(self, window_size, q): super().__init__( window_mode=WindowMode.SLIDING, q=q, window_size=window_size) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py index ceed2483e4fa..9f9402505b70 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile_test.py @@ -24,8 +24,8 @@ from parameterized import parameterized -from apache_beam.ml.anomaly.univariate.quantile import IncLandmarkQuantileTracker # pylint: disable=line-too-long -from apache_beam.ml.anomaly.univariate.quantile import IncSlidingQuantileTracker +from apache_beam.ml.anomaly.univariate.quantile import BufferedLandmarkQuantileTracker # pylint: disable=line-too-long +from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker from apache_beam.ml.anomaly.univariate.quantile import SimpleSlidingQuantileTracker # pylint: disable=line-too-long @@ -33,7 +33,7 @@ class LandmarkQuantileTest(unittest.TestCase): def test_without_nan(self): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t = IncLandmarkQuantileTracker(0.5) + t = BufferedLandmarkQuantileTracker(0.5) self.assertTrue(math.isnan(t.get())) t.push(1) @@ -52,7 +52,7 @@ def test_without_nan(self): def test_with_nan(self): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t = IncLandmarkQuantileTracker(0.2) + t = BufferedLandmarkQuantileTracker(0.2) self.assertTrue(math.isnan(t.get())) t.push(1) @@ -78,7 +78,7 @@ def _accuracy_helper(): with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") - t1 = IncLandmarkQuantileTracker(0.5) + t1 = BufferedLandmarkQuantileTracker(0.5) t2 = SimpleSlidingQuantileTracker(len(numbers), 0.5) for v in numbers: t1.push(v) @@ -92,7 +92,7 @@ def _accuracy_helper(): class SlidingQuantileTest(unittest.TestCase): @parameterized.expand( [ #SimpleSlidingQuantileTracker, - IncSlidingQuantileTracker + BufferedSlidingQuantileTracker ]) def test_without_nan(self, tracker): t = tracker(3, 0.5) @@ -111,7 +111,7 @@ def test_without_nan(self, tracker): self.assertEqual(t.get(), 1.0) @parameterized.expand( - [SimpleSlidingQuantileTracker, IncSlidingQuantileTracker]) + [SimpleSlidingQuantileTracker, BufferedSlidingQuantileTracker]) def test_with_nan(self, tracker): t = tracker(3, 0.8) self.assertTrue(math.isnan(t.get())) @@ -146,7 +146,7 @@ def _accuracy_helper(): for _ in range(5000): numbers.append(random.randint(0, 1000)) - t1 = IncSlidingQuantileTracker(100, 0.1) + t1 = BufferedSlidingQuantileTracker(100, 0.1) t2 = SimpleSlidingQuantileTracker(100, 0.1) for v in numbers: t1.push(v) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py index 1e895669dc85..6868db07b3ac 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -12,6 +12,18 @@ # limitations under the License. # +"""Trackers for calculating standard deviation in windowed fashion. + +This module defines different types of standard deviation trackers that operate +on windows of data. It includes: + + * `SimpleSlidingStdevTracker`: Calculates stdev using numpy in a sliding + window. + * `IncLandmarkStdevTracker`: Incremental stdev tracker in landmark window + mode. + * `IncSlidingStdevTracker`: Incremental stdev tracker in sliding window mode. +""" + import math import warnings @@ -20,22 +32,33 @@ from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker -__all__ = [ - "SimpleSlidingStdevTracker", - "IncLandmarkStdevTracker", - "IncSlidingStdevTracker" -] - class StdevTracker(WindowedTracker): + """Abstract base class for standard deviation trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for stdev trackers. + """ pass class SimpleSlidingStdevTracker(StdevTracker): + """Sliding window standard deviation tracker using NumPy. + + This tracker uses NumPy's `nanvar` function to calculate the variance of the + values currently in the sliding window and then takes the square root to get + the standard deviation. It's a simple, non-incremental approach. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) def get(self): + """Calculates and returns the stdev of the current sliding window. + + Returns: + float: The standard deviation of the values in the current sliding window. + Returns NaN if the window contains fewer than 2 elements. + """ with warnings.catch_warnings(record=False): warnings.simplefilter("ignore") # We do not use nanstd, since nanstd([]) gives 0, which is incorrect. @@ -44,12 +67,30 @@ def get(self): class IncStdevTracker(StdevTracker): + """Abstract base class for incremental standard deviation trackers. + + This class implements an online algorithm for calculating standard deviation, + updating the standard deviation incrementally as new data points arrive. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ def __init__(self, window_mode, **kwargs): super().__init__(window_mode, **kwargs) self._mean = 0 self._m2 = 0 def push(self, x): + """Pushes a new value and updates the incremental standard deviation + calculation. + + Implements Welford's online algorithm for variance, then derives stdev. + + Args: + x: The new value to be pushed. + """ if not math.isnan(x): self._n += 1 delta1 = x - self._mean @@ -80,6 +121,12 @@ def push(self, x): self._m2 = 0 def get(self): + """Returns the current incremental standard deviation. + + Returns: + float: The current incremental standard deviation value. + Returns NaN if fewer than 2 valid (non-NaN) values have been pushed. + """ if self._n < 2: # keep it consistent with numpy return float("nan") @@ -88,10 +135,17 @@ def get(self): class IncLandmarkStdevTracker(IncStdevTracker): + """Landmark window standard deviation tracker using incremental calculation.""" # pylint: disable=line-too-long + def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) class IncSlidingStdevTracker(IncStdevTracker): + """Sliding window standard deviation tracker using incremental calculation. + + Args: + window_size: The size of the sliding window. + """ def __init__(self, window_size): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) From 09d4cdcf507669aa78961542636b5470470eb9bf Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 14 Feb 2025 23:46:57 -0500 Subject: [PATCH 4/6] Fix lints --- sdks/python/apache_beam/ml/anomaly/univariate/mean.py | 2 +- sdks/python/apache_beam/ml/anomaly/univariate/quantile.py | 2 +- sdks/python/apache_beam/ml/anomaly/univariate/stdev.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py index eeb806e5d378..9085a95c5922 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -30,8 +30,8 @@ import numpy as np -from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode class MeanTracker(WindowedTracker): diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index cf7b94c320ad..67333db14379 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -35,8 +35,8 @@ import numpy as np from sortedcontainers import SortedList -from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode class QuantileTracker(WindowedTracker): diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py index 6868db07b3ac..f42302c8e37b 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -29,8 +29,8 @@ import numpy as np -from apache_beam.ml.anomaly.univariate.base import WindowMode from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode class StdevTracker(WindowedTracker): From de9416824770a29fb93ee2b9bc3d5066c9bde214 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 17 Feb 2025 22:16:21 -0500 Subject: [PATCH 5/6] Make trackers specifiable. Also includes minor fixes on Specifiable and univariate perf tests. --- sdks/python/apache_beam/ml/anomaly/specifiable.py | 5 +++-- sdks/python/apache_beam/ml/anomaly/univariate/mean.py | 4 ++++ sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py | 5 +++-- sdks/python/apache_beam/ml/anomaly/univariate/quantile.py | 4 ++++ sdks/python/apache_beam/ml/anomaly/univariate/stdev.py | 4 ++++ 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py b/sdks/python/apache_beam/ml/anomaly/specifiable.py index 1aedab2e8c21..e0122d41d9d5 100644 --- a/sdks/python/apache_beam/ml/anomaly/specifiable.py +++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py @@ -42,6 +42,7 @@ _ACCEPTED_SUBSPACES = [ "EnsembleAnomalyDetector", "AnomalyDetector", + "BaseTracker", "ThresholdFn", "AggregationFn", _FALLBACK_SUBSPACE, @@ -80,7 +81,7 @@ def _spec_type_to_subspace(spec_type: str) -> str: if spec_type in _KNOWN_SPECIFIABLE[subspace]: return subspace - raise ValueError(f"subspace for {str} not found.") + raise ValueError(f"subspace for {spec_type} not found.") @dataclasses.dataclass(frozen=True) @@ -309,7 +310,7 @@ def new_getattr(self, name): cls.run_original_init = run_original_init cls.to_spec = Specifiable.to_spec cls._to_spec_helper = staticmethod(Specifiable._to_spec_helper) - cls.from_spec = classmethod(Specifiable.from_spec) + cls.from_spec = Specifiable.from_spec cls._from_spec_helper = staticmethod(Specifiable._from_spec_helper) return cls # end of the function body of _wrapper diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py index 9085a95c5922..39a8b01f575a 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -30,6 +30,7 @@ import numpy as np +from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode @@ -43,6 +44,7 @@ class MeanTracker(WindowedTracker): pass +@specifiable class SimpleSlidingMeanTracker(MeanTracker): """Sliding window mean tracker that calculates mean using NumPy. @@ -125,12 +127,14 @@ def get(self): return self._mean +@specifiable class IncLandmarkMeanTracker(IncMeanTracker): """Landmark window mean tracker using incremental calculation.""" def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) +@specifiable class IncSlidingMeanTracker(IncMeanTracker): """Sliding window mean tracker using incremental calculation. diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py index b7324aa94f70..61067ef38dab 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py @@ -29,7 +29,7 @@ seed_value_time = int(time.time()) random.seed(seed_value_time) -print(f"{'Seed value':30s}{seed_value_time}") +print(f"{'Seed value':32s}{seed_value_time}") numbers = [] for _ in range(50000): @@ -47,7 +47,7 @@ def print_result(tracker, number=10, repeat=5): lambda: run_tracker(tracker, numbers), number=number, repeat=repeat) mean = statistics.mean(runtimes) sd = statistics.stdev(runtimes) - print(f"{tracker.__class__.__name__:30s}{mean:.6f} ± {sd:.6f}") + print(f"{tracker.__class__.__name__:32s}{mean:.6f} ± {sd:.6f}") class PerfTest(unittest.TestCase): @@ -65,6 +65,7 @@ def test_stdev_perf(self): print_result(IncLandmarkStdevTracker()) print_result(IncSlidingStdevTracker(100)) # Same as test_mean_perf, we reduce the number of repetitions here. + print_result(SimpleSlidingStdevTracker(100), number=1) def test_quantile_perf(self): print() diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index 67333db14379..eaa122f3ca28 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -35,6 +35,7 @@ import numpy as np from sortedcontainers import SortedList +from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode @@ -48,6 +49,7 @@ class QuantileTracker(WindowedTracker): pass +@specifiable class SimpleSlidingQuantileTracker(QuantileTracker): """Sliding window quantile tracker using NumPy. @@ -135,6 +137,7 @@ def get(self): return lo_value + (hi_value - lo_value) * (pos - lo) +@specifiable class BufferedLandmarkQuantileTracker(BufferedQuantileTracker): """Landmark quantile tracker using a sorted list for quantile calculation. @@ -153,6 +156,7 @@ def __init__(self, q): super().__init__(window_mode=WindowMode.LANDMARK, q=q) +@specifiable class BufferedSlidingQuantileTracker(BufferedQuantileTracker): """Sliding window quantile tracker using a sorted list for quantile calculation. diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py index f42302c8e37b..f6dc8e4ecbf0 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -29,6 +29,7 @@ import numpy as np +from apache_beam.ml.anomaly.specifiable import specifiable from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode @@ -42,6 +43,7 @@ class StdevTracker(WindowedTracker): pass +@specifiable class SimpleSlidingStdevTracker(StdevTracker): """Sliding window standard deviation tracker using NumPy. @@ -134,6 +136,7 @@ def get(self): return math.sqrt(self._m2 / dof) +@specifiable class IncLandmarkStdevTracker(IncStdevTracker): """Landmark window standard deviation tracker using incremental calculation.""" # pylint: disable=line-too-long @@ -141,6 +144,7 @@ def __init__(self): super().__init__(window_mode=WindowMode.LANDMARK) +@specifiable class IncSlidingStdevTracker(IncStdevTracker): """Sliding window standard deviation tracker using incremental calculation. From 5b3e643d3faffc719b02e4138c9fd51b3ad87d86 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 18 Feb 2025 14:13:01 -0500 Subject: [PATCH 6/6] Adjust class structures in trackers. Minor fix per feedback. --- .../apache_beam/ml/anomaly/univariate/mean.py | 7 ++++--- .../ml/anomaly/univariate/quantile.py | 17 +++++++++-------- .../apache_beam/ml/anomaly/univariate/stdev.py | 7 ++++--- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py index 39a8b01f575a..9aec5098bfd4 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/mean.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/mean.py @@ -31,11 +31,12 @@ import numpy as np from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode -class MeanTracker(WindowedTracker): +class MeanTracker(BaseTracker): """Abstract base class for mean trackers. Currently, it does not add any specific functionality but provides a type @@ -45,7 +46,7 @@ class MeanTracker(WindowedTracker): @specifiable -class SimpleSlidingMeanTracker(MeanTracker): +class SimpleSlidingMeanTracker(WindowedTracker, MeanTracker): """Sliding window mean tracker that calculates mean using NumPy. This tracker uses NumPy's `nanmean` function to calculate the mean of the @@ -73,7 +74,7 @@ def get(self): return np.nanmean(self._queue) -class IncMeanTracker(MeanTracker): +class IncMeanTracker(WindowedTracker, MeanTracker): """Base class for incremental mean trackers. This class implements incremental calculation of the mean, which is more diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py index eaa122f3ca28..44d2b1ab2448 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/quantile.py @@ -36,21 +36,24 @@ from sortedcontainers import SortedList from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode -class QuantileTracker(WindowedTracker): +class QuantileTracker(BaseTracker): """Abstract base class for quantile trackers. Currently, it does not add any specific functionality but provides a type hierarchy for quantile trackers. """ - pass + def __init__(self, q): + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q @specifiable -class SimpleSlidingQuantileTracker(QuantileTracker): +class SimpleSlidingQuantileTracker(WindowedTracker, QuantileTracker): """Sliding window quantile tracker using NumPy. This tracker uses NumPy's `nanquantile` function to calculate the specified @@ -63,8 +66,7 @@ class SimpleSlidingQuantileTracker(QuantileTracker): """ def __init__(self, window_size, q): super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) - assert 0 <= q <= 1, "quantile argument should be between 0 and 1" - self._q = q + QuantileTracker.__init__(self, q) def get(self): """Calculates and returns the specified quantile of the current sliding @@ -79,7 +81,7 @@ def get(self): return np.nanquantile(self._queue, self._q) -class BufferedQuantileTracker(WindowedTracker): +class BufferedQuantileTracker(WindowedTracker, QuantileTracker): """Abstract base class for buffered quantile trackers. Warning: @@ -95,8 +97,7 @@ class BufferedQuantileTracker(WindowedTracker): """ def __init__(self, window_mode, q, **kwargs): super().__init__(window_mode, **kwargs) - assert 0 <= q <= 1, "quantile argument should be between 0 and 1" - self._q = q + QuantileTracker.__init__(self, q) self._sorted_items = SortedList() def push(self, x): diff --git a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py index f6dc8e4ecbf0..cbd5d9f26ebc 100644 --- a/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py +++ b/sdks/python/apache_beam/ml/anomaly/univariate/stdev.py @@ -30,11 +30,12 @@ import numpy as np from apache_beam.ml.anomaly.specifiable import specifiable +from apache_beam.ml.anomaly.univariate.base import BaseTracker from apache_beam.ml.anomaly.univariate.base import WindowedTracker from apache_beam.ml.anomaly.univariate.base import WindowMode -class StdevTracker(WindowedTracker): +class StdevTracker(BaseTracker): """Abstract base class for standard deviation trackers. Currently, it does not add any specific functionality but provides a type @@ -44,7 +45,7 @@ class StdevTracker(WindowedTracker): @specifiable -class SimpleSlidingStdevTracker(StdevTracker): +class SimpleSlidingStdevTracker(WindowedTracker, StdevTracker): """Sliding window standard deviation tracker using NumPy. This tracker uses NumPy's `nanvar` function to calculate the variance of the @@ -68,7 +69,7 @@ def get(self): return math.sqrt(np.nanvar(self._queue, ddof=1)) -class IncStdevTracker(StdevTracker): +class IncStdevTracker(WindowedTracker, StdevTracker): """Abstract base class for incremental standard deviation trackers. This class implements an online algorithm for calculating standard deviation,