SuiteConsultance/modules/email/email_scraper.py
2025-09-20 13:18:04 +02:00

968 lines
37 KiB
Python

import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse
import time
from typing import List, Set, Dict
import json
import os
from datetime import datetime
class EmailScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
self.email_pattern = re.compile(r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}')
self.phone_pattern = re.compile(r'(?:\+32|0)\s?[1-9](?:[\s\-\.\/]?\d){8}|\+32\s?[1-9](?:[\s\-\.\/]?\d){8}|(?:\+33|0)[1-9](?:[\s\-\.\/]?\d){8}')
self.visited_urls = set()
self.found_emails = set()
self.contact_info = {}
def scrape_page(self, url: str, max_pages: int = 10) -> Dict:
"""
Scrape une page avec pagination pour extraire les données d'entreprises
"""
results = {
'url': url,
'contacts': [], # Liste des contacts avec email, nom, téléphone, etc.
'pages_scraped': [],
'errors': [],
'start_time': datetime.now().isoformat(),
'end_time': None,
'domain_info': {}
}
try:
self._scrape_with_pagination(url, results, max_pages)
self._extract_domain_info(url, results)
except Exception as e:
results['errors'].append(f"Erreur générale: {str(e)}")
results['end_time'] = datetime.now().isoformat()
return results
def _scrape_with_pagination(self, base_url: str, results: Dict, max_pages: int):
"""
Scraper avec gestion de la pagination
"""
current_page = 1
current_url = base_url
while current_page <= max_pages:
if current_url in self.visited_urls:
break
try:
# Normaliser l'URL
parsed_url = urlparse(current_url)
if not parsed_url.scheme:
current_url = 'https://' + current_url
self.visited_urls.add(current_url)
print(f"Scraping page {current_page}: {current_url}")
# Faire la requête
response = self.session.get(current_url, timeout=15)
response.raise_for_status()
# Parser le HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Extraire les entreprises/contacts de la page
page_contacts = self._extract_business_contacts(soup, response.text, current_url)
# Ajouter les contacts à la liste principale
for contact in page_contacts:
# Vérifier si ce contact existe déjà (par email)
existing_contact = next((c for c in results['contacts'] if c['email'] == contact['email']), None)
if existing_contact:
# Fusionner les informations si le contact existe
self._merge_contact_info(existing_contact, contact)
else:
results['contacts'].append(contact)
results['pages_scraped'].append({
'url': current_url,
'page_number': current_page,
'contacts_found': len(page_contacts),
'contacts': page_contacts,
'status': 'success',
'timestamp': datetime.now().isoformat()
})
print(f" - Page {current_page}: Trouvé {len(page_contacts)} contact(s)")
# Si aucun contact trouvé, peut-être qu'on a atteint la fin
if len(page_contacts) == 0:
print(f" - Aucun contact trouvé sur la page {current_page}, arrêt du scraping")
break
# Chercher le lien vers la page suivante
next_url = self._find_next_page_url(soup, current_url, current_page)
if not next_url:
print(f" - Pas de page suivante trouvée, arrêt du scraping")
break
current_url = next_url
current_page += 1
# Délai entre les pages pour éviter la surcharge
time.sleep(2)
except requests.exceptions.RequestException as e:
results['errors'].append(f"Erreur de requête pour la page {current_page} ({current_url}): {str(e)}")
results['pages_scraped'].append({
'url': current_url,
'page_number': current_page,
'contacts_found': 0,
'contacts': [],
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat()
})
break
except Exception as e:
results['errors'].append(f"Erreur lors du parsing de la page {current_page}: {str(e)}")
break
def _extract_business_contacts(self, soup: BeautifulSoup, text: str, page_url: str) -> List[Dict]:
"""
Extraire les informations d'entreprises d'une page (spécialisé pour les annuaires)
"""
contacts = []
# Chercher des conteneurs d'entreprises communs
business_containers = self._find_business_containers(soup)
if business_containers:
# Si on trouve des conteneurs structurés, les traiter
for container in business_containers:
contact = self._extract_contact_from_container(container, page_url)
if contact and contact.get('email'):
contacts.append(contact)
else:
# Fallback: extraction générale comme avant
contacts = self._extract_contact_info(soup, text, page_url)
return contacts
def _find_business_containers(self, soup: BeautifulSoup) -> List:
"""
Trouver les conteneurs qui contiennent probablement des informations d'entreprises
"""
containers = []
# Patterns communs pour les annuaires d'entreprises
business_selectors = [
# Classes/IDs communs
'[class*="business"]',
'[class*="company"]',
'[class*="enterprise"]',
'[class*="contact"]',
'[class*="listing"]',
'[class*="directory"]',
'[class*="card"]',
'[class*="item"]',
'[class*="entry"]',
'[class*="result"]',
# Balises sémantiques
'article',
'[itemtype*="Organization"]',
'[itemtype*="LocalBusiness"]',
# Structures de liste
'li[class*="business"]',
'li[class*="company"]',
'div[class*="row"]',
'div[class*="col"]'
]
for selector in business_selectors:
try:
elements = soup.select(selector)
for element in elements:
# Vérifier si l'élément contient des informations utiles
if self._container_has_business_info(element):
containers.append(element)
except:
continue
# Déduplication basée sur le contenu
unique_containers = []
for container in containers:
if not any(self._containers_are_similar(container, existing) for existing in unique_containers):
unique_containers.append(container)
return unique_containers[:50] # Limiter pour éviter la surcharge
def _container_has_business_info(self, container) -> bool:
"""
Vérifier si un conteneur a des informations d'entreprise
"""
text = container.get_text(strip=True).lower()
# Indicateurs d'informations d'entreprise
business_indicators = [
'@', 'email', 'mail', 'contact',
'tel', 'phone', 'telephone', 'gsm',
'rue', 'avenue', 'boulevard', 'place',
'www.', 'http', '.com', '.be', '.fr',
'sarl', 'sprl', 'sa', 'nv', 'bvba'
]
score = sum(1 for indicator in business_indicators if indicator in text)
return score >= 2 and len(text) > 20
def _containers_are_similar(self, container1, container2) -> bool:
"""
Vérifier si deux conteneurs sont similaires (pour éviter les doublons)
"""
text1 = container1.get_text(strip=True)
text2 = container2.get_text(strip=True)
# Si les textes sont identiques ou très similaires
if text1 == text2:
return True
# Si un conteneur est inclus dans l'autre
if len(text1) > len(text2):
return text2 in text1
else:
return text1 in text2
def _extract_contact_from_container(self, container, page_url: str) -> Dict:
"""
Extraire les informations de contact d'un conteneur spécifique
"""
contact = {
'email': '',
'name': '',
'first_name': '',
'last_name': '',
'company': '',
'phone': '',
'location': '',
'source_url': page_url,
'notes': ''
}
# Extraire l'email depuis les balises individuelles d'abord
email_found = False
# Chercher dans les liens mailto
mailto_links = container.find_all('a', href=re.compile(r'^mailto:', re.I))
if mailto_links:
href = mailto_links[0].get('href', '')
email_match = re.search(r'mailto:([^?&]+)', href, re.I)
if email_match and self._is_valid_email(email_match.group(1)):
contact['email'] = email_match.group(1).lower()
email_found = True
# Si pas trouvé dans mailto, chercher dans les balises individuelles
if not email_found:
for element in container.find_all(['p', 'div', 'span', 'td', 'li']):
element_text = element.get_text(strip=True)
# Ajouter des espaces autour des balises pour éviter la concaténation
element_text = ' ' + element_text + ' '
email_matches = self.email_pattern.findall(element_text)
if email_matches:
for email in email_matches:
email = email.strip()
if re.match(r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$', email) and self._is_valid_email(email):
contact['email'] = email.lower()
email_found = True
break
if email_found:
break
# Si toujours pas trouvé, chercher dans le texte global avec des patterns plus précis
if not email_found:
container_text = container.get_text(separator=' ', strip=True) # Utiliser un séparateur
# Patterns avec contexte pour éviter la capture parasite
context_patterns = [
r'(?:email|e-mail|mail|contact)\s*:?\s*([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})',
r'([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})(?=\s|$|[^\w.-])',
]
for pattern in context_patterns:
matches = re.findall(pattern, container_text, re.IGNORECASE)
if matches:
email = matches[0] if isinstance(matches[0], str) else matches[0][0] if matches[0] else ''
if email and self._is_valid_email(email):
contact['email'] = email.lower()
email_found = True
break
# Extraire le téléphone
container_text = container.get_text(separator=' ', strip=True)
phone_matches = self.phone_pattern.findall(container_text)
if phone_matches:
# Prendre le premier numéro et le nettoyer
phone = phone_matches[0]
# S'assurer qu'on n'a que des chiffres, espaces, tirets, points, slash et +
clean_phone = re.sub(r'[^0-9\s\-\.\/\+].*$', '', phone)
contact['phone'] = clean_phone.strip()
# Extraire le nom de l'entreprise
contact['company'] = self._extract_company_name(container, container_text)
# Extraire les noms de personnes
names = self._extract_person_names(container, container_text)
if names:
contact.update(names)
# Extraire la localisation
contact['location'] = self._extract_location_from_container(container, container_text)
# Enrichir avec des informations contextuelles
self._enhance_business_contact(contact, container, container_text)
return contact if contact['email'] or contact['company'] else None
def _extract_company_name(self, container, text: str) -> str:
"""
Extraire le nom de l'entreprise d'un conteneur
"""
# Chercher dans les balises title, h1-h6, strong, b
title_elements = container.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'strong', 'b', '[class*="title"]', '[class*="name"]', '[class*="company"]'])
for element in title_elements:
company_text = element.get_text(strip=True)
if len(company_text) > 2 and len(company_text) < 100:
# Éviter les textes trop génériques
if not any(generic in company_text.lower() for generic in ['accueil', 'contact', 'email', 'téléphone', 'adresse']):
return company_text
# Fallback: prendre la première ligne non-vide qui semble être un nom
lines = text.split('\n')
for line in lines[:3]: # Les 3 premières lignes
line = line.strip()
if len(line) > 2 and len(line) < 100 and not '@' in line and not any(char.isdigit() for char in line[:3]):
return line
return ''
def _extract_person_names(self, container, text: str) -> Dict:
"""
Extraire les noms de personnes
"""
names = {'name': '', 'first_name': '', 'last_name': ''}
# Patterns pour les noms de personnes
name_patterns = [
r'\b([A-Z][a-zÀ-ÿ]+)\s+([A-Z][a-zÀ-ÿ]+)\b', # Prénom Nom
r'\b([A-Z][A-Z]+)\s+([A-Z][a-zÀ-ÿ]+)\b', # NOM Prénom
]
# Chercher dans les balises spécifiques
name_elements = container.find_all(['[class*="name"]', '[class*="contact"]', '[class*="person"]'])
for element in name_elements:
element_text = element.get_text(strip=True)
for pattern in name_patterns:
match = re.search(pattern, element_text)
if match:
names['first_name'] = match.group(1)
names['last_name'] = match.group(2)
names['name'] = f"{names['first_name']} {names['last_name']}"
return names
# Si pas trouvé dans les balises, chercher dans le texte
for pattern in name_patterns:
match = re.search(pattern, text)
if match:
names['first_name'] = match.group(1)
names['last_name'] = match.group(2)
names['name'] = f"{names['first_name']} {names['last_name']}"
break
return names
def _extract_location_from_container(self, container, text: str) -> str:
"""
Extraire la localisation d'un conteneur
"""
# Chercher dans les balises d'adresse
address_elements = container.find_all(['address', '[class*="address"]', '[class*="location"]', '[class*="ville"]', '[class*="city"]'])
for element in address_elements:
location_text = element.get_text(strip=True)
if len(location_text) > 5:
return location_text
# Patterns pour les adresses belges/françaises
location_patterns = [
r'\b\d{4,5}\s+[A-Za-zÀ-ÿ\s\-]+\b', # Code postal + ville
r'\b[A-Za-zÀ-ÿ\s\-]+,\s*[A-Za-zÀ-ÿ\s\-]+\b', # Ville, Région/Pays
r'\b(?:rue|avenue|boulevard|place|chemin)\s+[A-Za-zÀ-ÿ\s\d\-,]+\b' # Adresse complète
]
for pattern in location_patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(0).strip()
return ''
def _enhance_business_contact(self, contact: Dict, container, text: str):
"""
Améliorer les informations de contact d'entreprise
"""
# Si pas de nom trouvé, essayer d'extraire depuis l'email
if not contact['name'] and contact['email']:
local_part = contact['email'].split('@')[0]
domain_part = contact['email'].split('@')[1]
if '.' in local_part:
parts = local_part.split('.')
contact['first_name'] = parts[0].title()
contact['last_name'] = parts[1].title() if len(parts) > 1 else ''
contact['name'] = f"{contact['first_name']} {contact['last_name']}".strip()
# Si pas d'entreprise, essayer de deviner depuis le domaine
if not contact['company']:
company_name = domain_part.split('.')[0]
contact['company'] = company_name.title()
# Enrichir les notes avec des informations contextuelles
notes_parts = []
# Chercher des informations sur l'activité
activity_patterns = [
r'(?i)\b(restaurant|café|boulangerie|pharmacie|garage|coiffeur|médecin|avocat|comptable|architecte|dentiste|vétérinaire|magasin|boutique|salon)\b',
r'(?i)\b(commerce|service|entreprise|société|bureau|cabinet|clinique|centre|institut)\b'
]
for pattern in activity_patterns:
matches = re.findall(pattern, text)
if matches:
notes_parts.append(f"Activité: {', '.join(set(matches))}")
break
# Chercher des horaires
horaires_pattern = r'(?i)(?:ouvert|fermé|horaires?)[:\s]*([^.!?\n]{10,50})'
horaires_match = re.search(horaires_pattern, text)
if horaires_match:
notes_parts.append(f"Horaires: {horaires_match.group(1).strip()}")
# Chercher un site web
website_pattern = r'\b(?:www\.)?[a-zA-Z0-9][a-zA-Z0-9-]*[a-zA-Z0-9]*\.(?:com|be|fr|org|net)\b'
website_match = re.search(website_pattern, text)
if website_match:
notes_parts.append(f"Site web: {website_match.group(0)}")
contact['notes'] = ' | '.join(notes_parts)
def _find_next_page_url(self, soup: BeautifulSoup, current_url: str, current_page: int) -> str:
"""
Trouver l'URL de la page suivante
"""
base_url = '/'.join(current_url.split('/')[:-1]) if '/' in current_url else current_url
# Patterns communs pour les liens de pagination
next_patterns = [
# Liens avec texte
'a[href]:contains("Suivant")',
'a[href]:contains("Next")',
'a[href]:contains(">")',
'a[href]:contains("Page suivante")',
# Liens avec classes
'a[class*="next"]',
'a[class*="suivant"]',
'a[class*="pagination"]',
# Numéros de page
f'a[href]:contains("{current_page + 1}")',
]
for pattern in next_patterns:
try:
links = soup.select(pattern)
for link in links:
href = link.get('href')
if href:
# Construire l'URL complète
if href.startswith('http'):
return href
elif href.startswith('/'):
parsed = urlparse(current_url)
return f"{parsed.scheme}://{parsed.netloc}{href}"
else:
return urljoin(current_url, href)
except:
continue
# Essayer de construire l'URL de la page suivante par pattern
# Pattern 1: ?page=X
if 'page=' in current_url:
return re.sub(r'page=\d+', f'page={current_page + 1}', current_url)
# Pattern 2: /pageX
if f'/page{current_page}' in current_url:
return current_url.replace(f'/page{current_page}', f'/page{current_page + 1}')
# Pattern 3: Ajouter ?page=2 si c'est la première page
if current_page == 1:
separator = '&' if '?' in current_url else '?'
return f"{current_url}{separator}page={current_page + 1}"
return None
def _extract_contact_info(self, soup: BeautifulSoup, text: str, page_url: str) -> List[Dict]:
"""
Extraire les informations de contact complètes d'une page
"""
contacts = []
# Extraire tous les emails
emails = set()
emails.update(self._extract_emails_from_text(text))
emails.update(self._extract_emails_from_links(soup))
# Extraire les numéros de téléphone
phones = self._extract_phone_numbers(text)
# Extraire les noms et entreprises depuis les balises structurées
structured_contacts = self._extract_structured_contacts(soup)
# Extraire l'adresse/localité
location = self._extract_location_info(soup, text)
# Créer des contacts pour chaque email trouvé
for email in emails:
if not self._is_valid_email(email):
continue
contact = {
'email': email.lower(),
'name': '',
'first_name': '',
'last_name': '',
'company': '',
'phone': '',
'location': location,
'source_url': page_url,
'notes': ''
}
# Essayer de trouver des informations complémentaires
self._enhance_contact_info(contact, soup, text, structured_contacts, phones)
contacts.append(contact)
return contacts
def _extract_phone_numbers(self, text: str) -> List[str]:
"""
Extraire les numéros de téléphone
"""
phones = []
matches = self.phone_pattern.findall(text)
for phone in matches:
# Nettoyer le numéro
clean_phone = re.sub(r'[\s\-\.\/]', '', phone)
if len(clean_phone) >= 9: # Numéro valide
phones.append(phone)
return phones
def _extract_structured_contacts(self, soup: BeautifulSoup) -> List[Dict]:
"""
Extraire les contacts depuis les données structurées (microdata, JSON-LD, etc.)
"""
contacts = []
# Chercher les données JSON-LD
json_scripts = soup.find_all('script', type='application/ld+json')
for script in json_scripts:
try:
data = json.loads(script.string)
if isinstance(data, dict):
contact = self._parse_json_ld_contact(data)
if contact:
contacts.append(contact)
elif isinstance(data, list):
for item in data:
contact = self._parse_json_ld_contact(item)
if contact:
contacts.append(contact)
except:
continue
# Chercher les microdata
contacts.extend(self._extract_microdata_contacts(soup))
return contacts
def _parse_json_ld_contact(self, data: Dict) -> Dict:
"""
Parser un contact depuis les données JSON-LD
"""
contact = {}
if data.get('@type') in ['Organization', 'LocalBusiness', 'Person']:
contact['name'] = data.get('name', '')
contact['company'] = data.get('name', '') if data.get('@type') != 'Person' else ''
# Email
email = data.get('email')
if email:
contact['email'] = email
# Téléphone
phone = data.get('telephone')
if phone:
contact['phone'] = phone
# Adresse
address = data.get('address')
if address:
if isinstance(address, dict):
location_parts = []
if address.get('addressLocality'):
location_parts.append(address['addressLocality'])
if address.get('addressRegion'):
location_parts.append(address['addressRegion'])
if address.get('addressCountry'):
location_parts.append(address['addressCountry'])
contact['location'] = ', '.join(location_parts)
elif isinstance(address, str):
contact['location'] = address
return contact if contact.get('email') or contact.get('name') else None
def _extract_microdata_contacts(self, soup: BeautifulSoup) -> List[Dict]:
"""
Extraire les contacts depuis les microdata
"""
contacts = []
# Chercher les éléments avec itemtype Person ou Organization
items = soup.find_all(attrs={'itemtype': re.compile(r'.*(Person|Organization|LocalBusiness).*')})
for item in items:
contact = {}
# Nom
name_elem = item.find(attrs={'itemprop': 'name'})
if name_elem:
contact['name'] = name_elem.get_text(strip=True)
# Email
email_elem = item.find(attrs={'itemprop': 'email'})
if email_elem:
contact['email'] = email_elem.get('href', '').replace('mailto:', '') or email_elem.get_text(strip=True)
# Téléphone
phone_elem = item.find(attrs={'itemprop': 'telephone'})
if phone_elem:
contact['phone'] = phone_elem.get_text(strip=True)
if contact.get('email') or contact.get('name'):
contacts.append(contact)
return contacts
def _extract_location_info(self, soup: BeautifulSoup, text: str) -> str:
"""
Extraire les informations de localisation
"""
location_indicators = [
r'\b\d{4,5}\s+[A-Za-zÀ-ÿ\s\-]+\b', # Code postal + ville
r'\b[A-Za-zÀ-ÿ\s\-]+,\s*[A-Za-zÀ-ÿ\s\-]+\b', # Ville, Pays
]
# Chercher dans les balises d'adresse
address_tags = soup.find_all(['address', 'div'], class_=re.compile(r'.*address.*|.*location.*|.*contact.*'))
for tag in address_tags:
address_text = tag.get_text(strip=True)
for pattern in location_indicators:
match = re.search(pattern, address_text, re.IGNORECASE)
if match:
return match.group(0)
# Chercher dans le texte global
for pattern in location_indicators:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(0)
return ''
def _enhance_contact_info(self, contact: Dict, soup: BeautifulSoup, text: str, structured_contacts: List[Dict], phones: List[str]):
"""
Améliorer les informations de contact en croisant les données
"""
email = contact['email']
# Chercher dans les contacts structurés
for struct_contact in structured_contacts:
if struct_contact.get('email') == email:
contact.update(struct_contact)
break
# Si pas de nom trouvé, essayer d'extraire depuis l'email
if not contact['name']:
local_part = email.split('@')[0]
domain_part = email.split('@')[1]
# Essayer de deviner le nom depuis la partie locale
if '.' in local_part:
parts = local_part.split('.')
contact['first_name'] = parts[0].title()
contact['last_name'] = parts[1].title() if len(parts) > 1 else ''
contact['name'] = f"{contact['first_name']} {contact['last_name']}".strip()
else:
contact['name'] = local_part.title()
# Essayer de deviner l'entreprise depuis le domaine
if not contact['company']:
company_name = domain_part.split('.')[0]
contact['company'] = company_name.title()
# Ajouter un numéro de téléphone si disponible
if not contact['phone'] and phones:
contact['phone'] = phones[0] # Prendre le premier numéro trouvé
# Enrichir les notes avec des informations contextuelles
notes_parts = []
if contact['location']:
notes_parts.append(f"Localisation: {contact['location']}")
# Chercher des informations sur la fonction/titre
title_patterns = [
r'(?i)(?:directeur|manager|responsable|chef|président|ceo|cto|cfo)\s+[a-zA-ZÀ-ÿ\s]+',
r'(?i)[a-zA-ZÀ-ÿ\s]+\s+(?:director|manager|head|chief|president)'
]
for pattern in title_patterns:
matches = re.findall(pattern, text)
if matches:
notes_parts.append(f"Fonction possible: {matches[0]}")
break
contact['notes'] = ' | '.join(notes_parts)
def _merge_contact_info(self, existing: Dict, new: Dict):
"""
Fusionner les informations de deux contacts
"""
for key, value in new.items():
if value and not existing.get(key):
existing[key] = value
# Fusionner les notes
if new.get('notes') and existing.get('notes'):
existing['notes'] = f"{existing['notes']} | {new['notes']}"
elif new.get('notes'):
existing['notes'] = new['notes']
def _extract_domain_info(self, url: str, results: Dict):
"""
Extraire les informations générales du domaine
"""
domain = urlparse(url).netloc
results['domain_info'] = {
'domain': domain,
'company_guess': domain.split('.')[0].title(),
'total_contacts': len(results['contacts']),
'total_pages_scraped': len(results['pages_scraped'])
}
def _extract_emails_from_links(self, soup: BeautifulSoup) -> Set[str]:
"""
Extraire les emails des liens mailto
"""
emails = set()
# Chercher les liens mailto
mailto_links = soup.find_all('a', href=re.compile(r'^mailto:', re.I))
for link in mailto_links:
href = link.get('href', '')
email_match = re.search(r'mailto:([^?&]+)', href, re.I)
if email_match:
email = email_match.group(1)
if self._is_valid_email(email):
emails.add(email.lower())
return emails
def _extract_emails_from_text(self, text: str) -> Set[str]:
"""
Extraire les emails du texte de la page
"""
emails = set()
matches = self.email_pattern.findall(text)
for email in matches:
# Filtrer les emails indésirables
if not self._is_valid_email(email):
continue
emails.add(email.lower())
return emails
def _extract_internal_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""
Extraire les liens internes de la page
"""
links = []
base_domain = urlparse(base_url).netloc
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(base_url, href)
parsed_link = urlparse(full_url)
# Vérifier que c'est un lien interne et pas déjà visité
if (parsed_link.netloc == base_domain and
full_url not in self.visited_urls and
not self._is_excluded_link(full_url)):
links.append(full_url)
return links
def _is_valid_email(self, email: str) -> bool:
"""
Vérifier si l'email est valide et non indésirable
"""
# Filtrer les extensions de fichiers communes
excluded_extensions = ['.jpg', '.png', '.gif', '.pdf', '.doc', '.css', '.js']
for ext in excluded_extensions:
if email.lower().endswith(ext):
return False
# Filtrer les emails génériques indésirables
excluded_patterns = [
'example.com',
'test.com',
'placeholder',
'your-email',
'youremail',
'email@',
'noreply',
'no-reply'
]
for pattern in excluded_patterns:
if pattern in email.lower():
return False
# Vérifier la longueur
if len(email) < 5 or len(email) > 254:
return False
return True
def _is_excluded_link(self, url: str) -> bool:
"""
Vérifier si le lien doit être exclu du scraping
"""
excluded_patterns = [
'#',
'javascript:',
'tel:',
'mailto:',
'.pdf',
'.doc',
'.zip',
'.jpg',
'.png',
'.gif'
]
url_lower = url.lower()
for pattern in excluded_patterns:
if pattern in url_lower:
return True
return False
def save_results(self, results: Dict, filename: str = None) -> str:
"""
Sauvegarder les résultats dans un fichier JSON
"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
domain = urlparse(results['url']).netloc.replace('.', '_')
filename = f"scraping_{domain}_{timestamp}.json"
# Créer le dossier s'il n'existe pas
scraping_folder = 'Data/email_scraping'
os.makedirs(scraping_folder, exist_ok=True)
filepath = os.path.join(scraping_folder, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return filepath
class EmailScrapingHistory:
def __init__(self):
self.history_folder = 'Data/email_scraping'
os.makedirs(self.history_folder, exist_ok=True)
def get_all_scrapings(self) -> List[Dict]:
"""
Récupérer l'historique de tous les scrapings
"""
scrapings = []
for filename in os.listdir(self.history_folder):
if filename.endswith('.json'):
filepath = os.path.join(self.history_folder, filename)
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
scrapings.append({
'filename': filename,
'url': data.get('url', ''),
'emails_count': len(data.get('contacts', data.get('emails', []))), # Support pour ancienne et nouvelle structure
'pages_count': len(data.get('pages_scraped', [])),
'start_time': data.get('start_time', ''),
'errors_count': len(data.get('errors', []))
})
except Exception as e:
print(f"Erreur lors de la lecture de {filename}: {e}")
# Trier par date (plus récent d'abord)
scrapings.sort(key=lambda x: x.get('start_time', ''), reverse=True)
return scrapings
def get_scraping_details(self, filename: str) -> Dict:
"""
Récupérer les détails d'un scraping spécifique
"""
filepath = os.path.join(self.history_folder, filename)
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
return None
def delete_scraping(self, filename: str) -> bool:
"""
Supprimer un fichier de scraping
"""
filepath = os.path.join(self.history_folder, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
return True
except Exception as e:
print(f"Erreur lors de la suppression: {e}")
return False
return False