from typing import List, Dict, Any, Union import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import os import json import re from urllib.parse import quote_plus from uuid import uuid4 from core.data import Data class EmailTemplate: """Classe gérant les templates d'emails""" def __init__(self, template_folder="Data/email_templates"): self.template_folder = template_folder # Créer le dossier de templates s'il n'existe pas if not os.path.exists(self.template_folder): os.makedirs(self.template_folder) def get_all_templates(self): """Retourne tous les templates disponibles""" templates = [] if os.path.exists(self.template_folder): for filename in os.listdir(self.template_folder): if filename.endswith('.json'): template_path = os.path.join(self.template_folder, filename) try: data_manager = Data(template_path) template_data = data_manager.load_data() templates.append(template_data) except Exception as e: print(f"Erreur lors du chargement du template {filename}: {e}") return templates def get_template_by_id(self, template_id): """Récupère un template par son ID""" template_path = os.path.join(self.template_folder, f"{template_id}.json") if os.path.exists(template_path): data_manager = Data(template_path) return data_manager.load_data() return None def save_template(self, template_data): """Sauvegarde un template d'email""" template_id = template_data.get('id') if not template_id: # Générer un ID s'il n'existe pas import uuid template_id = f"tpl_{uuid.uuid4().hex[:8]}" template_data['id'] = template_id template_path = os.path.join(self.template_folder, f"{template_id}.json") data_manager = Data(template_path) data_manager.save_data(template_data) return template_data def delete_template(self, template_id): """Supprime un template d'email""" template_path = os.path.join(self.template_folder, f"{template_id}.json") if os.path.exists(template_path): os.remove(template_path) return True return False def render_template(self, template_id, context=None): """Rend un template avec les variables spécifiées dans le contexte""" template = self.get_template_by_id(template_id) if not template: return None subject = template.get('subject', '') content = template.get('content', '') # Remplacer les variables dans le sujet et le contenu if context: for key, value in context.items(): placeholder = f"{{{{{key}}}}}" subject = subject.replace(placeholder, str(value)) content = content.replace(placeholder, str(value)) return { "subject": subject, "content": content } class EmailSender: """Classe gérant l'envoi d'emails""" def __init__(self, config_file="config/email_config.json"): # Ensure config_file is an absolute path if not os.path.isabs(config_file): base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) self.config_file = os.path.join(base_dir, config_file) else: self.config_file = config_file self.config = self._load_config() self.template_manager = EmailTemplate() def _load_config(self): """Charge la configuration email depuis le fichier de configuration""" config_dir = os.path.dirname(self.config_file) if not os.path.exists(config_dir): os.makedirs(config_dir) print(f"Loading email config from: {self.config_file}") if os.path.exists(self.config_file): try: with open(self.config_file, 'r') as f: config = json.load(f) print(f"Loaded email config: {config}") return config except Exception as e: print(f"Erreur lors du chargement de la configuration email: {e}") # Configuration par défaut default_config = { "smtp_server": "smtp.gmail.com", "smtp_port": 587, "username": "", "password": "", "sender_name": "Suite Consultance", "sender_email": "" } print(f"Using default email config: {default_config}") return default_config def save_config(self, config): """Sauvegarde la configuration email""" config_dir = os.path.dirname(self.config_file) if not os.path.exists(config_dir): os.makedirs(config_dir) with open(self.config_file, 'w') as f: json.dump(config, f, indent=4) self.config = config return True def send_email(self, to_email, subject, body, cc=None, bcc=None): """Envoie un email à un destinataire""" if not self.config.get('username') or not self.config.get('password'): raise ValueError("La configuration email n'est pas complète") message = MIMEMultipart() message["From"] = f"{self.config.get('sender_name')} <{self.config.get('sender_email')}>" message["To"] = to_email message["Subject"] = subject if cc: message["Cc"] = ", ".join(cc) if isinstance(cc, list) else cc if bcc: message["Bcc"] = ", ".join(bcc) if isinstance(bcc, list) else bcc message.attach(MIMEText(body, "html")) try: server = smtplib.SMTP(self.config.get('smtp_server'), self.config.get('smtp_port')) server.starttls() server.login(self.config.get('username'), self.config.get('password')) recipients = [to_email] if cc: recipients.extend(cc if isinstance(cc, list) else [cc]) if bcc: recipients.extend(bcc if isinstance(bcc, list) else [bcc]) server.sendmail(self.config.get('sender_email'), recipients, message.as_string()) server.quit() return { "success": True, "timestamp": datetime.now().isoformat(), "to": to_email, "subject": subject } except Exception as e: return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } def send_templated_email(self, to_email, template_id, context=None, cc=None, bcc=None): """Envoie un email basé sur un template à un destinataire""" rendered = self.template_manager.render_template(template_id, context) if not rendered: return { "success": False, "error": "Template not found", "timestamp": datetime.now().isoformat() } return self.send_email(to_email, rendered['subject'], rendered['content'], cc, bcc) def send_bulk_email(self, emails, subject, body, cc=None, bcc=None): """Envoie le même email à plusieurs destinataires""" results = [] for email in emails: result = self.send_email(email, subject, body, cc, bcc) results.append({ "email": email, **result }) return results def send_bulk_templated_email(self, recipients, template_id, cc=None, bcc=None): """ Envoie un email basé sur un template à plusieurs destinataires recipients: liste de dictionnaires contenant l'email du destinataire et le contexte [{ "email": "example@example.com", "context": {"name": "John Doe", "company": "ACME Inc."} }] """ results = [] for recipient in recipients: email = recipient.get('email') context = recipient.get('context', {}) result = self.send_templated_email(email, template_id, context, cc, bcc) results.append({ "email": email, **result }) return results # ---------- Tracking helpers ---------- def _embed_tracking(self, html_body: str, tracking_id: str, prospect_id: str) -> str: """ Ajoute un pixel d'ouverture et réécrit les liens pour le click tracking. Utilise APP_BASE_URL si définie, sinon génère des liens relatifs. """ base = (os.environ.get("APP_BASE_URL") or "").rstrip("/") prefix = f"{base}/tasks/t" # routes de tracking montées sur le blueprint 'tasks' # Pixel d'ouverture (1x1 PNG) pixel = f'' body = html_body or "" # Injection du pixel avant la fermeture de body si possible if "" in body.lower(): # trouver la vraie balise en conservant la casse idx = body.lower().rfind("") body = body[:idx] + pixel + body[idx:] else: body = body + pixel # Réécriture des liens def _rewrite(match): url = match.group(1) # ignore si déjà tracké if "/tasks/t/c/" in url: return f'href="{url}"' tracked = f'{prefix}/c/{tracking_id}?u={quote_plus(url)}' return f'href="{tracked}"' body = re.sub(r'href="([^"]+)"', _rewrite, body) return body def send_tracked_email(self, to_email: str, subject: str, body: str, prospect_id: str, template_id: str = None, cc=None, bcc=None) -> Dict[str, Any]: """ Envoie un email avec tracking (open/click). Crée un enregistrement de tracking et insère un pixel + réécriture des liens. """ tracking_id = f"trk_{uuid4().hex[:16]}" # Créer l'enregistrement de tracking try: from modules.tracking.store import TrackingStore store = TrackingStore() store.create_record(tracking_id, { "prospect_id": prospect_id, "to": to_email, "subject": subject, "template_id": template_id, "opens": 0, "clicks": 0, }) except Exception: # même si le tracking store échoue, on tente d'envoyer l'email pass tracked_body = self._embed_tracking(body, tracking_id, prospect_id) result = self.send_email(to_email, subject, tracked_body, cc, bcc) result["tracking_id"] = tracking_id return result class EmailHistory: """Classe gérant l'historique des emails envoyés""" def __init__(self, history_folder="Data/email_history"): self.history_folder = history_folder # Créer le dossier d'historique s'il n'existe pas if not os.path.exists(self.history_folder): os.makedirs(self.history_folder) def add_email_record(self, prospect_id, email_data): """Ajoute un email à l'historique d'un prospect""" history_file = os.path.join(self.history_folder, f"{prospect_id}.json") # Charger l'historique existant history = [] if os.path.exists(history_file): try: with open(history_file, 'r') as f: history = json.load(f) except: history = [] # Ajouter le nouvel email à l'historique history.append({ **email_data, "timestamp": datetime.now().isoformat() }) # Sauvegarder l'historique with open(history_file, 'w') as f: json.dump(history, f, indent=4) return True def get_prospect_email_history(self, prospect_id): """Récupère l'historique des emails pour un prospect""" history_file = os.path.join(self.history_folder, f"{prospect_id}.json") if os.path.exists(history_file): try: with open(history_file, 'r') as f: return json.load(f) except: return [] return [] def get_all_email_history(self): """Récupère l'historique de tous les emails envoyés""" all_history = {} if os.path.exists(self.history_folder): for filename in os.listdir(self.history_folder): if filename.endswith('.json'): prospect_id = filename.split('.')[0] history_file = os.path.join(self.history_folder, filename) try: with open(history_file, 'r') as f: all_history[prospect_id] = json.load(f) except: all_history[prospect_id] = [] return all_history