diff --git a/README.md b/README.md index a4d944a..79e3a0d 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,38 @@ class SegmentAssignmentLogger(AssignmentLogger): client_config = Config(api_key="", assignment_logger=SegmentAssignmentLogger()) ``` +### De-duplication of assignments + +The SDK may see many duplicate assignments in a short period of time, and if you have configured a logging function, they will be transmitted to your downstream event store. This increases the cost of storage as well as warehouse costs during experiment analysis. + +To mitigate this, a caching assignment logger is optionally available with configurable cache behavior. + +The caching can be configured individually for assignment logs and bandit action logs using `AssignmentCacheLogger`. + +`AssignmentCacheLogger` optionally accepts two caches. We recommend using [`cachetools`](https://pypi.org/project/cachetools/) but any subclass of `MutableMapping` works. + +```python +import cachetools +from eppo_client.assignment_logger import AssignmentLogger, AssignmentCacheLogger + + +class MyLogger(AssignmentLogger): + # implement your logger + pass + + +client_config = Config( + api_key="", + assignment_logger=AssignmentCacheLogger( + MyLogger(), + # cache 1024 least recently used assignments + assignment_cache=cachetools.LRUCache(maxsize=1024), + # cache bandit assignment for no longer than 10 minutes + bandit_cache=cachetools.TTLCache(maxsize=2048, ttl=600), + ), +) +``` + ## Export configuration To support the use-case of needing to bootstrap a front-end client, the Eppo SDK provides a function to export flag configurations to a JSON string. diff --git a/eppo_client/assignment_logger.py b/eppo_client/assignment_logger.py index cc17e76..de50152 100644 --- a/eppo_client/assignment_logger.py +++ b/eppo_client/assignment_logger.py @@ -1,13 +1,59 @@ -from typing import Dict -from eppo_client.base_model import BaseModel -from pydantic import ConfigDict +from typing import Dict, Optional, Tuple, MutableMapping -class AssignmentLogger(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True) - +class AssignmentLogger: def log_assignment(self, assignment_event: Dict): pass def log_bandit_action(self, bandit_event: Dict): pass + + +class AssignmentCacheLogger(AssignmentLogger): + def __init__( + self, + inner: AssignmentLogger, + *, + assignment_cache: Optional[MutableMapping] = None, + bandit_cache: Optional[MutableMapping] = None, + ): + self.__inner = inner + self.__assignment_cache = assignment_cache + self.__bandit_cache = bandit_cache + + def log_assignment(self, event: Dict): + _cache_or_call( + self.__assignment_cache, + *AssignmentCacheLogger.__assignment_cache_keyvalue(event), + lambda: self.__inner.log_assignment(event), + ) + + def log_bandit_action(self, event: Dict): + _cache_or_call( + self.__bandit_cache, + *AssignmentCacheLogger.__bandit_cache_keyvalue(event), + lambda: self.__inner.log_bandit_action(event), + ) + + @staticmethod + def __assignment_cache_keyvalue(event: Dict) -> Tuple[Tuple, Tuple]: + key = (event["featureFlag"], event["subject"]) + value = (event["allocation"], event["variation"]) + return key, value + + @staticmethod + def __bandit_cache_keyvalue(event: Dict) -> Tuple[Tuple, Tuple]: + key = (event["flagKey"], event["subject"]) + value = (event["banditKey"], event["action"]) + return key, value + + +def _cache_or_call(cache: Optional[MutableMapping], key, value, fn): + if cache is not None and (previous := cache.get(key)) and previous == value: + # ok, cached + return + + fn() + + if cache is not None: + cache[key] = value diff --git a/eppo_client/client.py b/eppo_client/client.py index 551d849..eb31898 100644 --- a/eppo_client/client.py +++ b/eppo_client/client.py @@ -364,12 +364,10 @@ def evaluate_bandit_action( "flagKey": flag_key, "banditKey": bandit_data.bandit_key, "subject": subject_key, - "action": evaluation.action_key if evaluation else None, - "actionProbability": evaluation.action_weight if evaluation else None, - "optimalityGap": evaluation.optimality_gap if evaluation else None, - "modelVersion": ( - bandit_data.bandit_model_version if evaluation else None - ), + "action": evaluation.action_key, + "actionProbability": evaluation.action_weight, + "optimalityGap": evaluation.optimality_gap, + "modelVersion": (bandit_data.bandit_model_version), "timestamp": _utcnow().isoformat(), "subjectNumericAttributes": ( subject_context_attributes.numeric_attributes diff --git a/eppo_client/config.py b/eppo_client/config.py index 62e8e7d..98ea73e 100644 --- a/eppo_client/config.py +++ b/eppo_client/config.py @@ -1,3 +1,5 @@ +from pydantic import Field, ConfigDict + from eppo_client.assignment_logger import AssignmentLogger from eppo_client.base_model import SdkBaseModel from eppo_client.validation import validate_not_blank @@ -8,9 +10,14 @@ class Config(SdkBaseModel): + model_config = ConfigDict( + # AssignmentLogger is not a pydantic model + arbitrary_types_allowed=True + ) + api_key: str base_url: str = "https://fscdn.eppo.cloud/api" - assignment_logger: AssignmentLogger + assignment_logger: AssignmentLogger = Field(exclude=True) is_graceful_mode: bool = True poll_interval_seconds: int = POLL_INTERVAL_SECONDS_DEFAULT poll_jitter_seconds: int = POLL_JITTER_SECONDS_DEFAULT diff --git a/eppo_client/version.py b/eppo_client/version.py index 6cbec38..415a809 100644 --- a/eppo_client/version.py +++ b/eppo_client/version.py @@ -1,4 +1,4 @@ # Note to developers: When ready to bump to 4.0, please change # the `POLL_INTERVAL_SECONDS` constant in `eppo_client/constants.py` # to 30 seconds to match the behavior of the other server SDKs. -__version__ = "3.5.4" +__version__ = "3.6.0" diff --git a/requirements-test.txt b/requirements-test.txt index c66caf9..91d0f27 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -3,3 +3,5 @@ pytest pytest-mock mypy httpretty +cachetools +types-cachetools diff --git a/requirements.txt b/requirements.txt index ea377c7..f69d1f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,5 @@ pydantic==2.4.* pydantic-settings==2.0.* requests==2.31.* -cachetools==5.3.* -types-cachetools==5.3.* types-requests==2.31.* semver==3.0.* diff --git a/setup.cfg b/setup.cfg index 966f600..2852ec9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,5 +22,4 @@ install_requires = pydantic pydantic-settings requests - cachetools semver diff --git a/test/cache_assignment_logger_test.py b/test/cache_assignment_logger_test.py new file mode 100644 index 0000000..b192dce --- /dev/null +++ b/test/cache_assignment_logger_test.py @@ -0,0 +1,112 @@ +from unittest.mock import Mock + +from cachetools import LRUCache + +from eppo_client.assignment_logger import AssignmentCacheLogger +from eppo_client.client import _utcnow +from eppo_client.version import __version__ + + +def test_non_caching(): + inner = Mock() + logger = AssignmentCacheLogger(inner) + + logger.log_assignment(make_assignment_event()) + logger.log_assignment(make_assignment_event()) + logger.log_bandit_action(make_bandit_event()) + logger.log_bandit_action(make_bandit_event()) + + assert inner.log_assignment.call_count == 2 + assert inner.log_bandit_action.call_count == 2 + + +def test_assignment_cache(): + inner = Mock() + logger = AssignmentCacheLogger(inner, assignment_cache=LRUCache(100)) + + logger.log_assignment(make_assignment_event()) + logger.log_assignment(make_assignment_event()) + + assert inner.log_assignment.call_count == 1 + + +def test_bandit_cache(): + inner = Mock() + logger = AssignmentCacheLogger(inner, bandit_cache=LRUCache(100)) + + logger.log_bandit_action(make_bandit_event()) + logger.log_bandit_action(make_bandit_event()) + + assert inner.log_bandit_action.call_count == 1 + + +def test_bandit_flip_flop(): + inner = Mock() + logger = AssignmentCacheLogger(inner, bandit_cache=LRUCache(100)) + + logger.log_bandit_action(make_bandit_event(action="action1")) + logger.log_bandit_action(make_bandit_event(action="action1")) + assert inner.log_bandit_action.call_count == 1 + + logger.log_bandit_action(make_bandit_event(action="action2")) + assert inner.log_bandit_action.call_count == 2 + + logger.log_bandit_action(make_bandit_event(action="action1")) + assert inner.log_bandit_action.call_count == 3 + + +def make_assignment_event( + *, + allocation="allocation", + experiment="experiment", + featureFlag="featureFlag", + variation="variation", + subject="subject", + timestamp=_utcnow().isoformat(), + subjectAttributes={}, + metaData={"sdkLanguage": "python", "sdkVersion": __version__}, + extra_logging={}, +): + return { + **extra_logging, + "allocation": allocation, + "experiment": experiment, + "featureFlag": featureFlag, + "variation": variation, + "subject": subject, + "timestamp": timestamp, + "subjectAttributes": subjectAttributes, + "metaData": metaData, + } + + +def make_bandit_event( + *, + flag_key="flagKey", + bandit_key="banditKey", + subject_key="subjectKey", + action="action", + action_probability=1.0, + optimality_gap=None, + evaluation=None, + bandit_data=None, + subject_context_attributes=None, + timestamp=_utcnow().isoformat(), + model_version="model_version", + meta_data={"sdkLanguage": "python", "sdkVersion": __version__}, +): + return { + "flagKey": flag_key, + "banditKey": bandit_key, + "subject": subject_key, + "action": action, + "actionProbability": action_probability, + "optimalityGap": optimality_gap, + "modelVersion": model_version, + "timestamp": timestamp, + "subjectNumericAttributes": {}, + "subjectCategoricalAttributes": {}, + "actionNumericAttributes": {}, + "actionCategoricalAttributes": {}, + "metaData": meta_data, + }