SuiteConsultance/app.py
2025-09-20 13:18:04 +02:00

1188 lines
48 KiB
Python

#!/usr/bin/env python3
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_from_directory
import os
import json
from werkzeug.utils import secure_filename
from datetime import datetime
from typing import List, Dict
# Import des modules existants
from core.form import Form
from core.generator import Generator
from core.data import Data
from modules.crm.handler import ClientHandler
from modules.crm.prospect_handler import ProspectHandler
from modules.proposition.fields import fields as proposition_fields
from modules.email.email_manager import EmailSender, EmailTemplate, EmailHistory
from modules.email.email_scraper import EmailScraper, EmailScrapingHistory
from datetime import datetime, date
from modules.projects.project_handler import ProjectHandler
from modules.projects.project import Project
from modules.projects.routes import projects_bp
# Création de l'application Flask
app = Flask(__name__)
app.register_blueprint(projects_bp)
app.secret_key = 'suite_consultance_secretkey' # Clé secrète pour Flask
# Ajouter un filtre personnalisé pour parser les dates ISO
@app.template_filter('parse_datetime')
def parse_datetime_filter(s):
"""Convertir une chaîne de date ISO en objet datetime"""
try:
return datetime.fromisoformat(s.replace('Z', '+00:00'))
except:
return datetime.now()
# Configuration des dossiers
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Data')
app.config['OUTPUT_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'output')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # Limite de taille pour les uploads (16MB)
# Initialisation des gestionnaires
client_handler = ClientHandler()
prospect_handler = ProspectHandler()
project_handler = ProjectHandler()
email_sender = EmailSender()
email_template = EmailTemplate()
email_history = EmailHistory()
email_scraper = EmailScraper()
email_scraping_history = EmailScrapingHistory()
# Filtres personnalisés pour Jinja2
@app.template_filter('datetime')
def format_datetime(value, format='%d/%m/%Y %H:%M'):
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return value
return value.strftime(format)
@app.template_filter('tojson')
def to_json(value):
import json
return json.dumps(value)
# Route principale - Page d'accueil
@app.route('/')
def index():
# Statistiques pour l'accueil
clients = client_handler.load_clients()
prospects = prospect_handler.load_prospects()
# Nombre de propositions commerciales
propositions_path = os.path.join(app.config['OUTPUT_FOLDER'], 'propositions')
propositions_count = 0
if os.path.exists(propositions_path):
propositions_count = len([f for f in os.listdir(propositions_path) if f.endswith('.pdf')])
# Nombre de devis
devis_path = os.path.join(app.config['OUTPUT_FOLDER'], 'devis')
devis_count = 0
if os.path.exists(devis_path):
devis_count = len([f for f in os.listdir(devis_path) if f.endswith('.pdf')])
# Répartition des prospects par statut
prospect_status = {}
for prospect in prospects:
status = prospect.status if hasattr(prospect, 'status') else 'Nouveau'
prospect_status[status] = prospect_status.get(status, 0) + 1
# Activités récentes (derniers prospects et clients)
recent_prospects = sorted(prospects, key=lambda p: p.last_contact if hasattr(p, 'last_contact') else "", reverse=True)[:5]
# Liste des derniers devis et propositions
recent_documents = []
if os.path.exists(propositions_path):
for filename in os.listdir(propositions_path):
if filename.endswith('.pdf'):
client_name = filename.split('_proposition')[0].replace('_', ' ').title()
date_created = datetime.fromtimestamp(os.path.getctime(os.path.join(propositions_path, filename)))
recent_documents.append({
'type': 'Proposition',
'client': client_name,
'date': date_created,
'filename': filename
})
if os.path.exists(devis_path):
for filename in os.listdir(devis_path):
if filename.endswith('.pdf'):
client_name = filename.split('_devis')[0].replace('_', ' ').title()
date_created = datetime.fromtimestamp(os.path.getctime(os.path.join(devis_path, filename)))
recent_documents.append({
'type': 'Devis',
'client': client_name,
'date': date_created,
'filename': filename
})
# Trier les documents par date (les plus récents d'abord)
recent_documents = sorted(recent_documents, key=lambda d: d['date'], reverse=True)[:5]
return render_template('index.html',
clients_count=len(clients),
prospects_count=len(prospects),
propositions_count=propositions_count,
devis_count=devis_count,
prospect_status=prospect_status,
recent_prospects=recent_prospects,
recent_documents=recent_documents)
# Routes pour le CRM
@app.route('/crm')
def crm():
clients = client_handler.load_clients()
prospects = prospect_handler.load_prospects()
# Statistiques pour le tableau de bord
dashboard_stats = {
'total_clients': len(clients),
'total_prospects': len(prospects),
'prospect_status': {}
}
# Compter les statuts des prospects
for prospect in prospects:
status = prospect.status if hasattr(prospect, 'status') else 'Nouveau'
dashboard_stats['prospect_status'][status] = dashboard_stats['prospect_status'].get(status, 0) + 1
return render_template('crm/crm.html', clients=clients, prospects=prospects, dashboard_stats=dashboard_stats)
@app.route('/client/<client_id>')
def client_details(client_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if os.path.exists(client_path):
data_manager = Data(client_path)
client_data = data_manager.load_data()
return render_template('crm/client_details.html', client=client_data)
else:
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
@app.route('/client/<client_id>/edit', methods=['GET', 'POST'])
def edit_client(client_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
data_manager = Data(client_path)
client_data = data_manager.load_data()
if request.method == 'POST':
# Récupérer les données du formulaire
client_data.update({
'client_name': request.form.get('client_name'),
'email': request.form.get('email'),
'telephone': request.form.get('telephone'),
'adresse': request.form.get('adresse'),
'project_name': request.form.get('project_name'),
'project_type': request.form.get('project_type'),
'deadline': request.form.get('deadline'),
'project_description': request.form.get('project_description'),
'budget': request.form.get('budget'),
'payment_terms': request.form.get('payment_terms'),
'contact_info': request.form.get('contact_info'),
'additional_info': request.form.get('additional_info')
})
# Traitement des fonctionnalités
features = request.form.get('features', '').split(',')
client_data['features'] = [{"description": feature.strip()} for feature in features if feature.strip()]
# Validation des champs requis
if not client_data['client_name']:
flash('Le nom du client est obligatoire', 'error')
return render_template('crm/edit_client.html', client=client_data)
# Sauvegarder les modifications
data_manager.save_data(client_data)
flash('Client mis à jour avec succès', 'success')
return redirect(url_for('client_details', client_id=client_id))
return render_template('crm/edit_client.html', client=client_data)
@app.route('/client/<client_id>/delete', methods=['POST', 'GET'])
def delete_client(client_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
try:
# Delete client file
os.remove(client_path)
# Delete associated projects
project_handler.delete_all_client_projects(client_id)
# Delete associated documents
client_propositions = os.path.join(app.config['OUTPUT_FOLDER'], 'propositions', f"{client_id}_*.pdf")
client_devis = os.path.join(app.config['OUTPUT_FOLDER'], 'devis', f"{client_id}_*.pdf")
for file_pattern in [client_propositions, client_devis]:
for file in glob.glob(file_pattern):
try:
os.remove(file)
except:
pass
flash('Client supprimé avec succès', 'success')
except Exception as e:
flash(f'Erreur lors de la suppression du client: {str(e)}', 'error')
return redirect(url_for('crm'))
# --------- Projets par client ---------
@app.route('/client/<client_id>/projects')
def list_client_projects(client_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
client_data = Data(client_path).load_data()
projects = project_handler.list_projects(client_id)
return render_template('projects/list_projects.html', client_id=client_id, client_name=client_data.get('client_name', client_id), projects=projects)
@app.route('/client/<client_id>/projects/add', methods=['GET', 'POST'])
def add_client_project(client_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
client_data = Data(client_path).load_data()
if request.method == 'POST':
name = request.form.get('name')
status = request.form.get('status', 'Nouveau')
start_date = request.form.get('start_date') or None
end_date = request.form.get('end_date') or None
description = request.form.get('description', '')
budget_raw = request.form.get('budget', '').strip()
budget = float(budget_raw.replace(',', '.')) if budget_raw else None
if not name:
flash('Le nom du projet est obligatoire', 'error')
return render_template('projects/edit_project.html', client_id=client_id, client_name=client_data.get('client_name', client_id), project=None)
project = Project(
client_id=client_id,
name=name,
status=status,
start_date=start_date,
end_date=end_date,
description=description,
budget=budget
)
project_handler.add_project(project)
flash('Projet créé avec succès', 'success')
return redirect(url_for('list_client_projects', client_id=client_id))
return render_template('projects/edit_project.html', client_id=client_id, client_name=client_data.get('client_name', client_id), project=None)
@app.route('/client/<client_id>/projects/<project_id>')
def project_details(client_id, project_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
project = project_handler.get_project(client_id, project_id)
if not project:
flash('Projet non trouvé', 'error')
return redirect(url_for('list_client_projects', client_id=client_id))
client_name = Data(client_path).load_data().get('client_name', client_id)
return render_template('projects/project_details.html', client_id=client_id, client_name=client_name, project=project)
@app.route('/client/<client_id>/projects/<project_id>/edit', methods=['GET', 'POST'])
def edit_client_project(client_id, project_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
project = project_handler.get_project(client_id, project_id)
if not project:
flash('Projet non trouvé', 'error')
return redirect(url_for('list_client_projects', client_id=client_id))
if request.method == 'POST':
project.name = request.form.get('name')
project.status = request.form.get('status', project.status)
project.start_date = request.form.get('start_date') or None
project.end_date = request.form.get('end_date') or None
project.description = request.form.get('description', '')
budget_raw = request.form.get('budget', '').strip()
project.budget = float(budget_raw.replace(',', '.')) if budget_raw else None
if not project.name:
flash('Le nom du projet est obligatoire', 'error')
client_name = Data(client_path).load_data().get('client_name', client_id)
return render_template('projects/edit_project.html', client_id=client_id, client_name=client_name, project=project)
project_handler.update_project(project)
flash('Projet mis à jour avec succès', 'success')
return redirect(url_for('project_details', client_id=client_id, project_id=project.id))
client_name = Data(client_path).load_data().get('client_name', client_id)
return render_template('projects/edit_project.html', client_id=client_id, client_name=client_name, project=project)
@app.route('/client/<client_id>/projects/<project_id>/delete', methods=['POST', 'GET'])
def delete_client_project(client_id, project_id):
client_path = os.path.join(app.config['UPLOAD_FOLDER'], 'clients', f"{client_id}.json")
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('crm'))
ok = project_handler.delete_project(client_id, project_id)
if ok:
flash('Projet supprimé avec succès', 'success')
else:
flash('Erreur lors de la suppression du projet', 'error')
return redirect(url_for('list_client_projects', client_id=client_id))
# --------- Fin projets ---------
@app.route('/client/add', methods=['GET', 'POST'])
def add_client():
if request.method == 'POST':
client_data = {
'client_name': request.form.get('client_name'),
'email': request.form.get('email'),
'telephone': request.form.get('telephone'),
'adresse': request.form.get('adresse'),
'project_name': request.form.get('project_name'),
'project_type': request.form.get('project_type'),
'deadline': request.form.get('deadline'),
'project_description': request.form.get('project_description'),
'budget': request.form.get('budget'),
'payment_terms': request.form.get('payment_terms'),
'contact_info': request.form.get('contact_info'),
'additional_info': request.form.get('additional_info')
}
# Traitement des fonctionnalités
features = request.form.get('features', '').split(',')
client_data['features'] = [{"description": feature.strip()} for feature in features if feature.strip()]
# Sauvegarde du client
client_name = client_data['client_name'].replace(' ', '_').lower()
data_manager = Data(f"Data/clients/{client_name}.json")
data_manager.save_data(client_data)
flash('Client ajouté avec succès', 'success')
return redirect(url_for('crm'))
return render_template('crm/add_client.html', fields=proposition_fields())
# Routes pour les prospects
@app.route('/prospect/add', methods=['GET', 'POST'])
def add_prospect():
from datetime import date
if request.method == 'POST':
# Récupérer les données du formulaire
name = request.form.get('name')
company = request.form.get('company', '')
email = request.form.get('email', '')
phone = request.form.get('phone', '')
source = request.form.get('source', '')
notes = request.form.get('notes', '')
status = request.form.get('status', 'Nouveau')
# Traitement des tags
tags = request.form.get('tags', '').split(',')
tags = [tag.strip() for tag in tags if tag.strip()]
last_contact = request.form.get('last_contact', str(date.today()))
next_action = request.form.get('next_action', '')
# Créer un objet Prospect
from modules.crm.prospect import Prospect
prospect = Prospect(
name=name,
company=company,
email=email,
phone=phone,
source=source,
notes=notes,
status=status,
tags=tags,
last_contact=last_contact,
next_action=next_action
)
# Ajouter le prospect
prospect_handler.add_prospect(prospect)
flash('Prospect ajouté avec succès', 'success')
return redirect(url_for('crm'))
# Pour l'affichage du formulaire, on passe la date du jour
today = date.today().strftime('%Y-%m-%d')
return render_template('crm/add_prospect.html', today=today)
@app.route('/prospect/<prospect_id>')
def prospect_details(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if prospect:
# Récupérer l'historique des emails
emails = email_history.get_prospect_email_history(prospect_id)
return render_template('crm/prospect_details.html', prospect=prospect, email_history=emails)
else:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
@app.route('/prospect/edit/<prospect_id>', methods=['GET', 'POST'])
def edit_prospect(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if not prospect:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
if request.method == 'POST':
# Mettre à jour les données du prospect
prospect.name = request.form.get('name')
prospect.company = request.form.get('company', '')
prospect.email = request.form.get('email', '')
prospect.phone = request.form.get('phone', '')
prospect.source = request.form.get('source', '')
prospect.notes = request.form.get('notes', '')
prospect.status = request.form.get('status', 'Nouveau')
# Traitement des tags
tags = request.form.get('tags', '').split(',')
prospect.tags = [tag.strip() for tag in tags if tag.strip()]
prospect.last_contact = request.form.get('last_contact', prospect.last_contact)
prospect.next_action = request.form.get('next_action', '')
# Mettre à jour le prospect
prospect_handler.update_prospect(prospect)
flash('Prospect mis à jour avec succès', 'success')
return redirect(url_for('prospect_details', prospect_id=prospect_id))
return render_template('crm/edit_prospect.html', prospect=prospect)
@app.route('/prospect/delete/<prospect_id>')
def delete_prospect(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if prospect:
prospect_handler.delete_prospect(prospect_id)
flash('Prospect supprimé avec succès', 'success')
else:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
@app.route('/prospect/convert/<prospect_id>')
def convert_prospect(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if prospect:
# Convertir le prospect en client
from modules.crm.client import Client
# Créer un dictionnaire de données client à partir du prospect
client_data = {
"client_name": prospect.name,
"email": prospect.email,
"telephone": prospect.phone,
"adresse": "",
"project_name": prospect.company,
"project_type": "",
"project_description": prospect.notes,
"additional_info": f"Converti depuis le prospect {prospect.id}. Tags: {', '.join(prospect.tags)}"
}
# Sauvegarder le client
client_name = client_data['client_name'].replace(' ', '_').lower()
data_manager = Data(f"Data/clients/{client_name}.json")
data_manager.save_data(client_data)
# Supprimer le prospect
prospect_handler.delete_prospect(prospect_id)
flash('Prospect converti en client avec succès', 'success')
else:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
# Routes pour les propositions commerciales
@app.route('/propositions')
def propositions():
# Liste des propositions existantes
propositions_path = os.path.join(app.config['OUTPUT_FOLDER'], 'propositions')
propositions = []
if os.path.exists(propositions_path):
for filename in os.listdir(propositions_path):
if filename.endswith('.pdf'):
client_name = filename.split('_proposition')[0].replace('_', ' ').title()
date_created = datetime.fromtimestamp(os.path.getctime(os.path.join(propositions_path, filename)))
propositions.append({
'filename': filename,
'client_name': client_name,
'date': date_created.strftime('%d/%m/%Y')
})
return render_template('propositions/propositions.html', propositions=propositions)
@app.route('/proposition/create', methods=['GET', 'POST'])
def create_proposition():
if request.method == 'POST':
# Récupérer les données du formulaire pour tous les champs
form_data = {
'client_name': request.form.get('client_name', ''),
'email': request.form.get('email', ''),
'telephone': request.form.get('telephone', ''),
'adresse': request.form.get('adresse', ''),
'project_name': request.form.get('project_name', ''),
'project_type': request.form.get('project_type', ''),
'deadline': request.form.get('deadline', ''),
'project_description': request.form.get('project_description', ''),
'budget': request.form.get('budget', ''),
'payment_terms': request.form.get('payment_terms', ''),
'contact_info': request.form.get('contact_info', ''),
'additional_info': request.form.get('additional_info', ''),
'features': request.form.get('features', '')
}
# Validation des champs obligatoires
if not form_data['client_name'] or not form_data['project_name']:
flash('Veuillez remplir tous les champs obligatoires', 'error')
return render_template('propositions/create_proposition.html', fields=proposition_fields(), form_data=form_data)
# Traitement des fonctionnalités
features = form_data.get('features', '').split(',')
form_data['features'] = [{"description": feature.strip()} for feature in features if feature.strip()]
# Sauvegarde du client
client_name = form_data['client_name'].replace(' ', '_').lower()
data_manager = Data(f"Data/clients/{client_name}.json")
data_manager.save_data(form_data)
# Génération de la proposition
generator = Generator(form_data)
generator.generate_pdf("propositions")
# Option: créer un projet associé si demandé
create_project_flag = request.form.get('create_project')
if create_project_flag:
# Utiliser les champs de la proposition pour initialiser le projet
proj_name = request.form.get('project_name') or f"Projet - {form_data.get('client_name', client_name)}"
description = form_data.get('project_description', '')
# Convertir le budget si fourni
budget_val = None
try:
if form_data.get('budget'):
budget_val = float(str(form_data['budget']).replace(',', '.'))
except Exception:
budget_val = None
try:
project = Project(
client_id=client_name,
name=proj_name,
status='En cours',
start_date=None,
end_date=form_data.get('deadline') or None,
description=description,
budget=budget_val
)
project_handler.add_project(project)
flash('Projet créé avec succès depuis la proposition', 'info')
except Exception as e:
flash(f'Création du projet échouée: {e}', 'warning')
flash('Proposition créée avec succès', 'success')
return redirect(url_for('propositions'))
return render_template('propositions/create_proposition.html', fields=proposition_fields())
# Routes pour les devis
@app.route('/devis')
def devis():
# Liste des devis existants
devis_path = os.path.join(app.config['OUTPUT_FOLDER'], 'devis')
devis_list = []
if os.path.exists(devis_path):
for filename in os.listdir(devis_path):
if filename.endswith('.pdf'):
client_name = filename.split('_devis')[0].replace('_', ' ').title()
date_created = datetime.fromtimestamp(os.path.getctime(os.path.join(devis_path, filename)))
devis_list.append({
'filename': filename,
'client_name': client_name,
'date': date_created.strftime('%d/%m/%Y')
})
return render_template('devis/devis.html', devis=devis_list)
@app.route('/devis/create', methods=['GET', 'POST'])
def create_devis():
if request.method == 'POST':
client_name = request.form.get('client_name', '').replace(' ', '_').lower()
client_path = f"Data/clients/{client_name}.json"
# Vérifier si le client existe
if not os.path.exists(client_path):
flash('Client non trouvé', 'error')
return redirect(url_for('create_devis'))
# Charger les données du client
data_manager = Data(client_path)
client_data = data_manager.load_data()
# Données du devis
devis_data = client_data.copy()
devis_data['numero'] = request.form.get('numero', '')
# Génération du devis
generator = Generator(devis_data)
content = generator.generate_pdf("devis")
# Option: créer un projet associé si demandé
create_project_flag = request.form.get('create_project')
if create_project_flag:
# Nom du projet: champ explicite sinon reprendre le nom de projet du client ou fallback
proj_name = request.form.get('project_name') or client_data.get('project_name') or f"Projet - {client_data.get('client_name', client_name)}"
# Description minimale liée au devis
description = f"Projet créé automatiquement via devis {devis_data.get('numero', '')}."
try:
project = Project(
client_id=client_name,
name=proj_name,
status='En cours',
description=description
)
project_handler.add_project(project)
flash('Projet créé avec succès depuis le devis', 'info')
except Exception as e:
flash(f'Création du projet échouée: {e}', 'warning')
flash('Devis créé avec succès', 'success')
return redirect(url_for('devis'))
# Liste des clients pour sélection
clients = []
client_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'clients')
if os.path.exists(client_folder):
for filename in os.listdir(client_folder):
if filename.endswith('.json'):
client_name = filename.split('.')[0].replace('_', ' ').title()
clients.append({
'id': filename.split('.')[0],
'name': client_name
})
return render_template('devis/create_devis.html', clients=clients)
# Route pour télécharger les fichiers PDF
@app.route('/download/<path:filename>')
def download_file(filename):
directory = os.path.dirname(filename)
file = os.path.basename(filename)
return send_from_directory(os.path.join(app.config['OUTPUT_FOLDER'], directory), file, as_attachment=True)
# Ajout de route pour la génération de données de test
@app.route('/generate-test-data')
def generate_test_data():
# Fonction pour générer des données factices
from main import fake_data
fake_data()
flash('Données de test générées avec succès', 'success')
return redirect(url_for('index'))
# Routes pour les emails
@app.route('/prospect/<prospect_id>/email', methods=['GET', 'POST'])
def send_prospect_email(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if not prospect:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
templates = email_template.get_all_templates()
if request.method == 'POST':
to_email = request.form.get('to_email')
subject = request.form.get('subject')
body = request.form.get('body')
template_id = request.form.get('template_id')
update_status = request.form.get('update_status') == 'on'
new_status = request.form.get('new_status')
# Vérifier les données obligatoires
if not to_email or not subject or not body:
flash('Veuillez remplir tous les champs obligatoires', 'error')
return render_template('email/send_email.html', prospect=prospect, templates=templates)
# Envoyer l'email
if template_id:
# Contexte pour le template
context = {
'name': prospect.name,
'company': prospect.company,
'email': prospect.email,
'phone': prospect.phone
}
result = email_sender.send_templated_email(to_email, template_id, context)
else:
result = email_sender.send_email(to_email, subject, body)
# Enregistrer l'historique
email_data = {
'to': to_email,
'subject': subject,
'content': body,
'success': result.get('success', False),
'error': result.get('error', None)
}
email_history.add_email_record(prospect_id, email_data)
# Mettre à jour le statut du prospect si demandé
if update_status and result.get('success', False):
prospect.status = new_status
prospect.last_contact = str(date.today())
prospect_handler.update_prospect(prospect)
if result.get('success', False):
flash('Email envoyé avec succès', 'success')
else:
flash(f'Erreur lors de l\'envoi de l\'email: {result.get("error", "Erreur inconnue")}', 'error')
return redirect(url_for('prospect_details', prospect_id=prospect_id))
return render_template('email/send_email.html', prospect=prospect, templates=templates)
@app.route('/prospect/<prospect_id>/email/history')
def prospect_email_history(prospect_id):
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if not prospect:
flash('Prospect non trouvé', 'error')
return redirect(url_for('crm'))
emails = email_history.get_prospect_email_history(prospect_id)
return render_template('email/email_history.html', prospect=prospect, emails=emails)
@app.route('/email/bulk', methods=['GET', 'POST'])
def send_bulk_email():
prospects = prospect_handler.load_prospects()
templates = email_template.get_all_templates()
if request.method == 'POST':
prospect_ids = request.form.getlist('prospect_ids[]')
subject = request.form.get('subject')
body = request.form.get('body')
template_id = request.form.get('template_id')
update_status = request.form.get('update_status') == 'on'
new_status = request.form.get('new_status')
# Vérifier les données obligatoires
if not prospect_ids or not subject or not body:
flash('Veuillez remplir tous les champs obligatoires et sélectionner au moins un prospect', 'error')
return render_template('email/bulk_email.html', prospects=prospects, templates=templates)
# Préparer les destinataires
success_count = 0
error_count = 0
for prospect_id in prospect_ids:
prospect = prospect_handler.get_prospect_by_id(prospect_id)
if not prospect or not prospect.email:
error_count += 1
continue
# Contexte pour personnaliser l'email
context = {
'name': prospect.name,
'company': prospect.company,
'email': prospect.email,
'phone': prospect.phone
}
# Remplacer les variables dans le sujet et le corps
personalized_subject = subject
personalized_body = body
for key, value in context.items():
placeholder = f"{{{{{key}}}}}"
personalized_subject = personalized_subject.replace(placeholder, str(value) if value else "")
personalized_body = personalized_body.replace(placeholder, str(value) if value else "")
# Envoyer l'email
if template_id:
result = email_sender.send_templated_email(prospect.email, template_id, context)
else:
result = email_sender.send_email(prospect.email, personalized_subject, personalized_body)
# Enregistrer l'historique
email_data = {
'to': prospect.email,
'subject': personalized_subject,
'content': personalized_body,
'success': result.get('success', False),
'error': result.get('error', None)
}
email_history.add_email_record(prospect_id, email_data)
# Mettre à jour le statut du prospect si demandé
if update_status and result.get('success', False):
prospect.status = new_status
prospect.last_contact = str(date.today())
prospect_handler.update_prospect(prospect)
if result.get('success', False):
success_count += 1
else:
error_count += 1
if success_count > 0:
flash(f'Emails envoyés avec succès à {success_count} prospect(s)', 'success')
if error_count > 0:
flash(f'Échec d\'envoi pour {error_count} prospect(s)', 'warning')
return redirect(url_for('crm'))
return render_template('email/bulk_email.html', prospects=prospects, templates=templates)
@app.route('/email/templates')
def email_templates():
templates = email_template.get_all_templates()
return render_template('email/templates.html', templates=templates)
@app.route('/email/template/create', methods=['GET', 'POST'])
def create_email_template():
if request.method == 'POST':
template_data = {
'name': request.form.get('name'),
'description': request.form.get('description', ''),
'subject': request.form.get('subject'),
'content': request.form.get('content')
}
# Vérifier les données obligatoires
if not template_data['name'] or not template_data['subject'] or not template_data['content']:
flash('Veuillez remplir tous les champs obligatoires', 'error')
return render_template('email/edit_template.html', template=template_data)
# Enregistrer le template
email_template.save_template(template_data)
flash('Template d\'email créé avec succès', 'success')
return redirect(url_for('email_templates'))
return render_template('email/edit_template.html', template=None)
@app.route('/email/template/edit/<template_id>', methods=['GET', 'POST'])
def edit_email_template(template_id):
template = email_template.get_template_by_id(template_id)
if not template:
flash('Template non trouvé', 'error')
return redirect(url_for('email_templates'))
if request.method == 'POST':
template_data = {
'id': template_id,
'name': request.form.get('name'),
'description': request.form.get('description', ''),
'subject': request.form.get('subject'),
'content': request.form.get('content')
}
# Vérifier les données obligatoires
if not template_data['name'] or not template_data['subject'] or not template_data['content']:
flash('Veuillez remplir tous les champs obligatoires', 'error')
return render_template('email/edit_template.html', template=template_data)
# Enregistrer le template
email_template.save_template(template_data)
flash('Template d\'email mis à jour avec succès', 'success')
return redirect(url_for('email_templates'))
return render_template('email/edit_template.html', template=template)
@app.route('/email/template/delete/<template_id>')
def delete_email_template(template_id):
if email_template.delete_template(template_id):
flash('Template d\'email supprimé avec succès', 'success')
else:
flash('Erreur lors de la suppression du template', 'error')
return redirect(url_for('email_templates'))
@app.route('/email/config', methods=['GET', 'POST'])
def email_config():
config = email_sender._load_config()
if request.method == 'POST':
config_data = {
'smtp_server': request.form.get('smtp_server'),
'smtp_port': int(request.form.get('smtp_port')),
'username': request.form.get('username'),
'password': request.form.get('password'),
'sender_name': request.form.get('sender_name'),
'sender_email': request.form.get('sender_email')
}
# Vérifier les données obligatoires
for key, value in config_data.items():
if not value and key != 'password':
flash('Veuillez remplir tous les champs obligatoires', 'error')
return render_template('email/config.html', config=config_data)
# Si le mot de passe est vide, conserver l'ancien
if not config_data['password'] and 'password' in config:
config_data['password'] = config['password']
# Enregistrer la configuration
email_sender.save_config(config_data)
flash('Configuration email enregistrée avec succès', 'success')
return redirect(url_for('email_config'))
return render_template('email/config.html', config=config)
# Routes pour le scrapping d'emails
@app.route('/email/scraper')
def email_scraper_page():
scrapings = email_scraping_history.get_all_scrapings()
return render_template('email/scraper.html', scrapings=scrapings)
@app.route('/email/scraper/new', methods=['GET', 'POST'])
def new_email_scraping():
if request.method == 'POST':
url = request.form.get('url')
max_pages = int(request.form.get('max_pages', 10))
auto_create_prospects = request.form.get('auto_create_prospects') == 'on'
# Validation
if not url:
flash('Veuillez entrer une URL', 'error')
return render_template('email/new_scraping.html')
try:
# Lancer le scrapping
results = email_scraper.scrape_page(url, max_pages)
# Sauvegarder les résultats
filename = email_scraper.save_results(results)
# Créer automatiquement des prospects si demandé
if auto_create_prospects and results['contacts']:
created_count, existing_count = _create_prospects_from_contacts(results['contacts'], url)
if created_count > 0:
flash(f'{created_count} prospect(s) créé(s) automatiquement', 'info')
if existing_count > 0:
flash(f'{existing_count} email(s) déjà existant(s) ignoré(s)', 'info')
flash(f'Scraping terminé: {len(results["contacts"])} contact(s) trouvé(s) sur {len(results["pages_scraped"])} page(s)', 'success')
return redirect(url_for('scraping_results', filename=os.path.basename(filename)))
except Exception as e:
flash(f'Erreur lors du scraping: {str(e)}', 'error')
return render_template('email/new_scraping.html')
return render_template('email/new_scraping.html')
@app.route('/email/scraper/results/<filename>')
def scraping_results(filename):
results = email_scraping_history.get_scraping_details(filename)
if not results:
flash('Résultats de scraping non trouvés', 'error')
return redirect(url_for('email_scraper_page'))
return render_template('email/scraping_results.html', results=results, filename=filename)
@app.route('/email/scraper/delete/<filename>')
def delete_scraping(filename):
if email_scraping_history.delete_scraping(filename):
flash('Résultats de scraping supprimés avec succès', 'success')
else:
flash('Erreur lors de la suppression', 'error')
return redirect(url_for('email_scraper_page'))
@app.route('/api/email_template/<template_id>', methods=['GET'])
def api_get_email_template(template_id):
"""API pour récupérer un template d'email par son ID"""
try:
template = email_template.get_template_by_id(template_id)
if template:
return jsonify({
'success': True,
'template': {
'id': template.get('id'),
'name': template.get('name'),
'subject': template.get('subject'),
'content': template.get('content'),
'description': template.get('description')
}
})
else:
return jsonify({
'success': False,
'error': 'Template non trouvé'
}), 404
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de la récupération du template: {str(e)}'
}), 500
@app.route('/api/scraping/create_prospects', methods=['POST'])
def api_create_prospects_from_scraping():
"""API pour créer des prospects à partir des contacts d'un scraping"""
data = request.json
filename = data.get('filename')
selected_contacts = data.get('contacts', [])
if not filename or not selected_contacts:
return jsonify({'success': False, 'error': 'Données manquantes'})
results = email_scraping_history.get_scraping_details(filename)
if not results:
return jsonify({'success': False, 'error': 'Résultats de scraping non trouvés'})
# Filtrer les contacts sélectionnés
contacts_to_create = []
for contact_email in selected_contacts:
contact = next((c for c in results['contacts'] if c['email'] == contact_email), None)
if contact:
contacts_to_create.append(contact)
created_count, existing_count = _create_prospects_from_contacts(contacts_to_create, results['url'])
return jsonify({
'success': True,
'created': created_count,
'existing': existing_count
})
# API pour tester la configuration email
@app.route('/api/test_email_config', methods=['POST'])
def api_test_email_config():
config_data = request.json
# Créer un EmailSender temporaire avec la configuration de test
temp_sender = EmailSender()
temp_sender.config = config_data
try:
# Envoyer un email de test
result = temp_sender.send_email(
config_data['username'],
'Test de configuration email',
'<p>Ceci est un email de test pour vérifier la configuration SMTP.</p>'
)
if result.get('success', False):
return jsonify({
'success': True
})
else:
return jsonify({
'success': False,
'error': result.get('error', 'Erreur inconnue')
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
def _create_prospects_from_contacts(contacts: List[Dict], source_url: str) -> tuple:
"""
Créer des prospects à partir des contacts scrapés
Returns: (created_count, existing_count)
"""
created_count = 0
existing_count = 0
existing_prospects = prospect_handler.load_prospects()
existing_emails = {p.email.lower() for p in existing_prospects if p.email}
for contact in contacts:
email = contact.get('email', '').lower()
if not email:
continue
# Vérifier si le prospect existe déjà
if email in existing_emails:
existing_count += 1
continue
# Préparer les données du prospect
name = contact.get('name', '')
if not name and contact.get('first_name'):
name = f"{contact.get('first_name', '')} {contact.get('last_name', '')}".strip()
if not name:
# Générer un nom à partir de l'email
local_part = email.split('@')[0]
name = local_part.replace('.', ' ').replace('_', ' ').title()
company = contact.get('company', '')
if not company:
# Générer le nom d'entreprise à partir du domaine
domain = email.split('@')[1]
company = domain.split('.')[0].title()
# Préparer les notes
notes_parts = [f"Contact trouvé automatiquement lors du scraping de {source_url}"]
if contact.get('location'):
notes_parts.append(f"Localisation: {contact['location']}")
if contact.get('source_url') and contact['source_url'] != source_url:
notes_parts.append(f"Page source: {contact['source_url']}")
if contact.get('notes'):
notes_parts.append(contact['notes'])
notes = ' | '.join(notes_parts)
# Créer le prospect
from modules.crm.prospect import Prospect
prospect = Prospect(
name=name,
company=company,
email=email,
phone=contact.get('phone', ''),
source=f"Scraping web",
status='Nouveau',
notes=notes,
tags=['scraping', 'web']
)
prospect_handler.add_prospect(prospect)
existing_emails.add(email) # Ajouter à la liste pour éviter les doublons dans le même batch
created_count += 1
return created_count, existing_count
if __name__ == '__main__':
# Création des dossiers nécessaires s'ils n'existent pas
os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'clients'), exist_ok=True)
os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'prospects'), exist_ok=True)
os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'projects'), exist_ok=True)
os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'projects'), exist_ok=True)
os.makedirs(os.path.join(app.config['OUTPUT_FOLDER'], 'devis'), exist_ok=True)
os.makedirs(os.path.join(app.config['OUTPUT_FOLDER'], 'propositions'), exist_ok=True)
# Dossiers pour les emails
os.makedirs('Data/email_templates', exist_ok=True)
os.makedirs('Data/email_history', exist_ok=True)
os.makedirs('Data/email_scraping', exist_ok=True)
os.makedirs('config', exist_ok=True)
# Lancement de l'application en mode debug
app.run(debug=True, host='0.0.0.0', port=8888)