Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion flagsmith/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flagsmith import webhooks
from flagsmith.analytics import PipelineAnalyticsConfig
from flagsmith.flagsmith import Flagsmith
from flagsmith.version import __version__

__all__ = ("Flagsmith", "webhooks", "__version__")
__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__")
167 changes: 167 additions & 0 deletions flagsmith/analytics.py
Comment thread
khvn26 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import atexit
import json
import logging
import threading
import time
import typing
from dataclasses import dataclass
from datetime import datetime

from requests_futures.sessions import FuturesSession # type: ignore

from flagsmith.version import __version__

logger = logging.getLogger(__name__)

ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"

# Used to control how often we send data(in seconds)
Expand Down Expand Up @@ -60,3 +69,161 @@ def track_feature(self, feature_name: str) -> None:
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
self.flush()


@dataclass
class PipelineAnalyticsConfig:
analytics_server_url: str
max_buffer_items: int = 1000
flush_interval_seconds: float = 10.0


class PipelineAnalyticsProcessor:
Comment thread
khvn26 marked this conversation as resolved.
"""
Buffered analytics processor that sends per-evaluation and custom events
to the Flagsmith pipeline analytics endpoint in batches.

Evaluation events are deduplicated within each flush window. Events are
flushed periodically via a background timer or when the buffer is full.
"""

def __init__(
self,
config: PipelineAnalyticsConfig,
environment_key: str,
) -> None:
url = config.analytics_server_url
if not url.endswith("/"):
url = f"{url}/"
self._batch_endpoint = f"{url}v1/analytics/batch"
self._environment_key = environment_key
self._max_buffer = config.max_buffer_items
self._flush_interval_seconds = config.flush_interval_seconds

self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
self._dedup_keys: typing.Dict[str, str] = {}
self._lock = threading.Lock()
self._timer: typing.Optional[threading.Timer] = None

def record_evaluation_event(
self,
flag_key: str,
enabled: bool,
value: typing.Any,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
should_flush = False

with self._lock:
if self._dedup_keys.get(flag_key) == fingerprint:
return
self._dedup_keys[flag_key] = fingerprint
self._buffer.append(
{
"event_id": flag_key,
"event_type": "flag_evaluation",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": enabled,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {"sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()

def record_custom_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
should_flush = False

with self._lock:
self._buffer.append(
{
"event_id": event_name,
"event_type": "custom_event",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": None,
"value": None,
"traits": dict(traits) if traits else None,
"metadata": {**(metadata or {}), "sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()

def flush(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []
self._dedup_keys.clear()

payload = json.dumps(
{"events": events, "environment_key": self._environment_key}
)
try:
future = session.post(
self._batch_endpoint,
data=payload,
timeout=3,
headers={
"Content-Type": "application/json; charset=utf-8",
"X-Environment-Key": self._environment_key,
"Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}",
},
)
except RuntimeError:
logger.debug("Skipping flush: thread pool already shut down")
return
future.add_done_callback(lambda f: self._handle_flush_result(f, events))

def _handle_flush_result(
self,
future: typing.Any,
events: typing.List[typing.Dict[str, typing.Any]],
) -> None:
try:
response = future.result()
response.raise_for_status()
except Exception:
logger.warning(
"Failed to flush pipeline analytics, re-queuing events", exc_info=True
)
with self._lock:
self._buffer = events + self._buffer
self._buffer = self._buffer[: self._max_buffer]

def start(self) -> None:
self._schedule_flush()
atexit.register(self.stop)

def stop(self) -> None:
atexit.unregister(self.stop)
if self._timer is not None:
self._timer.cancel()
self.flush()

def _schedule_flush(self) -> None:
self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush)
self._timer.daemon = True
self._timer.start()

def _timer_flush(self) -> None:
self.flush()
self._schedule_flush()
67 changes: 62 additions & 5 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from flagsmith.analytics import AnalyticsProcessor
from flagsmith.analytics import (
AnalyticsProcessor,
PipelineAnalyticsConfig,
PipelineAnalyticsProcessor,
)
from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError
from flagsmith.mappers import (
map_context_and_identity_data_to_context,
map_environment_document_to_context,
map_environment_document_to_environment_updated_at,
map_segment_results_to_identity_segments,
resolve_trait_values,
)
from flagsmith.models import DefaultFlag, Flags, Segment
from flagsmith.offline_handlers import OfflineHandler
Expand Down Expand Up @@ -63,6 +68,7 @@ def __init__(
environment_refresh_interval_seconds: typing.Union[int, float] = 60,
retries: typing.Optional[Retry] = None,
enable_analytics: bool = False,
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None,
default_flag_handler: typing.Optional[
typing.Callable[[str], DefaultFlag]
] = None,
Expand Down Expand Up @@ -108,6 +114,9 @@ def __init__(
self.default_flag_handler = default_flag_handler
self.enable_realtime_updates = enable_realtime_updates
self._analytics_processor: typing.Optional[AnalyticsProcessor] = None
self._pipeline_analytics_processor: typing.Optional[
PipelineAnalyticsProcessor
] = None
self._evaluation_context: typing.Optional[SDKEvaluationContext] = None
self._environment_updated_at: typing.Optional[datetime] = None

Expand Down Expand Up @@ -170,10 +179,28 @@ def __init__(

self._initialise_local_evaluation()

if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)
self._initialise_analytics(
environment_key=environment_key,
enable_analytics=enable_analytics,
pipeline_analytics_config=pipeline_analytics_config,
)

def _initialise_analytics(
self,
environment_key: str,
enable_analytics: bool,
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig],
) -> None:
if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)
if pipeline_analytics_config:
self._pipeline_analytics_processor = PipelineAnalyticsProcessor(
config=pipeline_analytics_config,
environment_key=environment_key,
)
self._pipeline_analytics_processor.start()

def _initialise_local_evaluation(self) -> None:
# To ensure that the environment is set before allowing subsequent
Expand Down Expand Up @@ -290,6 +317,25 @@ def get_identity_segments(

return map_segment_results_to_identity_segments(evaluation_result["segments"])

def track_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[TraitMapping] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
if not self._pipeline_analytics_processor:
raise ValueError(
"Pipeline analytics is not configured. "
"Provide pipeline_analytics_config to use track_event."
)
self._pipeline_analytics_processor.record_custom_event(
event_name=event_name,
identity_identifier=identity_identifier,
traits=resolve_trait_values(traits),
metadata=metadata,
)

def update_environment(self) -> None:
try:
environment_data = self._get_json_response(
Expand Down Expand Up @@ -345,6 +391,7 @@ def _get_environment_flags_from_document(self) -> Flags:
evaluation_result=evaluation_result,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
)

def _get_identity_flags_from_document(
Expand All @@ -368,6 +415,9 @@ def _get_identity_flags_from_document(
evaluation_result=evaluation_result,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
identity_identifier=identifier,
traits=resolve_trait_values(traits),
)

def _get_environment_flags_from_api(self) -> Flags:
Expand All @@ -379,6 +429,7 @@ def _get_environment_flags_from_api(self) -> Flags:
api_flags=json_response,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
)
except FlagsmithAPIError:
if self.offline_handler:
Expand Down Expand Up @@ -411,6 +462,9 @@ def _get_identity_flags_from_api(
api_flags=json_response["flags"],
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
identity_identifier=identifier,
traits=resolve_trait_values(traits),
)
except FlagsmithAPIError:
if self.offline_handler:
Expand Down Expand Up @@ -443,3 +497,6 @@ def __del__(self) -> None:

if hasattr(self, "event_stream_thread"):
self.event_stream_thread.stop()

if self._pipeline_analytics_processor:
self._pipeline_analytics_processor.stop()
33 changes: 14 additions & 19 deletions flagsmith/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
StrValueSegmentCondition,
)
from flag_engine.result.types import SegmentResult
from flag_engine.segments.types import ContextValue

from flagsmith.api.types import (
EnvironmentModel,
Expand All @@ -26,7 +25,7 @@
SDKEvaluationContext,
SegmentMetadata,
StreamEvent,
TraitConfig,
TraitMapping,
)
from flagsmith.utils.datetime import fromisoformat

Expand Down Expand Up @@ -75,31 +74,27 @@ def map_environment_document_to_environment_updated_at(
return updated_at.astimezone(tz=timezone.utc)


def resolve_trait_values(
traits: typing.Optional[TraitMapping],
) -> typing.Optional[typing.Dict[str, typing.Any]]:
if not traits:
return None
return {
key: (val["value"] if isinstance(val, dict) else val)
for key, val in traits.items()
}


def map_context_and_identity_data_to_context(
context: SDKEvaluationContext,
identifier: str,
traits: typing.Optional[
typing.Mapping[
str,
typing.Union[
ContextValue,
TraitConfig,
],
]
],
traits: typing.Optional[TraitMapping] = None,
) -> SDKEvaluationContext:
return {
**context,
"identity": {
"identifier": identifier,
"traits": {
trait_key: (
trait_value_or_config["value"]
if isinstance(trait_value_or_config, dict)
else trait_value_or_config
)
for trait_key, trait_value_or_config in (traits or {}).items()
},
"traits": resolve_trait_values(traits) or {},
},
}

Expand Down
Loading
Loading