From e07e824eca268d85469031a4a961673379f43103 Mon Sep 17 00:00:00 2001 From: rohanrog Date: Wed, 4 Mar 2026 16:09:06 +0530 Subject: [PATCH] test: expand gds-sim test coverage for error paths and edge cases Add 42 new tests covering _positional_count edge cases, adapt_policy/ adapt_suf cadCAD wrapping, model validation errors, and single-block models. Coverage up from 44 to 86 tests (97%). Also fix lint/format issues from merged PRs. Closes #87 --- .../gds_business/supplychain/model.py | 6 +- .../gds/verification/generic_checks.py | 5 +- .../gds-sim/tests/test_compat_edge_cases.py | 239 +++++++++++++++ .../gds-sim/tests/test_model_edge_cases.py | 285 ++++++++++++++++++ 4 files changed, 527 insertions(+), 8 deletions(-) create mode 100644 packages/gds-sim/tests/test_compat_edge_cases.py create mode 100644 packages/gds-sim/tests/test_model_edge_cases.py diff --git a/packages/gds-business/gds_business/supplychain/model.py b/packages/gds-business/gds_business/supplychain/model.py index 5614da6..5e1329c 100644 --- a/packages/gds-business/gds_business/supplychain/model.py +++ b/packages/gds-business/gds_business/supplychain/model.py @@ -64,13 +64,11 @@ def _validate_structure(self) -> Self: for s in self.shipments: if s.source not in node_names: errors.append( - f"Shipment {s.name!r} source {s.source!r} " - f"is not a declared node" + f"Shipment {s.name!r} source {s.source!r} is not a declared node" ) if s.target not in node_names: errors.append( - f"Shipment {s.name!r} target {s.target!r} " - f"is not a declared node" + f"Shipment {s.name!r} target {s.target!r} is not a declared node" ) # 4. Demand target references a declared node diff --git a/packages/gds-framework/gds/verification/generic_checks.py b/packages/gds-framework/gds/verification/generic_checks.py index 08ad623..f616a0c 100644 --- a/packages/gds-framework/gds/verification/generic_checks.py +++ b/packages/gds-framework/gds/verification/generic_checks.py @@ -78,10 +78,7 @@ def check_g002_signature_completeness(system: SystemIR) -> list[Finding]: # BoundaryAction blocks have no inputs by design — only check outputs is_boundary = block.block_type == "boundary" - if is_boundary: - has_required = has_output - else: - has_required = has_input and has_output + has_required = has_output if is_boundary else has_input and has_output missing = [] if not has_input: diff --git a/packages/gds-sim/tests/test_compat_edge_cases.py b/packages/gds-sim/tests/test_compat_edge_cases.py new file mode 100644 index 0000000..c233d13 --- /dev/null +++ b/packages/gds-sim/tests/test_compat_edge_cases.py @@ -0,0 +1,239 @@ +"""Tests for _positional_count edge cases and compat adaptation edge cases.""" + +from __future__ import annotations + +import functools +from typing import Any + +from gds_sim.compat import _positional_count, adapt_policy, adapt_suf + +# -- _positional_count edge cases ------------------------------------------ + + +class TestPositionalCountEdgeCases: + def test_lambda_no_args(self) -> None: + assert _positional_count(lambda: None) == 0 + + def test_lambda_one_arg(self) -> None: + assert _positional_count(lambda x: x) == 1 + + def test_lambda_two_args(self) -> None: + assert _positional_count(lambda x, y: x + y) == 2 + + def test_lambda_with_kwargs_not_counted(self) -> None: + """**kw should not count as positional.""" + assert _positional_count(lambda x, y, **kw: x) == 2 + + def test_lambda_with_default_still_counted(self) -> None: + """Args with defaults are still POSITIONAL_OR_KEYWORD.""" + assert _positional_count(lambda x, y=1: x) == 2 + + def test_builtin_returns_zero(self) -> None: + """Built-in functions cannot be inspected -- should return 0, not crash.""" + assert _positional_count(len) >= 0 # builtins may or may not be inspectable + + def test_partial_reduces_count(self) -> None: + """functools.partial with one positional arg bound.""" + + def f(a: int, b: int, c: int) -> int: + return a + b + c + + p = functools.partial(f, 1) + assert _positional_count(p) == 2 # b, c remain + + def test_partial_all_bound(self) -> None: + def f(a: int, b: int) -> int: + return a + b + + p = functools.partial(f, 1, 2) + assert _positional_count(p) == 0 + + def test_class_callable(self) -> None: + """A class with __call__ -- inspect.signature strips self for instances.""" + + class Adder: + def __call__(self, x: int, y: int) -> int: + return x + y + + assert _positional_count(Adder()) == 2 + + def test_keyword_only_not_counted(self) -> None: + """Keyword-only params (after *) should not be counted.""" + + def f(a: int, *, b: int, c: int) -> int: + return a + b + c + + assert _positional_count(f) == 1 + + def test_none_returns_zero(self) -> None: + """Non-callable should return 0, not crash.""" + assert _positional_count(None) == 0 # type: ignore[arg-type] + + def test_string_returns_zero(self) -> None: + """Non-callable should return 0, not crash.""" + assert _positional_count("not a function") == 0 # type: ignore[arg-type] + + def test_var_positional_not_counted(self) -> None: + """*args should not add to the positional count.""" + + def f(a: int, *args: int) -> int: + return a + sum(args) + + assert _positional_count(f) == 1 + + +# -- adapt_policy edge cases ----------------------------------------------- + + +class TestAdaptPolicyEdgeCases: + def test_three_arg_passes_through(self) -> None: + """Non-4-arg functions should pass through unchanged.""" + + def three_arg(a: Any, b: Any, c: Any) -> dict[str, Any]: + return {"x": 1} + + adapted = adapt_policy(three_arg) + assert adapted is three_arg + + def test_one_arg_passes_through(self) -> None: + def one_arg(state: Any) -> dict[str, Any]: + return {} + + adapted = adapt_policy(one_arg) + assert adapted is one_arg + + def test_zero_arg_passes_through(self) -> None: + def zero_arg() -> dict[str, Any]: + return {} + + adapted = adapt_policy(zero_arg) + assert adapted is zero_arg + + def test_cadcad_policy_forwards_substep_kwarg(self) -> None: + """Wrapped cadCAD policy should forward substep from **kw.""" + received: dict[str, Any] = {} + + def cadcad_policy( + params: Any, substep: int, history: list[Any], state: Any + ) -> dict[str, Any]: + received["substep"] = substep + received["state"] = state + received["params"] = params + return {} + + adapted = adapt_policy(cadcad_policy) + adapted({"x": 1}, {"rate": 2}, substep=7, timestep=3) + assert received["substep"] == 7 + assert received["state"] == {"x": 1} + assert received["params"] == {"rate": 2} + + def test_cadcad_policy_default_substep_zero(self) -> None: + """If substep not in kw, defaults to 0.""" + received: dict[str, Any] = {} + + def cadcad_policy( + params: Any, substep: int, history: list[Any], state: Any + ) -> dict[str, Any]: + received["substep"] = substep + return {} + + adapted = adapt_policy(cadcad_policy) + adapted({"x": 1}, {}, timestep=1) + assert received["substep"] == 0 + + def test_cadcad_policy_receives_empty_history(self) -> None: + """Wrapped cadCAD policy always gets [] for state_history.""" + received_history: list[Any] = [None] # sentinel + + def cadcad_policy( + params: Any, substep: int, history: list[Any], state: Any + ) -> dict[str, Any]: + received_history[0] = history + return {} + + adapted = adapt_policy(cadcad_policy) + adapted({}, {}) + assert received_history[0] == [] + + +# -- adapt_suf edge cases -------------------------------------------------- + + +class TestAdaptSufEdgeCases: + def test_three_arg_passes_through(self) -> None: + def three_arg(a: Any, b: Any, c: Any) -> tuple[str, Any]: + return "x", 1 + + adapted = adapt_suf(three_arg) + assert adapted is three_arg + + def test_cadcad_suf_forwards_substep_kwarg(self) -> None: + received: dict[str, Any] = {} + + def cadcad_suf( + params: Any, + substep: int, + history: list[Any], + state: Any, + policy_input: Any, + ) -> tuple[str, Any]: + received["substep"] = substep + received["policy_input"] = policy_input + return "x", 1 + + adapted = adapt_suf(cadcad_suf) + adapted({"x": 0}, {}, signal={"delta": 5}, substep=3) + assert received["substep"] == 3 + assert received["policy_input"] == {"delta": 5} + + def test_cadcad_suf_none_signal_becomes_empty_dict(self) -> None: + """When signal is None, cadCAD wrapper should pass {} as policy_input.""" + received: dict[str, Any] = {} + + def cadcad_suf( + params: Any, + substep: int, + history: list[Any], + state: Any, + policy_input: Any, + ) -> tuple[str, Any]: + received["policy_input"] = policy_input + return "x", 1 + + adapted = adapt_suf(cadcad_suf) + adapted({"x": 0}, {}, signal=None) + assert received["policy_input"] == {} + + def test_cadcad_suf_default_substep_zero(self) -> None: + received: dict[str, Any] = {} + + def cadcad_suf( + params: Any, + substep: int, + history: list[Any], + state: Any, + policy_input: Any, + ) -> tuple[str, Any]: + received["substep"] = substep + return "x", 1 + + adapted = adapt_suf(cadcad_suf) + adapted({"x": 0}, {}) + assert received["substep"] == 0 + + def test_cadcad_suf_receives_empty_history(self) -> None: + received_history: list[Any] = [None] + + def cadcad_suf( + params: Any, + substep: int, + history: list[Any], + state: Any, + policy_input: Any, + ) -> tuple[str, Any]: + received_history[0] = history + return "x", 1 + + adapted = adapt_suf(cadcad_suf) + adapted({"x": 0}, {}) + assert received_history[0] == [] diff --git a/packages/gds-sim/tests/test_model_edge_cases.py b/packages/gds-sim/tests/test_model_edge_cases.py new file mode 100644 index 0000000..c133c40 --- /dev/null +++ b/packages/gds-sim/tests/test_model_edge_cases.py @@ -0,0 +1,285 @@ +"""Tests for Model/Simulation/Experiment error paths and edge cases.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +import gds_sim + + +def _noop_suf( + state: dict[str, Any], params: dict[str, Any], **kw: Any +) -> tuple[str, Any]: + return "x", state["x"] + + +def _increment_suf( + state: dict[str, Any], params: dict[str, Any], **kw: Any +) -> tuple[str, Any]: + return "x", state["x"] + 1 + + +class TestModelErrorPaths: + def test_suf_references_nonexistent_key(self) -> None: + """SUF referencing a key not in initial_state should raise ValueError.""" + with pytest.raises(ValueError, match="not found in initial_state"): + gds_sim.Model( + initial_state={"x": 1}, + state_update_blocks=[ + {"policies": {}, "variables": {"nonexistent": _noop_suf}} + ], + ) + + def test_suf_references_nonexistent_key_in_second_block(self) -> None: + """Error message should identify block index.""" + with pytest.raises(ValueError, match="State update block 1"): + gds_sim.Model( + initial_state={"x": 1}, + state_update_blocks=[ + {"policies": {}, "variables": {"x": _noop_suf}}, + {"policies": {}, "variables": {"missing": _noop_suf}}, + ], + ) + + def test_error_message_includes_available_keys(self) -> None: + """Error message should list available keys.""" + with pytest.raises(ValueError, match=r"Available keys.*x"): + gds_sim.Model( + initial_state={"x": 1}, + state_update_blocks=[{"policies": {}, "variables": {"y": _noop_suf}}], + ) + + +class TestModelEdgeCases: + def test_single_block_model(self) -> None: + """Model with a single block and single variable.""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + sim = gds_sim.Simulation(model=model, timesteps=3) + rows = sim.run().to_list() + # 1 block = 1 substep per timestep, plus initial row = 1 + 3 = 4 + assert len(rows) == 4 + assert rows[-1]["x"] == 3 + + def test_empty_params_gives_single_empty_subset(self) -> None: + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + params={}, + ) + assert model._param_subsets == [{}] + + def test_single_param_value_gives_single_subset(self) -> None: + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + params={"alpha": [0.5]}, + ) + assert len(model._param_subsets) == 1 + assert model._param_subsets[0] == {"alpha": 0.5} + + def test_three_way_param_sweep(self) -> None: + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + params={"a": [1, 2], "b": [10, 20], "c": [100]}, + ) + # 2 x 2 x 1 = 4 subsets + assert len(model._param_subsets) == 4 + + def test_dict_blocks_coerced_to_state_update_block(self) -> None: + """Plain dicts should be coerced to StateUpdateBlock instances.""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + ) + assert isinstance(model.state_update_blocks[0], gds_sim.StateUpdateBlock) + + def test_state_update_block_already_typed(self) -> None: + """Pre-constructed StateUpdateBlock instances should pass through.""" + block = gds_sim.StateUpdateBlock(variables={"x": _noop_suf}) + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[block], + ) + assert len(model.state_update_blocks) == 1 + + def test_multiple_state_variables_single_block(self) -> None: + """Block updating multiple state variables.""" + + def suf_y( + state: dict[str, Any], params: dict[str, Any], **kw: Any + ) -> tuple[str, Any]: + return "y", state["y"] * 2 + + model = gds_sim.Model( + initial_state={"x": 0, "y": 1.0}, + state_update_blocks=[ + {"policies": {}, "variables": {"x": _increment_suf, "y": suf_y}} + ], + ) + sim = gds_sim.Simulation(model=model, timesteps=3) + rows = sim.run().to_list() + final = rows[-1] + assert final["x"] == 3 + assert final["y"] == 8.0 # 1 * 2^3 + + +class TestSimulationEdgeCases: + def test_zero_timesteps(self) -> None: + """Zero timesteps should produce only the initial state row.""" + model = gds_sim.Model( + initial_state={"x": 42}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + ) + sim = gds_sim.Simulation(model=model, timesteps=0) + rows = sim.run().to_list() + assert len(rows) == 1 + assert rows[0]["x"] == 42 + assert rows[0]["timestep"] == 0 + + def test_one_timestep(self) -> None: + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + sim = gds_sim.Simulation(model=model, timesteps=1) + rows = sim.run().to_list() + # initial + 1 timestep * 1 block = 2 + assert len(rows) == 2 + assert rows[-1]["x"] == 1 + + +class TestHooksEdgeCases: + def test_hooks_called_per_run_in_multi_run(self) -> None: + """before_run and after_run should be called once per run.""" + before_count: list[int] = [] + after_count: list[int] = [] + + def before_run(state: dict[str, Any], params: dict[str, Any]) -> None: + before_count.append(1) + + def after_run(state: dict[str, Any], params: dict[str, Any]) -> None: + after_count.append(1) + + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + ) + hooks = gds_sim.Hooks(before_run=before_run, after_run=after_run) + sim = gds_sim.Simulation(model=model, timesteps=3, runs=3, hooks=hooks) + sim.run() + assert len(before_count) == 3 + assert len(after_count) == 3 + + def test_hooks_called_per_subset(self) -> None: + """Hooks fire once per (subset, run) pair.""" + call_count: list[int] = [] + + def before_run(state: dict[str, Any], params: dict[str, Any]) -> None: + call_count.append(1) + + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _noop_suf}}], + params={"a": [1, 2, 3]}, + ) + hooks = gds_sim.Hooks(before_run=before_run) + sim = gds_sim.Simulation(model=model, timesteps=2, runs=2, hooks=hooks) + sim.run() + # 3 subsets * 2 runs = 6 + assert len(call_count) == 6 + + def test_early_exit_respects_break(self) -> None: + """after_step returning False at timestep 1 should stop immediately.""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + + def stop_immediately(state: dict[str, Any], t: int) -> bool: + return False + + hooks = gds_sim.Hooks(after_step=stop_immediately) + sim = gds_sim.Simulation(model=model, timesteps=100, hooks=hooks) + rows = sim.run().to_list() + # initial + 1 timestep (then stopped) + assert len(rows) == 2 + assert rows[-1]["x"] == 1 + + def test_after_step_returning_none_continues(self) -> None: + """after_step returning None should NOT stop.""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + + def do_nothing(state: dict[str, Any], t: int) -> None: + pass + + hooks = gds_sim.Hooks(after_step=do_nothing) + sim = gds_sim.Simulation(model=model, timesteps=5, hooks=hooks) + rows = sim.run().to_list() + assert rows[-1]["x"] == 5 + + def test_after_step_returning_true_continues(self) -> None: + """after_step returning True should NOT stop (only False stops).""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + + def keep_going(state: dict[str, Any], t: int) -> bool: + return True + + hooks = gds_sim.Hooks(after_step=keep_going) + sim = gds_sim.Simulation(model=model, timesteps=5, hooks=hooks) + rows = sim.run().to_list() + assert rows[-1]["x"] == 5 + + +class TestExperimentEdgeCases: + def test_single_sim_single_job_sequential(self) -> None: + """Single sim with 1 subset and 1 run should use sequential path.""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + sim = gds_sim.Simulation(model=model, timesteps=5, runs=1) + exp = gds_sim.Experiment(simulations=[sim]) + rows = exp.run().to_list() + assert rows[-1]["x"] == 5 + + def test_experiment_merges_multiple_sims(self) -> None: + """Experiment with two sims should merge results.""" + model1 = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + model2 = gds_sim.Model( + initial_state={"x": 100}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + ) + sim1 = gds_sim.Simulation(model=model1, timesteps=3) + sim2 = gds_sim.Simulation(model=model2, timesteps=3) + exp = gds_sim.Experiment(simulations=[sim1, sim2], processes=1) + results = exp.run() + # Each sim: 1 + 3 = 4 rows, total 8 + assert len(results) == 8 + + def test_experiment_processes_none_auto(self) -> None: + """processes=None should still work (auto-detect).""" + model = gds_sim.Model( + initial_state={"x": 0}, + state_update_blocks=[{"policies": {}, "variables": {"x": _increment_suf}}], + params={"a": [1, 2]}, + ) + sim = gds_sim.Simulation(model=model, timesteps=3, runs=2) + exp = gds_sim.Experiment(simulations=[sim]) + results = exp.run() + # 2 subsets * 2 runs * (1 + 3) = 16 + assert len(results) == 16