Le renseignement en sources ouvertes (OSINT — Open Source Intelligence) est la discipline qui consiste à collecter, analyser et synthétiser des informations à partir de sources publiquement accessibles pour produire du renseignement actionnable. Contrairement à l'espionnage ou au hacking offensif.
Le renseignement en sources ouvertes (OSINT — Open Source Intelligence) est la discipline qui consiste à collecter, analyser et synthétiser des informations à partir de sources publiquement accessibles pour produire du renseignement actionnable. Contrairement à l'espionnage ou au hacking offensif, l'OSINT opère exclusivement sur des données légalement accessibles — moteurs de recherche, réseaux sociaux, registres DNS, bases de données whois, archives web, publications académiques, documents gouvernementaux, forums publics et dark web monitoring. Pour un analyste en cybersécurité, l'OSINT est indispensable à plusieurs niveaux : la reconnaissance pré-engagement dans un test de pénétration autorisé pour cartographier la surface d'attaque d'une organisation cible, le renseignement sur les menaces (CTI — Cyber Threat Intelligence) pour identifier les acteurs qui ciblent une industrie spécifique, la réponse aux incidents pour retracer l'infrastructure d'un attaquant, et la due diligence sécuritaire pour évaluer le niveau d'exposition d'un partenaire ou fournisseur. La richesse et la complexité des techniques OSINT disponibles — de la Google dork simple aux frameworks d'automatisation multi-sources comme Maltego ou Recon-ng — nécessitent une approche méthodique et une rigueur légale absolue pour rester dans le cadre de la légalité et de l'éthique professionnelle.
Cadre légal et éthique de l'OSINT en France
Avant d'aborder les techniques, le cadre légal est indispensable. En France, l'OSINT est régi par plusieurs textes qui délimitent précisément ce qui est autorisé.
Cadre légal français de l'OSINT
| Texte | Portée | Implication OSINT |
|---|---|---|
| RGPD + Loi Informatique et Libertés | Protection des données personnelles | Collecte de données personnelles soumise à finalité légitime |
| Art. 226-1 Code pénal | Atteinte à l'intimité de la vie privée | Surveillance de personnes privées sans consentement = illégal |
| Art. 323-1 Code pénal (LCEN) | Accès frauduleux aux systèmes informatiques | Toute tentative d'accès non-public = illégal même via OSINT |
| Loi du 23 juillet 2019 (renseignement) | Activités de renseignement étatique | Seuls les services autorisés peuvent conduire OSINT sur la sécurité nationale |
| Loi Avia (2020, partiellement censurée) | Haine en ligne | La collecte de propos haineux à des fins probatoires est encadrée |
Les règles fondamentales pour un investigateur OSINT en France :
- Ne collecter que des données publiquement accessibles sans contourner un mécanisme de protection (même un simple login constitue une barrière légale)
- Documenter et archiver les sources avec horodatage pour la traçabilité
- Respecter la finalité — les données collectées pour un audit ne peuvent pas être réutilisées pour d'autres fins
- Ne jamais recouper des données pour reconstituer le profil détaillé d'un individu privé sans base légale (RGPD)
Méthodologie OSINT : du besoin en renseignement à l'analyse
L'OSINT professionnel ne commence pas par "lancer Maltego". Il commence par la définition du Besoin en Renseignement (BdR) — la question précise à laquelle l'investigation doit répondre.
Cycle du renseignement appliqué à l'OSINT
# Cycle OSINT structuré
cycle_renseignement:
phase_1_planification:
objectif: "Définir le BdR (Besoin en Renseignement)"
questions_clés:
- "Quelle est la cible de l'investigation ?"
- "Quel est l'objectif final (pentest, CTI, due diligence) ?"
- "Quel est le cadre légal applicable ?"
- "Quelles sont les limites de temps et de scope ?"
livrables:
- "BdR formalisé et validé par le donneur d'ordre"
- "Autorisation écrite si investigation pour tiers"
phase_2_collecte:
objectif: "Identifier et collecter les informations pertinentes"
sources:
passive: ["DNS", "WHOIS", "Shodan", "Google", "LinkedIn", "Archive.org"]
semi_active: ["Certificats TLS", "BGP routing", "Pastebin monitoring"]
active: ["Port scanning AUTORISÉ uniquement sur cibles propres"]
outils:
- "theHarvester, Maltego, Recon-ng, Shodan, SpiderFoot"
phase_3_traitement:
objectif: "Normaliser, déduplicer, structurer les données brutes"
actions:
- "Déduplication des données collectées depuis plusieurs sources"
- "Vérification croisée (une info unique = incertaine)"
- "Horodatage de toutes les données collectées"
- "Évaluation de la fiabilité de chaque source"
phase_4_analyse:
objectif: "Transformer les données en renseignement actionnable"
techniques:
- "Graphe de relations (Maltego)"
- "Timeline d'événements"
- "Analyse des patterns comportementaux"
- "Clustering d'attributs (IPs, domaines, personnes)"
phase_5_diffusion:
objectif: "Produire le rapport final"
format:
- "Rapport exécutif (résumé pour décideurs)"
- "Rapport technique (détail pour équipes sécurité)"
- "Graphe de relations"
- "IOCs exploitables (si CTI)"
Google Dorking : la recherche avancée comme outil de reconnaissance
Le Google Dorking (ou Google Hacking) consiste à utiliser les opérateurs de recherche avancée de Google pour trouver des informations spécifiques non indexées via une recherche classique. C'est l'outil OSINT le plus accessible et souvent le plus efficace.
Opérateurs Google essentiels
# === RECONNAISSANCE D'UNE ORGANISATION ===
# Tous les sous-domaines indexés
site:example.com
# Pages d'administration exposées
site:example.com inurl:(admin|login|dashboard|panel|manage)
# Fichiers sensibles exposés
site:example.com filetype:(pdf|xls|xlsx|doc|docx|csv) confidential
site:example.com filetype:sql
site:example.com filetype:env
site:example.com filetype:log "password"
# Répertoires ouverts
site:example.com intitle:"Index of /"
# Fichiers de configuration exposés
site:example.com inurl:"/wp-config.php" OR inurl:"/.env"
site:example.com ext:xml inurl:sitemap
# Emails d'employés
site:linkedin.com "@example.com"
"@example.com" filetype:pdf
# Mentions dans des documents gouvernementaux ou académiques
"example.com" site:gouv.fr OR site:ac-paris.fr
# === RECHERCHE DE VULNÉRABILITÉS EXPOSÉES ===
# Panneaux d'administration communs
intitle:"phpMyAdmin" inurl:"/phpmyadmin/"
intitle:"GitLab" inurl:"/users/sign_in"
intitle:"Grafana" inurl:":3000"
intitle:"Kibana" inurl:":5601"
intitle:"Jenkins" inurl:":8080/login"
# Fichiers de configuration Git exposés
inurl:"/.git/config" "branch"
# Credentials dans du code public
site:github.com "example.com" password OR secret OR api_key
# Erreurs d'application révélatrices
site:example.com "Fatal error" OR "stack trace" OR "SQL syntax"
# Caméras et équipements exposés (à des fins légitimes uniquement)
intitle:"webcam 7" inurl:"/viewer/live/"
Google Dorks Base (GHDB) et automatisation
#!/usr/bin/env python3
"""
google_dork_scanner.py — Automatisation de Google Dorking via SerpAPI
(utilise l'API légale de Google, pas de scraping)
"""
import time
import json
from dataclasses import dataclass
from typing import List, Optional
import requests
@dataclass
class DorkResult:
query: str
url: str
title: str
snippet: str
risk_level: str
class GoogleDorkScanner:
"""
Scanner Google Dork via SerpAPI (API légale)
Respecte les ToS Google en passant par l'API officielle
"""
SENSITIVE_DORKS = [
{
"name": "Fichiers de configuration exposés",
"template": 'site:{domain} filetype:env OR filetype:cfg OR filetype:conf',
"risk": "CRITICAL"
},
{
"name": "Répertoires ouverts",
"template": 'site:{domain} intitle:"Index of /"',
"risk": "HIGH"
},
{
"name": "Pages d'admin exposées",
"template": 'site:{domain} inurl:(admin|administrator|panel|manage|console)',
"risk": "HIGH"
},
{
"name": "Fichiers SQL exposés",
"template": 'site:{domain} filetype:sql',
"risk": "CRITICAL"
},
{
"name": "Logs applicatifs",
"template": 'site:{domain} filetype:log',
"risk": "HIGH"
},
{
"name": "Fichiers Excel/CSV avec données",
"template": 'site:{domain} filetype:(xls OR xlsx OR csv) (password OR email OR phone)',
"risk": "HIGH"
},
{
"name": "Erreurs d'application",
"template": 'site:{domain} ("SQL syntax" OR "Fatal error" OR "stack trace")',
"risk": "MEDIUM"
},
{
"name": "Git config exposé",
"template": 'site:{domain} inurl:"/.git/config"',
"risk": "CRITICAL"
},
]
def __init__(self, serp_api_key: str):
self.api_key = serp_api_key
self.base_url = "https://serpapi.com/search"
def scan_domain(
self,
domain: str,
delay_seconds: float = 2.0
) -> List[DorkResult]:
"""
Scan OSINT d'un domaine via Google Dorks
delay_seconds: respecter les rate limits
"""
all_results = []
for dork_config in self.SENSITIVE_DORKS:
query = dork_config["template"].format(domain=domain)
params = {
"api_key": self.api_key,
"engine": "google",
"q": query,
"num": 10,
"gl": "fr", # Google France
"hl": "fr"
}
try:
response = requests.get(self.base_url, params=params, timeout=15)
response.raise_for_status()
data = response.json()
for result in data.get("organic_results", []):
all_results.append(DorkResult(
query=query,
url=result.get("link", ""),
title=result.get("title", ""),
snippet=result.get("snippet", ""),
risk_level=dork_config["risk"]
))
except requests.RequestException as e:
print(f"[WARN] Erreur pour '{query}': {e}")
time.sleep(delay_seconds) # Rate limiting respectueux
return all_results
def generate_report(
self,
domain: str,
results: List[DorkResult]
) -> dict:
"""Génère un rapport structuré des findings"""
by_risk = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
for r in results:
by_risk[r.risk_level].append({
"url": r.url,
"title": r.title,
"dork": r.query,
"snippet": r.snippet[:200]
})
return {
"target": domain,
"scan_date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"total_findings": len(results),
"findings_by_risk": by_risk,
"executive_summary": (
f"L'analyse OSINT de {domain} a révélé "
f"{len(by_risk['CRITICAL'])} findings critiques, "
f"{len(by_risk['HIGH'])} élevés, "
f"{len(by_risk['MEDIUM'])} moyens."
)
}
Shodan : le moteur de recherche des objets connectés
Shodan est un moteur de recherche spécialisé qui indexe en permanence les services exposés sur Internet — serveurs web, caméras IP, systèmes SCADA, routeurs, bases de données — avec leurs bannières, certificats TLS, et métadonnées. C'est l'outil OSINT le plus puissant pour la reconnaissance réseau passive.
Reconnaissance avec Shodan
# === RECHERCHES SHODAN DE BASE ===
# Services d'une organisation (par ASN ou CIDR)
# Identifier l'ASN d'une organisation:
whois -h whois.radb.net '!gAS-GOOGLE' | head -5
# Recherche par ASN dans Shodan
shodan search "org:\"Example Corporation\""
# Recherche par CIDR
shodan search "net:203.0.113.0/24"
# Services exposés par type
shodan search "org:\"Example Corporation\" product:nginx"
shodan search "org:\"Example Corporation\" port:3389" # RDP exposé
# Certificats TLS — trouver les sous-domaines via les certificats
shodan search "ssl.cert.subject.cn:*.example.com"
# Services vulnérables connus
shodan search "product:\"Apache Struts\" vuln:CVE-2017-5638"
shodan search "product:\"Confluence\" vuln:CVE-2021-26084"
# Recherche de technologies spécifiques
shodan search "http.title:\"GitLab\" org:\"Example Corporation\""
shodan search "http.component:\"WordPress\" org:\"Example Corporation\""
# === API SHODAN PYTHON ===
#!/usr/bin/env python3
"""
shodan_recon.py — Reconnaissance OSINT avec l'API Shodan
"""
import shodan
import json
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class ShodanHost:
ip: str
port: int
product: str
version: str
cve_list: List[str]
banners: List[str]
hostnames: List[str]
country: str
asn: str
last_seen: str
class ShodanRecon:
def __init__(self, api_key: str):
self.api = shodan.Shodan(api_key)
def recon_organization(self, org_name: str) -> dict:
"""Reconnaissance complète d'une organisation via Shodan"""
results = {
"organization": org_name,
"hosts": [],
"exposed_services": {},
"vulnerabilities": [],
"interesting_findings": []
}
try:
# Recherche de tous les hôtes de l'organisation
query = f'org:"{org_name}"'
search_results = self.api.search(query, limit=200)
for match in search_results.get("matches", []):
host = ShodanHost(
ip=match.get("ip_str", ""),
port=match.get("port", 0),
product=match.get("product", ""),
version=match.get("version", ""),
cve_list=[v for v in match.get("vulns", {}).keys()],
banners=[match.get("data", "")[:200]],
hostnames=match.get("hostnames", []),
country=match.get("location", {}).get("country_name", ""),
asn=match.get("asn", ""),
last_seen=match.get("timestamp", "")
)
results["hosts"].append(host)
# Comptage des services exposés
service_key = f"{host.product}:{host.port}"
results["exposed_services"][service_key] = \
results["exposed_services"].get(service_key, 0) + 1
# Vulnérabilités CVE
for cve in host.cve_list:
results["vulnerabilities"].append({
"host": host.ip,
"port": host.port,
"cve": cve,
"product": host.product,
"version": host.version
})
# Findings intéressants
self._check_interesting(host, results["interesting_findings"])
except shodan.APIError as e:
results["error"] = str(e)
return results
def _check_interesting(self, host: ShodanHost, findings: list) -> None:
"""Identifie les services particulièrement intéressants pour un auditeur"""
# Protocoles de gestion exposés
if host.port in [23, 22, 3389, 5900]:
findings.append({
"type": "management_protocol_exposed",
"ip": host.ip,
"port": host.port,
"severity": "HIGH",
"description": f"Protocole de gestion {host.product} exposé sur Internet"
})
# Bases de données exposées sans authentification (Shodan indique souvent)
if host.port in [3306, 5432, 27017, 6379, 9200, 5984]:
if "authentication" not in " ".join(host.banners).lower():
findings.append({
"type": "database_possibly_unauthenticated",
"ip": host.ip,
"port": host.port,
"severity": "CRITICAL",
"description": f"Base de données {host.product} potentiellement non authentifiée"
})
# CVE critiques
critical_cves = [c for c in host.cve_list if c in KNOWN_CRITICAL_CVE]
if critical_cves:
findings.append({
"type": "critical_cve",
"ip": host.ip,
"cves": critical_cves,
"severity": "CRITICAL",
"description": f"CVE critiques détectées sur {host.product}"
})
def find_subdomains_via_ssl(self, domain: str) -> List[str]:
"""
Découverte de sous-domaines via les certificats TLS indexés par Shodan
Technique très efficace — les certificats wildcard révèlent souvent des sous-domaines
"""
subdomains = set()
try:
results = self.api.search(
f'ssl.cert.subject.cn:"*.{domain}" OR ssl.cert.subject.cn:"{domain}"',
limit=100
)
for match in results.get("matches", []):
# Extraire les SANs du certificat
ssl_info = match.get("ssl", {})
cert = ssl_info.get("cert", {})
subject_alt_names = cert.get("extensions", {}).get("subjectAltName", "")
# Parser les SANs
for san in subject_alt_names.split(","):
san = san.strip().replace("DNS:", "")
if domain in san and not san.startswith("*"):
subdomains.add(san.strip())
# Ajouter le CN
cn = cert.get("subject", {}).get("CN", "")
if cn and domain in cn and not cn.startswith("*"):
subdomains.add(cn)
# Ajouter les hostnames Shodan
for hostname in match.get("hostnames", []):
if domain in hostname:
subdomains.add(hostname)
except shodan.APIError as e:
print(f"[WARN] Erreur Shodan: {e}")
return sorted(subdomains)
KNOWN_CRITICAL_CVE = {
"CVE-2021-44228", # Log4Shell
"CVE-2021-26084", # Confluence RCE
"CVE-2022-22965", # Spring4Shell
"CVE-2023-44487", # HTTP/2 Rapid Reset
"CVE-2024-21893", # Ivanti SSRF
}
theHarvester : collecte d'emails et de sous-domaines
theHarvester est un outil Python de reconnaissance passive qui agrège les informations provenant de multiples sources (moteurs de recherche, Shodan, LinkedIn, etc.) pour cartographier rapidement la surface d'attaque d'une organisation.
# Installation
pip3 install theHarvester
# Collecte d'emails depuis Google, Bing, LinkedIn et Hunter.io
theHarvester \
-d example.com \
-b google, bing, linkedin, hunter \
-l 500 \
-f results_example.html
# Collecte via Shodan (nécessite une clé API)
theHarvester \
-d example.com \
-b shodan \
-s # Active la recherche Shodan sur les IPs trouvées
# Sources disponibles:
# - baidu, bing, brave, google, yahoo (moteurs)
# - hunter, emailformat, intelx (email hunters)
# - linkedin (LinkedIn sans auth)
# - shodan, censys, binaryedge (scan engines)
# - github, gitlab (code repositories)
# - dnsdumpster, hackertarget (DNS)
# - sublist3r (sous-domaines)
# Scan passif complet avec toutes les sources disponibles
theHarvester \
-d example.com \
-b all \
-l 200 \
-f full_recon.html \
--dns-brute # Brute-force DNS (semi-actif)
#!/usr/bin/env python3
"""
email_harvester.py — Collecte et validation d'emails d'organisation
via OSINT multi-sources
"""
import re
import requests
import dns.resolver
from typing import Set, List
from dataclasses import dataclass
@dataclass
class EmailFinding:
email: str
source: str
valid_mx: bool
smtp_verify: bool = False # Vérification SMTP (risqué — peut déclencher alertes)
class EmailHarvester:
EMAIL_PATTERN = re.compile(
r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'
)
def __init__(self, target_domain: str):
self.domain = target_domain.lower()
self.collected_emails: Set[str] = set()
def harvest_from_google_cse(self, api_key: str, cx: str) -> List[str]:
"""Collecte d'emails via Google Custom Search Engine (API légale)"""
queries = [
f'site:linkedin.com "@{self.domain}"',
f'"@{self.domain}" filetype:pdf',
f'"@{self.domain}" contact OR team OR about',
]
emails = []
for query in queries:
params = {
"key": api_key,
"cx": cx,
"q": query,
"num": 10
}
try:
resp = requests.get(
"https://www.googleapis.com/customsearch/v1",
params=params, timeout=10
)
data = resp.json()
for item in data.get("items", []):
text = item.get("title", "") + " " + item.get("snippet", "")
found = self.EMAIL_PATTERN.findall(text)
domain_emails = [
e.lower() for e in found
if e.lower().endswith(f"@{self.domain}")
]
emails.extend(domain_emails)
except requests.RequestException:
pass
return list(set(emails))
def validate_mx(self) -> bool:
"""Vérifie que le domaine a des enregistrements MX valides"""
try:
mx_records = dns.resolver.resolve(self.domain, 'MX')
return len(mx_records) > 0
except Exception:
return False
def infer_email_patterns(
self,
known_emails: List[str]
) -> List[str]:
"""
Déduit le pattern d'email de l'organisation depuis les emails connus
Ex: 'john.doe@company.com' → pattern 'firstname.lastname@company.com'
"""
patterns = []
for email in known_emails:
local_part = email.split("@")[0]
# Pattern prénom.nom
if "." in local_part:
parts = local_part.split(".")
if len(parts) == 2:
patterns.append("firstname.lastname")
# Pattern première initiale + nom
elif len(local_part) > 3 and local_part[0].isalpha():
patterns.append("flastname")
# Pattern prénom seul
else:
patterns.append("firstname")
# Pattern le plus fréquent
if patterns:
most_common = max(set(patterns), key=patterns.count)
return [most_common]
return []
Recon-ng : framework de reconnaissance modulaire
Recon-ng est un framework Python de reconnaissance OSINT inspiré de Metasploit. Son architecture modulaire avec une base de données intégrée permet d'enchaîner les modules et de construire progressivement une cartographie de la cible.
# Installation
pip3 install recon-ng
# Lancement et création d'un workspace
recon-ng
[recon-ng] > workspaces create example_corp
[recon-ng] > modules search
# Modules clés pour la reconnaissance
# 1. Enumération DNS
[recon-ng][example_corp] > modules load recon/domains-hosts/hackertarget
[recon-ng][example_corp][hackertarget] > options set SOURCE example.com
[recon-ng][example_corp][hackertarget] > run
# 2. Résolution des IPs
[recon-ng][example_corp] > modules load recon/hosts-hosts/resolve
[recon-ng][example_corp][resolve] > run
# 3. Recherche de sous-domaines via certificats
[recon-ng][example_corp] > modules load recon/domains-hosts/certificate_transparency
[recon-ng][example_corp][certificate_transparency] > options set SOURCE example.com
[recon-ng][example_corp][certificate_transparency] > run
# 4. Collecte d'emails via Hunter.io
[recon-ng][example_corp] > keys add hunter_api YOUR_HUNTER_API_KEY
[recon-ng][example_corp] > modules load recon/domains-contacts/hunter_io
[recon-ng][example_corp][hunter_io] > options set SOURCE example.com
[recon-ng][example_corp][hunter_io] > run
# 5. Géolocalisation des IPs via Shodan
[recon-ng][example_corp] > keys add shodan_api YOUR_SHODAN_API_KEY
[recon-ng][example_corp] > modules load recon/hosts-hosts/shodan_ip
[recon-ng][example_corp][shodan_ip] > run
# 6. Rapport final
[recon-ng][example_corp] > modules load reporting/html
[recon-ng][example_corp][html] > options set FILENAME /tmp/recon_report.html
[recon-ng][example_corp][html] > run
# Visualisation de l'état de la base de données
[recon-ng][example_corp] > show hosts
[recon-ng][example_corp] > show contacts
[recon-ng][example_corp] > show credentials
#!/usr/bin/env python3
"""
recon_automation.py — Automatisation Recon-ng via son API Python
"""
import sqlite3
import subprocess
import json
from pathlib import Path
class ReconNGAutomation:
"""Automatisation de Recon-ng pour la reconnaissance OSINT"""
def __init__(self, workspace: str, target_domain: str):
self.workspace = workspace
self.domain = target_domain
self.recon_path = Path.home() / ".recon-ng"
self.db_path = self.recon_path / "workspaces" / workspace / "data.db"
def run_reconnaissance(self) -> None:
"""Lance une séquence de reconnaissance complète"""
commands = f"""
workspaces create {self.workspace}
modules load recon/domains-hosts/hackertarget
options set SOURCE {self.domain}
run
modules load recon/domains-hosts/certificate_transparency
options set SOURCE {self.domain}
run
modules load recon/hosts-hosts/resolve
run
modules load recon/domains-contacts/whois_pocs
options set SOURCE {self.domain}
run
exit
"""
# Exécuter Recon-ng avec les commandes
proc = subprocess.run(
["recon-ng", "-w", self.workspace],
input=commands.encode(),
capture_output=True,
timeout=300
)
if proc.returncode != 0:
print(f"[WARN] Recon-ng output: {proc.stderr.decode()[:500]}")
def extract_results(self) -> dict:
"""Extrait les résultats depuis la base SQLite de Recon-ng"""
if not self.db_path.exists():
return {}
results = {}
with sqlite3.connect(str(self.db_path)) as conn:
conn.row_factory = sqlite3.Row
# Hôtes découverts
hosts = conn.execute(
"SELECT host, ip_address, region, country FROM hosts"
).fetchall()
results["hosts"] = [dict(h) for h in hosts]
# Contacts (emails, personnes)
contacts = conn.execute(
"SELECT first_name, last_name, email, title FROM contacts"
).fetchall()
results["contacts"] = [dict(c) for c in contacts]
# Credentials découverts (si module de leak search)
try:
creds = conn.execute(
"SELECT username, password, hash, type FROM credentials"
).fetchall()
results["credentials"] = [dict(c) for c in creds]
except sqlite3.OperationalError:
results["credentials"] = []
return results
Maltego : analyse de graphes relationnels
Maltego est le standard de l'industrie pour la visualisation des relations entre entités OSINT. Son modèle de données basé sur des entités (personnes, domaines, IPs, organisations) et des relations permet de construire des graphes de corrélation complexes.
Entités et transformations Maltego clés
| Entité source | Transformation | Entités cibles | Source de données |
|---|---|---|---|
| Domain | To DNS Name - MX | DNS Name (mail servers) | DNS public |
| Domain | To Email Addresses [Hunter] | Email Address | Hunter.io |
| Domain | To Netblocks [Whois] | Netblock | WHOIS |
| Netblock | To IPs [Shodan] | IP Address + Services | Shodan |
| IP Address | To Certificates | SSL Certificate → Domains | Shodan/Censys |
| Person | To Social Networks | LinkedIn, Twitter, GitHub | Social APIs |
| Email Address | To Breached Accounts | Breach records | HaveIBeenPwned |
Automatisation Maltego via Python (iTDS API)
#!/usr/bin/env python3
"""
maltego_transform.py — Création d'une transformation Maltego personnalisée
pour enrichir les entités avec des données OSINT internes
"""
from maltego_trx.maltego import MaltegoMsg, MaltegoTransform
from maltego_trx.decorator_registry import TransformSetting
def domain_to_subdomains_passive(request: MaltegoMsg, response: MaltegoTransform) -> None:
"""
Transformation Maltego: Domain → Subdomains via Certificate Transparency
Source: crt.sh (données publiques)
"""
domain = request.Value.strip()
if not domain or '.' not in domain:
response.addUIMessage("Domaine invalide", messageType="Partial")
return
try:
import requests
# Requête à crt.sh pour les certificats
crt_url = f"https://crt.sh/?q=%.{domain}&output=json"
resp = requests.get(crt_url, timeout=15)
resp.raise_for_status()
certificates = resp.json()
subdomains = set()
for cert in certificates:
name = cert.get("name_value", "")
for subdomain in name.split("\n"):
subdomain = subdomain.strip()
if (subdomain.endswith(f".{domain}") and
not subdomain.startswith("*") and
len(subdomain) > len(domain) + 1):
subdomains.add(subdomain)
# Créer les entités Maltego pour chaque sous-domaine
for subdomain in sorted(subdomains)[:50]: # Limiter à 50
entity = response.addEntity("maltego.DNSName", subdomain)
entity.addProperty("fqdn", "FQDN", "strict", subdomain)
entity.addProperty(
"source",
"Source",
"strict",
"Certificate Transparency (crt.sh)"
)
response.addUIMessage(
f"{len(subdomains)} sous-domaines découverts via Certificate Transparency"
)
except Exception as e:
response.addUIMessage(f"Erreur: {str(e)}", messageType="Partial")
OSINT sur les réseaux sociaux
Les réseaux sociaux sont une source OSINT majeure pour l'investigation sur les individus (dans un cadre légal) et les organisations. LinkedIn est particulièrement riche pour la reconnaissance corporate.
LinkedIn OSINT : cartographie organisationnelle
#!/usr/bin/env python3
"""
linkedin_osint.py — OSINT LinkedIn via la recherche Google et l'API Proxycurl
(respecte les ToS LinkedIn en n'utilisant pas de scraping direct)
"""
import requests
import time
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class LinkedInEmployee:
name: str
title: str
department: str
location: str
linkedin_url: str
email_guess: Optional[str] = None
class LinkedInOSINT:
"""
Reconnaissance LinkedIn via:
1. Google Custom Search (index LinkedIn publiquement)
2. Proxycurl API (accès légal aux profils publics)
"""
def __init__(self, google_api_key: str, google_cx: str,
proxycurl_key: Optional[str] = None):
self.google_api = google_api_key
self.google_cx = google_cx
self.proxycurl_key = proxycurl_key
def find_employees(
self,
company_name: str,
target_titles: List[str] = None
) -> List[LinkedInEmployee]:
"""
Trouve les employés d'une organisation via Google/LinkedIn
"""
employees = []
default_titles = [
"security", "CISO", "CTO", "CIO", "IT", "network",
"developer", "engineer", "architect", "DevOps",
"infrastructure", "cloud", "system administrator"
]
titles_to_search = target_titles or default_titles
for title in titles_to_search[:5]: # Limiter les requêtes
query = (
f'site:linkedin.com/in/ "{company_name}" "{title}"'
)
params = {
"key": self.google_api,
"cx": self.google_cx,
"q": query,
"num": 10
}
try:
resp = requests.get(
"https://www.googleapis.com/customsearch/v1",
params=params, timeout=10
)
data = resp.json()
for item in data.get("items", []):
# Parser le titre LinkedIn depuis le résultat Google
# Format: "Name - Title at Company | LinkedIn"
page_title = item.get("title", "")
snippet = item.get("snippet", "")
url = item.get("link", "")
if "linkedin.com/in/" in url:
employee = self._parse_linkedin_result(
page_title, snippet, url
)
if employee:
employees.append(employee)
except requests.RequestException:
pass
time.sleep(1.5) # Rate limiting
# Déduplication
seen_urls = set()
unique_employees = []
for emp in employees:
if emp.linkedin_url not in seen_urls:
seen_urls.add(emp.linkedin_url)
unique_employees.append(emp)
return unique_employees
def _parse_linkedin_result(
self,
title: str,
snippet: str,
url: str
) -> Optional[LinkedInEmployee]:
"""Parse un résultat Google de profil LinkedIn"""
# Format typique: "John Doe - Security Engineer at Example Corp | LinkedIn"
if " - " in title:
parts = title.split(" - ", 1)
name = parts[0].strip()
rest = parts[1].replace(" | LinkedIn", "").strip()
# Extraire le titre et l'entreprise
if " at " in rest:
job_title = rest.split(" at ")[0].strip()
department = self._guess_department(job_title)
else:
job_title = rest
department = "Inconnu"
return LinkedInEmployee(
name=name,
title=job_title,
department=department,
location=self._extract_location(snippet),
linkedin_url=url
)
return None
def _guess_department(self, title: str) -> str:
"""Déduit le département depuis le titre"""
title_lower = title.lower()
if any(k in title_lower for k in ["security", "ciso", "soc", "siem"]):
return "Sécurité"
elif any(k in title_lower for k in ["developer", "engineer", "software"]):
return "Développement"
elif any(k in title_lower for k in ["network", "system", "infrastructure"]):
return "Infrastructure"
elif any(k in title_lower for k in ["cto", "cio", "vp", "director"]):
return "Direction"
return "IT"
def _extract_location(self, snippet: str) -> str:
"""Extrait la localisation depuis le snippet Google"""
import re
# LinkedIn inclut souvent "Paris, Île-de-France" dans les snippets
loc_pattern = re.compile(r'([A-Z][a-z]+(?:,\s*[A-Z][a-zÀ-ÿ\-]+)*)\s*·')
match = loc_pattern.search(snippet)
return match.group(1) if match else ""
def guess_email(
self,
employee: LinkedInEmployee,
domain: str,
pattern: str = "firstname.lastname"
) -> str:
"""Génère une hypothèse d'email selon le pattern de l'organisation"""
name_parts = employee.name.lower().split()
if len(name_parts) < 2:
return ""
firstname = name_parts[0]
lastname = name_parts[-1]
# Normalisation (accents, tirets)
import unicodedata
def normalize(s: str) -> str:
return ''.join(
c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn'
).replace("-", "").replace("'", "")
fn = normalize(firstname)
ln = normalize(lastname)
patterns = {
"firstname.lastname": f"{fn}.{ln}@{domain}",
"flastname": f"{fn[0]}{ln}@{domain}",
"firstnamelastname": f"{fn}{ln}@{domain}",
"firstname": f"{fn}@{domain}",
"lastname.firstname": f"{ln}.{fn}@{domain}",
}
return patterns.get(pattern, f"{fn}.{ln}@{domain}")
OSINT réseaux sociaux : limites légales
- L'OSINT LinkedIn est légale sur les profils publics, mais construire un profil complet d'un individu privé peut constituer une atteinte à la vie privée (Art. 226-1 Cp)
- La génération d'hypothèses d'emails est légale ; l'envoi d'emails non sollicités est du spam (RGPD, Art. L34-5 CPCE)
- Les données LinkedIn ne doivent pas être stockées durablement sans base légale — traitement pour un audit spécifique = durée limitée à l'audit
- Les investigations sur des personnes physiques dans un contexte CTI doivent être limitées aux acteurs de menace identifiés comme tels, pas aux individus ordinaires
Reconnaissance DNS et certificats TLS
La reconnaissance DNS est l'une des techniques OSINT les plus productives — les enregistrements DNS sont publics par conception et révèlent une quantité considérable d'information sur l'infrastructure d'une organisation.
# === RECONNAISSANCE DNS PASSIVE ===
# Enregistrements DNS complets d'un domaine
dig example.com ANY
dig +short example.com MX
dig +short example.com TXT # SPF, DMARC, DKIM, vérification Google
dig +short example.com NS # Serveurs DNS autoritaires
dig +short example.com SOA # Start of Authority (infos admin)
dig +short example.com AAAA # IPv6
# Reverse DNS (PTR records)
dig -x 203.0.113.42
# Zone transfer (si mal configuré — souvent bloqué)
dig @ns1.example.com example.com AXFR
# DMARC — révèle l'email gateway
dig +short _dmarc.example.com TXT
# SPF — révèle les IPs et services d'envoi d'emails
dig +short example.com TXT | grep "v=spf"
# DKIM — révèle les sélecteurs (google/selector1/k1/etc = services tiers)
dig +short google._domainkey.example.com TXT
dig +short selector1._domainkey.example.com TXT
# Enumération de sous-domaines via Certificate Transparency
curl -s "https://crt.sh/?q=%.example.com&output=json" |
jq -r '.[].name_value' |
sort -u |
grep -v "^\*"
# DNSRecon — outil de reconnaissance DNS complet
dnsrecon -d example.com -t std # Standard DNS recon
dnsrecon -d example.com -t brt -D /usr/share/wordlists/subdomains.txt # Brute
# Amass — reconnaissance passive de sous-domaines
amass enum -passive -d example.com -o amass_results.txt
# Subfinder — multisource subdomain discovery
subfinder -d example.com -o subfinder_results.txt -all
#!/usr/bin/env python3
"""
dns_recon.py — Reconnaissance DNS passive multi-technique
"""
import dns.resolver
import requests
import json
import re
from typing import List, Set, Dict
from dataclasses import dataclass
@dataclass
class DNSReconResult:
domain: str
subdomains: Set[str]
ip_addresses: Dict[str, str] # subdomain -> IP
mx_records: List[str]
spf_includes: List[str] # Services cloud révélés via SPF
cloud_providers: List[str] # AWS, Azure, GCP détectés
certificate_sans: List[str]
class PassiveDNSRecon:
"""Reconnaissance DNS entièrement passive — aucun paquet envoyé à la cible"""
# Services cloud révélés par les SPF includes
SPF_CLOUD_MAPPING = {
"include:spf.protection.outlook.com": "Microsoft 365",
"include:_spf.google.com": "Google Workspace",
"include:amazonses.com": "Amazon SES",
"include:sendgrid.net": "SendGrid",
"include:mailgun.org": "Mailgun",
"include:spf.mandrillapp.com": "Mandrill/Mailchimp",
"include:servers.mcsv.net": "Mailchimp",
"include:spf.sparkpostmail.com": "SparkPost",
}
def __init__(self, domain: str):
self.domain = domain.lower()
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = ["8.8.8.8", "1.1.1.1"] # DNS publics
def full_recon(self) -> DNSReconResult:
"""Lance une reconnaissance DNS passive complète"""
result = DNSReconResult(
domain=self.domain,
subdomains=set(),
ip_addresses={},
mx_records=[],
spf_includes=[],
cloud_providers=[],
certificate_sans=[]
)
# 1. Enregistrements de base
result.mx_records = self._get_mx()
# 2. Analyse SPF pour découvrir les services cloud
result.spf_includes, result.cloud_providers = self._analyze_spf()
# 3. Certificate Transparency (crt.sh)
ct_subdomains = self._query_certificate_transparency()
result.subdomains.update(ct_subdomains)
result.certificate_sans = sorted(ct_subdomains)
# 4. Résolution IP des sous-domaines
for subdomain in list(result.subdomains)[:50]:
ip = self._resolve_to_ip(subdomain)
if ip:
result.ip_addresses[subdomain] = ip
# Détection cloud via CNAME/IP
provider = self._detect_cloud_from_ip(subdomain, ip)
if provider and provider not in result.cloud_providers:
result.cloud_providers.append(provider)
return result
def _get_mx(self) -> List[str]:
"""Récupère les enregistrements MX"""
try:
mx_answers = self.resolver.resolve(self.domain, 'MX')
return sorted(
[str(r.exchange).rstrip('.') for r in mx_answers],
key=lambda x: x.split('.')[0]
)
except Exception:
return []
def _analyze_spf(self) -> tuple:
"""Analyse le SPF pour identifier les services cloud utilisés"""
spf_includes = []
cloud_providers = []
try:
txt_answers = self.resolver.resolve(self.domain, 'TXT')
for rdata in txt_answers:
txt = str(rdata).strip('"')
if txt.startswith('v=spf1'):
# Extraire les includes
includes = re.findall(r'include:([^\s"]+)', txt)
spf_includes = [f"include:{i}" for i in includes]
# Mapper vers les fournisseurs
for include in spf_includes:
for spf_key, provider in self.SPF_CLOUD_MAPPING.items():
if spf_key in include:
if provider not in cloud_providers:
cloud_providers.append(provider)
except Exception:
pass
return spf_includes, cloud_providers
def _query_certificate_transparency(self) -> Set[str]:
"""Requête crt.sh pour les certificats TLS"""
subdomains = set()
try:
resp = requests.get(
f"https://crt.sh/?q=%.{self.domain}&output=json",
timeout=20
)
resp.raise_for_status()
for cert in resp.json():
name = cert.get("name_value", "")
for name_entry in name.split("\n"):
name_entry = name_entry.strip()
if (name_entry.endswith(f".{self.domain}") and
not name_entry.startswith("*") and
" " not in name_entry):
subdomains.add(name_entry.lower())
except Exception:
pass
return subdomains
def _resolve_to_ip(self, hostname: str) -> str:
"""Résout un hostname en IP (passif — utilise les DNS publics)"""
try:
answers = self.resolver.resolve(hostname, 'A')
return str(answers[0])
except Exception:
return ""
def _detect_cloud_from_ip(self, hostname: str, ip: str) -> str:
"""Détecte le fournisseur cloud depuis le CNAME ou l'IP"""
cloud_patterns = {
"amazonaws.com": "AWS",
"cloudfront.net": "AWS CloudFront",
"azurewebsites.net": "Azure",
"blob.core.windows.net": "Azure Storage",
"googleusercontent.com": "Google Cloud",
"appspot.com": "Google App Engine",
"cloudflare": "Cloudflare",
"fastly.net": "Fastly CDN",
}
try:
cname_answers = self.resolver.resolve(hostname, 'CNAME')
cname = str(cname_answers[0]).rstrip('.')
for pattern, provider in cloud_patterns.items():
if pattern in cname:
return provider
except Exception:
pass
return ""
OSINT pour la Cyber Threat Intelligence
Dans un contexte CTI (Cyber Threat Intelligence), l'OSINT est utilisé pour suivre les acteurs de menace, identifier leurs indicateurs de compromission (IOCs), et anticiper leurs prochaines cibles. Cette application de l'OSINT est la plus sophistiquée et nécessite une connaissance approfondie des TTPs (Tactiques, Techniques et Procédures) des acteurs.
Sources CTI open source
#!/usr/bin/env python3
"""
cti_osint_aggregator.py — Agrégateur d'IOCs depuis des sources CTI open source
"""
import requests
import json
import csv
import io
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Set
@dataclass
class ThreatIndicator:
value: str
type: str # ip, domain, hash, url, email
source: str
first_seen: str
last_seen: str
threat_type: str # malware, ransomware, apt, phishing
confidence: int # 0-100
tags: List[str] = field(default_factory=list)
class CTIOSINTAggregator:
"""
Agrégateur d'IOCs depuis des sources CTI open source:
- AlienVault OTX (Open Threat Exchange)
- ThreatFox (abuse.ch)
- URLhaus (abuse.ch)
- PhishTank
- MalwareBazaar
"""
FEEDS = {
"alienVault_otx_malware_ips": {
"url": "https://reputation.alienvault.com/reputation.data",
"type": "ip",
"threat_type": "malware",
"confidence": 70
},
"threatfox_json": {
"url": "https://threatfox-api.abuse.ch/api/v1/",
"type": "multiple",
"threat_type": "malware",
"confidence": 80
},
"urlhaus_csv": {
"url": "https://urlhaus.abuse.ch/downloads/csv_recent/",
"type": "url",
"threat_type": "malware",
"confidence": 85
},
"feodo_tracker": {
"url": "https://feodotracker.abuse.ch/downloads/ipblocklist.csv",
"type": "ip",
"threat_type": "botnet_c2",
"confidence": 90
}
}
def __init__(self, otx_api_key: str = ""):
self.otx_api_key = otx_api_key
self.indicators: List[ThreatIndicator] = []
def fetch_threatfox(self, days: int = 1) -> List[ThreatIndicator]:
"""Récupère les IOCs ThreatFox des N derniers jours"""
indicators = []
payload = {"query": "get_iocs", "days": days}
try:
resp = requests.post(
"https://threatfox-api.abuse.ch/api/v1/",
json=payload,
timeout=30
)
data = resp.json()
for ioc in data.get("data", []):
indicators.append(ThreatIndicator(
value=ioc.get("ioc", ""),
type=ioc.get("ioc_type", "").lower().replace(" ", "_"),
source="ThreatFox (abuse.ch)",
first_seen=ioc.get("first_seen", ""),
last_seen=ioc.get("last_seen", ""),
threat_type=ioc.get("threat_type", "").lower(),
confidence=ioc.get("confidence_level", 0),
tags=[ioc.get("malware", ""), ioc.get("threat_type_desc", "")]
))
except requests.RequestException as e:
print(f"[WARN] ThreatFox fetch error: {e}")
return indicators
def fetch_urlhaus(self) -> List[ThreatIndicator]:
"""Récupère les URLs malveillantes depuis URLhaus"""
indicators = []
try:
resp = requests.get(
"https://urlhaus.abuse.ch/downloads/csv_recent/",
timeout=30
)
resp.raise_for_status()
reader = csv.DictReader(
io.StringIO(resp.text.replace("# ", "")),
fieldnames=["id", "dateadded", "url", "url_status",
"last_online", "threat", "tags", "urlhaus_link", "reporter"]
)
for row in list(reader)[9:]: # Skip headers/comments
if row.get("url_status") == "online": # Actif uniquement
indicators.append(ThreatIndicator(
value=row.get("url", ""),
type="url",
source="URLhaus (abuse.ch)",
first_seen=row.get("dateadded", ""),
last_seen=row.get("last_online", ""),
threat_type=row.get("threat", "").lower(),
confidence=85,
tags=row.get("tags", "").split(",") if row.get("tags") else []
))
except requests.RequestException as e:
print(f"[WARN] URLhaus fetch error: {e}")
return indicators
def check_indicator(self, value: str, indicator_type: str = None) -> dict:
"""
Vérifie si un indicateur est connu comme malveillant
Multi-source avec scoring de confiance
"""
matches = []
max_confidence = 0
for ioc in self.indicators:
if ioc.value == value:
if indicator_type and ioc.type != indicator_type:
continue
matches.append(ioc)
max_confidence = max(max_confidence, ioc.confidence)
if not matches:
return {"found": False, "value": value}
return {
"found": True,
"value": value,
"confidence": max_confidence,
"threat_types": list(set(m.threat_type for m in matches)),
"sources": list(set(m.source for m in matches)),
"tags": list(set(tag for m in matches for tag in m.tags if tag)),
"verdict": "MALICIOUS" if max_confidence >= 70 else "SUSPICIOUS"
}
def export_misp(self, output_file: str) -> None:
"""Exporte les IOCs au format MISP pour import dans une plateforme TIP"""
misp_event = {
"Event": {
"info": f"CTI OSINT Feed - {datetime.utcnow().strftime('%Y-%m-%d')}",
"distribution": 1,
"threat_level_id": 2,
"analysis": 2,
"Attribute": []
}
}
for ioc in self.indicators:
misp_type_map = {
"ip_port": "ip-dst|port",
"ip": "ip-dst",
"domain": "domain",
"url": "url",
"md5_hash": "md5",
"sha256_hash": "sha256",
"email": "email-src"
}
misp_type = misp_type_map.get(ioc.type, ioc.type)
misp_event["Event"]["Attribute"].append({
"category": "Network activity",
"type": misp_type,
"value": ioc.value,
"comment": f"Source: {ioc.source} | Threat: {ioc.threat_type}",
"tags": ioc.tags
})
with open(output_file, 'w') as f:
json.dump(misp_event, f, indent=2, ensure_ascii=False)
Dark Web Monitoring via OSINT
Le dark web monitoring consiste à surveiller les espaces du web non indexés par les moteurs classiques (sites .onion du réseau Tor, marchés clandestins, forums de hackers) pour détecter des données d'une organisation qui auraient été volées et mises en vente.
# === DARK WEB MONITORING — APPROCHE LÉGALE ===
# NE PAS accéder aux marchés de drogues ou d'armes
# Focus: détection de données volées, credentials leakés
# 1. Services de monitoring commercial (légaux)
# - IntelX (intelligence X) — indexe le dark web légalement
# - DarkOwl — spécialisé dark web intelligence
# - Have I Been Pwned — breaches de données
# Vérification d'un domaine sur HIBP (Have I Been Pwned) via API
curl -H "hibp-api-key: $HIBP_API_KEY" \
"https://haveibeenpwned.com/api/v3/breachesforaccount/user@example.com"
# Recherche de breaches affectant un domaine entier (service payant)
curl -H "hibp-api-key: $HIBP_API_KEY" \
"https://haveibeenpwned.com/api/v3/enterprisebreachedaccount/example.com"
# 2. Tor Browser pour la surveillance manuelle (légale si lecture uniquement)
# Installation Tor
sudo apt-get install tor torsocks
# Configuration du proxy Tor
# /etc/tor/torrc:
SocksPort 9050
SocksPort 9150
# Requêtes via Tor (proxy SOCKS5)
torsocks curl -s "http://darkwebsite.onion/search?q=example.com"
# 3. Onionoo — données publiques Tor network
curl "https://onionoo.torproject.org/details?search=example.com"
#!/usr/bin/env python3
"""
darkweb_monitor.py — Monitoring du dark web pour la détection de fuites
via des services d'intelligence légaux
"""
import requests
import hashlib
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class BreachRecord:
source: str
breach_date: str
data_types: List[str]
is_verified: bool
account_count: int
description: str
class DarkWebMonitor:
"""
Monitoring de la présence de données d'organisation sur le dark web
via des APIs légales et éthiques
"""
HIBP_API = "https://haveibeenpwned.com/api/v3"
def __init__(self, hibp_api_key: str, intelx_api_key: str = ""):
self.hibp_key = hibp_api_key
self.intelx_key = intelx_api_key
self.session = requests.Session()
self.session.headers.update({
"hibp-api-key": hibp_api_key,
"User-Agent": "DarkWebMonitor/1.0"
})
def check_email_breach(self, email: str) -> List[BreachRecord]:
"""Vérifie si un email apparaît dans des breaches connues"""
try:
resp = self.session.get(
f"{self.HIBP_API}/breachedaccount/{email}",
params={"truncateResponse": "false"},
timeout=10
)
if resp.status_code == 404:
return [] # Pas trouvé — bonne nouvelle
resp.raise_for_status()
breaches = resp.json()
return [
BreachRecord(
source=b.get("Name", ""),
breach_date=b.get("BreachDate", ""),
data_types=b.get("DataClasses", []),
is_verified=b.get("IsVerified", False),
account_count=b.get("PwnCount", 0),
description=b.get("Description", "")[:200]
)
for b in breaches
]
except requests.RequestException as e:
print(f"[WARN] HIBP error for {email}: {e}")
return []
def check_password_pwned(self, password: str) -> int:
"""
Vérifie si un mot de passe est dans les bases leakées
Utilise k-anonymity — le mot de passe n'est jamais envoyé
Retourne le nombre de fois que le hash a été trouvé (0 = safe)
"""
# SHA-1 hash du mot de passe
sha1_hash = hashlib.sha1(password.encode()).hexdigest().upper()
prefix = sha1_hash[:5]
suffix = sha1_hash[5:]
try:
resp = requests.get(
f"https://api.pwnedpasswords.com/range/{prefix}",
timeout=10
)
resp.raise_for_status()
# Chercher le suffix dans la réponse
for line in resp.text.splitlines():
hash_suffix, count = line.split(":")
if hash_suffix == suffix:
return int(count)
return 0 # Non trouvé
except requests.RequestException:
return -1 # Erreur — indéterminé
def monitor_domain(
self,
domain: str,
employee_emails: List[str] = None
) -> dict:
"""
Monitoring complet d'un domaine:
- Breaches affectant le domaine
- Emails d'employés compromis
"""
report = {
"domain": domain,
"scan_date": __import__("datetime").datetime.utcnow().isoformat(),
"breaches_by_email": {},
"total_compromised_emails": 0,
"highest_risk_data_types": []
}
all_data_types = []
if employee_emails:
for email in employee_emails:
if not email.endswith(f"@{domain}"):
continue
breaches = self.check_email_breach(email)
if breaches:
report["breaches_by_email"][email] = [
{
"source": b.source,
"date": b.breach_date,
"data_types": b.data_types
}
for b in breaches
]
report["total_compromised_emails"] += 1
all_data_types.extend(
dt for b in breaches for dt in b.data_types
)
import time
time.sleep(1.5) # HIBP rate limit: 1 req/1.5s
# Données les plus fréquemment leakées
from collections import Counter
data_type_counts = Counter(all_data_types)
report["highest_risk_data_types"] = [
{"type": dt, "occurrences": count}
for dt, count in data_type_counts.most_common(10)
]
report["risk_level"] = (
"CRITICAL" if report["total_compromised_emails"] > 10 else
"HIGH" if report["total_compromised_emails"] > 3 else
"MEDIUM" if report["total_compromised_emails"] > 0 else
"LOW"
)
return report
OPSEC pour les investigateurs OSINT
L'OPSEC (Operational Security) est indispensable pour les investigateurs OSINT — toute visite directe d'un site cible peut être détectée via les logs d'accès, les Referer headers, ou les systèmes de tracking. Un bon investigateur OSINT ne laisse pas de traces de ses recherches sur les systèmes cibles.
Environnement OSINT isolé
# === ENVIRONNEMENT OSINT SÉCURISÉ ===
# 1. Machine virtuelle dédiée
# Ne jamais faire de l'OSINT depuis la machine principale
# Utiliser une VM dédiée, réinitialisable (snapshot avant chaque investigation)
# 2. VPN + Tor pour l'anonymisation
# VPN → Tor pour l'investigation sensible
# (à utiliser uniquement dans un cadre légal)
# Installation Whonix (distribution dédiée OPSEC)
# Whonix Gateway + Whonix Workstation = double anonymisation
# 3. Browser OPSEC
# Firefox avec profil dédié
firefox -new-instance -profile /tmp/osint-profile
# Paramètres minimaux:
# - about:config -> privacy.resistFingerprinting = true
# - Désactiver JavaScript pour les sites sensibles
# - NoScript + uBlock Origin
# - User Agent switcher (simuler un navigateur différent)
# 4. Comptes dédiés OSINT (sock puppets légaux)
# Créer des comptes LinkedIn/Twitter dédiés à l'investigation
# Ne jamais utiliser vos comptes personnels ou professionnels
# 5. Téléphone jetable pour les SMS de vérification
# Utiliser un numéro temporaire (ex: Twilio pour les registrations)
# 6. Recherches Google en mode privé et via Tor
torsocks curl -s \
"https://www.google.com/search?q=site:example.com&num=10" \
-H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
# 7. Vérification des fuites WebRTC avant investigation
# https://browserleaks.com/webrtc
# Les extensions comme uBlock Origin bloquent les fuites WebRTC
Automatisation OSINT avec Python : framework complet
#!/usr/bin/env python3
"""
osint_framework.py — Framework d'automatisation OSINT complet
Orchestre theHarvester, Shodan, DNS recon et CTI en une seule investigation
"""
import asyncio
import json
import argparse
from datetime import datetime
from pathlib import Path
from typing import Optional
class OSINTFramework:
"""
Framework d'investigation OSINT automatisé
Combine: DNS, Certificate Transparency, Shodan, Email Harvesting, CTI
"""
def __init__(
self,
target_domain: str,
shodan_api_key: str,
output_dir: str = "/tmp/osint_results",
verbose: bool = False
):
self.domain = target_domain
self.shodan_key = shodan_api_key
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.verbose = verbose
self.results = {
"target": target_domain,
"investigation_start": datetime.utcnow().isoformat(),
"phases": {}
}
async def run_full_investigation(self) -> dict:
"""Lance une investigation OSINT complète en mode asynchrone"""
print(f"[*] Démarrage investigation OSINT: {self.domain}")
print(f"[*] Heure de début: {datetime.utcnow().isoformat()}")
print("-" * 60)
# Phase 1: Reconnaissance DNS
print("[PHASE 1] Reconnaissance DNS passive...")
dns_recon = PassiveDNSRecon(self.domain)
dns_results = dns_recon.full_recon()
self.results["phases"]["dns"] = {
"subdomains_count": len(dns_results.subdomains),
"subdomains": sorted(dns_results.subdomains)[:100],
"mx_records": dns_results.mx_records,
"cloud_providers": dns_results.cloud_providers,
"spf_services": dns_results.spf_includes
}
print(f" Sous-domaines trouvés: {len(dns_results.subdomains)}")
print(f" Fournisseurs cloud détectés: {', '.join(dns_results.cloud_providers)}")
# Phase 2: Shodan
print("\n[PHASE 2] Reconnaissance Shodan...")
shodan_recon = ShodanRecon(self.shodan_key)
shodan_results = shodan_recon.recon_organization(self.domain)
self.results["phases"]["shodan"] = {
"hosts_count": len(shodan_results.get("hosts", [])),
"exposed_services": shodan_results.get("exposed_services", {}),
"vulnerabilities": shodan_results.get("vulnerabilities", [])[:20],
"interesting_findings": shodan_results.get("interesting_findings", [])
}
print(f" Hôtes indexés: {len(shodan_results.get('hosts', []))}")
print(f" CVE trouvées: {len(shodan_results.get('vulnerabilities', []))}")
# Phase 3: Google Dorking (via API)
print("\n[PHASE 3] Analyse de la surface d'exposition web...")
# (simulation — nécessite une clé SerpAPI)
self.results["phases"]["web_exposure"] = {
"status": "requires_serpapi_key",
"note": "Configurer une clé SerpAPI pour activer cette phase"
}
# Phase 4: CTI — vérification des IOCs
print("\n[PHASE 4] Threat Intelligence...")
cti = CTIOSINTAggregator()
# Récupérer les IOCs récents
iocs = cti.fetch_threatfox(days=7)
# Vérifier les IPs découvertes
target_ips = [
ip for ip in shodan_results.get("ip_addresses", {}).values()
if ip
]
cti_matches = []
for ip in target_ips[:10]:
result = cti.check_indicator(ip, "ip")
if result.get("found"):
cti_matches.append(result)
self.results["phases"]["cti"] = {
"target_ips_checked": len(target_ips),
"malicious_indicators_found": len(cti_matches),
"matches": cti_matches
}
print(f" IPs analysées: {len(target_ips)}")
print(f" Indicateurs malveillants: {len(cti_matches)}")
# Finalisation
self.results["investigation_end"] = datetime.utcnow().isoformat()
self.results["executive_summary"] = self._generate_summary()
# Sauvegarde
output_file = self.output_dir / f"osint_{self.domain}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"\n[+] Rapport sauvegardé: {output_file}")
return self.results
def _generate_summary(self) -> dict:
"""Génère un résumé exécutif de l'investigation"""
phases = self.results["phases"]
critical_findings = []
# CVE critiques
vulns = phases.get("shodan", {}).get("vulnerabilities", [])
critical_cves = [v for v in vulns if v.get("cve") in KNOWN_CRITICAL_CVE]
if critical_cves:
critical_findings.append(
f"{len(critical_cves)} CVE critiques sur les services exposés"
)
# IOCs malveillants
cti_hits = phases.get("cti", {}).get("malicious_indicators_found", 0)
if cti_hits:
critical_findings.append(
f"{cti_hits} IP(s) classée(s) malveillante(s) dans les feeds CTI"
)
return {
"target": self.domain,
"total_attack_surface": {
"subdomains": phases.get("dns", {}).get("subdomains_count", 0),
"exposed_hosts": phases.get("shodan", {}).get("hosts_count", 0),
"known_vulnerabilities": len(phases.get("shodan", {}).get("vulnerabilities", []))
},
"critical_findings": critical_findings,
"risk_level": "CRITICAL" if len(critical_findings) > 1 else
"HIGH" if critical_findings else "MEDIUM",
"recommended_actions": [
"Auditer et réduire les services exposés sur Internet",
"Patcher les CVE critiques identifiées en priorité",
"Implémenter un programme de gestion de la surface d'attaque (ASM)",
"Activer le monitoring continu via Shodan Monitor",
]
}
# Point d'entrée CLI
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Framework OSINT automatisé")
parser.add_argument("domain", help="Domaine cible de l'investigation")
parser.add_argument("--shodan-key", required=True, help="Clé API Shodan")
parser.add_argument("--output", default="/tmp/osint_results", help="Répertoire de sortie")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
framework = OSINTFramework(
target_domain=args.domain,
shodan_api_key=args.shodan_key,
output_dir=args.output,
verbose=args.verbose
)
asyncio.run(framework.run_full_investigation())
Automatisation OSINT : bonnes pratiques
- Toujours respecter les rate limits des APIs — Shodan, HIBP, Google ont des limites strictes et bloquent les abus
- Loguer toutes les requêtes avec timestamp pour reconstituer la chronologie d'investigation (traçabilité légale)
- Ne jamais hardcoder les clés API dans les scripts — utiliser des variables d'environnement ou un vault
- Séparer la collecte, le traitement et l'analyse dans des modules indépendants — facilite la maintenance et le test unitaire
OSINT pour les tests de pénétration autorisés
Dans le cadre d'un test de pénétration (pentest) avec autorisation écrite, l'OSINT constitue la phase de reconnaissance qui détermine la profondeur et la précision de l'engagement. La règle d'or est que l'OSINT passif est toujours autorisé ; l'OSINT actif (port scanning, fingerprinting) nécessite explicitement d'être dans le scope.
Rapport de reconnaissance OSINT pour un pentest
## RAPPORT DE RECONNAISSANCE OSINT
### Engagement: Pentest Externe — Example Corp
### Date: 2024-03-15 | Durée: 3 jours
---
## PÉRIMÈTRE D'INVESTIGATION
- Domaine principal: example.com
- Filiales: subsidiary.com, example-eu.com
- Plages IP: 203.0.113.0/24 (AS12345)
## RÉSUMÉ EXÉCUTIF
L'investigation OSINT a permis de cartographier une surface d'attaque
significativement plus large que déclarée par le client.
**Risques critiques identifiés:**
- 3 services de gestion (RDP, SSH) directement exposés sur Internet
- 2 CVE critiques (CVSS ≥ 9.0) non patchées sur des services publics
- 47 comptes email présents dans des breaches connues, dont 12 avec des mots
de passe en clair leakés
- Repository GitHub public contenant des credentials de développement
---
## DÉCOUVERTES TECHNIQUES
### Surface d'attaque DNS
| Type | Valeur | Risque |
|------|--------|--------|
| Sous-domaine | dev.example.com | HIGH — interface de développement exposée |
| Sous-domaine | backup.example.com | CRITICAL — interface backup sans auth potentielle |
| MX | mail.google.com | INFO — Google Workspace |
### Services Shodan exposés
| IP | Port | Service | CVE | Risque |
|----|------|---------|-----|--------|
| 203.0.113.42 | 3389 | RDP | CVE-2019-0708 | CRITICAL |
| 203.0.113.55 | 22 | OpenSSH 7.4 | CVE-2023-38408 | HIGH |
### Breaches et credentials
- 47 emails @example.com dans HIBP
- 12 comptes avec mot de passe en clair dans combo listes
- Priorité d'investigation: [liste anonymisée]
---
## RECOMMANDATIONS
1. **URGENT**: Retirer le service RDP (203.0.113.42:3389) d'Internet
2. **URGENT**: Forcer la rotation des mots de passe des 12 comptes compromis
3. **HIGH**: Supprimer le repository GitHub public (credentials en dur)
4. **HIGH**: Patcher OpenSSH sur 203.0.113.55
5. **MEDIUM**: Réduire la surface DNS (supprimer les sous-domaines inutilisés)
FAQ OSINT
Quelle est la différence entre OSINT passif et OSINT actif dans un contexte légal ?
L'OSINT passif consiste à collecter des informations sans interagir directement avec les systèmes cibles — requêtes DNS sur des serveurs publics, lecture de pages web indexées, consultation de bases de données WHOIS, analyse de certificats TLS via crt.sh. Il est toujours légal dans un cadre professionnel. L'OSINT actif implique une interaction directe avec les systèmes cibles — port scanning, fingerprinting de services, requêtes vers des APIs non publiques. En France, cette interaction sans autorisation explicite tombe sous le coup de l'article 323-1 du Code pénal (accès frauduleux). Dans un contexte de pentest avec autorisation écrite, l'OSINT actif dans le scope est parfaitement légal.
Shodan indexe-t-il tous les services exposés sur Internet ?
Non — Shodan couvre une portion significative mais non exhaustive d'Internet. Ses robots scannent en permanence des plages d'IP publiques sur les ports les plus courants (80, 443, 22, 3389, 8080, etc.) et un sous-ensemble de ports moins courants. Des services sur des ports très atypiques peuvent échapper à l'indexation Shodan. Censys et BinaryEdge sont des alternatives avec des méthodologies de scan différentes et parfois complémentaires. Pour une couverture maximale, croiser Shodan, Censys, et les données BGP/routing tables (via RIPEstat ou BGPView) donne une image plus complète.
Comment protéger son organisation contre la reconnaissance OSINT d'un attaquant ?
La défense contre la reconnaissance OSINT s'appelle l'Attack Surface Management (ASM). Les mesures concrètes incluent : (1) réduire la surface DNS — supprimer les sous-domaines inutilisés, utiliser des noms de sous-domaines non descriptifs, (2) utiliser des enregistrements WHOIS avec protection de la vie privée, (3) surveiller activement ce que Shodan indexe sur vos IP (Shodan Monitor), (4) retirer les services de gestion (RDP, SSH) de l'accès direct Internet derrière un VPN ou PAM, (5) mettre en place un programme de monitoring des credentials leakés (HIBP Enterprise), (6) former les développeurs à ne jamais committer de credentials dans les dépôts (même privés).
Quels outils OSINT ne nécessitent pas de clé API ?
Plusieurs outils OSINT puissants fonctionnent sans clé API. theHarvester avec les sources Google/Bing est utilisable sans API pour un usage limité. Recon-ng avec les modules passifs (hackertarget, certificate_transparency) ne nécessite pas de clé. crt.sh est entièrement libre d'accès via son API JSON (https://crt.sh/?q=%.domain.com&output=json). BGPView et RIPEstat fournissent des données de routage sans authentification. Wayback Machine d'Archive.org est accessible librement. DNSdumpster.com fournit une interface gratuite pour la reconnaissance DNS. Pour Shodan, la recherche basique est disponible avec un compte gratuit (limité à 1 page de résultats sans API key).
Sources ouvertes avancées
Comment utiliser Maltego sans licence commerciale pour l'OSINT ?
Maltego CE (Community Edition) est disponible gratuitement avec des limitations : 12 entités maximum par graphe, pas de stockage en ligne, et accès limité aux transformations commerciales. Pour contourner ces limitations dans un contexte professionnel : (1) Maltego One est l'offre commerciale la plus abordable (~600€/an), (2) utiliser Maltego CE pour les visualisations et exporter les données enrichies depuis d'autres outils, (3) SpiderFoot est une alternative open source gratuite avec des capacités similaires de graphe de relations, (4) Maltego peut être remplacé par un graphe NetworkX en Python + visualisation Gephi pour les investigations complexes sans budget.
Reconnaissance réseau
Comment automatiser le monitoring OSINT continu d'une organisation ?
L'OSINT continu (aussi appelé Digital Risk Monitoring) peut être automatisé avec un stack open source : (1) SpiderFoot HX ou SpiderFoot open source pour les scans périodiques, (2) des scripts Python cron qui interrogent crt.sh, Shodan Monitor, HIBP Enterprise à intervalles réguliers, (3) des alertes Google Alerts sur le nom de l'organisation et ses dirigeants, (4) des outils de monitoring dark web comme IntelX pour les notifications de mentions, (5) un SIEM qui agrège tous ces résultats et génère des alertes sur les nouveaux findings. Les résultats sont corrélés avec les données d'inventaire IT pour prioriser les actions de remédiation.
Quelle est la valeur légale des preuves collectées par OSINT ?
En France, les preuves OSINT peuvent être recevables en justice sous conditions. Premièrement, elles doivent avoir été collectées légalement (sources publiques, sans contournement de protections). Deuxièmement, leur authenticité doit être attestable — conserver des captures d'écran horodatées, des exports de pages web avec métadonnées, et des logs de requêtes DNS avec timestamp. Troisièmement, pour des procédures pénales, les preuves OSINT sont généralement soumises à un expert judiciaire pour validation. La norme ISO/IEC 27037 sur la collecte de preuves numériques fournit le cadre méthodologique pour garantir la chaîne de custody. Pour une investigation menée à des fins de contentieux, impliquer un huissier de justice pour authentifier les captures d'écran est recommandé.
Comment l'OSINT s'intègre-t-il dans un programme de Cyber Threat Intelligence structuré ?
L'OSINT est l'une des quatre sources de renseignement d'un programme CTI, avec le HUMINT (renseignement humain via les contacts industrie et les chercheurs en sécurité), le TECHINT (renseignement technique via les logs internes, le SIEM, les honeypots), et le SIGINT (interception de communications, réservée aux services étatiques). Dans une organisation privée, l'OSINT CTI se structure autour de : plateformes TIP (Threat Intelligence Platform) comme MISP ou OpenCTI qui agrègent et contextualisent les IOCs, des flux d'ISAC (Information Sharing and Analysis Centers) comme le CERT-FR qui partagent des IOCs sectoriels, et des outils d'automatisation qui enrichissent les alertes SIEM avec les données OSINT CTI en temps quasi-réel.
Corrélation de données
Outillage et automatisation
L'OSINT comme discipline défensive
L'OSINT est paradoxalement l'un des meilleurs outils défensifs disponibles : en voyant son organisation à travers les yeux d'un attaquant — en découvrant les informations qu'un acteur malveillant collecterait lors de sa phase de reconnaissance — une organisation peut anticiper et neutraliser les vecteurs d'attaque avant qu'ils ne soient exploités.
Les investigations OSINT régulières sur sa propre infrastructure (red team OSINT) révèlent systématiquement des expositions non connues des équipes IT : sous-domaines oubliés, services de développement exposés, credentials dans des dépôts publics, données d'employés dans des breaches récentes.
Pour approfondir les techniques offensives qui suivent la phase de reconnaissance OSINT, consultez nos articles sur le Kerberoasting et les attaques NTLM Relay modernes. La corrélation OSINT avec la threat intelligence est détaillée dans notre article sur les agents IA pour la threat hunting. Pour les investigations forensiques qui suivent la détection d'une compromission, notre comparatif des outils DFIR complète naturellement l'approche OSINT.
Les ressources normatives de référence incluent l'ENISA Threat Intelligence Course pour le cadre méthodologique CTI, et le NIST Cybersecurity Framework qui intègre l'OSINT dans la fonction "Identify" du cycle de sécurité.
Techniques avancées de cartographie des infrastructures
La cartographie d'une infrastructure cible constitue l'une des étapes les plus précieuses d'une investigation OSINT. L'objectif est de reconstituer l'empreinte numérique complète d'une organisation sans jamais interagir directement avec ses systèmes. Cette approche passive exploite les données publiquement disponibles dans les DNS, certificats TLS, registres WHOIS, et bases de données de routage BGP pour dessiner une topologie réseau détaillée.
Analyse BGP et ASN pour la cartographie réseau
Le protocole BGP (Border Gateway Protocol) expose publiquement des informations précieuses sur les blocs d'adresses IP appartenant à une organisation via les Autonomous System Numbers (ASN). Ces données permettent d'identifier l'intégralité des plages IP légitimement détenues par une cible, y compris les datacenters secondaires et les filiales.
#!/usr/bin/env python3
"""
BGP/ASN Recon - Cartographie des blocs IP via données BGP publiques
Sources: BGPView API, RIPE NCC, ARIN, Hurricane Electric
"""
import asyncio
import aiohttp
import ipaddress
import json
from typing import List, Dict, Set
from dataclasses import dataclass, field
@dataclass
class ASNInfo:
asn: int
name: str
country: str
prefixes_v4: List[str] = field(default_factory=list)
prefixes_v6: List[str] = field(default_factory=list)
peers: List[int] = field(default_factory=list)
total_ips: int = 0
class BGPRecon:
def __init__(self):
self.bgpview_base = "https://api.bgpview.io"
self.ripe_base = "https://stat.ripe.net/data"
self.he_base = "https://bgp.he.net"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
headers={"User-Agent": "OSINT-Research/1.0"},
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def search_asn_by_org(self, org_name: str) -> List[Dict]:
"""Recherche d'ASN par nom d'organisation"""
url = f"{self.bgpview_base}/search?query_term={org_name}"
async with self.session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("data", {}).get("asns", [])
return []
async def get_asn_prefixes(self, asn: int) -> ASNInfo:
"""Récupère tous les préfixes annoncés par un ASN"""
url = f"{self.bgpview_base}/asn/{asn}/prefixes"
async with self.session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
info_url = f"{self.bgpview_base}/asn/{asn}"
async with self.session.get(info_url) as info_resp:
info_data = await info_resp.json()
asn_data = info_data.get("data", {})
prefixes_data = data.get("data", {})
ipv4_prefixes = [p["prefix"] for p in prefixes_data.get("ipv4_prefixes", [])]
ipv6_prefixes = [p["prefix"] for p in prefixes_data.get("ipv6_prefixes", [])]
total_ips = sum(
ipaddress.IPv4Network(p).num_addresses
for p in ipv4_prefixes
)
return ASNInfo(
asn=asn,
name=asn_data.get("name", "Unknown"),
country=asn_data.get("country_code", "Unknown"),
prefixes_v4=ipv4_prefixes,
prefixes_v6=ipv6_prefixes,
total_ips=total_ips
)
return ASNInfo(asn=asn, name="Unknown", country="Unknown")
async def get_asn_peers(self, asn: int) -> List[int]:
"""Identifie les pairs BGP (peering relationships)"""
url = f"{self.bgpview_base}/asn/{asn}/peers"
async with self.session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
peers_v4 = data.get("data", {}).get("ipv4_peers", [])
return [p["asn"] for p in peers_v4]
return []
async def ripe_routing_history(self, prefix: str) -> Dict:
"""Historique de routage RIPE pour un préfixe"""
url = f"{self.ripe_base}/routing-history/data.json"
params = {"resource": prefix, "max_rows": 100}
async with self.session.get(url, params=params) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("data", {})
return {}
async def full_recon(self, org_name: str) -> Dict:
"""Reconnaissance complète d'une organisation"""
print(f"[*] Recherche ASN pour: {org_name}")
asns = await self.search_asn_by_org(org_name)
results = {
"organization": org_name,
"asns": [],
"total_prefixes_v4": 0,
"total_ips": 0,
"all_prefixes": []
}
tasks = [self.get_asn_prefixes(a["asn"]) for a in asns[:10]]
asn_infos = await asyncio.gather(*tasks, return_exceptions=True)
for info in asn_infos:
if isinstance(info, ASNInfo):
results["asns"].append({
"asn": info.asn,
"name": info.name,
"country": info.country,
"prefixes_count": len(info.prefixes_v4),
"total_ips": info.total_ips
})
results["total_prefixes_v4"] += len(info.prefixes_v4)
results["total_ips"] += info.total_ips
results["all_prefixes"].extend(info.prefixes_v4)
print(f"[+] {len(results['asns'])} ASN trouvés")
print(f"[+] {results['total_prefixes_v4']} préfixes IPv4")
print(f"[+] {results['total_ips']:,} adresses IP totales")
return results
async def main():
async with BGPRecon() as recon:
results = await recon.full_recon("Example Corp")
print(json.dumps(results, indent=2))
if __name__ == "__main__":
asyncio.run(main())
Certificate Transparency avancée et TLS fingerprinting
La Certificate Transparency (CT) est devenue l'une des sources OSINT les plus riches pour la découverte de sous-domaines et l'analyse des infrastructures TLS. Depuis 2018, tout certificat TLS émis par une CA publique doit être enregistré dans des logs CT publics, créant une base de données exhaustive et historique de tous les noms de domaines certifiés.
#!/usr/bin/env python3
"""
Certificate Transparency Mining - Analyse avancée des logs CT
Sources multiples: crt.sh, CertSpotter, Google CT, DigiCert CT
"""
import asyncio
import aiohttp
import re
import json
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Dict, Set, Tuple
class CTLogMiner:
def __init__(self, domain: str):
self.domain = domain
self.subdomains: Set[str] = set()
self.wildcard_certs: List[Dict] = []
self.san_networks: Dict[str, List[str]] = defaultdict(list)
async def query_crtsh(self, session: aiohttp.ClientSession) -> List[Dict]:
"""Query crt.sh avec déduplication et analyse SAN"""
url = f"https://crt.sh/?q=%.{self.domain}&output=json"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status == 200:
certs = await resp.json()
return certs
return []
async def query_certspotter(self, session: aiohttp.ClientSession) -> List[Dict]:
"""CertSpotter API - données en temps réel"""
url = f"https://api.certspotter.com/v1/issuances"
params = {
"domain": self.domain,
"include_subdomains": "true",
"expand": "dns_names",
"after": (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%dT%H:%M:%SZ")
}
async with session.get(url, params=params) as resp:
if resp.status == 200:
return await resp.json()
return []
def extract_subdomains_from_cert(self, cert: Dict) -> List[str]:
"""Extrait tous les noms du champ name_value"""
name_value = cert.get("name_value", "")
names = set()
for name in name_value.split("\n"):
name = name.strip().lower()
if name.endswith(f".{self.domain}") or name == self.domain:
if name.startswith("*."):
self.wildcard_certs.append({
"wildcard": name,
"issuer": cert.get("issuer_name", ""),
"not_before": cert.get("not_before", ""),
"not_after": cert.get("not_after", "")
})
name = name[2:]
names.add(name)
return list(names)
def analyze_naming_patterns(self) -> Dict:
"""Analyse les patterns de nommage pour inférer l'architecture"""
patterns = defaultdict(list)
env_indicators = {
"prod": ["prod", "production", "live"],
"staging": ["staging", "stage", "stg", "preprod", "pre-prod"],
"dev": ["dev", "development", "sandbox", "test", "qa"],
"internal": ["internal", "intranet", "corp", "int"],
"api": ["api", "apis", "rest", "graphql", "v1", "v2"],
"admin": ["admin", "admincp", "console", "management", "portal"],
"mail": ["mail", "smtp", "imap", "pop3", "webmail", "mx"],
"cdn": ["cdn", "static", "assets", "media", "img", "images"],
"vpn": ["vpn", "remote", "access", "gateway", "jump"],
}
for subdomain in self.subdomains:
parts = subdomain.split(".")
prefix = parts[0] if parts else ""
for category, keywords in env_indicators.items():
if any(kw in prefix.lower() for kw in keywords):
patterns[category].append(subdomain)
return dict(patterns)
def detect_cloud_providers(self) -> Dict[str, List[str]]:
"""Détecte les providers cloud via les patterns de nommage"""
cloud_patterns = {
"AWS": [r"\.amazonaws\.com$", r"cloudfront\.net$", r"elb\.amazonaws\.com$"],
"Azure": [r"\.azurewebsites\.net$", r"\.cloudapp\.azure\.com$", r"trafficmanager\.net$"],
"GCP": [r"\.appspot\.com$", r"\.googleapis\.com$", r"\.run\.app$"],
"Cloudflare": [r"\.pages\.dev$"],
"Vercel": [r"\.vercel\.app$", r"\.now\.sh$"],
"Heroku": [r"\.herokuapp\.com$"],
}
cloud_usage = defaultdict(list)
for subdomain in self.subdomains:
for provider, patterns in cloud_patterns.items():
for pattern in patterns:
if re.search(pattern, subdomain):
cloud_usage[provider].append(subdomain)
return dict(cloud_usage)
async def full_ct_analysis(self) -> Dict:
"""Analyse CT complète avec corrélation multi-sources"""
async with aiohttp.ClientSession(
headers={"User-Agent": "OSINT-Research/1.0"}
) as session:
# Requêtes parallèles sur multiple CT logs
crtsh_certs, certspotter_certs = await asyncio.gather(
self.query_crtsh(session),
self.query_certspotter(session),
return_exceptions=True
)
# Traitement crt.sh
if isinstance(crtsh_certs, list):
for cert in crtsh_certs:
subdomains = self.extract_subdomains_from_cert(cert)
self.subdomains.update(subdomains)
# Traitement CertSpotter
if isinstance(certspotter_certs, list):
for cert in certspotter_certs:
for dns_name in cert.get("dns_names", []):
if dns_name.endswith(f".{self.domain}"):
self.subdomains.add(dns_name.lower())
patterns = self.analyze_naming_patterns()
cloud = self.detect_cloud_providers()
return {
"domain": self.domain,
"total_subdomains": len(self.subdomains),
"subdomains": sorted(self.subdomains),
"wildcard_certs": self.wildcard_certs,
"naming_patterns": patterns,
"cloud_providers": cloud,
"infrastructure_insights": self._generate_insights(patterns, cloud)
}
def _generate_insights(self, patterns: Dict, cloud: Dict) -> List[str]:
insights = []
if patterns.get("dev"):
insights.append(f"Environnements dev exposés: {patterns['dev'][:3]}")
if patterns.get("admin"):
insights.append(f"Interfaces d'administration détectées: {patterns['admin'][:3]}")
if patterns.get("vpn"):
insights.append(f"Points d'accès distants: {patterns['vpn'][:3]}")
if cloud:
providers = list(cloud.keys())
insights.append(f"Multi-cloud détecté: {', '.join(providers)}")
if self.wildcard_certs:
insights.append(f"{len(self.wildcard_certs)} certificats wildcard trouvés")
return insights
async def main():
miner = CTLogMiner("example.com")
results = await miner.full_ct_analysis()
print(json.dumps(results, indent=2))
print(f"\n[INSIGHTS]")
for insight in results["infrastructure_insights"]:
print(f" ! {insight}")
if __name__ == "__main__":
asyncio.run(main())
OSINT sur les réseaux sociaux professionnels
LinkedIn reste la source la plus riche pour l'OSINT sur les organisations et leurs employés, mais son utilisation directe via scraping viole les CGU et peut constituer une infraction. L'approche éthique et légale passe par les moteurs de recherche (Google, Bing) qui indexent les profils publics LinkedIn, et par des APIs officielles limitées.
site:linkedin.com/in/ "example-corp" "RSSI" ou site:linkedin.com/in/ "example-corp" filetype:pdf permet de cartographier les équipes techniques d'une organisation sans créer de compte LinkedIn ni enfreindre les CGU. La combinaison avec Hunter.io pour la validation des patterns d'emails permet d'identifier des contacts pertinents pour le red teaming social engineering.
#!/usr/bin/env python3
"""
Social Engineering Intelligence - Cartographie organisationnelle
Méthodes : Google dorks (légal), LinkedIn public profiles, job postings analysis
"""
import asyncio
import aiohttp
import re
import json
from typing import List, Dict, Set
from dataclasses import dataclass, field
@dataclass
class Employee:
name: str
title: str
department: str = ""
linkedin_url: str = ""
email_guess: str = ""
technologies_mentioned: List[str] = field(default_factory=list)
class OrgIntelligence:
def __init__(self, company: str, domain: str):
self.company = company
self.domain = domain
self.employees: List[Employee] = []
self.technologies: Set[str] = set()
self.org_structure: Dict[str, List[str]] = {}
async def analyze_job_postings(self, session: aiohttp.ClientSession) -> List[Dict]:
"""
Analyse des offres d'emploi pour cartographier la stack technique.
Les job postings révèlent : technos utilisées, outils de sécurité,
certifications requises, taille des équipes, projets en cours.
Source légale : pages carrières publiques
"""
# Exemple: récupérer les offres d'emploi via l'API publique Indeed ou LinkedIn
tech_indicators = {
"SIEM": ["splunk", "qradar", "sentinel", "elastic siem", "arcsight"],
"EDR": ["crowdstrike", "sentinelone", "cylance", "carbon black", "defender atp"],
"Cloud": ["aws", "azure", "gcp", "terraform", "kubernetes", "docker"],
"DevSecOps": ["devsecops", "sast", "dast", "sca", "sonarqube", "checkmarx"],
"Frameworks": ["zero trust", "nist", "iso 27001", "soc 2", "pci dss"],
"Languages": ["python", "golang", "rust", "java", "c++", "powershell"],
"Databases": ["mysql", "postgresql", "mongodb", "redis", "elasticsearch"],
}
detected = {}
job_text = "" # Simulé - en pratique, scraper les pages carrières publiques
for category, keywords in tech_indicators.items():
found = [kw for kw in keywords if kw.lower() in job_text.lower()]
if found:
detected[category] = found
self.technologies.update(found)
return [{"category": k, "technologies": v} for k, v in detected.items()]
def infer_email_patterns(self, known_emails: List[str],
first_name: str, last_name: str) -> List[str]:
"""
Infère le pattern d'email d'une organisation depuis des emails connus.
Génère toutes les variantes possibles pour un nouveau contact.
"""
patterns = [
f"{first_name.lower()}.{last_name.lower()}@{self.domain}",
f"{first_name.lower()}{last_name.lower()}@{self.domain}",
f"{first_name[0].lower()}{last_name.lower()}@{self.domain}",
f"{first_name.lower()}{last_name[0].lower()}@{self.domain}",
f"{last_name.lower()}.{first_name.lower()}@{self.domain}",
f"{first_name[0].lower()}.{last_name.lower()}@{self.domain}",
]
return patterns
def detect_pattern_from_samples(self, sample_emails: List[str]) -> str:
"""Détecte le pattern dominant à partir d'emails connus"""
patterns_count = {}
for email in sample_emails:
local = email.split("@")[0].lower()
# Analyser la structure du local part
if "." in local:
parts = local.split(".")
if len(parts) == 2:
if len(parts[0]) == 1:
patterns_count["f.lastname"] = patterns_count.get("f.lastname", 0) + 1
elif len(parts[1]) == 1:
patterns_count["firstname.l"] = patterns_count.get("firstname.l", 0) + 1
else:
patterns_count["firstname.lastname"] = patterns_count.get("firstname.lastname", 0) + 1
else:
patterns_count["firstnamelastname"] = patterns_count.get("firstnamelastname", 0) + 1
if patterns_count:
return max(patterns_count, key=patterns_count.get)
return "unknown"
async def correlate_breached_emails(self, emails: List[str]) -> Dict:
"""
Vérifie si des emails apparaissent dans des bases de données de fuites
via l'API HIBP (Have I Been Pwned) avec méthode k-anonymity.
"""
import hashlib
results = {}
async with aiohttp.ClientSession() as session:
for email in emails[:5]: # Limiter pour demo
sha1_hash = hashlib.sha1(email.encode()).hexdigest().upper()
prefix = sha1_hash[:5]
suffix = sha1_hash[5:]
url = f"https://api.pwnedpasswords.com/range/{prefix}"
try:
async with session.get(url) as resp:
if resp.status == 200:
hashes = await resp.text()
found = suffix in hashes
results[email] = {"breached": found}
except Exception as e:
results[email] = {"error": str(e)}
return results
def generate_report(self) -> Dict:
"""Génère un rapport de renseignement organisationnel"""
depts = {}
for emp in self.employees:
dept = emp.department or "Unknown"
if dept not in depts:
depts[dept] = []
depts[dept].append({"name": emp.name, "title": emp.title})
return {
"company": self.company,
"domain": self.domain,
"total_employees_identified": len(self.employees),
"departments": depts,
"technology_stack": list(self.technologies),
"attack_surface_indicators": {
"exposed_tech_stack": len(self.technologies) > 5,
"admin_emails_found": any("admin" in e.title.lower() or "it" in e.title.lower()
for e in self.employees),
"c_suite_identified": any(title in e.title.lower()
for e in self.employees
for title in ["ciso", "cto", "ceo", "director"])
}
}
Surveillance du Dark Web et des forums criminels
La surveillance du dark web dans un cadre professionnel de CTI (Cyber Threat Intelligence) vise à détecter les mentions d'une organisation cible dans des forums criminels, des marketplaces d'accès initiaux, ou des plateformes de vente de données volées. Cette activité est légale lorsqu'elle est réalisée dans un but défensif par des équipes de sécurité ou des fournisseurs CTI accrédités.
#!/usr/bin/env python3
"""
Dark Web Intelligence Monitor - Surveillance défensive des marketplaces
AVERTISSEMENT: Utiliser uniquement dans un cadre légal et éthique.
Consulter un juriste avant toute opération sur le dark web.
"""
import asyncio
import aiohttp
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
import re
class DarkWebMonitor:
"""
Surveille les mentions d'une organisation via:
1. Tor2web proxies (légal, sans Tor requis)
2. API d'intel commercial (Recorded Future, Intel471, etc.)
3. Paste sites publics (Pastebin, GitHub Gists filtrés)
4. Telegram monitoring (canaux publics)
"""
def __init__(self, company: str, domain: str, keywords: List[str]):
self.company = company
self.domain = domain
self.keywords = keywords
self.alerts: List[Dict] = []
async def monitor_paste_sites(self, session: aiohttp.ClientSession) -> List[Dict]:
"""Surveille les paste sites publics pour des mentions de données sensibles"""
# Pastebin search via Google dorks (légal, public)
dorks = [
f'site:pastebin.com "{self.domain}"',
f'site:paste.debian.net "{self.domain}"',
f'site:privatebin.net "{self.domain}"',
f'site:hastebin.com "{self.domain}"',
]
findings = []
# En production : utiliser SerpAPI pour exécuter ces dorks
# et analyser les résultats pour des patterns sensibles
sensitive_patterns = {
"credentials": r"password[:\s]+\S+|passwd[:\s]+\S+|pwd[:\s]+\S+",
"api_keys": r"[A-Za-z0-9]{32,64}",
"credit_cards": r"\b4[0-9]{12}(?:[0-9]{3})?\b",
"emails_bulk": r"[\w.+-]+@" + re.escape(self.domain),
}
return findings
async def check_breach_databases(self, session: aiohttp.ClientSession) -> Dict:
"""
Vérifie si le domaine apparaît dans des bases de fuites connues
via des APIs légales : HIBP, DeHashed (avec abonnement), etc.
"""
results = {
"hibp_domain_check": None,
"dehashed_results": None,
"intelx_results": None
}
# HIBP Domain Search (nécessite clé API)
hibp_url = f"https://haveibeenpwned.com/api/v3/breacheddomain/{self.domain}"
headers = {
"hibp-api-key": "YOUR_API_KEY_HERE",
"user-agent": "OSINT-CTI-Monitor/1.0"
}
try:
async with session.get(hibp_url, headers=headers) as resp:
if resp.status == 200:
results["hibp_domain_check"] = {
"breached": True,
"data": await resp.json()
}
elif resp.status == 404:
results["hibp_domain_check"] = {"breached": False}
except Exception as e:
results["hibp_domain_check"] = {"error": str(e)}
return results
async def monitor_telegram_channels(self, channel_ids: List[str]) -> List[Dict]:
"""
Surveille les canaux Telegram publics pour des mentions cibles.
Utilise l'API Telegram officielle (légal pour les canaux publics).
"""
# Exemple avec telegram-mtproto API
findings = []
# Pattern matching pour données sensibles
patterns = {
"credential_dump": r"login[:\s]+\S+.*password[:\s]+\S+",
"database_dump": r"INSERT INTO|CREATE TABLE|SELECT \*",
"employee_data": r"@" + re.escape(self.domain.split(".")[0]),
}
return findings
def calculate_risk_score(self, findings: List[Dict]) -> int:
"""
Calcule un score de risque basé sur les découvertes
Score 0-100, seuils: 0-25 faible, 26-50 modéré, 51-75 élevé, 76-100 critique
"""
score = 0
weights = {
"active_credentials": 30,
"database_dump": 25,
"source_code": 20,
"employee_pii": 15,
"infrastructure_details": 10,
}
for finding in findings:
finding_type = finding.get("type", "")
score += weights.get(finding_type, 5)
return min(score, 100)
def generate_alert(self, finding: Dict, severity: str) -> Dict:
"""Génère une alerte structurée au format STIX-like"""
return {
"id": hashlib.sha256(json.dumps(finding, sort_keys=True).encode()).hexdigest()[:16],
"timestamp": datetime.utcnow().isoformat(),
"severity": severity,
"company": self.company,
"finding": finding,
"recommended_actions": self._get_recommendations(finding.get("type")),
"tlp": "TLP:AMBER"
}
def _get_recommendations(self, finding_type: str) -> List[str]:
recommendations = {
"active_credentials": [
"Forcer le changement de mot de passe immédiatement",
"Activer l'authentification MFA si pas déjà en place",
"Vérifier les journaux d'accès pour usage malveillant",
"Notifier le RSSI et le DPO"
],
"database_dump": [
"Évaluer le périmètre des données exposées",
"Notifier la CNIL dans les 72h si données personnelles (RGPD Art. 33)",
"Engager une cellule de crise",
"Préserver les preuves pour investigation forensique"
],
"source_code": [
"Auditer le code exposé pour des secrets hardcodés",
"Révoquer toutes les clés API présentes dans le code",
"Identifier l'origine de la fuite (insider, CI/CD leak)"
],
}
return recommendations.get(finding_type, ["Analyser manuellement la découverte"])
Corrélation et enrichissement des données OSINT
La valeur ajoutée d'une investigation OSINT réside dans la capacité à corréler des données provenant de sources disparates pour produire un renseignement actionnable. Un nom de domaine peut être corrélé avec un certificat TLS, qui révèle un sous-domaine, lui-même corrélé avec une IP Shodan, qui pointe vers un service vulnérable exposé.
#!/usr/bin/env python3
"""
OSINT Correlation Engine - Graphe de connaissances multi-sources
Corrèle : DNS, BGP, CT logs, Shodan, WHOIS, job postings, social media
"""
import asyncio
import json
from collections import defaultdict
from typing import List, Dict, Set, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
class NodeType(Enum):
DOMAIN = "domain"
IP = "ip"
ASN = "asn"
CERTIFICATE = "certificate"
EMAIL = "email"
PERSON = "person"
ORGANIZATION = "organization"
TECHNOLOGY = "technology"
VULNERABILITY = "vulnerability"
@dataclass
class OSINTNode:
node_id: str
node_type: NodeType
value: str
attributes: Dict[str, Any] = field(default_factory=dict)
confidence: float = 1.0
sources: List[str] = field(default_factory=list)
@dataclass
class OSINTEdge:
source_id: str
target_id: str
relationship: str
weight: float = 1.0
evidence: List[str] = field(default_factory=list)
class OSINTGraph:
"""Graphe de connaissances OSINT avec moteur de corrélation"""
def __init__(self):
self.nodes: Dict[str, OSINTNode] = {}
self.edges: List[OSINTEdge] = []
self.adjacency: Dict[str, Set[str]] = defaultdict(set)
def add_node(self, node: OSINTNode) -> str:
existing = self.nodes.get(node.node_id)
if existing:
# Fusion des attributs et sources
existing.attributes.update(node.attributes)
existing.sources.extend(node.sources)
existing.confidence = max(existing.confidence, node.confidence)
else:
self.nodes[node.node_id] = node
return node.node_id
def add_edge(self, edge: OSINTEdge):
self.edges.append(edge)
self.adjacency[edge.source_id].add(edge.target_id)
self.adjacency[edge.target_id].add(edge.source_id)
def find_shortest_path(self, start_id: str, end_id: str) -> List[str]:
"""BFS pour trouver le chemin le plus court entre deux entités"""
if start_id not in self.nodes or end_id not in self.nodes:
return []
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
if current == end_id:
return path
for neighbor in self.adjacency[current]:
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return []
def get_neighbors_by_type(self, node_id: str,
node_type: NodeType) -> List[OSINTNode]:
"""Récupère les voisins d'un type spécifique"""
neighbors = []
for neighbor_id in self.adjacency[node_id]:
neighbor = self.nodes.get(neighbor_id)
if neighbor and neighbor.node_type == node_type:
neighbors.append(neighbor)
return neighbors
def calculate_centrality(self) -> Dict[str, float]:
"""Calcule la centralité de degré pour identifier les pivots"""
centrality = {}
for node_id in self.nodes:
centrality[node_id] = len(self.adjacency[node_id]) / max(len(self.nodes) - 1, 1)
return dict(sorted(centrality.items(), key=lambda x: x[1], reverse=True))
def identify_attack_paths(self, target_id: str) -> List[Dict]:
"""Identifie les chemins d'attaque potentiels vers une cible"""
paths = []
# Trouver tous les nœuds de vulnérabilités
vuln_nodes = [n for n in self.nodes.values()
if n.node_type == NodeType.VULNERABILITY]
for vuln_node in vuln_nodes:
path = self.find_shortest_path(vuln_node.node_id, target_id)
if path and len(path) <= 5: # Chemins courts = risque élevé
paths.append({
"starting_point": vuln_node.value,
"path": [self.nodes[nid].value for nid in path],
"hop_count": len(path),
"risk_score": max(0, 100 - (len(path) * 15))
})
return sorted(paths, key=lambda x: x["risk_score"], reverse=True)
def export_to_maltego(self) -> List[Dict]:
"""Exporte le graphe au format Maltego CSV pour visualisation"""
maltego_data = []
for edge in self.edges:
source = self.nodes.get(edge.source_id)
target = self.nodes.get(edge.target_id)
if source and target:
maltego_data.append({
"Source": f"{source.node_type.value}:{source.value}",
"Target": f"{target.node_type.value}:{target.value}",
"Type": edge.relationship,
"Weight": edge.weight
})
return maltego_data
class OSINTCorrelationEngine:
"""Moteur de corrélation automatique multi-sources"""
def __init__(self):
self.graph = OSINTGraph()
self.correlation_rules = []
self._register_default_rules()
def _register_default_rules(self):
"""Règles de corrélation automatique"""
self.correlation_rules = [
{
"name": "IP-to-Domain via rDNS",
"condition": lambda n: n.node_type == NodeType.IP,
"action": "rdns_lookup",
"creates_edge": "RESOLVES_TO"
},
{
"name": "Domain-to-Certificate via CT",
"condition": lambda n: n.node_type == NodeType.DOMAIN,
"action": "ct_lookup",
"creates_edge": "HAS_CERTIFICATE"
},
{
"name": "IP-to-Vulnerability via Shodan",
"condition": lambda n: n.node_type == NodeType.IP,
"action": "shodan_lookup",
"creates_edge": "EXPOSES"
},
]
async def ingest_from_sources(self, target_domain: str) -> OSINTGraph:
"""Pipeline d'ingestion multi-sources avec corrélation automatique"""
# Nœud racine
root_node = OSINTNode(
node_id=f"domain:{target_domain}",
node_type=NodeType.DOMAIN,
value=target_domain,
sources=["initial_target"]
)
self.graph.add_node(root_node)
# Simulation d'enrichissement progressif
# En production : appels async à toutes les sources
return self.graph
def generate_executive_report(self, target: str) -> Dict:
"""Génère un rapport exécutif avec les findings critiques"""
centrality = self.graph.calculate_centrality()
pivot_nodes = list(centrality.items())[:5]
attack_paths = self.graph.identify_attack_paths(f"domain:{target}")
return {
"executive_summary": {
"total_assets_discovered": len(self.graph.nodes),
"relationships_mapped": len(self.graph.edges),
"critical_pivot_points": [
{"entity": self.graph.nodes[nid].value, "centrality_score": score}
for nid, score in pivot_nodes if nid in self.graph.nodes
],
"highest_risk_attack_paths": attack_paths[:3],
},
"recommendations": self._generate_recommendations(attack_paths)
}
def _generate_recommendations(self, attack_paths: List[Dict]) -> List[str]:
recs = []
if any(p["hop_count"] <= 2 for p in attack_paths):
recs.append("CRITIQUE: Des chemins d'attaque directs ont été identifiés - action immédiate requise")
if any(p["hop_count"] <= 4 for p in attack_paths):
recs.append("Renforcer la segmentation réseau pour allonger les chemins d'attaque")
recs.append("Surveiller les pivots identifiés avec des alertes SIEM dédiées")
recs.append("Réaliser un pentest ciblé sur les assets à haute centralité")
return recs
Outils OSINT open source : comparatif et intégration
| Outil | Type | Forces | Limites | Intégration |
|---|---|---|---|---|
| Maltego | Graphe visuel | Visualisation, transforms OSINT, community édition | Coûteux (pro), nécessite transforms payants | API MTDS, Python transforms |
| Shodan | Moteur de recherche IoT | Couverture IPv4 complète, CVE tagging, historique | Données parfois anciennes, payant pour volume | REST API, Python library |
| SpiderFoot | Automation OSINT | 200+ modules, auto-corrélation, UI web | Faux positifs, lent sur gros périmètres | API REST, CLI, Docker |
| OSINT Framework | Annuaire de sources | Exhaustif, catégorisé, gratuit | Pas d'automation, juste un répertoire | N/A (web uniquement) |
| theHarvester | Collecte d'emails/DNS | Multi-sources, simple, Python | Rate limiting, pas de corrélation | CLI, librairie Python |
| Recon-ng | Framework modulaire | Base de données SQLite, modules OSINT | Interface CLI complexe, doc limitée | Modules Python, API |
| Amass | Enum de sous-domaines | Exhaustif, active+passive, graphe intégré | Lent, consommateur de ressources | CLI, API JSON, Docker |
| Censys | Scan Internet | TLS deep scan, precision élevée | Quota API restrictif en gratuit | Python library, REST API |
OPSEC pour investigateurs OSINT
L'OPSEC (Operational Security) de l'investigateur est critique : chaque requête DNS, chaque connexion HTTP peut potentiellement être enregistrée par la cible ou des tiers. Une investigation mal menée peut alerter la cible, contaminer la preuve judiciaire, ou exposer l'identité de l'investigateur.
#!/bin/bash
# OPSEC Setup pour investigation OSINT
# Environnement isolé avec rotation d'identité
# 1. VM dédiée sans lien avec l'identité réelle
# Utiliser Whonix ou Tails pour les investigations sensibles
# 2. VPN + Tor en cascade (VPN over Tor)
# Note: Tor over VPN est différent et moins protecteur
mullvad connect # VPN d'abord
# Puis router le trafic via Tor
# 3. DNS chiffré via DoH/DoT (pas de fuite DNS)
# /etc/systemd/resolved.conf
cat << 'EOF' > /etc/systemd/resolved.conf.d/dns-over-tls.conf
[Resolve]
DNS=1.1.1.1#cloudflare-dns.com 8.8.8.8#dns.google
DNSOverTLS=yes
DNSSEC=yes
EOF
systemctl restart systemd-resolved
# 4. User-Agent rotatif pour les requêtes HTTP
AGENTS=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/537.36"
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0"
)
RANDOM_AGENT="${AGENTS[$RANDOM % ${#AGENTS[@]}]}"
# 5. Timestamps artificiels pour les fichiers créés
# (évite de révéler les heures de travail)
touch -t 202401010900 /tmp/investigation_notes.txt
# 6. Chiffrement du workspace d'investigation
cryptsetup luksFormat /dev/sdb1 --label osint-workspace
cryptsetup luksOpen /dev/sdb1 osint-workspace
mkfs.ext4 /dev/mapper/osint-workspace
mount /dev/mapper/osint-workspace /mnt/osint
# 7. Cloisonnement des requêtes par persona
# Jamais de mélange entre investigation et navigation personnelle
# Utiliser des profils Firefox dédiés avec cookies/history effacés
# 8. Rate limiting pour éviter la détection
# Introduire des délais aléatoires entre les requêtes
python3 -c "
import time, random
def rate_limited_request(url, min_delay=2, max_delay=8):
import urllib.request
time.sleep(random.uniform(min_delay, max_delay))
return urllib.request.urlopen(url)
"
# 9. Journalisation de l'investigation (chain of custody)
mkdir -p /mnt/osint/logs
cat << 'EOF' > /mnt/osint/investigation.log
=== DÉBUT INVESTIGATION ===
Date/Heure UTC: $(date -u)
Opérateur: [hash de l'identifiant]
Cible: [à remplir]
Mandat légal: [référence document]
EOF
echo "[+] Environnement OPSEC configuré"
echo "[!] Rappel: Documenter TOUTES les actions pour la traçabilité légale"
OSINT et renseignement sur les menaces (CTI)
L'OSINT est le socle du renseignement sur les menaces (CTI — Cyber Threat Intelligence). Contrairement à l'OSINT offensif ciblant une organisation spécifique, le CTI OSINT surveille les acteurs malveillants, les campagnes d'attaques, et les indicateurs de compromission (IoC) pour nourrir les défenses en amont.
Méthodologie structurée
Exploitation des résultats
Bonnes pratiques opérationnelles
#!/usr/bin/env python3
"""
CTI OSINT Aggregator - Collecte et enrichissement d'IoCs
Sources: VirusTotal, AlienVault OTX, ThreatFox, URLhaus, MISP
"""
import asyncio
import aiohttp
import json
import ipaddress
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, field
import hashlib
@dataclass
class ThreatIndicator:
ioc_type: str # ip, domain, hash, url, email
value: str
threat_type: str = ""
malware_family: str = ""
confidence: int = 0 # 0-100
first_seen: str = ""
last_seen: str = ""
tags: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
mitre_ttps: List[str] = field(default_factory=list)
kill_chain_phases: List[str] = field(default_factory=list)
class CTIAggregator:
def __init__(self, vt_api_key: str = "", otx_api_key: str = ""):
self.vt_api_key = vt_api_key
self.otx_api_key = otx_api_key
self.ioc_cache: Dict[str, ThreatIndicator] = {}
async def enrich_ip(self, ip: str,
session: aiohttp.ClientSession) -> ThreatIndicator:
"""Enrichit une IP avec des données CTI multi-sources"""
indicator = ThreatIndicator(ioc_type="ip", value=ip)
tasks = [
self._query_abuseipdb(ip, session),
self._query_otx(ip, "ip", session),
self._query_shodan_ioc(ip, session),
]
if self.vt_api_key:
tasks.append(self._query_virustotal(ip, "ip-addresses", session))
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict) and "confidence" in result:
indicator.confidence = max(indicator.confidence, result["confidence"])
indicator.sources.extend(result.get("sources", []))
indicator.tags.extend(result.get("tags", []))
if result.get("malware"):
indicator.malware_family = result["malware"]
# Déduplication
indicator.tags = list(set(indicator.tags))
indicator.sources = list(set(indicator.sources))
return indicator
async def _query_abuseipdb(self, ip: str,
session: aiohttp.ClientSession) -> Dict:
"""AbuseIPDB - Score de réputation des IPs"""
url = "https://api.abuseipdb.com/api/v2/check"
params = {"ipAddress": ip, "maxAgeInDays": 90}
headers = {"Key": "YOUR_ABUSEIPDB_KEY", "Accept": "application/json"}
try:
async with session.get(url, params=params, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
abuse_score = data.get("data", {}).get("abuseConfidenceScore", 0)
return {
"confidence": abuse_score,
"sources": ["abuseipdb"],
"tags": ["malicious_ip"] if abuse_score > 50 else []
}
except Exception:
pass
return {}
async def _query_otx(self, indicator: str,
ioc_type: str,
session: aiohttp.ClientSession) -> Dict:
"""AlienVault OTX - Threat pulses"""
type_map = {"ip": "IPv4", "domain": "domain", "hash": "file"}
otx_type = type_map.get(ioc_type, ioc_type)
url = f"https://otx.alienvault.com/api/v1/indicators/{otx_type}/{indicator}/general"
headers = {"X-OTX-API-KEY": self.otx_api_key}
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
pulse_count = data.get("pulse_info", {}).get("count", 0)
return {
"confidence": min(pulse_count * 10, 100),
"sources": ["alienVault_otx"],
"tags": [t["name"] for t in data.get("pulse_info", {}).get("tags", [])][:5]
}
except Exception:
pass
return {}
async def _query_shodan_ioc(self, ip: str,
session: aiohttp.ClientSession) -> Dict:
"""Shodan - Services exposés pour contextualisation CTI"""
# Données Shodan enrichissent le contexte infrastructure
return {"confidence": 0, "sources": [], "tags": []}
async def _query_virustotal(self, indicator: str,
ioc_type: str,
session: aiohttp.ClientSession) -> Dict:
"""VirusTotal - Analyse multi-AV"""
url = f"https://www.virustotal.com/api/v3/{ioc_type}/{indicator}"
headers = {"x-apikey": self.vt_api_key}
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
malicious = stats.get("malicious", 0)
total = sum(stats.values()) or 1
confidence = int((malicious / total) * 100)
return {
"confidence": confidence,
"sources": ["virustotal"],
"tags": ["vt_detection"] if malicious > 5 else [],
"malware": data.get("data", {}).get("attributes", {}).get("popular_threat_label", "")
}
except Exception:
pass
return {}
async def bulk_enrich(self, indicators: List[Dict]) -> List[ThreatIndicator]:
"""Enrichissement en masse avec rate limiting"""
enriched = []
async with aiohttp.ClientSession(
headers={"User-Agent": "CTI-Platform/1.0"}
) as session:
# Traiter par batch de 10 pour respecter les rate limits
for i in range(0, len(indicators), 10):
batch = indicators[i:i+10]
tasks = []
for ioc in batch:
if ioc["type"] == "ip":
tasks.append(self.enrich_ip(ioc["value"], session))
# Ajouter d'autres types (domain, hash, url)
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
enriched.extend([r for r in batch_results if isinstance(r, ThreatIndicator)])
# Rate limiting
await asyncio.sleep(1)
return enriched
def export_to_stix(self, indicators: List[ThreatIndicator]) -> Dict:
"""Export au format STIX 2.1 pour partage CTI"""
bundle = {
"type": "bundle",
"id": f"bundle--{hashlib.uuid4_compatible()}",
"spec_version": "2.1",
"objects": []
}
for ioc in indicators:
if ioc.confidence > 50: # Seuil de confiance minimum
indicator_obj = {
"type": "indicator",
"spec_version": "2.1",
"id": f"indicator--{hashlib.sha256(ioc.value.encode()).hexdigest()[:36]}",
"name": f"{ioc.ioc_type}: {ioc.value}",
"indicator_types": [ioc.threat_type or "malicious-activity"],
"pattern": self._value_to_stix_pattern(ioc),
"valid_from": datetime.utcnow().isoformat() + "Z",
"confidence": ioc.confidence,
"labels": ioc.tags,
}
bundle["objects"].append(indicator_obj)
return bundle
def _value_to_stix_pattern(self, ioc: ThreatIndicator) -> str:
if ioc.ioc_type == "ip":
return f"[ipv4-addr:value = '{ioc.value}']"
elif ioc.ioc_type == "domain":
return f"[domain-name:value = '{ioc.value}']"
elif ioc.ioc_type == "hash":
return f"[file:hashes.'SHA-256' = '{ioc.value}']"
elif ioc.ioc_type == "url":
return f"[url:value = '{ioc.value}']"
return f"[artifact:value = '{ioc.value}']"
Cadre méthodologique OSINT : formaliser une investigation
Une investigation OSINT professionnelle ne peut pas être improvisée. Elle doit s'appuyer sur une méthodologie structurée qui garantit la reproductibilité, la légalité, et la recevabilité judiciaire des preuves collectées. Le modèle PIAM (Planning, Investigation, Analysis, Mitigation) adapté au contexte OSINT offre ce cadre.
| Phase | Activités | Livrables | Durée estimée |
|---|---|---|---|
| 1. Cadrage légal | Définir la cible, obtenir mandat, vérifier légalité, RGPD DPO | Formulaire d'autorisation signé, périmètre documenté | 0.5-2 jours |
| 2. Reconnaissance passive | DNS, WHOIS, BGP, Certificate Transparency, SHODAN | Inventaire infrastructure, liste sous-domaines, ASN mapping | 1-3 jours |
| 3. Collecte humaine (HUMINT-light) | Profils LinkedIn, job postings, conférences, GitHub | Organigramme technique, stack technologique, key persons | 1-2 jours |
| 4. Corrélation et analyse | Graphe de corrélation, identification pivots, chemins d'attaque | Graphe Maltego, rapport d'analyse, scoring de risque | 1-3 jours |
| 5. Monitoring continu | Alertes CT logs, Shodan webhooks, paste sites, dark web | Dashboard surveillance, alertes automatiques | Continu |
| 6. Rapport et restitution | Rapport exécutif + technique, recommandations priorisées | Rapport TLDR + annexes, plan d'action 90 jours | 1-2 jours |
FAQ — Questions fréquentes sur l'OSINT
La collecte d'OSINT sur une entreprise concurrente est-elle légale en France ?
Oui, dans les limites définies par la loi. Collecter des informations publiquement disponibles (site web, LinkedIn, registre du commerce, brevets, communiqués de presse) est légal et constitue de la veille concurrentielle classique. En revanche, accéder à des systèmes informatiques sans autorisation (même pour lire des informations), utiliser des identités fictives pour extraire des données, ou exploiter des données personnelles sans base légale RGPD constitue des infractions pénales. La frontière se situe entre information publique librement accessible et information obtenue par des moyens détournés.
Quelle est la différence entre OSINT, SOCMINT et HUMINT ?
OSINT (Open Source Intelligence) désigne le renseignement tiré de sources publiques et ouvertes — sites web, DNS, publications, etc. SOCMINT (Social Media Intelligence) est une sous-catégorie de l'OSINT spécifiquement centrée sur les réseaux sociaux. HUMINT (Human Intelligence) implique l'interaction directe avec des sources humaines — interviews, infiltration, agents. Dans le domaine cyber, les red teams combinent ces disciplines : OSINT pour la reconnaissance technique, SOCMINT pour le social engineering, et parfois HUMINT pour l'ingénierie sociale physique (vishing, pretexting).
Comment documenter une investigation OSINT pour qu'elle soit recevable en justice ?
La chaîne de custody (chain of custody) est fondamentale. Chaque action doit être horodatée, l'investigateur identifié, et les captures d'écran hashées (SHA-256) avant archivage. Des outils comme Hunchly (Chrome extension) ou WebRecorder créent automatiquement des archives web avec métadonnées d'intégrité. Les captures doivent inclure la barre d'adresse complète, la date/heure système, et être stockées sur un support dont l'intégrité peut être attestée (DVD WORM, stockage HSM). En cas de procédure pénale, il est recommandé de faire intervenir un huissier pour officialiser les constats.
Shodan est-il légal à utiliser ?
Oui. Shodan indexe uniquement les services publiquement accessibles sur Internet — les mêmes services qu'un attaquant pourrait découvrir avec un simple scan Nmap. Consulter Shodan pour analyser sa propre infrastructure, ou analyser celle d'un tiers dans le cadre d'un pentest autorisé ou d'une investigation défensive, est parfaitement légal. L'utilisation de Shodan pour identifier des cibles à attaquer constitue en revanche une préparation d'actes délictueux. La clé est l'intention et le cadre d'utilisation.
Comment automatiser la surveillance OSINT continue d'une organisation ?
L'automatisation OSINT continue repose sur des webhooks et des feeds : Shodan Monitor (alertes sur changements de services), crt.sh Certificate Watch (nouveaux certificats pour votre domaine), BreachWatch (surveillance des fuites), et des RSS feeds de sources de threat intelligence. Des plateformes comme Maltego Investigations, SpiderFoot HX, ou des SOAR comme XSOAR permettent d'orchestrer ces alertes et de les corréler automatiquement. Pour les budgets limités, un script Python + cron + alertes email/Slack peut couvrir les cas d'usage essentiels.
Surveillance continue
Approfondissement
Quelle formation suivre pour devenir analyste OSINT ?
Les certifications reconnues incluent OSCP (OSCE3 pour les plus avancés), la certification OSINT Curious, et les formations de l'Open Source Intelligence School. En France, le Pôle d'Excellence Cyber propose des formations spécialisées. La plateforme Bellingcat publie des guides pratiques gratuits utilisés par les journalistes d'investigation. La pratique sur des CTF OSINT (GeoGuessr, Trace Labs OSINT CTF) est indispensable pour développer les réflexes méthodologiques avant de travailler sur de vraies investigations.
Comment protéger son organisation contre l'OSINT offensif ?
La réduction de l'empreinte numérique (attack surface reduction) est la première ligne de défense. Cela inclut : supprimer les informations inutiles des registres WHOIS (utiliser la protection de confidentialité), configurer des politiques DNS minimales (pas de transfert de zone, pas de noms internes dans les rDNS publics), former les employés à ne pas publier d'informations sensibles sur LinkedIn (noms des systèmes internes, versions logicielles), auditer régulièrement ce qu'un attaquant peut découvrir via Shodan/Censys sur votre infrastructure, et surveiller les Certificate Transparency logs pour détecter des certificats frauduleux créés pour vos domaines.
Existe-t-il des outils OSINT spécifiques pour la cybersécurité ?
Oui, l'écosystème est riche. Pour la reconnaissance réseau : Amass, Subfinder, Masscan. Pour les métadonnées : FOCA, Metagoofil. Pour l'intelligence sur les menaces : MISP, OpenCTI, Cortex. Pour les fuites de données : GitDorker, TruffleHog. Pour les images : ExifTool, Google Lens, TinEye. La distribution Kali Linux inclut la plupart de ces outils préconfigurés, et la REMnux est dédiée à l'analyse de malware. Pour une approche OSINT complète, Trace Labs Kali Linux est spécifiquement optimisée pour les investigations OSINT.
Pour approfondir ces concepts dans un contexte de défense active, consultez nos analyses sur la détection de menaces avec des agents IA, la forensique assistée par IA, et notre livre blanc sur l'IA en cyberdéfense. Les techniques d'OSINT présentées ici s'intègrent naturellement dans les workflows de SIEM augmenté et de threat hunting avec Microsoft Sentinel.
Techniques de pivotage
Approfondissement
Conclusion et recommandations
La maîtrise de ces techniques et outils est indispensable pour tout professionnel de la cybersécurité en 2026. L'évolution constante des menaces exige une veille permanente et une mise à jour régulière des compétences. Pour aller plus loin, consultez nos articles techniques ou contactez notre équipe pour un accompagnement sur mesure adapté à votre contexte.
À retenir : La sécurité est un processus continu, pas un état. Chaque audit, chaque test et chaque analyse contribue à renforcer la posture de défense de l'organisation face aux menaces actuelles et futures.
À propos de l'auteur
Ayi NEDJIMI
Auditeur Senior Cybersécurité & Consultant IA
Expert Judiciaire — Cour d'Appel de Paris
Habilitation Confidentiel Défense
ayi@ayinedjimi-consultants.fr
Ayi NEDJIMI est un vétéran de la cybersécurité avec plus de 25 ans d'expérience sur des missions critiques. Ancien développeur Microsoft à Redmond sur le module GINA (Windows NT4) et co-auteur de la version française du guide de sécurité Windows NT4 pour la NSA.
À la tête d'Ayi NEDJIMI Consultants, il réalise des audits Lead Auditor ISO 42001 et ISO 27001, des pentests d'infrastructures critiques, du forensics et des missions de conformité NIS2 / AI Act.
Conférencier international (Europe & US), il a formé plus de 10 000 professionnels.
Domaines d'expertise
Ressources & Outils de l'auteur
Testez vos connaissances
Mini-quiz de certification lié à cet article — propulsé par CertifExpress
Articles connexes
Retours d’Expérience Pentest : 5 Missions Terrain Anonymisées
Plongez au cœur de 5 missions de pentest réelles et anonymisées : compromission Active Directory en 4h, chaîne IDOR+SSRF vers RCE sur un e-commerce, Red Team contre EDR CrowdStrike, audit cloud AWS avec exfiltration S3, et évaluation OT/ICS Modbus. Pour chaque mission : contexte, méthodologie détaillée, outils utilisés, chronologie, découvertes critiques et remédiations appliquées.
Nuclei vs Nessus vs Qualys : Scanners de Vulnérabilités Comparés
Comparatif des trois principaux scanners de vulnérabilités : Nuclei (open-source), Nessus (Tenable) et Qualys VMDR.
Burp Suite vs OWASP ZAP : Quel Scanner Web Choisir en 2026 ?
Comparatif Burp Suite Pro vs OWASP ZAP pour le pentest web en 2026. Prix, fonctionnalités, extensions et verdict d'expert.
Commentaires
Aucun commentaire pour le moment. Soyez le premier à commenter !
Laisser un commentaire