import os import json from datetime import datetime from typing import Optional, Dict, Any, List class TrackingStore: """ Stocke les enregistrements d'emails trackés et leurs événements. Répertoire: Data/email_tracking Fichier par tracking_id: Data/email_tracking/.json """ def __init__(self, base_dir: Optional[str] = None): base_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) self.base_dir = base_dir or os.path.join(base_root, "Data", "email_tracking") os.makedirs(self.base_dir, exist_ok=True) def _path(self, tracking_id: str) -> str: return os.path.join(self.base_dir, f"{tracking_id}.json") def create_record(self, tracking_id: str, record: Dict[str, Any]) -> None: record = dict(record) record.setdefault("created_at", datetime.utcnow().isoformat()) record.setdefault("events", []) # list of {"type": "...","ts": "...", ...} with open(self._path(tracking_id), "w", encoding="utf-8") as f: json.dump(record, f, ensure_ascii=False, indent=2) def get_record(self, tracking_id: str) -> Optional[Dict[str, Any]]: p = self._path(tracking_id) if not os.path.exists(p): return None with open(p, "r", encoding="utf-8") as f: return json.load(f) def update_record(self, tracking_id: str, updater) -> Optional[Dict[str, Any]]: rec = self.get_record(tracking_id) if rec is None: return None new_rec = updater(dict(rec)) or rec with open(self._path(tracking_id), "w", encoding="utf-8") as f: json.dump(new_rec, f, ensure_ascii=False, indent=2) return new_rec def add_event(self, tracking_id: str, event_type: str, meta: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: meta = meta or {} def _upd(rec: Dict[str, Any]): events: List[Dict[str, Any]] = rec.get("events") or [] events.append({"type": event_type, "ts": datetime.utcnow().isoformat(), **meta}) rec["events"] = events # counters if event_type == "open": rec["opens"] = int(rec.get("opens") or 0) + 1 if event_type == "click": rec["clicks"] = int(rec.get("clicks") or 0) + 1 return rec return self.update_record(tracking_id, _upd)