import requests from bs4 import BeautifulSoup import re from urllib.parse import urljoin, urlparse import time from typing import List, Set, Dict import json import os from datetime import datetime class EmailScraper: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) self.email_pattern = re.compile(r'[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}') self.phone_pattern = re.compile(r'(?:\+32|0)\s?[1-9](?:[\s\-\.\/]?\d){8}|\+32\s?[1-9](?:[\s\-\.\/]?\d){8}|(?:\+33|0)[1-9](?:[\s\-\.\/]?\d){8}') self.visited_urls = set() self.found_emails = set() self.contact_info = {} def scrape_page(self, url: str, max_pages: int = 10) -> Dict: """ Scrape une page avec pagination pour extraire les données d'entreprises """ results = { 'url': url, 'contacts': [], # Liste des contacts avec email, nom, téléphone, etc. 'pages_scraped': [], 'errors': [], 'start_time': datetime.now().isoformat(), 'end_time': None, 'domain_info': {} } try: self._scrape_with_pagination(url, results, max_pages) self._extract_domain_info(url, results) except Exception as e: results['errors'].append(f"Erreur générale: {str(e)}") results['end_time'] = datetime.now().isoformat() return results def _scrape_with_pagination(self, base_url: str, results: Dict, max_pages: int): """ Scraper avec gestion de la pagination """ current_page = 1 current_url = base_url while current_page <= max_pages: if current_url in self.visited_urls: break try: # Normaliser l'URL parsed_url = urlparse(current_url) if not parsed_url.scheme: current_url = 'https://' + current_url self.visited_urls.add(current_url) print(f"Scraping page {current_page}: {current_url}") # Faire la requête response = self.session.get(current_url, timeout=15) response.raise_for_status() # Parser le HTML soup = BeautifulSoup(response.content, 'html.parser') # Extraire les entreprises/contacts de la page page_contacts = self._extract_business_contacts(soup, response.text, current_url) # Ajouter les contacts à la liste principale for contact in page_contacts: # Vérifier si ce contact existe déjà (par email) existing_contact = next((c for c in results['contacts'] if c['email'] == contact['email']), None) if existing_contact: # Fusionner les informations si le contact existe self._merge_contact_info(existing_contact, contact) else: results['contacts'].append(contact) results['pages_scraped'].append({ 'url': current_url, 'page_number': current_page, 'contacts_found': len(page_contacts), 'contacts': page_contacts, 'status': 'success', 'timestamp': datetime.now().isoformat() }) print(f" - Page {current_page}: Trouvé {len(page_contacts)} contact(s)") # Si aucun contact trouvé, peut-être qu'on a atteint la fin if len(page_contacts) == 0: print(f" - Aucun contact trouvé sur la page {current_page}, arrêt du scraping") break # Chercher le lien vers la page suivante next_url = self._find_next_page_url(soup, current_url, current_page) if not next_url: print(f" - Pas de page suivante trouvée, arrêt du scraping") break current_url = next_url current_page += 1 # Délai entre les pages pour éviter la surcharge time.sleep(2) except requests.exceptions.RequestException as e: results['errors'].append(f"Erreur de requête pour la page {current_page} ({current_url}): {str(e)}") results['pages_scraped'].append({ 'url': current_url, 'page_number': current_page, 'contacts_found': 0, 'contacts': [], 'status': 'error', 'error': str(e), 'timestamp': datetime.now().isoformat() }) break except Exception as e: results['errors'].append(f"Erreur lors du parsing de la page {current_page}: {str(e)}") break def _extract_business_contacts(self, soup: BeautifulSoup, text: str, page_url: str) -> List[Dict]: """ Extraire les informations d'entreprises d'une page (spécialisé pour les annuaires) """ contacts = [] # Chercher des conteneurs d'entreprises communs business_containers = self._find_business_containers(soup) if business_containers: # Si on trouve des conteneurs structurés, les traiter for container in business_containers: contact = self._extract_contact_from_container(container, page_url) if contact and contact.get('email'): contacts.append(contact) else: # Fallback: extraction générale comme avant contacts = self._extract_contact_info(soup, text, page_url) return contacts def _find_business_containers(self, soup: BeautifulSoup) -> List: """ Trouver les conteneurs qui contiennent probablement des informations d'entreprises """ containers = [] # Patterns communs pour les annuaires d'entreprises business_selectors = [ # Classes/IDs communs '[class*="business"]', '[class*="company"]', '[class*="enterprise"]', '[class*="contact"]', '[class*="listing"]', '[class*="directory"]', '[class*="card"]', '[class*="item"]', '[class*="entry"]', '[class*="result"]', # Balises sémantiques 'article', '[itemtype*="Organization"]', '[itemtype*="LocalBusiness"]', # Structures de liste 'li[class*="business"]', 'li[class*="company"]', 'div[class*="row"]', 'div[class*="col"]' ] for selector in business_selectors: try: elements = soup.select(selector) for element in elements: # Vérifier si l'élément contient des informations utiles if self._container_has_business_info(element): containers.append(element) except: continue # Déduplication basée sur le contenu unique_containers = [] for container in containers: if not any(self._containers_are_similar(container, existing) for existing in unique_containers): unique_containers.append(container) return unique_containers[:50] # Limiter pour éviter la surcharge def _container_has_business_info(self, container) -> bool: """ Vérifier si un conteneur a des informations d'entreprise """ text = container.get_text(strip=True).lower() # Indicateurs d'informations d'entreprise business_indicators = [ '@', 'email', 'mail', 'contact', 'tel', 'phone', 'telephone', 'gsm', 'rue', 'avenue', 'boulevard', 'place', 'www.', 'http', '.com', '.be', '.fr', 'sarl', 'sprl', 'sa', 'nv', 'bvba' ] score = sum(1 for indicator in business_indicators if indicator in text) return score >= 2 and len(text) > 20 def _containers_are_similar(self, container1, container2) -> bool: """ Vérifier si deux conteneurs sont similaires (pour éviter les doublons) """ text1 = container1.get_text(strip=True) text2 = container2.get_text(strip=True) # Si les textes sont identiques ou très similaires if text1 == text2: return True # Si un conteneur est inclus dans l'autre if len(text1) > len(text2): return text2 in text1 else: return text1 in text2 def _extract_contact_from_container(self, container, page_url: str) -> Dict: """ Extraire les informations de contact d'un conteneur spécifique """ contact = { 'email': '', 'name': '', 'first_name': '', 'last_name': '', 'company': '', 'phone': '', 'location': '', 'source_url': page_url, 'notes': '' } # Extraire l'email depuis les balises individuelles d'abord email_found = False # Chercher dans les liens mailto mailto_links = container.find_all('a', href=re.compile(r'^mailto:', re.I)) if mailto_links: href = mailto_links[0].get('href', '') email_match = re.search(r'mailto:([^?&]+)', href, re.I) if email_match and self._is_valid_email(email_match.group(1)): contact['email'] = email_match.group(1).lower() email_found = True # Si pas trouvé dans mailto, chercher dans les balises individuelles if not email_found: for element in container.find_all(['p', 'div', 'span', 'td', 'li']): element_text = element.get_text(strip=True) # Ajouter des espaces autour des balises pour éviter la concaténation element_text = ' ' + element_text + ' ' email_matches = self.email_pattern.findall(element_text) if email_matches: for email in email_matches: email = email.strip() if re.match(r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$', email) and self._is_valid_email(email): contact['email'] = email.lower() email_found = True break if email_found: break # Si toujours pas trouvé, chercher dans le texte global avec des patterns plus précis if not email_found: container_text = container.get_text(separator=' ', strip=True) # Utiliser un séparateur # Patterns avec contexte pour éviter la capture parasite context_patterns = [ r'(?:email|e-mail|mail|contact)\s*:?\s*([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})', r'([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,})(?=\s|$|[^\w.-])', ] for pattern in context_patterns: matches = re.findall(pattern, container_text, re.IGNORECASE) if matches: email = matches[0] if isinstance(matches[0], str) else matches[0][0] if matches[0] else '' if email and self._is_valid_email(email): contact['email'] = email.lower() email_found = True break # Extraire le téléphone container_text = container.get_text(separator=' ', strip=True) phone_matches = self.phone_pattern.findall(container_text) if phone_matches: # Prendre le premier numéro et le nettoyer phone = phone_matches[0] # S'assurer qu'on n'a que des chiffres, espaces, tirets, points, slash et + clean_phone = re.sub(r'[^0-9\s\-\.\/\+].*$', '', phone) contact['phone'] = clean_phone.strip() # Extraire le nom de l'entreprise contact['company'] = self._extract_company_name(container, container_text) # Extraire les noms de personnes names = self._extract_person_names(container, container_text) if names: contact.update(names) # Extraire la localisation contact['location'] = self._extract_location_from_container(container, container_text) # Enrichir avec des informations contextuelles self._enhance_business_contact(contact, container, container_text) return contact if contact['email'] or contact['company'] else None def _extract_company_name(self, container, text: str) -> str: """ Extraire le nom de l'entreprise d'un conteneur """ # Chercher dans les balises title, h1-h6, strong, b title_elements = container.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'strong', 'b', '[class*="title"]', '[class*="name"]', '[class*="company"]']) for element in title_elements: company_text = element.get_text(strip=True) if len(company_text) > 2 and len(company_text) < 100: # Éviter les textes trop génériques if not any(generic in company_text.lower() for generic in ['accueil', 'contact', 'email', 'téléphone', 'adresse']): return company_text # Fallback: prendre la première ligne non-vide qui semble être un nom lines = text.split('\n') for line in lines[:3]: # Les 3 premières lignes line = line.strip() if len(line) > 2 and len(line) < 100 and not '@' in line and not any(char.isdigit() for char in line[:3]): return line return '' def _extract_person_names(self, container, text: str) -> Dict: """ Extraire les noms de personnes """ names = {'name': '', 'first_name': '', 'last_name': ''} # Patterns pour les noms de personnes name_patterns = [ r'\b([A-Z][a-zÀ-ÿ]+)\s+([A-Z][a-zÀ-ÿ]+)\b', # Prénom Nom r'\b([A-Z][A-Z]+)\s+([A-Z][a-zÀ-ÿ]+)\b', # NOM Prénom ] # Chercher dans les balises spécifiques name_elements = container.find_all(['[class*="name"]', '[class*="contact"]', '[class*="person"]']) for element in name_elements: element_text = element.get_text(strip=True) for pattern in name_patterns: match = re.search(pattern, element_text) if match: names['first_name'] = match.group(1) names['last_name'] = match.group(2) names['name'] = f"{names['first_name']} {names['last_name']}" return names # Si pas trouvé dans les balises, chercher dans le texte for pattern in name_patterns: match = re.search(pattern, text) if match: names['first_name'] = match.group(1) names['last_name'] = match.group(2) names['name'] = f"{names['first_name']} {names['last_name']}" break return names def _extract_location_from_container(self, container, text: str) -> str: """ Extraire la localisation d'un conteneur """ # Chercher dans les balises d'adresse address_elements = container.find_all(['address', '[class*="address"]', '[class*="location"]', '[class*="ville"]', '[class*="city"]']) for element in address_elements: location_text = element.get_text(strip=True) if len(location_text) > 5: return location_text # Patterns pour les adresses belges/françaises location_patterns = [ r'\b\d{4,5}\s+[A-Za-zÀ-ÿ\s\-]+\b', # Code postal + ville r'\b[A-Za-zÀ-ÿ\s\-]+,\s*[A-Za-zÀ-ÿ\s\-]+\b', # Ville, Région/Pays r'\b(?:rue|avenue|boulevard|place|chemin)\s+[A-Za-zÀ-ÿ\s\d\-,]+\b' # Adresse complète ] for pattern in location_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(0).strip() return '' def _enhance_business_contact(self, contact: Dict, container, text: str): """ Améliorer les informations de contact d'entreprise """ # Si pas de nom trouvé, essayer d'extraire depuis l'email if not contact['name'] and contact['email']: local_part = contact['email'].split('@')[0] domain_part = contact['email'].split('@')[1] if '.' in local_part: parts = local_part.split('.') contact['first_name'] = parts[0].title() contact['last_name'] = parts[1].title() if len(parts) > 1 else '' contact['name'] = f"{contact['first_name']} {contact['last_name']}".strip() # Si pas d'entreprise, essayer de deviner depuis le domaine if not contact['company']: company_name = domain_part.split('.')[0] contact['company'] = company_name.title() # Enrichir les notes avec des informations contextuelles notes_parts = [] # Chercher des informations sur l'activité activity_patterns = [ r'(?i)\b(restaurant|café|boulangerie|pharmacie|garage|coiffeur|médecin|avocat|comptable|architecte|dentiste|vétérinaire|magasin|boutique|salon)\b', r'(?i)\b(commerce|service|entreprise|société|bureau|cabinet|clinique|centre|institut)\b' ] for pattern in activity_patterns: matches = re.findall(pattern, text) if matches: notes_parts.append(f"Activité: {', '.join(set(matches))}") break # Chercher des horaires horaires_pattern = r'(?i)(?:ouvert|fermé|horaires?)[:\s]*([^.!?\n]{10,50})' horaires_match = re.search(horaires_pattern, text) if horaires_match: notes_parts.append(f"Horaires: {horaires_match.group(1).strip()}") # Chercher un site web website_pattern = r'\b(?:www\.)?[a-zA-Z0-9][a-zA-Z0-9-]*[a-zA-Z0-9]*\.(?:com|be|fr|org|net)\b' website_match = re.search(website_pattern, text) if website_match: notes_parts.append(f"Site web: {website_match.group(0)}") contact['notes'] = ' | '.join(notes_parts) def _find_next_page_url(self, soup: BeautifulSoup, current_url: str, current_page: int) -> str: """ Trouver l'URL de la page suivante """ base_url = '/'.join(current_url.split('/')[:-1]) if '/' in current_url else current_url # Patterns communs pour les liens de pagination next_patterns = [ # Liens avec texte 'a[href]:contains("Suivant")', 'a[href]:contains("Next")', 'a[href]:contains(">")', 'a[href]:contains("Page suivante")', # Liens avec classes 'a[class*="next"]', 'a[class*="suivant"]', 'a[class*="pagination"]', # Numéros de page f'a[href]:contains("{current_page + 1}")', ] for pattern in next_patterns: try: links = soup.select(pattern) for link in links: href = link.get('href') if href: # Construire l'URL complète if href.startswith('http'): return href elif href.startswith('/'): parsed = urlparse(current_url) return f"{parsed.scheme}://{parsed.netloc}{href}" else: return urljoin(current_url, href) except: continue # Essayer de construire l'URL de la page suivante par pattern # Pattern 1: ?page=X if 'page=' in current_url: return re.sub(r'page=\d+', f'page={current_page + 1}', current_url) # Pattern 2: /pageX if f'/page{current_page}' in current_url: return current_url.replace(f'/page{current_page}', f'/page{current_page + 1}') # Pattern 3: Ajouter ?page=2 si c'est la première page if current_page == 1: separator = '&' if '?' in current_url else '?' return f"{current_url}{separator}page={current_page + 1}" return None def _extract_contact_info(self, soup: BeautifulSoup, text: str, page_url: str) -> List[Dict]: """ Extraire les informations de contact complètes d'une page """ contacts = [] # Extraire tous les emails emails = set() emails.update(self._extract_emails_from_text(text)) emails.update(self._extract_emails_from_links(soup)) # Extraire les numéros de téléphone phones = self._extract_phone_numbers(text) # Extraire les noms et entreprises depuis les balises structurées structured_contacts = self._extract_structured_contacts(soup) # Extraire l'adresse/localité location = self._extract_location_info(soup, text) # Créer des contacts pour chaque email trouvé for email in emails: if not self._is_valid_email(email): continue contact = { 'email': email.lower(), 'name': '', 'first_name': '', 'last_name': '', 'company': '', 'phone': '', 'location': location, 'source_url': page_url, 'notes': '' } # Essayer de trouver des informations complémentaires self._enhance_contact_info(contact, soup, text, structured_contacts, phones) contacts.append(contact) return contacts def _extract_phone_numbers(self, text: str) -> List[str]: """ Extraire les numéros de téléphone """ phones = [] matches = self.phone_pattern.findall(text) for phone in matches: # Nettoyer le numéro clean_phone = re.sub(r'[\s\-\.\/]', '', phone) if len(clean_phone) >= 9: # Numéro valide phones.append(phone) return phones def _extract_structured_contacts(self, soup: BeautifulSoup) -> List[Dict]: """ Extraire les contacts depuis les données structurées (microdata, JSON-LD, etc.) """ contacts = [] # Chercher les données JSON-LD json_scripts = soup.find_all('script', type='application/ld+json') for script in json_scripts: try: data = json.loads(script.string) if isinstance(data, dict): contact = self._parse_json_ld_contact(data) if contact: contacts.append(contact) elif isinstance(data, list): for item in data: contact = self._parse_json_ld_contact(item) if contact: contacts.append(contact) except: continue # Chercher les microdata contacts.extend(self._extract_microdata_contacts(soup)) return contacts def _parse_json_ld_contact(self, data: Dict) -> Dict: """ Parser un contact depuis les données JSON-LD """ contact = {} if data.get('@type') in ['Organization', 'LocalBusiness', 'Person']: contact['name'] = data.get('name', '') contact['company'] = data.get('name', '') if data.get('@type') != 'Person' else '' # Email email = data.get('email') if email: contact['email'] = email # Téléphone phone = data.get('telephone') if phone: contact['phone'] = phone # Adresse address = data.get('address') if address: if isinstance(address, dict): location_parts = [] if address.get('addressLocality'): location_parts.append(address['addressLocality']) if address.get('addressRegion'): location_parts.append(address['addressRegion']) if address.get('addressCountry'): location_parts.append(address['addressCountry']) contact['location'] = ', '.join(location_parts) elif isinstance(address, str): contact['location'] = address return contact if contact.get('email') or contact.get('name') else None def _extract_microdata_contacts(self, soup: BeautifulSoup) -> List[Dict]: """ Extraire les contacts depuis les microdata """ contacts = [] # Chercher les éléments avec itemtype Person ou Organization items = soup.find_all(attrs={'itemtype': re.compile(r'.*(Person|Organization|LocalBusiness).*')}) for item in items: contact = {} # Nom name_elem = item.find(attrs={'itemprop': 'name'}) if name_elem: contact['name'] = name_elem.get_text(strip=True) # Email email_elem = item.find(attrs={'itemprop': 'email'}) if email_elem: contact['email'] = email_elem.get('href', '').replace('mailto:', '') or email_elem.get_text(strip=True) # Téléphone phone_elem = item.find(attrs={'itemprop': 'telephone'}) if phone_elem: contact['phone'] = phone_elem.get_text(strip=True) if contact.get('email') or contact.get('name'): contacts.append(contact) return contacts def _extract_location_info(self, soup: BeautifulSoup, text: str) -> str: """ Extraire les informations de localisation """ location_indicators = [ r'\b\d{4,5}\s+[A-Za-zÀ-ÿ\s\-]+\b', # Code postal + ville r'\b[A-Za-zÀ-ÿ\s\-]+,\s*[A-Za-zÀ-ÿ\s\-]+\b', # Ville, Pays ] # Chercher dans les balises d'adresse address_tags = soup.find_all(['address', 'div'], class_=re.compile(r'.*address.*|.*location.*|.*contact.*')) for tag in address_tags: address_text = tag.get_text(strip=True) for pattern in location_indicators: match = re.search(pattern, address_text, re.IGNORECASE) if match: return match.group(0) # Chercher dans le texte global for pattern in location_indicators: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(0) return '' def _enhance_contact_info(self, contact: Dict, soup: BeautifulSoup, text: str, structured_contacts: List[Dict], phones: List[str]): """ Améliorer les informations de contact en croisant les données """ email = contact['email'] # Chercher dans les contacts structurés for struct_contact in structured_contacts: if struct_contact.get('email') == email: contact.update(struct_contact) break # Si pas de nom trouvé, essayer d'extraire depuis l'email if not contact['name']: local_part = email.split('@')[0] domain_part = email.split('@')[1] # Essayer de deviner le nom depuis la partie locale if '.' in local_part: parts = local_part.split('.') contact['first_name'] = parts[0].title() contact['last_name'] = parts[1].title() if len(parts) > 1 else '' contact['name'] = f"{contact['first_name']} {contact['last_name']}".strip() else: contact['name'] = local_part.title() # Essayer de deviner l'entreprise depuis le domaine if not contact['company']: company_name = domain_part.split('.')[0] contact['company'] = company_name.title() # Ajouter un numéro de téléphone si disponible if not contact['phone'] and phones: contact['phone'] = phones[0] # Prendre le premier numéro trouvé # Enrichir les notes avec des informations contextuelles notes_parts = [] if contact['location']: notes_parts.append(f"Localisation: {contact['location']}") # Chercher des informations sur la fonction/titre title_patterns = [ r'(?i)(?:directeur|manager|responsable|chef|président|ceo|cto|cfo)\s+[a-zA-ZÀ-ÿ\s]+', r'(?i)[a-zA-ZÀ-ÿ\s]+\s+(?:director|manager|head|chief|president)' ] for pattern in title_patterns: matches = re.findall(pattern, text) if matches: notes_parts.append(f"Fonction possible: {matches[0]}") break contact['notes'] = ' | '.join(notes_parts) def _merge_contact_info(self, existing: Dict, new: Dict): """ Fusionner les informations de deux contacts """ for key, value in new.items(): if value and not existing.get(key): existing[key] = value # Fusionner les notes if new.get('notes') and existing.get('notes'): existing['notes'] = f"{existing['notes']} | {new['notes']}" elif new.get('notes'): existing['notes'] = new['notes'] def _extract_domain_info(self, url: str, results: Dict): """ Extraire les informations générales du domaine """ domain = urlparse(url).netloc results['domain_info'] = { 'domain': domain, 'company_guess': domain.split('.')[0].title(), 'total_contacts': len(results['contacts']), 'total_pages_scraped': len(results['pages_scraped']) } def _extract_emails_from_links(self, soup: BeautifulSoup) -> Set[str]: """ Extraire les emails des liens mailto """ emails = set() # Chercher les liens mailto mailto_links = soup.find_all('a', href=re.compile(r'^mailto:', re.I)) for link in mailto_links: href = link.get('href', '') email_match = re.search(r'mailto:([^?&]+)', href, re.I) if email_match: email = email_match.group(1) if self._is_valid_email(email): emails.add(email.lower()) return emails def _extract_emails_from_text(self, text: str) -> Set[str]: """ Extraire les emails du texte de la page """ emails = set() matches = self.email_pattern.findall(text) for email in matches: # Filtrer les emails indésirables if not self._is_valid_email(email): continue emails.add(email.lower()) return emails def _extract_internal_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: """ Extraire les liens internes de la page """ links = [] base_domain = urlparse(base_url).netloc for link in soup.find_all('a', href=True): href = link['href'] full_url = urljoin(base_url, href) parsed_link = urlparse(full_url) # Vérifier que c'est un lien interne et pas déjà visité if (parsed_link.netloc == base_domain and full_url not in self.visited_urls and not self._is_excluded_link(full_url)): links.append(full_url) return links def _is_valid_email(self, email: str) -> bool: """ Vérifier si l'email est valide et non indésirable """ # Filtrer les extensions de fichiers communes excluded_extensions = ['.jpg', '.png', '.gif', '.pdf', '.doc', '.css', '.js'] for ext in excluded_extensions: if email.lower().endswith(ext): return False # Filtrer les emails génériques indésirables excluded_patterns = [ 'example.com', 'test.com', 'placeholder', 'your-email', 'youremail', 'email@', 'noreply', 'no-reply' ] for pattern in excluded_patterns: if pattern in email.lower(): return False # Vérifier la longueur if len(email) < 5 or len(email) > 254: return False return True def _is_excluded_link(self, url: str) -> bool: """ Vérifier si le lien doit être exclu du scraping """ excluded_patterns = [ '#', 'javascript:', 'tel:', 'mailto:', '.pdf', '.doc', '.zip', '.jpg', '.png', '.gif' ] url_lower = url.lower() for pattern in excluded_patterns: if pattern in url_lower: return True return False def save_results(self, results: Dict, filename: str = None) -> str: """ Sauvegarder les résultats dans un fichier JSON """ if not filename: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') domain = urlparse(results['url']).netloc.replace('.', '_') filename = f"scraping_{domain}_{timestamp}.json" # Créer le dossier s'il n'existe pas scraping_folder = 'Data/email_scraping' os.makedirs(scraping_folder, exist_ok=True) filepath = os.path.join(scraping_folder, filename) with open(filepath, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) return filepath class EmailScrapingHistory: def __init__(self): self.history_folder = 'Data/email_scraping' os.makedirs(self.history_folder, exist_ok=True) def get_all_scrapings(self) -> List[Dict]: """ Récupérer l'historique de tous les scrapings """ scrapings = [] for filename in os.listdir(self.history_folder): if filename.endswith('.json'): filepath = os.path.join(self.history_folder, filename) try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) scrapings.append({ 'filename': filename, 'url': data.get('url', ''), 'emails_count': len(data.get('contacts', data.get('emails', []))), # Support pour ancienne et nouvelle structure 'pages_count': len(data.get('pages_scraped', [])), 'start_time': data.get('start_time', ''), 'errors_count': len(data.get('errors', [])) }) except Exception as e: print(f"Erreur lors de la lecture de {filename}: {e}") # Trier par date (plus récent d'abord) scrapings.sort(key=lambda x: x.get('start_time', ''), reverse=True) return scrapings def get_scraping_details(self, filename: str) -> Dict: """ Récupérer les détails d'un scraping spécifique """ filepath = os.path.join(self.history_folder, filename) if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) return None def delete_scraping(self, filename: str) -> bool: """ Supprimer un fichier de scraping """ filepath = os.path.join(self.history_folder, filename) if os.path.exists(filepath): try: os.remove(filepath) return True except Exception as e: print(f"Erreur lors de la suppression: {e}") return False return False