From 99089fa741504d41d5a4e2263c409c444646a48d Mon Sep 17 00:00:00 2001 From: Ichsan Haryadi Date: Thu, 19 Feb 2026 02:56:28 +0800 Subject: [PATCH] feat: add custom actions --- README.md | 31 +++- examples/custom_actions.py | 16 +++ selv.py | 115 +++++++++++---- tests/test_custom_actions.py | 271 +++++++++++++++++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 405 insertions(+), 30 deletions(-) create mode 100644 examples/custom_actions.py create mode 100644 tests/test_custom_actions.py diff --git a/README.md b/README.md index 1742f82..d726d06 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,13 @@ Tracking attribute changes helps me understand how a complex class behaves and s ## Features - **Automatic attribute change tracking**: Decorated class automatically log all attribute changes -- **Container support**: Tracks value modifications inside container +- **Container support**: Tracks value modifications inside built-in containers (dict, list, set, tuple) - **Flexible logging**: Use good ol `print` statement or any custom logger function - **View changelog**: Query complete change history for any attribute - **Exclude specific attributes**: Optionally exclude specific attributes from tracking +- **Custom actions**: Execute custom functions when specific attributes change + ## Installation ```bash @@ -87,6 +89,29 @@ print(grouped) # } ``` +### Set custom action + +```python +from selv import selv + + +def log_inventory_change(inventory): + total = sum(inventory.values()) + print(f"Total items in inventory: {total}") + +@selv(actions={"inventory": log_inventory_change}) +class Store: + def __init__(self): + self.inventory = {"apples": 10, "bananas": 5} + +store = Store() +store.inventory["oranges"] = 8 +# [Store] inventory = {'apples': 10, 'bananas': 5} (initialized) +# Total items in inventory: 15 +# [Store] inventory: {'apples': 10, 'bananas': 5} -> {'apples': 10, 'bananas': 5, 'oranges': 8} +# Total items in inventory: 23 +``` + ### Parameters The `@selv` decorator has a few parameters to customize its behavior. @@ -103,6 +128,10 @@ The `@selv` decorator has a few parameters to customize its behavior. - List of attribute names to exclude from tracking - Useful for exclude sensitive data or unimportant attributes +4. **`actions`** (`Dict[str, Callable[[Any], None]]`, default: `None`) + - Dictionary mapping attribute names to functions that are called when the attribute changes + - Each function receives the new value of the attribute as its argument + ## License MIT License - see [LICENSE](LICENSE) file for details. diff --git a/examples/custom_actions.py b/examples/custom_actions.py new file mode 100644 index 0000000..f98311f --- /dev/null +++ b/examples/custom_actions.py @@ -0,0 +1,16 @@ +from selv import selv + + +def log_inventory_change(inventory): + total = sum(inventory.values()) + print(f"Total items in inventory: {total}") + + +@selv(actions={"inventory": log_inventory_change}) +class Store: + def __init__(self): + self.inventory = {"apples": 10, "bananas": 5} + + +store = Store() +store.inventory["oranges"] = 8 diff --git a/selv.py b/selv.py index 6a684a2..37cb3e6 100644 --- a/selv.py +++ b/selv.py @@ -28,11 +28,11 @@ def __init__(self, *args, parent=None, attr_name=None, **kwargs): self._attr_name = attr_name self._initializing = True super().__init__() - # Process initial items and wrap them + if len(args) > 0: if len(args) > 1: raise TypeError("expected at most 1 argument, got %d" % len(args)) - # Extract first argument safely + first_arg = args[0] for key, value in dict(first_arg).items(): self[key] = value @@ -52,6 +52,7 @@ def __setitem__(self, key, value): def _wrap_value(self, value): """Wrap container values for observation.""" + # Recursively wrap nested containers to maintain observation chain if isinstance(value, dict): return ObservableDict(value, parent=self._parent, attr_name=self._attr_name) elif isinstance(value, list): @@ -78,9 +79,8 @@ def __init__(self, *args, parent=None, attr_name=None, **kwargs): self._attr_name = attr_name self._initializing = True super().__init__() - # Process initial items and wrap them + if len(args) > 0: - # Extract first argument safely first_arg = args[0] for value in first_arg: self.append(value) @@ -98,6 +98,7 @@ def __setitem__(self, key, value): def _wrap_value(self, value): """Wrap container values for observation.""" + # Recursively wrap nested containers to maintain observation chain if isinstance(value, dict): return ObservableDict(value, parent=self._parent, attr_name=self._attr_name) elif isinstance(value, list): @@ -144,9 +145,8 @@ def __init__(self, *args, parent=None, attr_name=None, **kwargs): self._attr_name = attr_name self._initializing = True super().__init__() - # Process initial items and wrap them + if len(args) > 0: - # Extract first argument safely first_arg = args[0] for value in first_arg: self.add(value) @@ -164,6 +164,7 @@ def add(self, element): def _wrap_value(self, value): """Wrap container values for observation.""" + # Recursively wrap nested containers to maintain observation chain if isinstance(value, dict): return ObservableDict(value, parent=self._parent, attr_name=self._attr_name) elif isinstance(value, list): @@ -211,7 +212,7 @@ def clear(self): def update(self, *others): old_container_state = self.copy() if self._parent else None - # Update with original values, wrapping happens in add() method + super().update(*others) if self._parent and self._attr_name and not self._initializing: new_container_state = self.copy() @@ -323,11 +324,10 @@ def _format_tuple(self, value: tuple) -> str: def _format_set(self, value: set) -> str: """Format set value.""" + # Try to sort for consistent output, fall back to natural order if sorting fails try: - # Try to sort for consistent output sorted_items = sorted(value) except TypeError: - # If sorting fails (mixed types), use natural order sorted_items = list(value) items = [self._format_value(item) for item in sorted_items] return "{" + ", ".join(items) + "}" @@ -341,13 +341,16 @@ def __init__( track_private: bool = True, logger: Optional[Callable[[str], None]] = None, exclude: Optional[List[str]] = None, + actions: Optional[Dict[str, Callable[[Any], None]]] = None, ): self.track_private = track_private self.logger = logger self.exclude = exclude or [] + self.actions = actions or {} def wrap_container(self, self_obj: Any, name: str, value: Any) -> Any: """Wrap dict/list/set values.""" + # Convert regular containers to observable versions for tracking if isinstance(value, dict): return ObservableDict(value, parent=self_obj, attr_name=name) elif isinstance(value, list): @@ -356,6 +359,53 @@ def wrap_container(self, self_obj: Any, name: str, value: Any) -> Any: return ObservableSet(value, parent=self_obj, attr_name=name) return value + def _safe_copy(self, value: Any) -> Any: + """Safely copy a value, handling observable containers specially.""" + # Observable containers need special handling to avoid triggering notifications + if value is None: + return None + + if isinstance(value, (ObservableDict, ObservableList, ObservableSet)): + if isinstance(value, ObservableDict): + return dict(value.items()) + elif isinstance(value, ObservableList): + return list(value) + elif isinstance(value, ObservableSet): + return set(value) + + return copy.deepcopy(value) + + def _ensure_history_initialized(self, self_obj: Any, name: str) -> None: + """Ensure change history is initialized for the object and attribute.""" + if not hasattr(self_obj, "_selv_change_history"): + self_obj._selv_change_history = {} + + if name not in self_obj._selv_change_history: + self_obj._selv_change_history[name] = [] + + def _log_change_message( + self, + cls_name: str, + attr_str: str, + old_value: Any, + new_value: Any, + is_initial: bool, + log_func: Callable, + record: _ChangeRecord, + ) -> None: + """Log the actual change message.""" + # Handle three cases: initialization, deletion, and regular update + if is_initial: + formatted_val = record._format_value(new_value) + log_func(f"[{cls_name}] {attr_str} = {formatted_val} (initialized)") + elif new_value is None: + formatted_old = record._format_value(old_value) + log_func(f"[{cls_name}] {attr_str}: {formatted_old} -> deleted") + else: + formatted_old = record._format_value(old_value) + formatted_new = record._format_value(new_value) + log_func(f"[{cls_name}] {attr_str}: {formatted_old} -> {formatted_new}") + def log_change( self, self_obj: Any, @@ -367,14 +417,11 @@ def log_change( is_initial: bool = False, ) -> None: """Log attribute change.""" - if not hasattr(self_obj, "_selv_change_history"): - self_obj._selv_change_history = {} - if name not in self_obj._selv_change_history: - self_obj._selv_change_history[name] = [] + self._ensure_history_initialized(self_obj, name) - old_value_copy = copy.deepcopy(old_value) if old_value is not None else None - new_value_copy = copy.deepcopy(new_value) if new_value is not None else None + old_value_copy = self._safe_copy(old_value) + new_value_copy = self._safe_copy(new_value) record = _ChangeRecord( timestamp=datetime.now(), @@ -388,16 +435,19 @@ def log_change( attr_str = self._get_attribute_string(name, container_key) log_func = self.logger if self.logger is not None else print - if is_initial: - formatted_val = record._format_value(new_value) - log_func(f"[{cls_name}] {attr_str} = {formatted_val} (initialized)") - elif new_value is None: - formatted_old = record._format_value(old_value) - log_func(f"[{cls_name}] {attr_str}: {formatted_old} -> deleted") - else: - formatted_old = record._format_value(old_value) - formatted_new = record._format_value(new_value) - log_func(f"[{cls_name}] {attr_str}: {formatted_old} -> {formatted_new}") + self._log_change_message( + cls_name, attr_str, old_value, new_value, is_initial, log_func, record + ) + + # Call custom action if defined for this attribute + if name in self.actions: + try: + self.actions[name](new_value) + except Exception as e: + if self.logger is not None: + self.logger(f"Error in custom action for {name}: {e}") + + log_func(f"[{cls_name}] Error in action for {attr_str}: {e}") def _get_attribute_string( self, name: str, container_key: Optional[Union[str, int]] @@ -419,6 +469,8 @@ def log_container_change( new_container_state: Any, ) -> None: """Log container change.""" + # Use deepcopy to capture container state at time of change + # This prevents mutations from affecting historical records old_copy = ( copy.deepcopy(old_container_state) if old_container_state is not None @@ -437,7 +489,7 @@ def create_setattr( """Create the __setattr__ method for the decorated class.""" def new_setattr(self_obj: Any, name: str, value: Any) -> None: - # Handle attributes that should not be tracked + # Skip tracking for private attributes or excluded names if self._should_skip_tracking(name): self._set_attribute(self_obj, name, value, original_setattr) return @@ -445,6 +497,7 @@ def new_setattr(self_obj: Any, name: str, value: Any) -> None: has_old_value = hasattr(self_obj, name) old_value = getattr(self_obj, name, None) if has_old_value else None + # Wrap containers to enable nested tracking wrapped_value = self.wrap_container(self_obj, name, value) self._set_attribute(self_obj, name, wrapped_value, original_setattr) @@ -462,10 +515,13 @@ def new_setattr(self_obj: Any, name: str, value: Any) -> None: def _should_skip_tracking(self, name: str) -> bool: """Check if an attribute should be skipped for tracking.""" + # Skip private attributes unless tracking is enabled if name.startswith("_") and not self.track_private: return True + # Skip internal selv attributes if name.startswith("_selv_"): return True + # Skip explicitly excluded attributes if name in self.exclude: return True return False @@ -478,6 +534,7 @@ def _set_attribute( original_setattr: Optional[Callable], ) -> None: """Set an attribute using the appropriate setattr method.""" + # Use the class's original __setattr__ if available, otherwise use object's if original_setattr: original_setattr(self_obj, name, value) else: @@ -492,6 +549,7 @@ def view_changelog( format: Literal["flat", "attr"] = "flat", ) -> Union[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: """View changelog for attributes.""" + # Validate format parameter if format not in ("flat", "attr"): raise ValueError(f"format must be 'flat' or 'attr', got {format!r}") @@ -562,11 +620,12 @@ def _selv( track_private: bool = True, logger: Optional[Callable[[str], None]] = None, exclude: Optional[List[str]] = None, + actions: Optional[Dict[str, Callable[[Any], None]]] = None, ) -> Any: """Class decorator for logging attribute changes.""" def decorator(cls: Type[T]) -> Any: - decorator_instance = _SelvDecorator(track_private, logger, exclude) + decorator_instance = _SelvDecorator(track_private, logger, exclude, actions) original_setattr = getattr(cls, "__setattr__", None) # Create bound methods for the class @@ -605,7 +664,7 @@ def log_container_change( new_container_state, ) - # Set methods on the class + # Attach methods to the decorated class cls.__setattr__ = decorator_instance.create_setattr(cls, original_setattr) cls.view_changelog = decorator_instance.create_view_changelog() cls._selv_wrap_container = wrap_container diff --git a/tests/test_custom_actions.py b/tests/test_custom_actions.py new file mode 100644 index 0000000..da542c4 --- /dev/null +++ b/tests/test_custom_actions.py @@ -0,0 +1,271 @@ +"""Tests for custom actions functionality in selv decorator.""" + +from selv import selv + + +def test_basic_custom_action(): + """Test basic custom action on attribute change.""" + action_called = [] + action_value = None + + def my_action(value): + nonlocal action_called, action_value + action_called.append(True) + action_value = value + + @selv(actions={"x": my_action}) + class TestClass: + def __init__(self): + self.x = 1 + + obj = TestClass() + # Action should be called on initialization + assert len(action_called) == 1 + assert action_value == 1 + + # Reset tracking + action_called.clear() + action_value = None + + # Change the attribute + obj.x = 2 + # Action should be called on change + assert len(action_called) == 1 + assert action_value == 2 + + +def test_multiple_attributes_with_actions(): + """Test multiple attributes with different actions.""" + x_called = [] + y_called = [] + + def x_action(value): + x_called.append(value) + + def y_action(value): + y_called.append(value) + + @selv(actions={"x": x_action, "y": y_action}) + class TestClass: + def __init__(self): + self.x = 1 + self.y = 2 + + obj = TestClass() + assert len(x_called) == 1 + assert x_called[0] == 1 + assert len(y_called) == 1 + assert y_called[0] == 2 + + # Change attributes + obj.x = 10 + obj.y = 20 + + assert len(x_called) == 2 + assert x_called[1] == 10 + assert len(y_called) == 2 + assert y_called[1] == 20 + + +def test_action_not_called_for_other_attributes(): + """Test that actions are only called for specified attributes.""" + action_called = [] + + def my_action(value): + action_called.append(value) + + @selv(actions={"x": my_action}) + class TestClass: + def __init__(self): + self.x = 1 + self.y = 2 # No action for y + + obj = TestClass() + assert len(action_called) == 1 # Only for x initialization + assert action_called[0] == 1 + + # Change y - action should not be called + obj.y = 3 + assert len(action_called) == 1 # Still only 1 call + + +def test_action_with_container_attributes(): + """Test actions work with container attributes (dict, list, set).""" + dict_action_called = [] + list_action_called = [] + set_action_called = [] + + def dict_action(value): + dict_action_called.append(value) + + def list_action(value): + list_action_called.append(value) + + def set_action(value): + set_action_called.append(value) + + @selv( + actions={"my_dict": dict_action, "my_list": list_action, "my_set": set_action} + ) + class TestClass: + def __init__(self): + self.my_dict = {"a": 1} + self.my_list = [1, 2, 3] + self.my_set = {1, 2, 3} + + obj = TestClass() + assert len(dict_action_called) == 1 + assert dict_action_called[0] == {"a": 1} + assert len(list_action_called) == 1 + assert list_action_called[0] == [1, 2, 3] + assert len(set_action_called) == 1 + assert set_action_called[0] == {1, 2, 3} + + # Modify containers + obj.my_dict["b"] = 2 + obj.my_list.append(4) + obj.my_set.add(5) + + # Actions should be called for container modifications + assert len(dict_action_called) == 2 + assert dict_action_called[1] == {"a": 1, "b": 2} + assert len(list_action_called) == 2 + assert list_action_called[1] == [1, 2, 3, 4] + assert len(set_action_called) == 2 + assert set_action_called[1] == {1, 2, 3, 5} + + +def test_action_receives_correct_value(): + """Test that action receives the actual new value.""" + received_values = [] + + def my_action(value): + received_values.append(value) + + @selv(actions={"x": my_action}) + class TestClass: + def __init__(self): + self.x = "initial" + + obj = TestClass() + assert received_values == ["initial"] + + obj.x = "changed" + assert received_values == ["initial", "changed"] + + obj.x = 123 + assert received_values == ["initial", "changed", 123] + + obj.x = None + assert received_values == ["initial", "changed", 123, None] + + +def test_action_with_excluded_attributes(): + """Test that actions work with excluded attributes.""" + action_called = [] + + def my_action(value): + action_called.append(value) + + @selv(actions={"x": my_action}, exclude=["y"]) + class TestClass: + def __init__(self): + self.x = 1 # Has action + self.y = 2 # Excluded, no tracking + + obj = TestClass() + assert len(action_called) == 1 + assert action_called[0] == 1 + + # Change x - action should be called + obj.x = 3 + assert len(action_called) == 2 + assert action_called[1] == 3 + + # Change y - no action since it's excluded + obj.y = 4 + assert len(action_called) == 2 # No change + + +def test_action_with_private_attributes(): + """Test actions with private attributes when track_private=True.""" + action_called = [] + + def my_action(value): + action_called.append(value) + + @selv(actions={"_private": my_action}, track_private=True) + class TestClass: + def __init__(self): + self._private = "secret" + + obj = TestClass() + assert len(action_called) == 1 + assert action_called[0] == "secret" + + obj._private = "new_secret" + assert len(action_called) == 2 + assert action_called[1] == "new_secret" + + +def test_no_action_for_private_when_track_private_false(): + """Test that actions are not called for untracked private attributes""" + action_called = [] + + def my_action(value): + action_called.append(value) + + @selv(actions={"_private": my_action}, track_private=False) + class TestClass: + def __init__(self): + self._private = "secret" + + obj = TestClass() + # Action should not be called because private attributes aren't tracked + assert len(action_called) == 0 + + obj._private = "new_secret" + assert len(action_called) == 0 + + +def test_action_with_custom_logger(): + """Test that actions work with custom logger.""" + action_called = [] + log_messages = [] + + def my_action(value): + action_called.append(value) + + def my_logger(message): + log_messages.append(message) + + @selv(actions={"x": my_action}, logger=my_logger) + class TestClass: + def __init__(self): + self.x = 1 + + obj = TestClass() + assert len(action_called) == 1 + assert action_called[0] == 1 + assert len(log_messages) > 0 # Logger should have been called + + # Change attribute + obj.x = 2 + assert len(action_called) == 2 + assert action_called[1] == 2 + + +def test_action_with_lambda(): + """Test that lambda functions can be used as actions.""" + results = [] + + @selv(actions={"x": lambda v: results.append(f"x={v}")}) + class TestClass: + def __init__(self): + self.x = 1 + + obj = TestClass() + assert results == ["x=1"] + + obj.x = 2 + assert results == ["x=1", "x=2"] diff --git a/uv.lock b/uv.lock index 8651e17..ef76ce6 100644 --- a/uv.lock +++ b/uv.lock @@ -186,7 +186,7 @@ wheels = [ [[package]] name = "selv" -version = "1.0.0" +version = "1.1.0" source = { editable = "." } [package.dev-dependencies]