Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flagsmith/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flagsmith import webhooks
from flagsmith.analytics import PipelineAnalyticsConfig
from flagsmith.analytics import EventProcessorConfig
from flagsmith.flagsmith import Flagsmith
from flagsmith.version import __version__

__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__")
__all__ = ("Flagsmith", "EventProcessorConfig", "webhooks", "__version__")
110 changes: 54 additions & 56 deletions flagsmith/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
import threading
import time
import typing
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -18,6 +17,10 @@
# Used to control how often we send data(in seconds)
ANALYTICS_TIMER: typing.Final[int] = 10

FLAG_EXPOSURE_EVENT: typing.Final[str] = "$flag_exposure"

DEFAULT_EVENT_API_URL: typing.Final[str] = "https://events.api.flagsmith.com/"

session = FuturesSession(max_workers=4)


Expand Down Expand Up @@ -72,92 +75,90 @@ def track_feature(self, feature_name: str) -> None:


@dataclass
class PipelineAnalyticsConfig:
analytics_server_url: str
class EventProcessorConfig:
Comment thread
Zaimwa9 marked this conversation as resolved.
events_api_url: str = DEFAULT_EVENT_API_URL
max_buffer_items: int = 1000
flush_interval_seconds: float = 10.0


class PipelineAnalyticsProcessor:
class EventProcessor:
"""
Buffered analytics processor that sends per-evaluation and custom events
to the Flagsmith pipeline analytics endpoint in batches.

Evaluation events are deduplicated within each flush window. Events are
flushed periodically via a background timer or when the buffer is full.
Buffered event processor that batches custom events and POSTs them to the
Flagsmith event endpoint. Flushes on a background timer or when the buffer
fills.
"""

def __init__(
self,
config: PipelineAnalyticsConfig,
config: EventProcessorConfig,
environment_key: str,
) -> None:
url = config.analytics_server_url
url = config.events_api_url
if not url.endswith("/"):
url = f"{url}/"
self._batch_endpoint = f"{url}v1/analytics/batch"
self._batch_endpoint = f"{url}v1/events"
self._environment_key = environment_key
self._max_buffer = config.max_buffer_items
self._flush_interval_seconds = config.flush_interval_seconds

self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
self._dedup_keys: typing.Dict[str, str] = {}
self._lock = threading.Lock()
self._timer: typing.Optional[threading.Timer] = None

def record_evaluation_event(
def track_event(
self,
flag_key: str,
enabled: bool,
value: typing.Any,
identity_identifier: typing.Optional[str] = None,
event: str,
identifier: typing.Optional[str] = None,
value: typing.Optional[typing.Union[str, int, float, bool]] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
should_flush = False

with self._lock:
if self._dedup_keys.get(flag_key) == fingerprint:
return
self._dedup_keys[flag_key] = fingerprint
self._buffer.append(
{
"event_id": flag_key,
"event_type": "flag_evaluation",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": enabled,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {"sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()
self._buffer_event(
event=event,
feature_name=None,
identifier=identifier,
value=value,
traits=traits,
metadata=metadata,
)

def record_custom_event(
def track_exposure_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
feature_name: str,
identifier: typing.Optional[str] = None,
value: typing.Optional[typing.Union[str, int, float, bool]] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
should_flush = False
self._buffer_event(
event=FLAG_EXPOSURE_EVENT,
feature_name=feature_name,
identifier=identifier,
value=value,
traits=traits,
metadata=metadata,
)

def _buffer_event(
self,
event: str,
feature_name: typing.Optional[str],
identifier: typing.Optional[str],
value: typing.Optional[typing.Union[str, int, float, bool]],
traits: typing.Optional[typing.Dict[str, typing.Any]],
metadata: typing.Optional[typing.Dict[str, typing.Any]],
) -> None:
should_flush = False
with self._lock:
self._buffer.append(
{
"event_id": event_name,
"event_type": "custom_event",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": None,
"value": None,
"event": event,
"feature_name": feature_name,
"identifier": identifier,
"value": str(value) if value is not None else None,
"traits": dict(traits) if traits else None,
"metadata": {**(metadata or {}), "sdk_version": __version__},
"timestamp": int(datetime.now().timestamp() * 1000),
}
)
if len(self._buffer) >= self._max_buffer:
Expand All @@ -172,11 +173,8 @@ def flush(self) -> None:
return
events = self._buffer
self._buffer = []
self._dedup_keys.clear()

payload = json.dumps(
{"events": events, "environment_key": self._environment_key}
)
payload = json.dumps({"events": events})
try:
future = session.post(
self._batch_endpoint,
Expand Down
Loading
Loading