diff --git a/packages/gds-psuu/gds_psuu/__init__.py b/packages/gds-psuu/gds_psuu/__init__.py index 820ce45..f93f9c7 100644 --- a/packages/gds-psuu/gds_psuu/__init__.py +++ b/packages/gds-psuu/gds_psuu/__init__.py @@ -11,6 +11,12 @@ from gds_psuu.optimizers.grid import GridSearchOptimizer from gds_psuu.optimizers.random import RandomSearchOptimizer from gds_psuu.results import EvaluationSummary, SweepResults +from gds_psuu.sensitivity import ( + Analyzer, + MorrisAnalyzer, + OATAnalyzer, + SensitivityResult, +) from gds_psuu.space import ( Constraint, Continuous, @@ -24,6 +30,7 @@ from gds_psuu.types import KPIFn, KPIScores, ParamPoint __all__ = [ + "Analyzer", "BayesianOptimizer", "Constraint", "Continuous", @@ -38,6 +45,8 @@ "KPIFn", "KPIScores", "LinearConstraint", + "MorrisAnalyzer", + "OATAnalyzer", "Objective", "Optimizer", "ParamPoint", @@ -46,6 +55,7 @@ "PsuuSearchError", "PsuuValidationError", "RandomSearchOptimizer", + "SensitivityResult", "SingleKPI", "Sweep", "SweepResults", diff --git a/packages/gds-psuu/gds_psuu/sensitivity/__init__.py b/packages/gds-psuu/gds_psuu/sensitivity/__init__.py new file mode 100644 index 0000000..e4d2149 --- /dev/null +++ b/packages/gds-psuu/gds_psuu/sensitivity/__init__.py @@ -0,0 +1,12 @@ +"""Sensitivity analysis framework for gds-psuu.""" + +from gds_psuu.sensitivity.base import Analyzer, SensitivityResult +from gds_psuu.sensitivity.morris import MorrisAnalyzer +from gds_psuu.sensitivity.oat import OATAnalyzer + +__all__ = [ + "Analyzer", + "MorrisAnalyzer", + "OATAnalyzer", + "SensitivityResult", +] diff --git a/packages/gds-psuu/gds_psuu/sensitivity/base.py b/packages/gds-psuu/gds_psuu/sensitivity/base.py new file mode 100644 index 0000000..501ea50 --- /dev/null +++ b/packages/gds-psuu/gds_psuu/sensitivity/base.py @@ -0,0 +1,57 @@ +"""Analyzer ABC and SensitivityResult.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +from pydantic import BaseModel, ConfigDict + +if TYPE_CHECKING: + from gds_psuu.evaluation import Evaluator + from gds_psuu.space import ParameterSpace + + +class SensitivityResult(BaseModel): + """Per-KPI, per-parameter sensitivity indices.""" + + model_config = ConfigDict(frozen=True) + + indices: dict[str, dict[str, dict[str, float]]] + """kpi_name -> param_name -> {metric_name: value}.""" + method: str + + def ranking(self, kpi: str, *, metric: str = "mean_effect") -> list[str]: + """Rank parameters by a metric for a given KPI, descending.""" + kpi_indices = self.indices[kpi] + return sorted( + kpi_indices, + key=lambda p: abs(kpi_indices[p][metric]), + reverse=True, + ) + + def to_dataframe(self) -> Any: + """Convert to pandas DataFrame. Requires ``pandas`` installed.""" + try: + import pandas as pd # type: ignore[import-untyped] + except ImportError as exc: # pragma: no cover + raise ImportError( + "pandas is required for to_dataframe(). " + "Install with: uv add gds-psuu[pandas]" + ) from exc + + rows: list[dict[str, Any]] = [] + for kpi, params in self.indices.items(): + for param, metrics in params.items(): + row: dict[str, Any] = {"kpi": kpi, "param": param} + row.update(metrics) + rows.append(row) + return pd.DataFrame(rows) + + +class Analyzer(ABC): + """Base class for sensitivity analyzers.""" + + @abstractmethod + def analyze(self, evaluator: Evaluator, space: ParameterSpace) -> SensitivityResult: + """Run sensitivity analysis and return results.""" diff --git a/packages/gds-psuu/gds_psuu/sensitivity/morris.py b/packages/gds-psuu/gds_psuu/sensitivity/morris.py new file mode 100644 index 0000000..b6f8ccc --- /dev/null +++ b/packages/gds-psuu/gds_psuu/sensitivity/morris.py @@ -0,0 +1,121 @@ +"""Morris method (elementary effects) sensitivity analyzer.""" + +from __future__ import annotations + +import random +from typing import TYPE_CHECKING + +from gds_psuu.sensitivity.base import Analyzer, SensitivityResult +from gds_psuu.space import Continuous, Discrete, Integer + +if TYPE_CHECKING: + from gds_psuu.evaluation import Evaluator + from gds_psuu.space import ParameterSpace + from gds_psuu.types import ParamPoint + + +class MorrisAnalyzer(Analyzer): + """Morris screening method for sensitivity analysis. + + Generates ``r`` random trajectories through the parameter space, + each with ``k+1`` points (k = number of parameters). Computes + elementary effects per parameter: + + - ``mu_star``: mean of absolute elementary effects (influence) + - ``sigma``: std of elementary effects (nonlinearity / interactions) + """ + + def __init__(self, r: int = 10, n_levels: int = 4, seed: int | None = None) -> None: + self._r = r + self._n_levels = n_levels + self._rng = random.Random(seed) + + def analyze(self, evaluator: Evaluator, space: ParameterSpace) -> SensitivityResult: + param_names = space.dimension_names + k = len(param_names) + + # Precompute level values for each parameter + level_values: dict[str, list[object]] = {} + for name, dim in space.params.items(): + level_values[name] = _get_levels(dim, self._n_levels) + + # Collect elementary effects per parameter per KPI + kpi_names: list[str] | None = None + effects: dict[str, dict[str, list[float]]] = {} + + for _ in range(self._r): + # Generate random starting point from levels + base_point: ParamPoint = { + name: self._rng.choice(level_values[name]) for name in param_names + } + base_result = evaluator.evaluate(base_point) + + if kpi_names is None: + kpi_names = list(base_result.scores.keys()) + effects = {kpi: {p: [] for p in param_names} for kpi in kpi_names} + + # Permute parameter order for this trajectory + order = list(range(k)) + self._rng.shuffle(order) + + current_point: ParamPoint = dict(base_point) + current_scores = dict(base_result.scores) + + for idx in order: + name = param_names[idx] + levels = level_values[name] + old_val = current_point[name] + + # Pick a different level + candidates = [v for v in levels if v != old_val] + if not candidates: + continue + new_val = self._rng.choice(candidates) + + next_point: ParamPoint = dict(current_point) + next_point[name] = new_val + next_result = evaluator.evaluate(next_point) + + for kpi in kpi_names: + ee = next_result.scores[kpi] - current_scores[kpi] + effects[kpi][name].append(ee) + + current_point = next_point + current_scores = dict(next_result.scores) + + assert kpi_names is not None + indices: dict[str, dict[str, dict[str, float]]] = {} + for kpi in kpi_names: + indices[kpi] = {} + for param in param_names: + effs = effects[kpi][param] + n = len(effs) + if n == 0: + indices[kpi][param] = {"mu_star": 0.0, "sigma": 0.0} + continue + mu_star = sum(abs(e) for e in effs) / n + mean = sum(effs) / n + variance = ( + sum((e - mean) ** 2 for e in effs) / (n - 1) if n > 1 else 0.0 + ) + sigma = variance**0.5 + indices[kpi][param] = { + "mu_star": mu_star, + "sigma": sigma, + } + + return SensitivityResult(indices=indices, method="Morris") + + +def _get_levels(dim: Continuous | Integer | Discrete, n_levels: int) -> list[object]: + """Generate evenly-spaced levels for a dimension.""" + if isinstance(dim, Continuous): + if n_levels < 2: + return [dim.min_val] + step = (dim.max_val - dim.min_val) / (n_levels - 1) + return [dim.min_val + i * step for i in range(n_levels)] + elif isinstance(dim, Integer): + return list(range(dim.min_val, dim.max_val + 1)) + elif isinstance(dim, Discrete): + return list(dim.values) + return [] # pragma: no cover diff --git a/packages/gds-psuu/gds_psuu/sensitivity/oat.py b/packages/gds-psuu/gds_psuu/sensitivity/oat.py new file mode 100644 index 0000000..c19f880 --- /dev/null +++ b/packages/gds-psuu/gds_psuu/sensitivity/oat.py @@ -0,0 +1,85 @@ +"""One-at-a-time (OAT) sensitivity analyzer.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from gds_psuu.sensitivity.base import Analyzer, SensitivityResult +from gds_psuu.space import Continuous, Discrete, Integer + +if TYPE_CHECKING: + from gds_psuu.evaluation import Evaluator + from gds_psuu.space import ParameterSpace + from gds_psuu.types import ParamPoint + + +class OATAnalyzer(Analyzer): + """One-at-a-time sensitivity analysis. + + Varies each parameter independently while holding others at baseline + (midpoint for Continuous/Integer, first value for Discrete). + """ + + def __init__(self, n_levels: int = 4) -> None: + self._n_levels = n_levels + + def analyze(self, evaluator: Evaluator, space: ParameterSpace) -> SensitivityResult: + baseline = _compute_baseline(space) + baseline_result = evaluator.evaluate(baseline) + baseline_scores = baseline_result.scores + + kpi_names = list(baseline_scores.keys()) + indices: dict[str, dict[str, dict[str, float]]] = {kpi: {} for kpi in kpi_names} + + for param_name, dim in space.params.items(): + test_values = _get_test_values(dim, self._n_levels) + effects: dict[str, list[float]] = {kpi: [] for kpi in kpi_names} + + for val in test_values: + point: ParamPoint = dict(baseline) + point[param_name] = val + result = evaluator.evaluate(point) + for kpi in kpi_names: + effects[kpi].append(result.scores[kpi] - baseline_scores[kpi]) + + for kpi in kpi_names: + effs = effects[kpi] + n = len(effs) + mean_effect = sum(abs(e) for e in effs) / n if n else 0.0 + base_val = baseline_scores[kpi] + relative = mean_effect / abs(base_val) if base_val != 0 else 0.0 + indices[kpi][param_name] = { + "mean_effect": mean_effect, + "relative_effect": relative, + } + + return SensitivityResult(indices=indices, method="OAT") + + +def _compute_baseline(space: ParameterSpace) -> ParamPoint: + """Compute baseline point: midpoints for numeric, first for discrete.""" + baseline: ParamPoint = {} + for name, dim in space.params.items(): + if isinstance(dim, Continuous): + baseline[name] = (dim.min_val + dim.max_val) / 2.0 + elif isinstance(dim, Integer): + baseline[name] = (dim.min_val + dim.max_val) // 2 + elif isinstance(dim, Discrete): + baseline[name] = dim.values[0] + return baseline + + +def _get_test_values( + dim: Continuous | Integer | Discrete, n_levels: int +) -> list[object]: + """Generate test values for a dimension.""" + if isinstance(dim, Continuous): + if n_levels < 2: + return [dim.min_val] + step = (dim.max_val - dim.min_val) / (n_levels - 1) + return [dim.min_val + i * step for i in range(n_levels)] + elif isinstance(dim, Integer): + return list(range(dim.min_val, dim.max_val + 1)) + elif isinstance(dim, Discrete): + return list(dim.values) + return [] # pragma: no cover diff --git a/packages/gds-psuu/tests/test_sensitivity.py b/packages/gds-psuu/tests/test_sensitivity.py new file mode 100644 index 0000000..0f8fd69 --- /dev/null +++ b/packages/gds-psuu/tests/test_sensitivity.py @@ -0,0 +1,257 @@ +"""Tests for sensitivity analysis framework.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from gds_psuu import ( + KPI, + Analyzer, + Continuous, + Discrete, + Evaluator, + Integer, + MorrisAnalyzer, + OATAnalyzer, + ParameterSpace, + SensitivityResult, + final_state_mean, +) + +if TYPE_CHECKING: + from gds_sim import Model + + +class TestAnalyzerABC: + def test_is_abstract(self) -> None: + with pytest.raises(TypeError): + Analyzer() # type: ignore[abstract] + + +class TestSensitivityResult: + def test_ranking(self) -> None: + result = SensitivityResult( + indices={ + "kpi_a": { + "x": {"mean_effect": 10.0}, + "y": {"mean_effect": 50.0}, + "z": {"mean_effect": 30.0}, + } + }, + method="test", + ) + ranking = result.ranking("kpi_a") + assert ranking == ["y", "z", "x"] + + def test_ranking_custom_metric(self) -> None: + result = SensitivityResult( + indices={ + "kpi": { + "a": {"mu_star": 5.0, "sigma": 20.0}, + "b": {"mu_star": 15.0, "sigma": 2.0}, + } + }, + method="test", + ) + ranking = result.ranking("kpi", metric="sigma") + assert ranking == ["a", "b"] + + def test_to_dataframe(self) -> None: + pytest.importorskip("pandas") + result = SensitivityResult( + indices={ + "kpi": { + "x": {"mean_effect": 10.0, "relative_effect": 0.5}, + "y": {"mean_effect": 20.0, "relative_effect": 1.0}, + } + }, + method="OAT", + ) + df = result.to_dataframe() + assert len(df) == 2 + assert "kpi" in df.columns + assert "param" in df.columns + assert "mean_effect" in df.columns + + +class TestOATAnalyzer: + def test_oat_basic(self, simple_model: Model) -> None: + space = ParameterSpace( + params={"growth_rate": Continuous(min_val=0.01, max_val=0.1)} + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=10, + runs=1, + ) + analyzer = OATAnalyzer(n_levels=4) + result = analyzer.analyze(evaluator, space) + + assert result.method == "OAT" + assert "final_pop" in result.indices + assert "growth_rate" in result.indices["final_pop"] + metrics = result.indices["final_pop"]["growth_rate"] + assert "mean_effect" in metrics + assert "relative_effect" in metrics + assert metrics["mean_effect"] > 0 # growth rate matters + + def test_oat_multi_param(self, simple_model: Model) -> None: + space = ParameterSpace( + params={ + "growth_rate": Continuous(min_val=0.01, max_val=0.1), + "label": Discrete(values=("A", "B")), + } + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=10, + runs=1, + ) + analyzer = OATAnalyzer(n_levels=3) + result = analyzer.analyze(evaluator, space) + + assert "growth_rate" in result.indices["final_pop"] + assert "label" in result.indices["final_pop"] + # growth_rate should be more influential than label + ranking = result.ranking("final_pop") + assert ranking[0] == "growth_rate" + + def test_oat_integer_dim(self, simple_model: Model) -> None: + space = ParameterSpace(params={"growth_rate": Integer(min_val=1, max_val=3)}) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=5, + runs=1, + ) + analyzer = OATAnalyzer(n_levels=3) + result = analyzer.analyze(evaluator, space) + assert "growth_rate" in result.indices["final_pop"] + + def test_oat_single_level(self, simple_model: Model) -> None: + space = ParameterSpace( + params={"growth_rate": Continuous(min_val=0.01, max_val=0.1)} + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=5, + runs=1, + ) + analyzer = OATAnalyzer(n_levels=1) + result = analyzer.analyze(evaluator, space) + assert "growth_rate" in result.indices["final_pop"] + + +class TestMorrisAnalyzer: + def test_morris_basic(self, simple_model: Model) -> None: + space = ParameterSpace( + params={"growth_rate": Continuous(min_val=0.01, max_val=0.1)} + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=10, + runs=1, + ) + analyzer = MorrisAnalyzer(r=5, n_levels=4, seed=42) + result = analyzer.analyze(evaluator, space) + + assert result.method == "Morris" + assert "final_pop" in result.indices + metrics = result.indices["final_pop"]["growth_rate"] + assert "mu_star" in metrics + assert "sigma" in metrics + assert metrics["mu_star"] > 0 + + def test_morris_multi_param(self, simple_model: Model) -> None: + space = ParameterSpace( + params={ + "growth_rate": Continuous(min_val=0.01, max_val=0.1), + "label": Discrete(values=("A", "B")), + } + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=10, + runs=1, + ) + analyzer = MorrisAnalyzer(r=8, n_levels=4, seed=42) + result = analyzer.analyze(evaluator, space) + + ranking = result.ranking("final_pop", metric="mu_star") + assert ranking[0] == "growth_rate" + + def test_morris_reproducible(self, simple_model: Model) -> None: + space = ParameterSpace( + params={"growth_rate": Continuous(min_val=0.01, max_val=0.1)} + ) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=5, + runs=1, + ) + r1 = MorrisAnalyzer(r=5, seed=99).analyze(evaluator, space) + r2 = MorrisAnalyzer(r=5, seed=99).analyze(evaluator, space) + assert ( + r1.indices["final_pop"]["growth_rate"]["mu_star"] + == r2.indices["final_pop"]["growth_rate"]["mu_star"] + ) + + def test_morris_discrete_only(self, simple_model: Model) -> None: + space = ParameterSpace(params={"strategy": Discrete(values=("A", "B", "C"))}) + evaluator = Evaluator( + base_model=simple_model, + kpis=[ + KPI( + name="final_pop", + fn=lambda r: final_state_mean(r, "population"), + ) + ], + timesteps=5, + runs=1, + ) + analyzer = MorrisAnalyzer(r=5, n_levels=3, seed=42) + result = analyzer.analyze(evaluator, space) + assert "strategy" in result.indices["final_pop"]