SuiteConsultance/modules/email/email_manager.py
2025-09-20 13:18:04 +02:00

353 lines
13 KiB
Python

from typing import List, Dict, Any, Union
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import os
import json
import re
from urllib.parse import quote_plus
from uuid import uuid4
from core.data import Data
class EmailTemplate:
"""Classe gérant les templates d'emails"""
def __init__(self, template_folder="Data/email_templates"):
self.template_folder = template_folder
# Créer le dossier de templates s'il n'existe pas
if not os.path.exists(self.template_folder):
os.makedirs(self.template_folder)
def get_all_templates(self):
"""Retourne tous les templates disponibles"""
templates = []
if os.path.exists(self.template_folder):
for filename in os.listdir(self.template_folder):
if filename.endswith('.json'):
template_path = os.path.join(self.template_folder, filename)
try:
data_manager = Data(template_path)
template_data = data_manager.load_data()
templates.append(template_data)
except Exception as e:
print(f"Erreur lors du chargement du template {filename}: {e}")
return templates
def get_template_by_id(self, template_id):
"""Récupère un template par son ID"""
template_path = os.path.join(self.template_folder, f"{template_id}.json")
if os.path.exists(template_path):
data_manager = Data(template_path)
return data_manager.load_data()
return None
def save_template(self, template_data):
"""Sauvegarde un template d'email"""
template_id = template_data.get('id')
if not template_id:
# Générer un ID s'il n'existe pas
import uuid
template_id = f"tpl_{uuid.uuid4().hex[:8]}"
template_data['id'] = template_id
template_path = os.path.join(self.template_folder, f"{template_id}.json")
data_manager = Data(template_path)
data_manager.save_data(template_data)
return template_data
def delete_template(self, template_id):
"""Supprime un template d'email"""
template_path = os.path.join(self.template_folder, f"{template_id}.json")
if os.path.exists(template_path):
os.remove(template_path)
return True
return False
def render_template(self, template_id, context=None):
"""Rend un template avec les variables spécifiées dans le contexte"""
template = self.get_template_by_id(template_id)
if not template:
return None
subject = template.get('subject', '')
content = template.get('content', '')
# Remplacer les variables dans le sujet et le contenu
if context:
for key, value in context.items():
placeholder = f"{{{{{key}}}}}"
subject = subject.replace(placeholder, str(value))
content = content.replace(placeholder, str(value))
return {
"subject": subject,
"content": content
}
class EmailSender:
"""Classe gérant l'envoi d'emails"""
def __init__(self, config_file="config/email_config.json"):
# Ensure config_file is an absolute path
if not os.path.isabs(config_file):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self.config_file = os.path.join(base_dir, config_file)
else:
self.config_file = config_file
self.config = self._load_config()
self.template_manager = EmailTemplate()
def _load_config(self):
"""Charge la configuration email depuis le fichier de configuration"""
config_dir = os.path.dirname(self.config_file)
if not os.path.exists(config_dir):
os.makedirs(config_dir)
print(f"Loading email config from: {self.config_file}")
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
config = json.load(f)
print(f"Loaded email config: {config}")
return config
except Exception as e:
print(f"Erreur lors du chargement de la configuration email: {e}")
# Configuration par défaut
default_config = {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "",
"password": "",
"sender_name": "Suite Consultance",
"sender_email": ""
}
print(f"Using default email config: {default_config}")
return default_config
def save_config(self, config):
"""Sauvegarde la configuration email"""
config_dir = os.path.dirname(self.config_file)
if not os.path.exists(config_dir):
os.makedirs(config_dir)
with open(self.config_file, 'w') as f:
json.dump(config, f, indent=4)
self.config = config
return True
def send_email(self, to_email, subject, body, cc=None, bcc=None):
"""Envoie un email à un destinataire"""
if not self.config.get('username') or not self.config.get('password'):
raise ValueError("La configuration email n'est pas complète")
message = MIMEMultipart()
message["From"] = f"{self.config.get('sender_name')} <{self.config.get('sender_email')}>"
message["To"] = to_email
message["Subject"] = subject
if cc:
message["Cc"] = ", ".join(cc) if isinstance(cc, list) else cc
if bcc:
message["Bcc"] = ", ".join(bcc) if isinstance(bcc, list) else bcc
message.attach(MIMEText(body, "html"))
try:
server = smtplib.SMTP(self.config.get('smtp_server'), self.config.get('smtp_port'))
server.starttls()
server.login(self.config.get('username'), self.config.get('password'))
recipients = [to_email]
if cc:
recipients.extend(cc if isinstance(cc, list) else [cc])
if bcc:
recipients.extend(bcc if isinstance(bcc, list) else [bcc])
server.sendmail(self.config.get('sender_email'), recipients, message.as_string())
server.quit()
return {
"success": True,
"timestamp": datetime.now().isoformat(),
"to": to_email,
"subject": subject
}
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def send_templated_email(self, to_email, template_id, context=None, cc=None, bcc=None):
"""Envoie un email basé sur un template à un destinataire"""
rendered = self.template_manager.render_template(template_id, context)
if not rendered:
return {
"success": False,
"error": "Template not found",
"timestamp": datetime.now().isoformat()
}
return self.send_email(to_email, rendered['subject'], rendered['content'], cc, bcc)
def send_bulk_email(self, emails, subject, body, cc=None, bcc=None):
"""Envoie le même email à plusieurs destinataires"""
results = []
for email in emails:
result = self.send_email(email, subject, body, cc, bcc)
results.append({
"email": email,
**result
})
return results
def send_bulk_templated_email(self, recipients, template_id, cc=None, bcc=None):
"""
Envoie un email basé sur un template à plusieurs destinataires
recipients: liste de dictionnaires contenant l'email du destinataire et le contexte
[{
"email": "example@example.com",
"context": {"name": "John Doe", "company": "ACME Inc."}
}]
"""
results = []
for recipient in recipients:
email = recipient.get('email')
context = recipient.get('context', {})
result = self.send_templated_email(email, template_id, context, cc, bcc)
results.append({
"email": email,
**result
})
return results
# ---------- Tracking helpers ----------
def _embed_tracking(self, html_body: str, tracking_id: str, prospect_id: str) -> str:
"""
Ajoute un pixel d'ouverture et réécrit les liens pour le click tracking.
Utilise APP_BASE_URL si définie, sinon génère des liens relatifs.
"""
base = (os.environ.get("APP_BASE_URL") or "").rstrip("/")
prefix = f"{base}/tasks/t" # routes de tracking montées sur le blueprint 'tasks'
# Pixel d'ouverture (1x1 PNG)
pixel = f'<img src="{prefix}/o/{tracking_id}.png?pid={quote_plus(prospect_id)}" alt="" width="1" height="1" style="display:none;" />'
body = html_body or ""
# Injection du pixel avant la fermeture de body si possible
if "</body>" in body.lower():
# trouver la vraie balise en conservant la casse
idx = body.lower().rfind("</body>")
body = body[:idx] + pixel + body[idx:]
else:
body = body + pixel
# Réécriture des liens <a href="...">
def _rewrite(match):
url = match.group(1)
# ignore si déjà tracké
if "/tasks/t/c/" in url:
return f'href="{url}"'
tracked = f'{prefix}/c/{tracking_id}?u={quote_plus(url)}'
return f'href="{tracked}"'
body = re.sub(r'href="([^"]+)"', _rewrite, body)
return body
def send_tracked_email(self, to_email: str, subject: str, body: str, prospect_id: str, template_id: str = None, cc=None, bcc=None) -> Dict[str, Any]:
"""
Envoie un email avec tracking (open/click).
Crée un enregistrement de tracking et insère un pixel + réécriture des liens.
"""
tracking_id = f"trk_{uuid4().hex[:16]}"
# Créer l'enregistrement de tracking
try:
from modules.tracking.store import TrackingStore
store = TrackingStore()
store.create_record(tracking_id, {
"prospect_id": prospect_id,
"to": to_email,
"subject": subject,
"template_id": template_id,
"opens": 0,
"clicks": 0,
})
except Exception:
# même si le tracking store échoue, on tente d'envoyer l'email
pass
tracked_body = self._embed_tracking(body, tracking_id, prospect_id)
result = self.send_email(to_email, subject, tracked_body, cc, bcc)
result["tracking_id"] = tracking_id
return result
class EmailHistory:
"""Classe gérant l'historique des emails envoyés"""
def __init__(self, history_folder="Data/email_history"):
self.history_folder = history_folder
# Créer le dossier d'historique s'il n'existe pas
if not os.path.exists(self.history_folder):
os.makedirs(self.history_folder)
def add_email_record(self, prospect_id, email_data):
"""Ajoute un email à l'historique d'un prospect"""
history_file = os.path.join(self.history_folder, f"{prospect_id}.json")
# Charger l'historique existant
history = []
if os.path.exists(history_file):
try:
with open(history_file, 'r') as f:
history = json.load(f)
except:
history = []
# Ajouter le nouvel email à l'historique
history.append({
**email_data,
"timestamp": datetime.now().isoformat()
})
# Sauvegarder l'historique
with open(history_file, 'w') as f:
json.dump(history, f, indent=4)
return True
def get_prospect_email_history(self, prospect_id):
"""Récupère l'historique des emails pour un prospect"""
history_file = os.path.join(self.history_folder, f"{prospect_id}.json")
if os.path.exists(history_file):
try:
with open(history_file, 'r') as f:
return json.load(f)
except:
return []
return []
def get_all_email_history(self):
"""Récupère l'historique de tous les emails envoyés"""
all_history = {}
if os.path.exists(self.history_folder):
for filename in os.listdir(self.history_folder):
if filename.endswith('.json'):
prospect_id = filename.split('.')[0]
history_file = os.path.join(self.history_folder, filename)
try:
with open(history_file, 'r') as f:
all_history[prospect_id] = json.load(f)
except:
all_history[prospect_id] = []
return all_history