Le renseignement en sources ouvertes (OSINT — Open Source Intelligence) est la discipline qui consiste à collecter, analyser et synthétiser des informations à partir de sources publiquement accessibles pour produire du renseignement actionnable. Contrairement à l'espionnage ou au hacking offensif, l'OSINT opère exclusivement sur des données légalement accessibles — moteurs de recherche, réseaux sociaux, registres DNS, bases de données whois, archives web, publications académiques, documents gouvernementaux, forums publics et dark web monitoring. Pour un analyste en cybersécurité, l'OSINT est indispensable à plusieurs niveaux : la reconnaissance pré-engagement dans un test de pénétration autorisé pour cartographier la surface d'attaque d'une organisation cible, le renseignement sur les menaces (CTI — Cyber Threat Intelligence) pour identifier les acteurs qui ciblent une industrie spécifique, la réponse aux incidents pour retracer l'infrastructure d'un attaquant, et la due diligence sécuritaire pour évaluer le niveau d'exposition d'un partenaire ou fournisseur. La richesse et la complexité des techniques OSINT disponibles — de la Google dork simple aux frameworks d'automatisation multi-sources comme Maltego ou Recon-ng — nécessitent une approche méthodique et une rigueur légale absolue pour rester dans le cadre de la légalité et de l'éthique professionnelle.

Cadre légal et éthique de l'OSINT en France

Avant d'aborder les techniques, le cadre légal est indispensable. En France, l'OSINT est régi par plusieurs textes qui délimitent précisément ce qui est autorisé.

Cadre légal français de l'OSINT

Texte Portée Implication OSINT
RGPD + Loi Informatique et Libertés Protection des données personnelles Collecte de données personnelles soumise à finalité légitime
Art. 226-1 Code pénal Atteinte à l'intimité de la vie privée Surveillance de personnes privées sans consentement = illégal
Art. 323-1 Code pénal (LCEN) Accès frauduleux aux systèmes informatiques Toute tentative d'accès non-public = illégal même via OSINT
Loi du 23 juillet 2019 (renseignement) Activités de renseignement étatique Seuls les services autorisés peuvent conduire OSINT sur la sécurité nationale
Loi Avia (2020, partiellement censurée) Haine en ligne La collecte de propos haineux à des fins probatoires est encadrée

Les règles fondamentales pour un investigateur OSINT en France :

  • Ne collecter que des données publiquement accessibles sans contourner un mécanisme de protection (même un simple login constitue une barrière légale)
  • Documenter et archiver les sources avec horodatage pour la traçabilité
  • Respecter la finalité — les données collectées pour un audit ne peuvent pas être réutilisées pour d'autres fins
  • Ne jamais recouper des données pour reconstituer le profil détaillé d'un individu privé sans base légale (RGPD)

Méthodologie OSINT : du besoin en renseignement à l'analyse

L'OSINT professionnel ne commence pas par "lancer Maltego". Il commence par la définition du Besoin en Renseignement (BdR) — la question précise à laquelle l'investigation doit répondre.

Cycle du renseignement appliqué à l'OSINT


# Cycle OSINT structuré

cycle_renseignement:
  phase_1_planification:
    objectif: "Définir le BdR (Besoin en Renseignement)"
    questions_clés:
      - "Quelle est la cible de l'investigation ?"
      - "Quel est l'objectif final (pentest, CTI, due diligence) ?"
      - "Quel est le cadre légal applicable ?"
      - "Quelles sont les limites de temps et de scope ?"
    livrables:
      - "BdR formalisé et validé par le donneur d'ordre"
      - "Autorisation écrite si investigation pour tiers"

  phase_2_collecte:
    objectif: "Identifier et collecter les informations pertinentes"
    sources:
      passive: ["DNS", "WHOIS", "Shodan", "Google", "LinkedIn", "Archive.org"]
      semi_active: ["Certificats TLS", "BGP routing", "Pastebin monitoring"]
      active: ["Port scanning AUTORISÉ uniquement sur cibles propres"]
    outils:
      - "theHarvester, Maltego, Recon-ng, Shodan, SpiderFoot"

  phase_3_traitement:
    objectif: "Normaliser, déduplicer, structurer les données brutes"
    actions:
      - "Déduplication des données collectées depuis plusieurs sources"
      - "Vérification croisée (une info unique = incertaine)"
      - "Horodatage de toutes les données collectées"
      - "Évaluation de la fiabilité de chaque source"

  phase_4_analyse:
    objectif: "Transformer les données en renseignement actionnable"
    techniques:
      - "Graphe de relations (Maltego)"
      - "Timeline d'événements"
      - "Analyse des patterns comportementaux"
      - "Clustering d'attributs (IPs, domaines, personnes)"

  phase_5_diffusion:
    objectif: "Produire le rapport final"
    format:
      - "Rapport exécutif (résumé pour décideurs)"
      - "Rapport technique (détail pour équipes sécurité)"
      - "Graphe de relations"
      - "IOCs exploitables (si CTI)"

Google Dorking : la recherche avancée comme outil de reconnaissance

Le Google Dorking (ou Google Hacking) consiste à utiliser les opérateurs de recherche avancée de Google pour trouver des informations spécifiques non indexées via une recherche classique. C'est l'outil OSINT le plus accessible et souvent le plus efficace.

Opérateurs Google essentiels


# === RECONNAISSANCE D'UNE ORGANISATION ===

# Tous les sous-domaines indexés
site:example.com

# Pages d'administration exposées
site:example.com inurl:(admin|login|dashboard|panel|manage)

# Fichiers sensibles exposés
site:example.com filetype:(pdf|xls|xlsx|doc|docx|csv) confidential
site:example.com filetype:sql
site:example.com filetype:env
site:example.com filetype:log "password"

# Répertoires ouverts
site:example.com intitle:"Index of /"

# Fichiers de configuration exposés
site:example.com inurl:"/wp-config.php" OR inurl:"/.env"
site:example.com ext:xml inurl:sitemap

# Emails d'employés
site:linkedin.com "@example.com"
"@example.com" filetype:pdf

# Mentions dans des documents gouvernementaux ou académiques
"example.com" site:gouv.fr OR site:ac-paris.fr

# === RECHERCHE DE VULNÉRABILITÉS EXPOSÉES ===

# Panneaux d'administration communs
intitle:"phpMyAdmin" inurl:"/phpmyadmin/"
intitle:"GitLab" inurl:"/users/sign_in"
intitle:"Grafana" inurl:":3000"
intitle:"Kibana" inurl:":5601"
intitle:"Jenkins" inurl:":8080/login"

# Fichiers de configuration Git exposés
inurl:"/.git/config" "branch"

# Credentials dans du code public
site:github.com "example.com" password OR secret OR api_key

# Erreurs d'application révélatrices
site:example.com "Fatal error" OR "stack trace" OR "SQL syntax"

# Caméras et équipements exposés (à des fins légitimes uniquement)
intitle:"webcam 7" inurl:"/viewer/live/"

Google Dorks Base (GHDB) et automatisation

#!/usr/bin/env python3
"""
google_dork_scanner.py — Automatisation de Google Dorking via SerpAPI
(utilise l'API légale de Google, pas de scraping)
"""

import time
import json
from dataclasses import dataclass
from typing import List, Optional
import requests

@dataclass
class DorkResult:
    query: str
    url: str
    title: str
    snippet: str
    risk_level: str

class GoogleDorkScanner:
    """
    Scanner Google Dork via SerpAPI (API légale)
    Respecte les ToS Google en passant par l'API officielle
    """
    SENSITIVE_DORKS = [
        {
            "name": "Fichiers de configuration exposés",
            "template": 'site:{domain} filetype:env OR filetype:cfg OR filetype:conf',
            "risk": "CRITICAL"
        },
        {
            "name": "Répertoires ouverts",
            "template": 'site:{domain} intitle:"Index of /"',
            "risk": "HIGH"
        },
        {
            "name": "Pages d'admin exposées",
            "template": 'site:{domain} inurl:(admin|administrator|panel|manage|console)',
            "risk": "HIGH"
        },
        {
            "name": "Fichiers SQL exposés",
            "template": 'site:{domain} filetype:sql',
            "risk": "CRITICAL"
        },
        {
            "name": "Logs applicatifs",
            "template": 'site:{domain} filetype:log',
            "risk": "HIGH"
        },
        {
            "name": "Fichiers Excel/CSV avec données",
            "template": 'site:{domain} filetype:(xls OR xlsx OR csv) (password OR email OR phone)',
            "risk": "HIGH"
        },
        {
            "name": "Erreurs d'application",
            "template": 'site:{domain} ("SQL syntax" OR "Fatal error" OR "stack trace")',
            "risk": "MEDIUM"
        },
        {
            "name": "Git config exposé",
            "template": 'site:{domain} inurl:"/.git/config"',
            "risk": "CRITICAL"
        },
    ]

    def __init__(self, serp_api_key: str):
        self.api_key = serp_api_key
        self.base_url = "https://serpapi.com/search"

    def scan_domain(
        self,
        domain: str,
        delay_seconds: float = 2.0
    ) -> List[DorkResult]:
        """
        Scan OSINT d'un domaine via Google Dorks
        delay_seconds: respecter les rate limits
        """
        all_results = []

        for dork_config in self.SENSITIVE_DORKS:
            query = dork_config["template"].format(domain=domain)

            params = {
                "api_key": self.api_key,
                "engine": "google",
                "q": query,
                "num": 10,
                "gl": "fr",  # Google France
                "hl": "fr"
            }

            try:
                response = requests.get(self.base_url, params=params, timeout=15)
                response.raise_for_status()
                data = response.json()

                for result in data.get("organic_results", []):
                    all_results.append(DorkResult(
                        query=query,
                        url=result.get("link", ""),
                        title=result.get("title", ""),
                        snippet=result.get("snippet", ""),
                        risk_level=dork_config["risk"]
                    ))

            except requests.RequestException as e:
                print(f"[WARN] Erreur pour '{query}': {e}")

            time.sleep(delay_seconds)  # Rate limiting respectueux

        return all_results

    def generate_report(
        self,
        domain: str,
        results: List[DorkResult]
    ) -> dict:
        """Génère un rapport structuré des findings"""
        by_risk = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}

        for r in results:
            by_risk[r.risk_level].append({
                "url": r.url,
                "title": r.title,
                "dork": r.query,
                "snippet": r.snippet[:200]
            })

        return {
            "target": domain,
            "scan_date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "total_findings": len(results),
            "findings_by_risk": by_risk,
            "executive_summary": (
                f"L'analyse OSINT de {domain} a révélé "
                f"{len(by_risk['CRITICAL'])} findings critiques, "
                f"{len(by_risk['HIGH'])} élevés, "
                f"{len(by_risk['MEDIUM'])} moyens."
            )
        }

Shodan : le moteur de recherche des objets connectés

Shodan est un moteur de recherche spécialisé qui indexe en permanence les services exposés sur Internet — serveurs web, caméras IP, systèmes SCADA, routeurs, bases de données — avec leurs bannières, certificats TLS, et métadonnées. C'est l'outil OSINT le plus puissant pour la reconnaissance réseau passive.

Reconnaissance avec Shodan


# === RECHERCHES SHODAN DE BASE ===

# Services d'une organisation (par ASN ou CIDR)
# Identifier l'ASN d'une organisation:
whois -h whois.radb.net '!gAS-GOOGLE' | head -5

# Recherche par ASN dans Shodan
shodan search "org:\"Example Corporation\""

# Recherche par CIDR
shodan search "net:203.0.113.0/24"

# Services exposés par type
shodan search "org:\"Example Corporation\" product:nginx"
shodan search "org:\"Example Corporation\" port:3389"  # RDP exposé

# Certificats TLS — trouver les sous-domaines via les certificats
shodan search "ssl.cert.subject.cn:*.example.com"

# Services vulnérables connus
shodan search "product:\"Apache Struts\" vuln:CVE-2017-5638"
shodan search "product:\"Confluence\" vuln:CVE-2021-26084"

# Recherche de technologies spécifiques
shodan search "http.title:\"GitLab\" org:\"Example Corporation\""
shodan search "http.component:\"WordPress\" org:\"Example Corporation\""

# === API SHODAN PYTHON ===
#!/usr/bin/env python3
"""
shodan_recon.py — Reconnaissance OSINT avec l'API Shodan
"""

import shodan
import json
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class ShodanHost:
    ip: str
    port: int
    product: str
    version: str
    cve_list: List[str]
    banners: List[str]
    hostnames: List[str]
    country: str
    asn: str
    last_seen: str

class ShodanRecon:
    def __init__(self, api_key: str):
        self.api = shodan.Shodan(api_key)

    def recon_organization(self, org_name: str) -> dict:
        """Reconnaissance complète d'une organisation via Shodan"""
        results = {
            "organization": org_name,
            "hosts": [],
            "exposed_services": {},
            "vulnerabilities": [],
            "interesting_findings": []
        }

        try:
            # Recherche de tous les hôtes de l'organisation
            query = f'org:"{org_name}"'
            search_results = self.api.search(query, limit=200)

            for match in search_results.get("matches", []):
                host = ShodanHost(
                    ip=match.get("ip_str", ""),
                    port=match.get("port", 0),
                    product=match.get("product", ""),
                    version=match.get("version", ""),
                    cve_list=[v for v in match.get("vulns", {}).keys()],
                    banners=[match.get("data", "")[:200]],
                    hostnames=match.get("hostnames", []),
                    country=match.get("location", {}).get("country_name", ""),
                    asn=match.get("asn", ""),
                    last_seen=match.get("timestamp", "")
                )

                results["hosts"].append(host)

                # Comptage des services exposés
                service_key = f"{host.product}:{host.port}"
                results["exposed_services"][service_key] = \
                    results["exposed_services"].get(service_key, 0) + 1

                # Vulnérabilités CVE
                for cve in host.cve_list:
                    results["vulnerabilities"].append({
                        "host": host.ip,
                        "port": host.port,
                        "cve": cve,
                        "product": host.product,
                        "version": host.version
                    })

                # Findings intéressants
                self._check_interesting(host, results["interesting_findings"])

        except shodan.APIError as e:
            results["error"] = str(e)

        return results

    def _check_interesting(self, host: ShodanHost, findings: list) -> None:
        """Identifie les services particulièrement intéressants pour un auditeur"""
        # Protocoles de gestion exposés
        if host.port in [23, 22, 3389, 5900]:
            findings.append({
                "type": "management_protocol_exposed",
                "ip": host.ip,
                "port": host.port,
                "severity": "HIGH",
                "description": f"Protocole de gestion {host.product} exposé sur Internet"
            })

        # Bases de données exposées sans authentification (Shodan indique souvent)
        if host.port in [3306, 5432, 27017, 6379, 9200, 5984]:
            if "authentication" not in " ".join(host.banners).lower():
                findings.append({
                    "type": "database_possibly_unauthenticated",
                    "ip": host.ip,
                    "port": host.port,
                    "severity": "CRITICAL",
                    "description": f"Base de données {host.product} potentiellement non authentifiée"
                })

        # CVE critiques
        critical_cves = [c for c in host.cve_list if c in KNOWN_CRITICAL_CVE]
        if critical_cves:
            findings.append({
                "type": "critical_cve",
                "ip": host.ip,
                "cves": critical_cves,
                "severity": "CRITICAL",
                "description": f"CVE critiques détectées sur {host.product}"
            })

    def find_subdomains_via_ssl(self, domain: str) -> List[str]:
        """
        Découverte de sous-domaines via les certificats TLS indexés par Shodan
        Technique très efficace — les certificats wildcard révèlent souvent des sous-domaines
        """
        subdomains = set()

        try:
            results = self.api.search(
                f'ssl.cert.subject.cn:"*.{domain}" OR ssl.cert.subject.cn:"{domain}"',
                limit=100
            )

            for match in results.get("matches", []):
                # Extraire les SANs du certificat
                ssl_info = match.get("ssl", {})
                cert = ssl_info.get("cert", {})
                subject_alt_names = cert.get("extensions", {}).get("subjectAltName", "")

                # Parser les SANs
                for san in subject_alt_names.split(","):
                    san = san.strip().replace("DNS:", "")
                    if domain in san and not san.startswith("*"):
                        subdomains.add(san.strip())

                # Ajouter le CN
                cn = cert.get("subject", {}).get("CN", "")
                if cn and domain in cn and not cn.startswith("*"):
                    subdomains.add(cn)

                # Ajouter les hostnames Shodan
                for hostname in match.get("hostnames", []):
                    if domain in hostname:
                        subdomains.add(hostname)

        except shodan.APIError as e:
            print(f"[WARN] Erreur Shodan: {e}")

        return sorted(subdomains)

KNOWN_CRITICAL_CVE = {
    "CVE-2021-44228",  # Log4Shell
    "CVE-2021-26084",  # Confluence RCE
    "CVE-2022-22965",  # Spring4Shell
    "CVE-2023-44487",  # HTTP/2 Rapid Reset
    "CVE-2024-21893",  # Ivanti SSRF
}

theHarvester : collecte d'emails et de sous-domaines

theHarvester est un outil Python de reconnaissance passive qui agrège les informations provenant de multiples sources (moteurs de recherche, Shodan, LinkedIn, etc.) pour cartographier rapidement la surface d'attaque d'une organisation.


# Installation
pip3 install theHarvester

# Collecte d'emails depuis Google, Bing, LinkedIn et Hunter.io
theHarvester \
  -d example.com \
  -b google, bing, linkedin, hunter \
  -l 500 \
  -f results_example.html

# Collecte via Shodan (nécessite une clé API)
theHarvester \
  -d example.com \
  -b shodan \
  -s  # Active la recherche Shodan sur les IPs trouvées

# Sources disponibles:
# - baidu, bing, brave, google, yahoo (moteurs)
# - hunter, emailformat, intelx (email hunters)
# - linkedin (LinkedIn sans auth)
# - shodan, censys, binaryedge (scan engines)
# - github, gitlab (code repositories)
# - dnsdumpster, hackertarget (DNS)
# - sublist3r (sous-domaines)

# Scan passif complet avec toutes les sources disponibles
theHarvester \
  -d example.com \
  -b all \
  -l 200 \
  -f full_recon.html \
  --dns-brute  # Brute-force DNS (semi-actif)
#!/usr/bin/env python3
"""
email_harvester.py — Collecte et validation d'emails d'organisation
via OSINT multi-sources
"""

import re
import requests
import dns.resolver
from typing import Set, List
from dataclasses import dataclass

@dataclass
class EmailFinding:
    email: str
    source: str
    valid_mx: bool
    smtp_verify: bool = False  # Vérification SMTP (risqué — peut déclencher alertes)

class EmailHarvester:
    EMAIL_PATTERN = re.compile(
        r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'
    )

    def __init__(self, target_domain: str):
        self.domain = target_domain.lower()
        self.collected_emails: Set[str] = set()

    def harvest_from_google_cse(self, api_key: str, cx: str) -> List[str]:
        """Collecte d'emails via Google Custom Search Engine (API légale)"""
        queries = [
            f'site:linkedin.com "@{self.domain}"',
            f'"@{self.domain}" filetype:pdf',
            f'"@{self.domain}" contact OR team OR about',
        ]
        emails = []

        for query in queries:
            params = {
                "key": api_key,
                "cx": cx,
                "q": query,
                "num": 10
            }
            try:
                resp = requests.get(
                    "https://www.googleapis.com/customsearch/v1",
                    params=params, timeout=10
                )
                data = resp.json()

                for item in data.get("items", []):
                    text = item.get("title", "") + " " + item.get("snippet", "")
                    found = self.EMAIL_PATTERN.findall(text)
                    domain_emails = [
                        e.lower() for e in found
                        if e.lower().endswith(f"@{self.domain}")
                    ]
                    emails.extend(domain_emails)
            except requests.RequestException:
                pass

        return list(set(emails))

    def validate_mx(self) -> bool:
        """Vérifie que le domaine a des enregistrements MX valides"""
        try:
            mx_records = dns.resolver.resolve(self.domain, 'MX')
            return len(mx_records) > 0
        except Exception:
            return False

    def infer_email_patterns(
        self,
        known_emails: List[str]
    ) -> List[str]:
        """
        Déduit le pattern d'email de l'organisation depuis les emails connus
        Ex: 'john.doe@company.com' → pattern 'firstname.lastname@company.com'
        """
        patterns = []

        for email in known_emails:
            local_part = email.split("@")[0]

            # Pattern prénom.nom
            if "." in local_part:
                parts = local_part.split(".")
                if len(parts) == 2:
                    patterns.append("firstname.lastname")

            # Pattern première initiale + nom
            elif len(local_part) > 3 and local_part[0].isalpha():
                patterns.append("flastname")

            # Pattern prénom seul
            else:
                patterns.append("firstname")

        # Pattern le plus fréquent
        if patterns:
            most_common = max(set(patterns), key=patterns.count)
            return [most_common]
        return []

Recon-ng : framework de reconnaissance modulaire

Recon-ng est un framework Python de reconnaissance OSINT inspiré de Metasploit. Son architecture modulaire avec une base de données intégrée permet d'enchaîner les modules et de construire progressivement une cartographie de la cible.


# Installation
pip3 install recon-ng

# Lancement et création d'un workspace
recon-ng
[recon-ng] > workspaces create example_corp
[recon-ng] > modules search

# Modules clés pour la reconnaissance

# 1. Enumération DNS
[recon-ng][example_corp] > modules load recon/domains-hosts/hackertarget
[recon-ng][example_corp][hackertarget] > options set SOURCE example.com
[recon-ng][example_corp][hackertarget] > run

# 2. Résolution des IPs
[recon-ng][example_corp] > modules load recon/hosts-hosts/resolve
[recon-ng][example_corp][resolve] > run

# 3. Recherche de sous-domaines via certificats
[recon-ng][example_corp] > modules load recon/domains-hosts/certificate_transparency
[recon-ng][example_corp][certificate_transparency] > options set SOURCE example.com
[recon-ng][example_corp][certificate_transparency] > run

# 4. Collecte d'emails via Hunter.io
[recon-ng][example_corp] > keys add hunter_api YOUR_HUNTER_API_KEY
[recon-ng][example_corp] > modules load recon/domains-contacts/hunter_io
[recon-ng][example_corp][hunter_io] > options set SOURCE example.com
[recon-ng][example_corp][hunter_io] > run

# 5. Géolocalisation des IPs via Shodan
[recon-ng][example_corp] > keys add shodan_api YOUR_SHODAN_API_KEY
[recon-ng][example_corp] > modules load recon/hosts-hosts/shodan_ip
[recon-ng][example_corp][shodan_ip] > run

# 6. Rapport final
[recon-ng][example_corp] > modules load reporting/html
[recon-ng][example_corp][html] > options set FILENAME /tmp/recon_report.html
[recon-ng][example_corp][html] > run

# Visualisation de l'état de la base de données
[recon-ng][example_corp] > show hosts
[recon-ng][example_corp] > show contacts
[recon-ng][example_corp] > show credentials
#!/usr/bin/env python3
"""
recon_automation.py — Automatisation Recon-ng via son API Python
"""

import sqlite3
import subprocess
import json
from pathlib import Path

class ReconNGAutomation:
    """Automatisation de Recon-ng pour la reconnaissance OSINT"""

    def __init__(self, workspace: str, target_domain: str):
        self.workspace = workspace
        self.domain = target_domain
        self.recon_path = Path.home() / ".recon-ng"
        self.db_path = self.recon_path / "workspaces" / workspace / "data.db"

    def run_reconnaissance(self) -> None:
        """Lance une séquence de reconnaissance complète"""
        commands = f"""
workspaces create {self.workspace}
modules load recon/domains-hosts/hackertarget
options set SOURCE {self.domain}
run
modules load recon/domains-hosts/certificate_transparency
options set SOURCE {self.domain}
run
modules load recon/hosts-hosts/resolve
run
modules load recon/domains-contacts/whois_pocs
options set SOURCE {self.domain}
run
exit
"""
        # Exécuter Recon-ng avec les commandes
        proc = subprocess.run(
            ["recon-ng", "-w", self.workspace],
            input=commands.encode(),
            capture_output=True,
            timeout=300
        )

        if proc.returncode != 0:
            print(f"[WARN] Recon-ng output: {proc.stderr.decode()[:500]}")

    def extract_results(self) -> dict:
        """Extrait les résultats depuis la base SQLite de Recon-ng"""
        if not self.db_path.exists():
            return {}

        results = {}

        with sqlite3.connect(str(self.db_path)) as conn:
            conn.row_factory = sqlite3.Row

            # Hôtes découverts
            hosts = conn.execute(
                "SELECT host, ip_address, region, country FROM hosts"
            ).fetchall()
            results["hosts"] = [dict(h) for h in hosts]

            # Contacts (emails, personnes)
            contacts = conn.execute(
                "SELECT first_name, last_name, email, title FROM contacts"
            ).fetchall()
            results["contacts"] = [dict(c) for c in contacts]

            # Credentials découverts (si module de leak search)
            try:
                creds = conn.execute(
                    "SELECT username, password, hash, type FROM credentials"
                ).fetchall()
                results["credentials"] = [dict(c) for c in creds]
            except sqlite3.OperationalError:
                results["credentials"] = []

        return results

Maltego : analyse de graphes relationnels

Maltego est le standard de l'industrie pour la visualisation des relations entre entités OSINT. Son modèle de données basé sur des entités (personnes, domaines, IPs, organisations) et des relations permet de construire des graphes de corrélation complexes.

Entités et transformations Maltego clés

Entité source Transformation Entités cibles Source de données
Domain To DNS Name - MX DNS Name (mail servers) DNS public
Domain To Email Addresses [Hunter] Email Address Hunter.io
Domain To Netblocks [Whois] Netblock WHOIS
Netblock To IPs [Shodan] IP Address + Services Shodan
IP Address To Certificates SSL Certificate → Domains Shodan/Censys
Person To Social Networks LinkedIn, Twitter, GitHub Social APIs
Email Address To Breached Accounts Breach records HaveIBeenPwned

Automatisation Maltego via Python (iTDS API)

#!/usr/bin/env python3
"""
maltego_transform.py — Création d'une transformation Maltego personnalisée
pour enrichir les entités avec des données OSINT internes
"""

from maltego_trx.maltego import MaltegoMsg, MaltegoTransform
from maltego_trx.decorator_registry import TransformSetting

def domain_to_subdomains_passive(request: MaltegoMsg, response: MaltegoTransform) -> None:
    """
    Transformation Maltego: Domain → Subdomains via Certificate Transparency
    Source: crt.sh (données publiques)
    """
    domain = request.Value.strip()

    if not domain or '.' not in domain:
        response.addUIMessage("Domaine invalide", messageType="Partial")
        return

    try:
        import requests

        # Requête à crt.sh pour les certificats
        crt_url = f"https://crt.sh/?q=%.{domain}&output=json"
        resp = requests.get(crt_url, timeout=15)
        resp.raise_for_status()

        certificates = resp.json()
        subdomains = set()

        for cert in certificates:
            name = cert.get("name_value", "")
            for subdomain in name.split("\n"):
                subdomain = subdomain.strip()
                if (subdomain.endswith(f".{domain}") and
                    not subdomain.startswith("*") and
                    len(subdomain) > len(domain) + 1):
                    subdomains.add(subdomain)

        # Créer les entités Maltego pour chaque sous-domaine
        for subdomain in sorted(subdomains)[:50]:  # Limiter à 50
            entity = response.addEntity("maltego.DNSName", subdomain)
            entity.addProperty("fqdn", "FQDN", "strict", subdomain)
            entity.addProperty(
                "source",
                "Source",
                "strict",
                "Certificate Transparency (crt.sh)"
            )

        response.addUIMessage(
            f"{len(subdomains)} sous-domaines découverts via Certificate Transparency"
        )

    except Exception as e:
        response.addUIMessage(f"Erreur: {str(e)}", messageType="Partial")

OSINT sur les réseaux sociaux

Les réseaux sociaux sont une source OSINT majeure pour l'investigation sur les individus (dans un cadre légal) et les organisations. LinkedIn est particulièrement riche pour la reconnaissance corporate.

LinkedIn OSINT : cartographie organisationnelle

#!/usr/bin/env python3
"""
linkedin_osint.py — OSINT LinkedIn via la recherche Google et l'API Proxycurl
(respecte les ToS LinkedIn en n'utilisant pas de scraping direct)
"""

import requests
import time
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class LinkedInEmployee:
    name: str
    title: str
    department: str
    location: str
    linkedin_url: str
    email_guess: Optional[str] = None

class LinkedInOSINT:
    """
    Reconnaissance LinkedIn via:
    1. Google Custom Search (index LinkedIn publiquement)
    2. Proxycurl API (accès légal aux profils publics)
    """

    def __init__(self, google_api_key: str, google_cx: str,
                 proxycurl_key: Optional[str] = None):
        self.google_api = google_api_key
        self.google_cx = google_cx
        self.proxycurl_key = proxycurl_key

    def find_employees(
        self,
        company_name: str,
        target_titles: List[str] = None
    ) -> List[LinkedInEmployee]:
        """
        Trouve les employés d'une organisation via Google/LinkedIn
        """
        employees = []
        default_titles = [
            "security", "CISO", "CTO", "CIO", "IT", "network",
            "developer", "engineer", "architect", "DevOps",
            "infrastructure", "cloud", "system administrator"
        ]
        titles_to_search = target_titles or default_titles

        for title in titles_to_search[:5]:  # Limiter les requêtes
            query = (
                f'site:linkedin.com/in/ "{company_name}" "{title}"'
            )

            params = {
                "key": self.google_api,
                "cx": self.google_cx,
                "q": query,
                "num": 10
            }

            try:
                resp = requests.get(
                    "https://www.googleapis.com/customsearch/v1",
                    params=params, timeout=10
                )
                data = resp.json()

                for item in data.get("items", []):
                    # Parser le titre LinkedIn depuis le résultat Google
                    # Format: "Name - Title at Company | LinkedIn"
                    page_title = item.get("title", "")
                    snippet = item.get("snippet", "")
                    url = item.get("link", "")

                    if "linkedin.com/in/" in url:
                        employee = self._parse_linkedin_result(
                            page_title, snippet, url
                        )
                        if employee:
                            employees.append(employee)

            except requests.RequestException:
                pass

            time.sleep(1.5)  # Rate limiting

        # Déduplication
        seen_urls = set()
        unique_employees = []
        for emp in employees:
            if emp.linkedin_url not in seen_urls:
                seen_urls.add(emp.linkedin_url)
                unique_employees.append(emp)

        return unique_employees

    def _parse_linkedin_result(
        self,
        title: str,
        snippet: str,
        url: str
    ) -> Optional[LinkedInEmployee]:
        """Parse un résultat Google de profil LinkedIn"""
        # Format typique: "John Doe - Security Engineer at Example Corp | LinkedIn"
        if " - " in title:
            parts = title.split(" - ", 1)
            name = parts[0].strip()
            rest = parts[1].replace(" | LinkedIn", "").strip()

            # Extraire le titre et l'entreprise
            if " at " in rest:
                job_title = rest.split(" at ")[0].strip()
                department = self._guess_department(job_title)
            else:
                job_title = rest
                department = "Inconnu"

            return LinkedInEmployee(
                name=name,
                title=job_title,
                department=department,
                location=self._extract_location(snippet),
                linkedin_url=url
            )
        return None

    def _guess_department(self, title: str) -> str:
        """Déduit le département depuis le titre"""
        title_lower = title.lower()
        if any(k in title_lower for k in ["security", "ciso", "soc", "siem"]):
            return "Sécurité"
        elif any(k in title_lower for k in ["developer", "engineer", "software"]):
            return "Développement"
        elif any(k in title_lower for k in ["network", "system", "infrastructure"]):
            return "Infrastructure"
        elif any(k in title_lower for k in ["cto", "cio", "vp", "director"]):
            return "Direction"
        return "IT"

    def _extract_location(self, snippet: str) -> str:
        """Extrait la localisation depuis le snippet Google"""
        import re
        # LinkedIn inclut souvent "Paris, Île-de-France" dans les snippets
        loc_pattern = re.compile(r'([A-Z][a-z]+(?:,\s*[A-Z][a-zÀ-ÿ\-]+)*)\s*·')
        match = loc_pattern.search(snippet)
        return match.group(1) if match else ""

    def guess_email(
        self,
        employee: LinkedInEmployee,
        domain: str,
        pattern: str = "firstname.lastname"
    ) -> str:
        """Génère une hypothèse d'email selon le pattern de l'organisation"""
        name_parts = employee.name.lower().split()
        if len(name_parts) < 2:
            return ""

        firstname = name_parts[0]
        lastname = name_parts[-1]

        # Normalisation (accents, tirets)
        import unicodedata
        def normalize(s: str) -> str:
            return ''.join(
                c for c in unicodedata.normalize('NFD', s)
                if unicodedata.category(c) != 'Mn'
            ).replace("-", "").replace("'", "")

        fn = normalize(firstname)
        ln = normalize(lastname)

        patterns = {
            "firstname.lastname": f"{fn}.{ln}@{domain}",
            "flastname": f"{fn[0]}{ln}@{domain}",
            "firstnamelastname": f"{fn}{ln}@{domain}",
            "firstname": f"{fn}@{domain}",
            "lastname.firstname": f"{ln}.{fn}@{domain}",
        }

        return patterns.get(pattern, f"{fn}.{ln}@{domain}")

OSINT réseaux sociaux : limites légales

  • L'OSINT LinkedIn est légale sur les profils publics, mais construire un profil complet d'un individu privé peut constituer une atteinte à la vie privée (Art. 226-1 Cp)
  • La génération d'hypothèses d'emails est légale ; l'envoi d'emails non sollicités est du spam (RGPD, Art. L34-5 CPCE)
  • Les données LinkedIn ne doivent pas être stockées durablement sans base légale — traitement pour un audit spécifique = durée limitée à l'audit
  • Les investigations sur des personnes physiques dans un contexte CTI doivent être limitées aux acteurs de menace identifiés comme tels, pas aux individus ordinaires

Reconnaissance DNS et certificats TLS

La reconnaissance DNS est l'une des techniques OSINT les plus productives — les enregistrements DNS sont publics par conception et révèlent une quantité considérable d'information sur l'infrastructure d'une organisation.


# === RECONNAISSANCE DNS PASSIVE ===

# Enregistrements DNS complets d'un domaine
dig example.com ANY
dig +short example.com MX
dig +short example.com TXT   # SPF, DMARC, DKIM, vérification Google
dig +short example.com NS    # Serveurs DNS autoritaires
dig +short example.com SOA   # Start of Authority (infos admin)
dig +short example.com AAAA  # IPv6

# Reverse DNS (PTR records)
dig -x 203.0.113.42

# Zone transfer (si mal configuré — souvent bloqué)
dig @ns1.example.com example.com AXFR

# DMARC — révèle l'email gateway
dig +short _dmarc.example.com TXT

# SPF — révèle les IPs et services d'envoi d'emails
dig +short example.com TXT | grep "v=spf"

# DKIM — révèle les sélecteurs (google/selector1/k1/etc = services tiers)
dig +short google._domainkey.example.com TXT
dig +short selector1._domainkey.example.com TXT

# Enumération de sous-domaines via Certificate Transparency
curl -s "https://crt.sh/?q=%.example.com&output=json" |
  jq -r '.[].name_value' |
  sort -u |
  grep -v "^\*"

# DNSRecon — outil de reconnaissance DNS complet
dnsrecon -d example.com -t std    # Standard DNS recon
dnsrecon -d example.com -t brt -D /usr/share/wordlists/subdomains.txt  # Brute

# Amass — reconnaissance passive de sous-domaines
amass enum -passive -d example.com -o amass_results.txt

# Subfinder — multisource subdomain discovery
subfinder -d example.com -o subfinder_results.txt -all
#!/usr/bin/env python3
"""
dns_recon.py — Reconnaissance DNS passive multi-technique
"""

import dns.resolver
import requests
import json
import re
from typing import List, Set, Dict
from dataclasses import dataclass

@dataclass
class DNSReconResult:
    domain: str
    subdomains: Set[str]
    ip_addresses: Dict[str, str]  # subdomain -> IP
    mx_records: List[str]
    spf_includes: List[str]  # Services cloud révélés via SPF
    cloud_providers: List[str]  # AWS, Azure, GCP détectés
    certificate_sans: List[str]

class PassiveDNSRecon:
    """Reconnaissance DNS entièrement passive — aucun paquet envoyé à la cible"""

    # Services cloud révélés par les SPF includes
    SPF_CLOUD_MAPPING = {
        "include:spf.protection.outlook.com": "Microsoft 365",
        "include:_spf.google.com": "Google Workspace",
        "include:amazonses.com": "Amazon SES",
        "include:sendgrid.net": "SendGrid",
        "include:mailgun.org": "Mailgun",
        "include:spf.mandrillapp.com": "Mandrill/Mailchimp",
        "include:servers.mcsv.net": "Mailchimp",
        "include:spf.sparkpostmail.com": "SparkPost",
    }

    def __init__(self, domain: str):
        self.domain = domain.lower()
        self.resolver = dns.resolver.Resolver()
        self.resolver.nameservers = ["8.8.8.8", "1.1.1.1"]  # DNS publics

    def full_recon(self) -> DNSReconResult:
        """Lance une reconnaissance DNS passive complète"""
        result = DNSReconResult(
            domain=self.domain,
            subdomains=set(),
            ip_addresses={},
            mx_records=[],
            spf_includes=[],
            cloud_providers=[],
            certificate_sans=[]
        )

        # 1. Enregistrements de base
        result.mx_records = self._get_mx()

        # 2. Analyse SPF pour découvrir les services cloud
        result.spf_includes, result.cloud_providers = self._analyze_spf()

        # 3. Certificate Transparency (crt.sh)
        ct_subdomains = self._query_certificate_transparency()
        result.subdomains.update(ct_subdomains)
        result.certificate_sans = sorted(ct_subdomains)

        # 4. Résolution IP des sous-domaines
        for subdomain in list(result.subdomains)[:50]:
            ip = self._resolve_to_ip(subdomain)
            if ip:
                result.ip_addresses[subdomain] = ip
                # Détection cloud via CNAME/IP
                provider = self._detect_cloud_from_ip(subdomain, ip)
                if provider and provider not in result.cloud_providers:
                    result.cloud_providers.append(provider)

        return result

    def _get_mx(self) -> List[str]:
        """Récupère les enregistrements MX"""
        try:
            mx_answers = self.resolver.resolve(self.domain, 'MX')
            return sorted(
                [str(r.exchange).rstrip('.') for r in mx_answers],
                key=lambda x: x.split('.')[0]
            )
        except Exception:
            return []

    def _analyze_spf(self) -> tuple:
        """Analyse le SPF pour identifier les services cloud utilisés"""
        spf_includes = []
        cloud_providers = []

        try:
            txt_answers = self.resolver.resolve(self.domain, 'TXT')
            for rdata in txt_answers:
                txt = str(rdata).strip('"')
                if txt.startswith('v=spf1'):
                    # Extraire les includes
                    includes = re.findall(r'include:([^\s"]+)', txt)
                    spf_includes = [f"include:{i}" for i in includes]

                    # Mapper vers les fournisseurs
                    for include in spf_includes:
                        for spf_key, provider in self.SPF_CLOUD_MAPPING.items():
                            if spf_key in include:
                                if provider not in cloud_providers:
                                    cloud_providers.append(provider)
        except Exception:
            pass

        return spf_includes, cloud_providers

    def _query_certificate_transparency(self) -> Set[str]:
        """Requête crt.sh pour les certificats TLS"""
        subdomains = set()

        try:
            resp = requests.get(
                f"https://crt.sh/?q=%.{self.domain}&output=json",
                timeout=20
            )
            resp.raise_for_status()

            for cert in resp.json():
                name = cert.get("name_value", "")
                for name_entry in name.split("\n"):
                    name_entry = name_entry.strip()
                    if (name_entry.endswith(f".{self.domain}") and
                        not name_entry.startswith("*") and
                        " " not in name_entry):
                        subdomains.add(name_entry.lower())

        except Exception:
            pass

        return subdomains

    def _resolve_to_ip(self, hostname: str) -> str:
        """Résout un hostname en IP (passif — utilise les DNS publics)"""
        try:
            answers = self.resolver.resolve(hostname, 'A')
            return str(answers[0])
        except Exception:
            return ""

    def _detect_cloud_from_ip(self, hostname: str, ip: str) -> str:
        """Détecte le fournisseur cloud depuis le CNAME ou l'IP"""
        cloud_patterns = {
            "amazonaws.com": "AWS",
            "cloudfront.net": "AWS CloudFront",
            "azurewebsites.net": "Azure",
            "blob.core.windows.net": "Azure Storage",
            "googleusercontent.com": "Google Cloud",
            "appspot.com": "Google App Engine",
            "cloudflare": "Cloudflare",
            "fastly.net": "Fastly CDN",
        }

        try:
            cname_answers = self.resolver.resolve(hostname, 'CNAME')
            cname = str(cname_answers[0]).rstrip('.')

            for pattern, provider in cloud_patterns.items():
                if pattern in cname:
                    return provider
        except Exception:
            pass

        return ""

OSINT pour la Cyber Threat Intelligence

Dans un contexte CTI (Cyber Threat Intelligence), l'OSINT est utilisé pour suivre les acteurs de menace, identifier leurs indicateurs de compromission (IOCs), et anticiper leurs prochaines cibles. Cette application de l'OSINT est la plus sophistiquée et nécessite une connaissance approfondie des TTPs (Tactiques, Techniques et Procédures) des acteurs.

Sources CTI open source

#!/usr/bin/env python3
"""
cti_osint_aggregator.py — Agrégateur d'IOCs depuis des sources CTI open source
"""

import requests
import json
import csv
import io
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Set

@dataclass
class ThreatIndicator:
    value: str
    type: str  # ip, domain, hash, url, email
    source: str
    first_seen: str
    last_seen: str
    threat_type: str  # malware, ransomware, apt, phishing
    confidence: int  # 0-100
    tags: List[str] = field(default_factory=list)

class CTIOSINTAggregator:
    """
    Agrégateur d'IOCs depuis des sources CTI open source:
    - AlienVault OTX (Open Threat Exchange)
    - ThreatFox (abuse.ch)
    - URLhaus (abuse.ch)
    - PhishTank
    - MalwareBazaar
    """

    FEEDS = {
        "alienVault_otx_malware_ips": {
            "url": "https://reputation.alienvault.com/reputation.data",
            "type": "ip",
            "threat_type": "malware",
            "confidence": 70
        },
        "threatfox_json": {
            "url": "https://threatfox-api.abuse.ch/api/v1/",
            "type": "multiple",
            "threat_type": "malware",
            "confidence": 80
        },
        "urlhaus_csv": {
            "url": "https://urlhaus.abuse.ch/downloads/csv_recent/",
            "type": "url",
            "threat_type": "malware",
            "confidence": 85
        },
        "feodo_tracker": {
            "url": "https://feodotracker.abuse.ch/downloads/ipblocklist.csv",
            "type": "ip",
            "threat_type": "botnet_c2",
            "confidence": 90
        }
    }

    def __init__(self, otx_api_key: str = ""):
        self.otx_api_key = otx_api_key
        self.indicators: List[ThreatIndicator] = []

    def fetch_threatfox(self, days: int = 1) -> List[ThreatIndicator]:
        """Récupère les IOCs ThreatFox des N derniers jours"""
        indicators = []

        payload = {"query": "get_iocs", "days": days}
        try:
            resp = requests.post(
                "https://threatfox-api.abuse.ch/api/v1/",
                json=payload,
                timeout=30
            )
            data = resp.json()

            for ioc in data.get("data", []):
                indicators.append(ThreatIndicator(
                    value=ioc.get("ioc", ""),
                    type=ioc.get("ioc_type", "").lower().replace(" ", "_"),
                    source="ThreatFox (abuse.ch)",
                    first_seen=ioc.get("first_seen", ""),
                    last_seen=ioc.get("last_seen", ""),
                    threat_type=ioc.get("threat_type", "").lower(),
                    confidence=ioc.get("confidence_level", 0),
                    tags=[ioc.get("malware", ""), ioc.get("threat_type_desc", "")]
                ))

        except requests.RequestException as e:
            print(f"[WARN] ThreatFox fetch error: {e}")

        return indicators

    def fetch_urlhaus(self) -> List[ThreatIndicator]:
        """Récupère les URLs malveillantes depuis URLhaus"""
        indicators = []

        try:
            resp = requests.get(
                "https://urlhaus.abuse.ch/downloads/csv_recent/",
                timeout=30
            )
            resp.raise_for_status()

            reader = csv.DictReader(
                io.StringIO(resp.text.replace("# ", "")),
                fieldnames=["id", "dateadded", "url", "url_status",
                           "last_online", "threat", "tags", "urlhaus_link", "reporter"]
            )

            for row in list(reader)[9:]:  # Skip headers/comments
                if row.get("url_status") == "online":  # Actif uniquement
                    indicators.append(ThreatIndicator(
                        value=row.get("url", ""),
                        type="url",
                        source="URLhaus (abuse.ch)",
                        first_seen=row.get("dateadded", ""),
                        last_seen=row.get("last_online", ""),
                        threat_type=row.get("threat", "").lower(),
                        confidence=85,
                        tags=row.get("tags", "").split(",") if row.get("tags") else []
                    ))

        except requests.RequestException as e:
            print(f"[WARN] URLhaus fetch error: {e}")

        return indicators

    def check_indicator(self, value: str, indicator_type: str = None) -> dict:
        """
        Vérifie si un indicateur est connu comme malveillant
        Multi-source avec scoring de confiance
        """
        matches = []
        max_confidence = 0

        for ioc in self.indicators:
            if ioc.value == value:
                if indicator_type and ioc.type != indicator_type:
                    continue
                matches.append(ioc)
                max_confidence = max(max_confidence, ioc.confidence)

        if not matches:
            return {"found": False, "value": value}

        return {
            "found": True,
            "value": value,
            "confidence": max_confidence,
            "threat_types": list(set(m.threat_type for m in matches)),
            "sources": list(set(m.source for m in matches)),
            "tags": list(set(tag for m in matches for tag in m.tags if tag)),
            "verdict": "MALICIOUS" if max_confidence >= 70 else "SUSPICIOUS"
        }

    def export_misp(self, output_file: str) -> None:
        """Exporte les IOCs au format MISP pour import dans une plateforme TIP"""
        misp_event = {
            "Event": {
                "info": f"CTI OSINT Feed - {datetime.utcnow().strftime('%Y-%m-%d')}",
                "distribution": 1,
                "threat_level_id": 2,
                "analysis": 2,
                "Attribute": []
            }
        }

        for ioc in self.indicators:
            misp_type_map = {
                "ip_port": "ip-dst|port",
                "ip": "ip-dst",
                "domain": "domain",
                "url": "url",
                "md5_hash": "md5",
                "sha256_hash": "sha256",
                "email": "email-src"
            }

            misp_type = misp_type_map.get(ioc.type, ioc.type)

            misp_event["Event"]["Attribute"].append({
                "category": "Network activity",
                "type": misp_type,
                "value": ioc.value,
                "comment": f"Source: {ioc.source} | Threat: {ioc.threat_type}",
                "tags": ioc.tags
            })

        with open(output_file, 'w') as f:
            json.dump(misp_event, f, indent=2, ensure_ascii=False)

Dark Web Monitoring via OSINT

Le dark web monitoring consiste à surveiller les espaces du web non indexés par les moteurs classiques (sites .onion du réseau Tor, marchés clandestins, forums de hackers) pour détecter des données d'une organisation qui auraient été volées et mises en vente.


# === DARK WEB MONITORING — APPROCHE LÉGALE ===
# NE PAS accéder aux marchés de drogues ou d'armes
# Focus: détection de données volées, credentials leakés

# 1. Services de monitoring commercial (légaux)
# - IntelX (intelligence X) — indexe le dark web légalement
# - DarkOwl — spécialisé dark web intelligence
# - Have I Been Pwned — breaches de données

# Vérification d'un domaine sur HIBP (Have I Been Pwned) via API
curl -H "hibp-api-key: $HIBP_API_KEY" \
  "https://haveibeenpwned.com/api/v3/breachesforaccount/user@example.com"

# Recherche de breaches affectant un domaine entier (service payant)
curl -H "hibp-api-key: $HIBP_API_KEY" \
  "https://haveibeenpwned.com/api/v3/enterprisebreachedaccount/example.com"

# 2. Tor Browser pour la surveillance manuelle (légale si lecture uniquement)
# Installation Tor
sudo apt-get install tor torsocks

# Configuration du proxy Tor
# /etc/tor/torrc:
SocksPort 9050
SocksPort 9150

# Requêtes via Tor (proxy SOCKS5)
torsocks curl -s "http://darkwebsite.onion/search?q=example.com"

# 3. Onionoo — données publiques Tor network
curl "https://onionoo.torproject.org/details?search=example.com"
#!/usr/bin/env python3
"""
darkweb_monitor.py — Monitoring du dark web pour la détection de fuites
via des services d'intelligence légaux
"""

import requests
import hashlib
from typing import List, Optional
from dataclasses import dataclass

@dataclass
class BreachRecord:
    source: str
    breach_date: str
    data_types: List[str]
    is_verified: bool
    account_count: int
    description: str

class DarkWebMonitor:
    """
    Monitoring de la présence de données d'organisation sur le dark web
    via des APIs légales et éthiques
    """

    HIBP_API = "https://haveibeenpwned.com/api/v3"

    def __init__(self, hibp_api_key: str, intelx_api_key: str = ""):
        self.hibp_key = hibp_api_key
        self.intelx_key = intelx_api_key
        self.session = requests.Session()
        self.session.headers.update({
            "hibp-api-key": hibp_api_key,
            "User-Agent": "DarkWebMonitor/1.0"
        })

    def check_email_breach(self, email: str) -> List[BreachRecord]:
        """Vérifie si un email apparaît dans des breaches connues"""
        try:
            resp = self.session.get(
                f"{self.HIBP_API}/breachedaccount/{email}",
                params={"truncateResponse": "false"},
                timeout=10
            )

            if resp.status_code == 404:
                return []  # Pas trouvé — bonne nouvelle

            resp.raise_for_status()
            breaches = resp.json()

            return [
                BreachRecord(
                    source=b.get("Name", ""),
                    breach_date=b.get("BreachDate", ""),
                    data_types=b.get("DataClasses", []),
                    is_verified=b.get("IsVerified", False),
                    account_count=b.get("PwnCount", 0),
                    description=b.get("Description", "")[:200]
                )
                for b in breaches
            ]

        except requests.RequestException as e:
            print(f"[WARN] HIBP error for {email}: {e}")
            return []

    def check_password_pwned(self, password: str) -> int:
        """
        Vérifie si un mot de passe est dans les bases leakées
        Utilise k-anonymity — le mot de passe n'est jamais envoyé
        Retourne le nombre de fois que le hash a été trouvé (0 = safe)
        """
        # SHA-1 hash du mot de passe
        sha1_hash = hashlib.sha1(password.encode()).hexdigest().upper()
        prefix = sha1_hash[:5]
        suffix = sha1_hash[5:]

        try:
            resp = requests.get(
                f"https://api.pwnedpasswords.com/range/{prefix}",
                timeout=10
            )
            resp.raise_for_status()

            # Chercher le suffix dans la réponse
            for line in resp.text.splitlines():
                hash_suffix, count = line.split(":")
                if hash_suffix == suffix:
                    return int(count)

            return 0  # Non trouvé

        except requests.RequestException:
            return -1  # Erreur — indéterminé

    def monitor_domain(
        self,
        domain: str,
        employee_emails: List[str] = None
    ) -> dict:
        """
        Monitoring complet d'un domaine:
        - Breaches affectant le domaine
        - Emails d'employés compromis
        """
        report = {
            "domain": domain,
            "scan_date": __import__("datetime").datetime.utcnow().isoformat(),
            "breaches_by_email": {},
            "total_compromised_emails": 0,
            "highest_risk_data_types": []
        }

        all_data_types = []

        if employee_emails:
            for email in employee_emails:
                if not email.endswith(f"@{domain}"):
                    continue

                breaches = self.check_email_breach(email)

                if breaches:
                    report["breaches_by_email"][email] = [
                        {
                            "source": b.source,
                            "date": b.breach_date,
                            "data_types": b.data_types
                        }
                        for b in breaches
                    ]
                    report["total_compromised_emails"] += 1
                    all_data_types.extend(
                        dt for b in breaches for dt in b.data_types
                    )

                import time
                time.sleep(1.5)  # HIBP rate limit: 1 req/1.5s

        # Données les plus fréquemment leakées
        from collections import Counter
        data_type_counts = Counter(all_data_types)
        report["highest_risk_data_types"] = [
            {"type": dt, "occurrences": count}
            for dt, count in data_type_counts.most_common(10)
        ]

        report["risk_level"] = (
            "CRITICAL" if report["total_compromised_emails"] > 10 else
            "HIGH" if report["total_compromised_emails"] > 3 else
            "MEDIUM" if report["total_compromised_emails"] > 0 else
            "LOW"
        )

        return report

OPSEC pour les investigateurs OSINT

L'OPSEC (Operational Security) est indispensable pour les investigateurs OSINT — toute visite directe d'un site cible peut être détectée via les logs d'accès, les Referer headers, ou les systèmes de tracking. Un bon investigateur OSINT ne laisse pas de traces de ses recherches sur les systèmes cibles.

Environnement OSINT isolé


# === ENVIRONNEMENT OSINT SÉCURISÉ ===

# 1. Machine virtuelle dédiée
# Ne jamais faire de l'OSINT depuis la machine principale
# Utiliser une VM dédiée, réinitialisable (snapshot avant chaque investigation)

# 2. VPN + Tor pour l'anonymisation
# VPN → Tor pour l'investigation sensible
# (à utiliser uniquement dans un cadre légal)

# Installation Whonix (distribution dédiée OPSEC)
# Whonix Gateway + Whonix Workstation = double anonymisation

# 3. Browser OPSEC
# Firefox avec profil dédié
firefox -new-instance -profile /tmp/osint-profile

# Paramètres minimaux:
# - about:config -> privacy.resistFingerprinting = true
# - Désactiver JavaScript pour les sites sensibles
# - NoScript + uBlock Origin
# - User Agent switcher (simuler un navigateur différent)

# 4. Comptes dédiés OSINT (sock puppets légaux)
# Créer des comptes LinkedIn/Twitter dédiés à l'investigation
# Ne jamais utiliser vos comptes personnels ou professionnels

# 5. Téléphone jetable pour les SMS de vérification
# Utiliser un numéro temporaire (ex: Twilio pour les registrations)

# 6. Recherches Google en mode privé et via Tor
torsocks curl -s \
  "https://www.google.com/search?q=site:example.com&num=10" \
  -H "User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"

# 7. Vérification des fuites WebRTC avant investigation
# https://browserleaks.com/webrtc
# Les extensions comme uBlock Origin bloquent les fuites WebRTC

Automatisation OSINT avec Python : framework complet

#!/usr/bin/env python3
"""
osint_framework.py — Framework d'automatisation OSINT complet
Orchestre theHarvester, Shodan, DNS recon et CTI en une seule investigation
"""

import asyncio
import json
import argparse
from datetime import datetime
from pathlib import Path
from typing import Optional

class OSINTFramework:
    """
    Framework d'investigation OSINT automatisé
    Combine: DNS, Certificate Transparency, Shodan, Email Harvesting, CTI
    """

    def __init__(
        self,
        target_domain: str,
        shodan_api_key: str,
        output_dir: str = "/tmp/osint_results",
        verbose: bool = False
    ):
        self.domain = target_domain
        self.shodan_key = shodan_api_key
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.verbose = verbose
        self.results = {
            "target": target_domain,
            "investigation_start": datetime.utcnow().isoformat(),
            "phases": {}
        }

    async def run_full_investigation(self) -> dict:
        """Lance une investigation OSINT complète en mode asynchrone"""
        print(f"[*] Démarrage investigation OSINT: {self.domain}")
        print(f"[*] Heure de début: {datetime.utcnow().isoformat()}")
        print("-" * 60)

        # Phase 1: Reconnaissance DNS
        print("[PHASE 1] Reconnaissance DNS passive...")
        dns_recon = PassiveDNSRecon(self.domain)
        dns_results = dns_recon.full_recon()

        self.results["phases"]["dns"] = {
            "subdomains_count": len(dns_results.subdomains),
            "subdomains": sorted(dns_results.subdomains)[:100],
            "mx_records": dns_results.mx_records,
            "cloud_providers": dns_results.cloud_providers,
            "spf_services": dns_results.spf_includes
        }
        print(f"  Sous-domaines trouvés: {len(dns_results.subdomains)}")
        print(f"  Fournisseurs cloud détectés: {', '.join(dns_results.cloud_providers)}")

        # Phase 2: Shodan
        print("\n[PHASE 2] Reconnaissance Shodan...")
        shodan_recon = ShodanRecon(self.shodan_key)
        shodan_results = shodan_recon.recon_organization(self.domain)

        self.results["phases"]["shodan"] = {
            "hosts_count": len(shodan_results.get("hosts", [])),
            "exposed_services": shodan_results.get("exposed_services", {}),
            "vulnerabilities": shodan_results.get("vulnerabilities", [])[:20],
            "interesting_findings": shodan_results.get("interesting_findings", [])
        }
        print(f"  Hôtes indexés: {len(shodan_results.get('hosts', []))}")
        print(f"  CVE trouvées: {len(shodan_results.get('vulnerabilities', []))}")

        # Phase 3: Google Dorking (via API)
        print("\n[PHASE 3] Analyse de la surface d'exposition web...")
        # (simulation — nécessite une clé SerpAPI)
        self.results["phases"]["web_exposure"] = {
            "status": "requires_serpapi_key",
            "note": "Configurer une clé SerpAPI pour activer cette phase"
        }

        # Phase 4: CTI — vérification des IOCs
        print("\n[PHASE 4] Threat Intelligence...")
        cti = CTIOSINTAggregator()
        # Récupérer les IOCs récents
        iocs = cti.fetch_threatfox(days=7)

        # Vérifier les IPs découvertes
        target_ips = [
            ip for ip in shodan_results.get("ip_addresses", {}).values()
            if ip
        ]
        cti_matches = []
        for ip in target_ips[:10]:
            result = cti.check_indicator(ip, "ip")
            if result.get("found"):
                cti_matches.append(result)

        self.results["phases"]["cti"] = {
            "target_ips_checked": len(target_ips),
            "malicious_indicators_found": len(cti_matches),
            "matches": cti_matches
        }
        print(f"  IPs analysées: {len(target_ips)}")
        print(f"  Indicateurs malveillants: {len(cti_matches)}")

        # Finalisation
        self.results["investigation_end"] = datetime.utcnow().isoformat()
        self.results["executive_summary"] = self._generate_summary()

        # Sauvegarde
        output_file = self.output_dir / f"osint_{self.domain}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)

        print(f"\n[+] Rapport sauvegardé: {output_file}")

        return self.results

    def _generate_summary(self) -> dict:
        """Génère un résumé exécutif de l'investigation"""
        phases = self.results["phases"]

        critical_findings = []

        # CVE critiques
        vulns = phases.get("shodan", {}).get("vulnerabilities", [])
        critical_cves = [v for v in vulns if v.get("cve") in KNOWN_CRITICAL_CVE]
        if critical_cves:
            critical_findings.append(
                f"{len(critical_cves)} CVE critiques sur les services exposés"
            )

        # IOCs malveillants
        cti_hits = phases.get("cti", {}).get("malicious_indicators_found", 0)
        if cti_hits:
            critical_findings.append(
                f"{cti_hits} IP(s) classée(s) malveillante(s) dans les feeds CTI"
            )

        return {
            "target": self.domain,
            "total_attack_surface": {
                "subdomains": phases.get("dns", {}).get("subdomains_count", 0),
                "exposed_hosts": phases.get("shodan", {}).get("hosts_count", 0),
                "known_vulnerabilities": len(phases.get("shodan", {}).get("vulnerabilities", []))
            },
            "critical_findings": critical_findings,
            "risk_level": "CRITICAL" if len(critical_findings) > 1 else
                         "HIGH" if critical_findings else "MEDIUM",
            "recommended_actions": [
                "Auditer et réduire les services exposés sur Internet",
                "Patcher les CVE critiques identifiées en priorité",
                "Implémenter un programme de gestion de la surface d'attaque (ASM)",
                "Activer le monitoring continu via Shodan Monitor",
            ]
        }

# Point d'entrée CLI
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Framework OSINT automatisé")
    parser.add_argument("domain", help="Domaine cible de l'investigation")
    parser.add_argument("--shodan-key", required=True, help="Clé API Shodan")
    parser.add_argument("--output", default="/tmp/osint_results", help="Répertoire de sortie")
    parser.add_argument("--verbose", action="store_true")

    args = parser.parse_args()

    framework = OSINTFramework(
        target_domain=args.domain,
        shodan_api_key=args.shodan_key,
        output_dir=args.output,
        verbose=args.verbose
    )

    asyncio.run(framework.run_full_investigation())

Automatisation OSINT : bonnes pratiques

  • Toujours respecter les rate limits des APIs — Shodan, HIBP, Google ont des limites strictes et bloquent les abus
  • Loguer toutes les requêtes avec timestamp pour reconstituer la chronologie d'investigation (traçabilité légale)
  • Ne jamais hardcoder les clés API dans les scripts — utiliser des variables d'environnement ou un vault
  • Séparer la collecte, le traitement et l'analyse dans des modules indépendants — facilite la maintenance et le test unitaire

OSINT pour les tests de pénétration autorisés

Dans le cadre d'un test de pénétration (pentest) avec autorisation écrite, l'OSINT constitue la phase de reconnaissance qui détermine la profondeur et la précision de l'engagement. La règle d'or est que l'OSINT passif est toujours autorisé ; l'OSINT actif (port scanning, fingerprinting) nécessite explicitement d'être dans le scope.

Rapport de reconnaissance OSINT pour un pentest


## RAPPORT DE RECONNAISSANCE OSINT
### Engagement: Pentest Externe — Example Corp
### Date: 2024-03-15 | Durée: 3 jours

---

## PÉRIMÈTRE D'INVESTIGATION
- Domaine principal: example.com
- Filiales: subsidiary.com, example-eu.com
- Plages IP: 203.0.113.0/24 (AS12345)

## RÉSUMÉ EXÉCUTIF
L'investigation OSINT a permis de cartographier une surface d'attaque
significativement plus large que déclarée par le client.

**Risques critiques identifiés:**
- 3 services de gestion (RDP, SSH) directement exposés sur Internet
- 2 CVE critiques (CVSS ≥ 9.0) non patchées sur des services publics
- 47 comptes email présents dans des breaches connues, dont 12 avec des mots
  de passe en clair leakés
- Repository GitHub public contenant des credentials de développement

---

## DÉCOUVERTES TECHNIQUES

### Surface d'attaque DNS
| Type | Valeur | Risque |
|------|--------|--------|
| Sous-domaine | dev.example.com | HIGH — interface de développement exposée |
| Sous-domaine | backup.example.com | CRITICAL — interface backup sans auth potentielle |
| MX | mail.google.com | INFO — Google Workspace |

### Services Shodan exposés
| IP | Port | Service | CVE | Risque |
|----|------|---------|-----|--------|
| 203.0.113.42 | 3389 | RDP | CVE-2019-0708 | CRITICAL |
| 203.0.113.55 | 22 | OpenSSH 7.4 | CVE-2023-38408 | HIGH |

### Breaches et credentials
- 47 emails @example.com dans HIBP
- 12 comptes avec mot de passe en clair dans combo listes
- Priorité d'investigation: [liste anonymisée]

---

## RECOMMANDATIONS
1. **URGENT**: Retirer le service RDP (203.0.113.42:3389) d'Internet
2. **URGENT**: Forcer la rotation des mots de passe des 12 comptes compromis
3. **HIGH**: Supprimer le repository GitHub public (credentials en dur)
4. **HIGH**: Patcher OpenSSH sur 203.0.113.55
5. **MEDIUM**: Réduire la surface DNS (supprimer les sous-domaines inutilisés)

FAQ OSINT

Quelle est la différence entre OSINT passif et OSINT actif dans un contexte légal ?

L'OSINT passif consiste à collecter des informations sans interagir directement avec les systèmes cibles — requêtes DNS sur des serveurs publics, lecture de pages web indexées, consultation de bases de données WHOIS, analyse de certificats TLS via crt.sh. Il est toujours légal dans un cadre professionnel. L'OSINT actif implique une interaction directe avec les systèmes cibles — port scanning, fingerprinting de services, requêtes vers des APIs non publiques. En France, cette interaction sans autorisation explicite tombe sous le coup de l'article 323-1 du Code pénal (accès frauduleux). Dans un contexte de pentest avec autorisation écrite, l'OSINT actif dans le scope est parfaitement légal.

Shodan indexe-t-il tous les services exposés sur Internet ?

Non — Shodan couvre une portion significative mais non exhaustive d'Internet. Ses robots scannent en permanence des plages d'IP publiques sur les ports les plus courants (80, 443, 22, 3389, 8080, etc.) et un sous-ensemble de ports moins courants. Des services sur des ports très atypiques peuvent échapper à l'indexation Shodan. Censys et BinaryEdge sont des alternatives avec des méthodologies de scan différentes et parfois complémentaires. Pour une couverture maximale, croiser Shodan, Censys, et les données BGP/routing tables (via RIPEstat ou BGPView) donne une image plus complète.

Comment protéger son organisation contre la reconnaissance OSINT d'un attaquant ?

La défense contre la reconnaissance OSINT s'appelle l'Attack Surface Management (ASM). Les mesures concrètes incluent : (1) réduire la surface DNS — supprimer les sous-domaines inutilisés, utiliser des noms de sous-domaines non descriptifs, (2) utiliser des enregistrements WHOIS avec protection de la vie privée, (3) surveiller activement ce que Shodan indexe sur vos IP (Shodan Monitor), (4) retirer les services de gestion (RDP, SSH) de l'accès direct Internet derrière un VPN ou PAM, (5) mettre en place un programme de monitoring des credentials leakés (HIBP Enterprise), (6) former les développeurs à ne jamais committer de credentials dans les dépôts (même privés).

Quels outils OSINT ne nécessitent pas de clé API ?

Plusieurs outils OSINT puissants fonctionnent sans clé API. theHarvester avec les sources Google/Bing est utilisable sans API pour un usage limité. Recon-ng avec les modules passifs (hackertarget, certificate_transparency) ne nécessite pas de clé. crt.sh est entièrement libre d'accès via son API JSON (https://crt.sh/?q=%.domain.com&output=json). BGPView et RIPEstat fournissent des données de routage sans authentification. Wayback Machine d'Archive.org est accessible librement. DNSdumpster.com fournit une interface gratuite pour la reconnaissance DNS. Pour Shodan, la recherche basique est disponible avec un compte gratuit (limité à 1 page de résultats sans API key).

Sources ouvertes avancées

Comment utiliser Maltego sans licence commerciale pour l'OSINT ?

Maltego CE (Community Edition) est disponible gratuitement avec des limitations : 12 entités maximum par graphe, pas de stockage en ligne, et accès limité aux transformations commerciales. Pour contourner ces limitations dans un contexte professionnel : (1) Maltego One est l'offre commerciale la plus abordable (~600€/an), (2) utiliser Maltego CE pour les visualisations et exporter les données enrichies depuis d'autres outils, (3) SpiderFoot est une alternative open source gratuite avec des capacités similaires de graphe de relations, (4) Maltego peut être remplacé par un graphe NetworkX en Python + visualisation Gephi pour les investigations complexes sans budget.

Reconnaissance réseau

Comment automatiser le monitoring OSINT continu d'une organisation ?

L'OSINT continu (aussi appelé Digital Risk Monitoring) peut être automatisé avec un stack open source : (1) SpiderFoot HX ou SpiderFoot open source pour les scans périodiques, (2) des scripts Python cron qui interrogent crt.sh, Shodan Monitor, HIBP Enterprise à intervalles réguliers, (3) des alertes Google Alerts sur le nom de l'organisation et ses dirigeants, (4) des outils de monitoring dark web comme IntelX pour les notifications de mentions, (5) un SIEM qui agrège tous ces résultats et génère des alertes sur les nouveaux findings. Les résultats sont corrélés avec les données d'inventaire IT pour prioriser les actions de remédiation.

Quelle est la valeur légale des preuves collectées par OSINT ?

En France, les preuves OSINT peuvent être recevables en justice sous conditions. Premièrement, elles doivent avoir été collectées légalement (sources publiques, sans contournement de protections). Deuxièmement, leur authenticité doit être attestable — conserver des captures d'écran horodatées, des exports de pages web avec métadonnées, et des logs de requêtes DNS avec timestamp. Troisièmement, pour des procédures pénales, les preuves OSINT sont généralement soumises à un expert judiciaire pour validation. La norme ISO/IEC 27037 sur la collecte de preuves numériques fournit le cadre méthodologique pour garantir la chaîne de custody. Pour une investigation menée à des fins de contentieux, impliquer un huissier de justice pour authentifier les captures d'écran est recommandé.

Comment l'OSINT s'intègre-t-il dans un programme de Cyber Threat Intelligence structuré ?

L'OSINT est l'une des quatre sources de renseignement d'un programme CTI, avec le HUMINT (renseignement humain via les contacts industrie et les chercheurs en sécurité), le TECHINT (renseignement technique via les logs internes, le SIEM, les honeypots), et le SIGINT (interception de communications, réservée aux services étatiques). Dans une organisation privée, l'OSINT CTI se structure autour de : plateformes TIP (Threat Intelligence Platform) comme MISP ou OpenCTI qui agrègent et contextualisent les IOCs, des flux d'ISAC (Information Sharing and Analysis Centers) comme le CERT-FR qui partagent des IOCs sectoriels, et des outils d'automatisation qui enrichissent les alertes SIEM avec les données OSINT CTI en temps quasi-réel.

Corrélation de données

Outillage et automatisation

L'OSINT comme discipline défensive

L'OSINT est paradoxalement l'un des meilleurs outils défensifs disponibles : en voyant son organisation à travers les yeux d'un attaquant — en découvrant les informations qu'un acteur malveillant collecterait lors de sa phase de reconnaissance — une organisation peut anticiper et neutraliser les vecteurs d'attaque avant qu'ils ne soient exploités.

Les investigations OSINT régulières sur sa propre infrastructure (red team OSINT) révèlent systématiquement des expositions non connues des équipes IT : sous-domaines oubliés, services de développement exposés, credentials dans des dépôts publics, données d'employés dans des breaches récentes.

Pour approfondir les techniques offensives qui suivent la phase de reconnaissance OSINT, consultez nos articles sur le Kerberoasting et les attaques NTLM Relay modernes. La corrélation OSINT avec la threat intelligence est détaillée dans notre article sur les agents IA pour la threat hunting. Pour les investigations forensiques qui suivent la détection d'une compromission, notre comparatif des outils DFIR complète naturellement l'approche OSINT.

Les ressources normatives de référence incluent l'ENISA Threat Intelligence Course pour le cadre méthodologique CTI, et le NIST Cybersecurity Framework qui intègre l'OSINT dans la fonction "Identify" du cycle de sécurité.

Techniques avancées de cartographie des infrastructures

La cartographie d'une infrastructure cible constitue l'une des étapes les plus précieuses d'une investigation OSINT. L'objectif est de reconstituer l'empreinte numérique complète d'une organisation sans jamais interagir directement avec ses systèmes. Cette approche passive exploite les données publiquement disponibles dans les DNS, certificats TLS, registres WHOIS, et bases de données de routage BGP pour dessiner une topologie réseau détaillée.

Analyse BGP et ASN pour la cartographie réseau

Le protocole BGP (Border Gateway Protocol) expose publiquement des informations précieuses sur les blocs d'adresses IP appartenant à une organisation via les Autonomous System Numbers (ASN). Ces données permettent d'identifier l'intégralité des plages IP légitimement détenues par une cible, y compris les datacenters secondaires et les filiales.

#!/usr/bin/env python3
"""
BGP/ASN Recon - Cartographie des blocs IP via données BGP publiques
Sources: BGPView API, RIPE NCC, ARIN, Hurricane Electric
"""
import asyncio
import aiohttp
import ipaddress
import json
from typing import List, Dict, Set
from dataclasses import dataclass, field

@dataclass
class ASNInfo:
    asn: int
    name: str
    country: str
    prefixes_v4: List[str] = field(default_factory=list)
    prefixes_v6: List[str] = field(default_factory=list)
    peers: List[int] = field(default_factory=list)
    total_ips: int = 0

class BGPRecon:
    def __init__(self):
        self.bgpview_base = "https://api.bgpview.io"
        self.ripe_base = "https://stat.ripe.net/data"
        self.he_base = "https://bgp.he.net"
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers={"User-Agent": "OSINT-Research/1.0"},
            timeout=aiohttp.ClientTimeout(total=30)
        )
        return self
    
    async def __aexit__(self, *args):
        await self.session.close()
    
    async def search_asn_by_org(self, org_name: str) -> List[Dict]:
        """Recherche d'ASN par nom d'organisation"""
        url = f"{self.bgpview_base}/search?query_term={org_name}"
        async with self.session.get(url) as resp:
            if resp.status == 200:
                data = await resp.json()
                return data.get("data", {}).get("asns", [])
        return []
    
    async def get_asn_prefixes(self, asn: int) -> ASNInfo:
        """Récupère tous les préfixes annoncés par un ASN"""
        url = f"{self.bgpview_base}/asn/{asn}/prefixes"
        async with self.session.get(url) as resp:
            if resp.status == 200:
                data = await resp.json()
                info_url = f"{self.bgpview_base}/asn/{asn}"
                async with self.session.get(info_url) as info_resp:
                    info_data = await info_resp.json()
                    asn_data = info_data.get("data", {})
                
                prefixes_data = data.get("data", {})
                ipv4_prefixes = [p["prefix"] for p in prefixes_data.get("ipv4_prefixes", [])]
                ipv6_prefixes = [p["prefix"] for p in prefixes_data.get("ipv6_prefixes", [])]
                
                total_ips = sum(
                    ipaddress.IPv4Network(p).num_addresses 
                    for p in ipv4_prefixes
                )
                
                return ASNInfo(
                    asn=asn,
                    name=asn_data.get("name", "Unknown"),
                    country=asn_data.get("country_code", "Unknown"),
                    prefixes_v4=ipv4_prefixes,
                    prefixes_v6=ipv6_prefixes,
                    total_ips=total_ips
                )
        return ASNInfo(asn=asn, name="Unknown", country="Unknown")
    
    async def get_asn_peers(self, asn: int) -> List[int]:
        """Identifie les pairs BGP (peering relationships)"""
        url = f"{self.bgpview_base}/asn/{asn}/peers"
        async with self.session.get(url) as resp:
            if resp.status == 200:
                data = await resp.json()
                peers_v4 = data.get("data", {}).get("ipv4_peers", [])
                return [p["asn"] for p in peers_v4]
        return []
    
    async def ripe_routing_history(self, prefix: str) -> Dict:
        """Historique de routage RIPE pour un préfixe"""
        url = f"{self.ripe_base}/routing-history/data.json"
        params = {"resource": prefix, "max_rows": 100}
        async with self.session.get(url, params=params) as resp:
            if resp.status == 200:
                data = await resp.json()
                return data.get("data", {})
        return {}
    
    async def full_recon(self, org_name: str) -> Dict:
        """Reconnaissance complète d'une organisation"""
        print(f"[*] Recherche ASN pour: {org_name}")
        asns = await self.search_asn_by_org(org_name)
        
        results = {
            "organization": org_name,
            "asns": [],
            "total_prefixes_v4": 0,
            "total_ips": 0,
            "all_prefixes": []
        }
        
        tasks = [self.get_asn_prefixes(a["asn"]) for a in asns[:10]]
        asn_infos = await asyncio.gather(*tasks, return_exceptions=True)
        
        for info in asn_infos:
            if isinstance(info, ASNInfo):
                results["asns"].append({
                    "asn": info.asn,
                    "name": info.name,
                    "country": info.country,
                    "prefixes_count": len(info.prefixes_v4),
                    "total_ips": info.total_ips
                })
                results["total_prefixes_v4"] += len(info.prefixes_v4)
                results["total_ips"] += info.total_ips
                results["all_prefixes"].extend(info.prefixes_v4)
        
        print(f"[+] {len(results['asns'])} ASN trouvés")
        print(f"[+] {results['total_prefixes_v4']} préfixes IPv4")
        print(f"[+] {results['total_ips']:,} adresses IP totales")
        
        return results

async def main():
    async with BGPRecon() as recon:
        results = await recon.full_recon("Example Corp")
        print(json.dumps(results, indent=2))

if __name__ == "__main__":
    asyncio.run(main())
BGP comme source OSINT : Les tables de routage BGP sont publiques et permettent d'identifier l'intégralité des blocs IP d'une organisation, y compris les infrastructures hébergées chez des tiers. Des outils comme BGPView, Hurricane Electric BGP Toolkit, et RIPE NCC sont indispensables pour cette phase de reconnaissance. La corrélation ASN/organisation révèle souvent des filiales non documentées.

Certificate Transparency avancée et TLS fingerprinting

La Certificate Transparency (CT) est devenue l'une des sources OSINT les plus riches pour la découverte de sous-domaines et l'analyse des infrastructures TLS. Depuis 2018, tout certificat TLS émis par une CA publique doit être enregistré dans des logs CT publics, créant une base de données exhaustive et historique de tous les noms de domaines certifiés.

#!/usr/bin/env python3
"""
Certificate Transparency Mining - Analyse avancée des logs CT
Sources multiples: crt.sh, CertSpotter, Google CT, DigiCert CT
"""
import asyncio
import aiohttp
import re
import json
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, Dict, Set, Tuple

class CTLogMiner:
    def __init__(self, domain: str):
        self.domain = domain
        self.subdomains: Set[str] = set()
        self.wildcard_certs: List[Dict] = []
        self.san_networks: Dict[str, List[str]] = defaultdict(list)
        
    async def query_crtsh(self, session: aiohttp.ClientSession) -> List[Dict]:
        """Query crt.sh avec déduplication et analyse SAN"""
        url = f"https://crt.sh/?q=%.{self.domain}&output=json"
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp:
            if resp.status == 200:
                certs = await resp.json()
                return certs
        return []
    
    async def query_certspotter(self, session: aiohttp.ClientSession) -> List[Dict]:
        """CertSpotter API - données en temps réel"""
        url = f"https://api.certspotter.com/v1/issuances"
        params = {
            "domain": self.domain,
            "include_subdomains": "true",
            "expand": "dns_names",
            "after": (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%dT%H:%M:%SZ")
        }
        async with session.get(url, params=params) as resp:
            if resp.status == 200:
                return await resp.json()
        return []
    
    def extract_subdomains_from_cert(self, cert: Dict) -> List[str]:
        """Extrait tous les noms du champ name_value"""
        name_value = cert.get("name_value", "")
        names = set()
        for name in name_value.split("\n"):
            name = name.strip().lower()
            if name.endswith(f".{self.domain}") or name == self.domain:
                if name.startswith("*."):
                    self.wildcard_certs.append({
                        "wildcard": name,
                        "issuer": cert.get("issuer_name", ""),
                        "not_before": cert.get("not_before", ""),
                        "not_after": cert.get("not_after", "")
                    })
                    name = name[2:]
                names.add(name)
        return list(names)
    
    def analyze_naming_patterns(self) -> Dict:
        """Analyse les patterns de nommage pour inférer l'architecture"""
        patterns = defaultdict(list)
        env_indicators = {
            "prod": ["prod", "production", "live"],
            "staging": ["staging", "stage", "stg", "preprod", "pre-prod"],
            "dev": ["dev", "development", "sandbox", "test", "qa"],
            "internal": ["internal", "intranet", "corp", "int"],
            "api": ["api", "apis", "rest", "graphql", "v1", "v2"],
            "admin": ["admin", "admincp", "console", "management", "portal"],
            "mail": ["mail", "smtp", "imap", "pop3", "webmail", "mx"],
            "cdn": ["cdn", "static", "assets", "media", "img", "images"],
            "vpn": ["vpn", "remote", "access", "gateway", "jump"],
        }
        
        for subdomain in self.subdomains:
            parts = subdomain.split(".")
            prefix = parts[0] if parts else ""
            
            for category, keywords in env_indicators.items():
                if any(kw in prefix.lower() for kw in keywords):
                    patterns[category].append(subdomain)
        
        return dict(patterns)
    
    def detect_cloud_providers(self) -> Dict[str, List[str]]:
        """Détecte les providers cloud via les patterns de nommage"""
        cloud_patterns = {
            "AWS": [r"\.amazonaws\.com$", r"cloudfront\.net$", r"elb\.amazonaws\.com$"],
            "Azure": [r"\.azurewebsites\.net$", r"\.cloudapp\.azure\.com$", r"trafficmanager\.net$"],
            "GCP": [r"\.appspot\.com$", r"\.googleapis\.com$", r"\.run\.app$"],
            "Cloudflare": [r"\.pages\.dev$"],
            "Vercel": [r"\.vercel\.app$", r"\.now\.sh$"],
            "Heroku": [r"\.herokuapp\.com$"],
        }
        
        cloud_usage = defaultdict(list)
        for subdomain in self.subdomains:
            for provider, patterns in cloud_patterns.items():
                for pattern in patterns:
                    if re.search(pattern, subdomain):
                        cloud_usage[provider].append(subdomain)
        
        return dict(cloud_usage)
    
    async def full_ct_analysis(self) -> Dict:
        """Analyse CT complète avec corrélation multi-sources"""
        async with aiohttp.ClientSession(
            headers={"User-Agent": "OSINT-Research/1.0"}
        ) as session:
            # Requêtes parallèles sur multiple CT logs
            crtsh_certs, certspotter_certs = await asyncio.gather(
                self.query_crtsh(session),
                self.query_certspotter(session),
                return_exceptions=True
            )
            
            # Traitement crt.sh
            if isinstance(crtsh_certs, list):
                for cert in crtsh_certs:
                    subdomains = self.extract_subdomains_from_cert(cert)
                    self.subdomains.update(subdomains)
            
            # Traitement CertSpotter
            if isinstance(certspotter_certs, list):
                for cert in certspotter_certs:
                    for dns_name in cert.get("dns_names", []):
                        if dns_name.endswith(f".{self.domain}"):
                            self.subdomains.add(dns_name.lower())
            
            patterns = self.analyze_naming_patterns()
            cloud = self.detect_cloud_providers()
            
            return {
                "domain": self.domain,
                "total_subdomains": len(self.subdomains),
                "subdomains": sorted(self.subdomains),
                "wildcard_certs": self.wildcard_certs,
                "naming_patterns": patterns,
                "cloud_providers": cloud,
                "infrastructure_insights": self._generate_insights(patterns, cloud)
            }
    
    def _generate_insights(self, patterns: Dict, cloud: Dict) -> List[str]:
        insights = []
        if patterns.get("dev"):
            insights.append(f"Environnements dev exposés: {patterns['dev'][:3]}")
        if patterns.get("admin"):
            insights.append(f"Interfaces d'administration détectées: {patterns['admin'][:3]}")
        if patterns.get("vpn"):
            insights.append(f"Points d'accès distants: {patterns['vpn'][:3]}")
        if cloud:
            providers = list(cloud.keys())
            insights.append(f"Multi-cloud détecté: {', '.join(providers)}")
        if self.wildcard_certs:
            insights.append(f"{len(self.wildcard_certs)} certificats wildcard trouvés")
        return insights

async def main():
    miner = CTLogMiner("example.com")
    results = await miner.full_ct_analysis()
    print(json.dumps(results, indent=2))
    
    print(f"\n[INSIGHTS]")
    for insight in results["infrastructure_insights"]:
        print(f"  ! {insight}")

if __name__ == "__main__":
    asyncio.run(main())

OSINT sur les réseaux sociaux professionnels

LinkedIn reste la source la plus riche pour l'OSINT sur les organisations et leurs employés, mais son utilisation directe via scraping viole les CGU et peut constituer une infraction. L'approche éthique et légale passe par les moteurs de recherche (Google, Bing) qui indexent les profils publics LinkedIn, et par des APIs officielles limitées.

LinkedIn OSINT éthique : L'utilisation de Google dorks comme site:linkedin.com/in/ "example-corp" "RSSI" ou site:linkedin.com/in/ "example-corp" filetype:pdf permet de cartographier les équipes techniques d'une organisation sans créer de compte LinkedIn ni enfreindre les CGU. La combinaison avec Hunter.io pour la validation des patterns d'emails permet d'identifier des contacts pertinents pour le red teaming social engineering.
#!/usr/bin/env python3
"""
Social Engineering Intelligence - Cartographie organisationnelle
Méthodes : Google dorks (légal), LinkedIn public profiles, job postings analysis
"""
import asyncio
import aiohttp
import re
import json
from typing import List, Dict, Set
from dataclasses import dataclass, field

@dataclass
class Employee:
    name: str
    title: str
    department: str = ""
    linkedin_url: str = ""
    email_guess: str = ""
    technologies_mentioned: List[str] = field(default_factory=list)

class OrgIntelligence:
    def __init__(self, company: str, domain: str):
        self.company = company
        self.domain = domain
        self.employees: List[Employee] = []
        self.technologies: Set[str] = set()
        self.org_structure: Dict[str, List[str]] = {}
        
    async def analyze_job_postings(self, session: aiohttp.ClientSession) -> List[Dict]:
        """
        Analyse des offres d'emploi pour cartographier la stack technique.
        Les job postings révèlent : technos utilisées, outils de sécurité,
        certifications requises, taille des équipes, projets en cours.
        Source légale : pages carrières publiques
        """
        # Exemple: récupérer les offres d'emploi via l'API publique Indeed ou LinkedIn
        tech_indicators = {
            "SIEM": ["splunk", "qradar", "sentinel", "elastic siem", "arcsight"],
            "EDR": ["crowdstrike", "sentinelone", "cylance", "carbon black", "defender atp"],
            "Cloud": ["aws", "azure", "gcp", "terraform", "kubernetes", "docker"],
            "DevSecOps": ["devsecops", "sast", "dast", "sca", "sonarqube", "checkmarx"],
            "Frameworks": ["zero trust", "nist", "iso 27001", "soc 2", "pci dss"],
            "Languages": ["python", "golang", "rust", "java", "c++", "powershell"],
            "Databases": ["mysql", "postgresql", "mongodb", "redis", "elasticsearch"],
        }
        
        detected = {}
        job_text = ""  # Simulé - en pratique, scraper les pages carrières publiques
        
        for category, keywords in tech_indicators.items():
            found = [kw for kw in keywords if kw.lower() in job_text.lower()]
            if found:
                detected[category] = found
                self.technologies.update(found)
        
        return [{"category": k, "technologies": v} for k, v in detected.items()]
    
    def infer_email_patterns(self, known_emails: List[str], 
                              first_name: str, last_name: str) -> List[str]:
        """
        Infère le pattern d'email d'une organisation depuis des emails connus.
        Génère toutes les variantes possibles pour un nouveau contact.
        """
        patterns = [
            f"{first_name.lower()}.{last_name.lower()}@{self.domain}",
            f"{first_name.lower()}{last_name.lower()}@{self.domain}",
            f"{first_name[0].lower()}{last_name.lower()}@{self.domain}",
            f"{first_name.lower()}{last_name[0].lower()}@{self.domain}",
            f"{last_name.lower()}.{first_name.lower()}@{self.domain}",
            f"{first_name[0].lower()}.{last_name.lower()}@{self.domain}",
        ]
        return patterns
    
    def detect_pattern_from_samples(self, sample_emails: List[str]) -> str:
        """Détecte le pattern dominant à partir d'emails connus"""
        patterns_count = {}
        for email in sample_emails:
            local = email.split("@")[0].lower()
            # Analyser la structure du local part
            if "." in local:
                parts = local.split(".")
                if len(parts) == 2:
                    if len(parts[0]) == 1:
                        patterns_count["f.lastname"] = patterns_count.get("f.lastname", 0) + 1
                    elif len(parts[1]) == 1:
                        patterns_count["firstname.l"] = patterns_count.get("firstname.l", 0) + 1
                    else:
                        patterns_count["firstname.lastname"] = patterns_count.get("firstname.lastname", 0) + 1
            else:
                patterns_count["firstnamelastname"] = patterns_count.get("firstnamelastname", 0) + 1
        
        if patterns_count:
            return max(patterns_count, key=patterns_count.get)
        return "unknown"
    
    async def correlate_breached_emails(self, emails: List[str]) -> Dict:
        """
        Vérifie si des emails apparaissent dans des bases de données de fuites
        via l'API HIBP (Have I Been Pwned) avec méthode k-anonymity.
        """
        import hashlib
        results = {}
        
        async with aiohttp.ClientSession() as session:
            for email in emails[:5]:  # Limiter pour demo
                sha1_hash = hashlib.sha1(email.encode()).hexdigest().upper()
                prefix = sha1_hash[:5]
                suffix = sha1_hash[5:]
                
                url = f"https://api.pwnedpasswords.com/range/{prefix}"
                try:
                    async with session.get(url) as resp:
                        if resp.status == 200:
                            hashes = await resp.text()
                            found = suffix in hashes
                            results[email] = {"breached": found}
                except Exception as e:
                    results[email] = {"error": str(e)}
        
        return results

    def generate_report(self) -> Dict:
        """Génère un rapport de renseignement organisationnel"""
        depts = {}
        for emp in self.employees:
            dept = emp.department or "Unknown"
            if dept not in depts:
                depts[dept] = []
            depts[dept].append({"name": emp.name, "title": emp.title})
        
        return {
            "company": self.company,
            "domain": self.domain,
            "total_employees_identified": len(self.employees),
            "departments": depts,
            "technology_stack": list(self.technologies),
            "attack_surface_indicators": {
                "exposed_tech_stack": len(self.technologies) > 5,
                "admin_emails_found": any("admin" in e.title.lower() or "it" in e.title.lower() 
                                           for e in self.employees),
                "c_suite_identified": any(title in e.title.lower() 
                                          for e in self.employees 
                                          for title in ["ciso", "cto", "ceo", "director"])
            }
        }

Surveillance du Dark Web et des forums criminels

La surveillance du dark web dans un cadre professionnel de CTI (Cyber Threat Intelligence) vise à détecter les mentions d'une organisation cible dans des forums criminels, des marketplaces d'accès initiaux, ou des plateformes de vente de données volées. Cette activité est légale lorsqu'elle est réalisée dans un but défensif par des équipes de sécurité ou des fournisseurs CTI accrédités.

#!/usr/bin/env python3
"""
Dark Web Intelligence Monitor - Surveillance défensive des marketplaces
AVERTISSEMENT: Utiliser uniquement dans un cadre légal et éthique.
Consulter un juriste avant toute opération sur le dark web.
"""
import asyncio
import aiohttp
import json
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
import re

class DarkWebMonitor:
    """
    Surveille les mentions d'une organisation via:
    1. Tor2web proxies (légal, sans Tor requis)
    2. API d'intel commercial (Recorded Future, Intel471, etc.)
    3. Paste sites publics (Pastebin, GitHub Gists filtrés)
    4. Telegram monitoring (canaux publics)
    """
    
    def __init__(self, company: str, domain: str, keywords: List[str]):
        self.company = company
        self.domain = domain
        self.keywords = keywords
        self.alerts: List[Dict] = []
    
    async def monitor_paste_sites(self, session: aiohttp.ClientSession) -> List[Dict]:
        """Surveille les paste sites publics pour des mentions de données sensibles"""
        # Pastebin search via Google dorks (légal, public)
        dorks = [
            f'site:pastebin.com "{self.domain}"',
            f'site:paste.debian.net "{self.domain}"',
            f'site:privatebin.net "{self.domain}"',
            f'site:hastebin.com "{self.domain}"',
        ]
        
        findings = []
        # En production : utiliser SerpAPI pour exécuter ces dorks
        # et analyser les résultats pour des patterns sensibles
        
        sensitive_patterns = {
            "credentials": r"password[:\s]+\S+|passwd[:\s]+\S+|pwd[:\s]+\S+",
            "api_keys": r"[A-Za-z0-9]{32,64}",
            "credit_cards": r"\b4[0-9]{12}(?:[0-9]{3})?\b",
            "emails_bulk": r"[\w.+-]+@" + re.escape(self.domain),
        }
        
        return findings
    
    async def check_breach_databases(self, session: aiohttp.ClientSession) -> Dict:
        """
        Vérifie si le domaine apparaît dans des bases de fuites connues
        via des APIs légales : HIBP, DeHashed (avec abonnement), etc.
        """
        results = {
            "hibp_domain_check": None,
            "dehashed_results": None,
            "intelx_results": None
        }
        
        # HIBP Domain Search (nécessite clé API)
        hibp_url = f"https://haveibeenpwned.com/api/v3/breacheddomain/{self.domain}"
        headers = {
            "hibp-api-key": "YOUR_API_KEY_HERE",
            "user-agent": "OSINT-CTI-Monitor/1.0"
        }
        
        try:
            async with session.get(hibp_url, headers=headers) as resp:
                if resp.status == 200:
                    results["hibp_domain_check"] = {
                        "breached": True,
                        "data": await resp.json()
                    }
                elif resp.status == 404:
                    results["hibp_domain_check"] = {"breached": False}
        except Exception as e:
            results["hibp_domain_check"] = {"error": str(e)}
        
        return results
    
    async def monitor_telegram_channels(self, channel_ids: List[str]) -> List[Dict]:
        """
        Surveille les canaux Telegram publics pour des mentions cibles.
        Utilise l'API Telegram officielle (légal pour les canaux publics).
        """
        # Exemple avec telegram-mtproto API
        findings = []
        
        # Pattern matching pour données sensibles
        patterns = {
            "credential_dump": r"login[:\s]+\S+.*password[:\s]+\S+",
            "database_dump": r"INSERT INTO|CREATE TABLE|SELECT \*",
            "employee_data": r"@" + re.escape(self.domain.split(".")[0]),
        }
        
        return findings
    
    def calculate_risk_score(self, findings: List[Dict]) -> int:
        """
        Calcule un score de risque basé sur les découvertes
        Score 0-100, seuils: 0-25 faible, 26-50 modéré, 51-75 élevé, 76-100 critique
        """
        score = 0
        weights = {
            "active_credentials": 30,
            "database_dump": 25,
            "source_code": 20,
            "employee_pii": 15,
            "infrastructure_details": 10,
        }
        
        for finding in findings:
            finding_type = finding.get("type", "")
            score += weights.get(finding_type, 5)
        
        return min(score, 100)
    
    def generate_alert(self, finding: Dict, severity: str) -> Dict:
        """Génère une alerte structurée au format STIX-like"""
        return {
            "id": hashlib.sha256(json.dumps(finding, sort_keys=True).encode()).hexdigest()[:16],
            "timestamp": datetime.utcnow().isoformat(),
            "severity": severity,
            "company": self.company,
            "finding": finding,
            "recommended_actions": self._get_recommendations(finding.get("type")),
            "tlp": "TLP:AMBER"
        }
    
    def _get_recommendations(self, finding_type: str) -> List[str]:
        recommendations = {
            "active_credentials": [
                "Forcer le changement de mot de passe immédiatement",
                "Activer l'authentification MFA si pas déjà en place",
                "Vérifier les journaux d'accès pour usage malveillant",
                "Notifier le RSSI et le DPO"
            ],
            "database_dump": [
                "Évaluer le périmètre des données exposées",
                "Notifier la CNIL dans les 72h si données personnelles (RGPD Art. 33)",
                "Engager une cellule de crise",
                "Préserver les preuves pour investigation forensique"
            ],
            "source_code": [
                "Auditer le code exposé pour des secrets hardcodés",
                "Révoquer toutes les clés API présentes dans le code",
                "Identifier l'origine de la fuite (insider, CI/CD leak)"
            ],
        }
        return recommendations.get(finding_type, ["Analyser manuellement la découverte"])

Corrélation et enrichissement des données OSINT

La valeur ajoutée d'une investigation OSINT réside dans la capacité à corréler des données provenant de sources disparates pour produire un renseignement actionnable. Un nom de domaine peut être corrélé avec un certificat TLS, qui révèle un sous-domaine, lui-même corrélé avec une IP Shodan, qui pointe vers un service vulnérable exposé.

#!/usr/bin/env python3
"""
OSINT Correlation Engine - Graphe de connaissances multi-sources
Corrèle : DNS, BGP, CT logs, Shodan, WHOIS, job postings, social media
"""
import asyncio
import json
from collections import defaultdict
from typing import List, Dict, Set, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum

class NodeType(Enum):
    DOMAIN = "domain"
    IP = "ip"
    ASN = "asn"
    CERTIFICATE = "certificate"
    EMAIL = "email"
    PERSON = "person"
    ORGANIZATION = "organization"
    TECHNOLOGY = "technology"
    VULNERABILITY = "vulnerability"

@dataclass
class OSINTNode:
    node_id: str
    node_type: NodeType
    value: str
    attributes: Dict[str, Any] = field(default_factory=dict)
    confidence: float = 1.0
    sources: List[str] = field(default_factory=list)

@dataclass
class OSINTEdge:
    source_id: str
    target_id: str
    relationship: str
    weight: float = 1.0
    evidence: List[str] = field(default_factory=list)

class OSINTGraph:
    """Graphe de connaissances OSINT avec moteur de corrélation"""
    
    def __init__(self):
        self.nodes: Dict[str, OSINTNode] = {}
        self.edges: List[OSINTEdge] = []
        self.adjacency: Dict[str, Set[str]] = defaultdict(set)
    
    def add_node(self, node: OSINTNode) -> str:
        existing = self.nodes.get(node.node_id)
        if existing:
            # Fusion des attributs et sources
            existing.attributes.update(node.attributes)
            existing.sources.extend(node.sources)
            existing.confidence = max(existing.confidence, node.confidence)
        else:
            self.nodes[node.node_id] = node
        return node.node_id
    
    def add_edge(self, edge: OSINTEdge):
        self.edges.append(edge)
        self.adjacency[edge.source_id].add(edge.target_id)
        self.adjacency[edge.target_id].add(edge.source_id)
    
    def find_shortest_path(self, start_id: str, end_id: str) -> List[str]:
        """BFS pour trouver le chemin le plus court entre deux entités"""
        if start_id not in self.nodes or end_id not in self.nodes:
            return []
        
        visited = {start_id}
        queue = [(start_id, [start_id])]
        
        while queue:
            current, path = queue.pop(0)
            if current == end_id:
                return path
            
            for neighbor in self.adjacency[current]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append((neighbor, path + [neighbor]))
        
        return []
    
    def get_neighbors_by_type(self, node_id: str, 
                               node_type: NodeType) -> List[OSINTNode]:
        """Récupère les voisins d'un type spécifique"""
        neighbors = []
        for neighbor_id in self.adjacency[node_id]:
            neighbor = self.nodes.get(neighbor_id)
            if neighbor and neighbor.node_type == node_type:
                neighbors.append(neighbor)
        return neighbors
    
    def calculate_centrality(self) -> Dict[str, float]:
        """Calcule la centralité de degré pour identifier les pivots"""
        centrality = {}
        for node_id in self.nodes:
            centrality[node_id] = len(self.adjacency[node_id]) / max(len(self.nodes) - 1, 1)
        return dict(sorted(centrality.items(), key=lambda x: x[1], reverse=True))
    
    def identify_attack_paths(self, target_id: str) -> List[Dict]:
        """Identifie les chemins d'attaque potentiels vers une cible"""
        paths = []
        
        # Trouver tous les nœuds de vulnérabilités
        vuln_nodes = [n for n in self.nodes.values() 
                      if n.node_type == NodeType.VULNERABILITY]
        
        for vuln_node in vuln_nodes:
            path = self.find_shortest_path(vuln_node.node_id, target_id)
            if path and len(path) <= 5:  # Chemins courts = risque élevé
                paths.append({
                    "starting_point": vuln_node.value,
                    "path": [self.nodes[nid].value for nid in path],
                    "hop_count": len(path),
                    "risk_score": max(0, 100 - (len(path) * 15))
                })
        
        return sorted(paths, key=lambda x: x["risk_score"], reverse=True)
    
    def export_to_maltego(self) -> List[Dict]:
        """Exporte le graphe au format Maltego CSV pour visualisation"""
        maltego_data = []
        for edge in self.edges:
            source = self.nodes.get(edge.source_id)
            target = self.nodes.get(edge.target_id)
            if source and target:
                maltego_data.append({
                    "Source": f"{source.node_type.value}:{source.value}",
                    "Target": f"{target.node_type.value}:{target.value}",
                    "Type": edge.relationship,
                    "Weight": edge.weight
                })
        return maltego_data

class OSINTCorrelationEngine:
    """Moteur de corrélation automatique multi-sources"""
    
    def __init__(self):
        self.graph = OSINTGraph()
        self.correlation_rules = []
        self._register_default_rules()
    
    def _register_default_rules(self):
        """Règles de corrélation automatique"""
        self.correlation_rules = [
            {
                "name": "IP-to-Domain via rDNS",
                "condition": lambda n: n.node_type == NodeType.IP,
                "action": "rdns_lookup",
                "creates_edge": "RESOLVES_TO"
            },
            {
                "name": "Domain-to-Certificate via CT",
                "condition": lambda n: n.node_type == NodeType.DOMAIN,
                "action": "ct_lookup",
                "creates_edge": "HAS_CERTIFICATE"
            },
            {
                "name": "IP-to-Vulnerability via Shodan",
                "condition": lambda n: n.node_type == NodeType.IP,
                "action": "shodan_lookup",
                "creates_edge": "EXPOSES"
            },
        ]
    
    async def ingest_from_sources(self, target_domain: str) -> OSINTGraph:
        """Pipeline d'ingestion multi-sources avec corrélation automatique"""
        # Nœud racine
        root_node = OSINTNode(
            node_id=f"domain:{target_domain}",
            node_type=NodeType.DOMAIN,
            value=target_domain,
            sources=["initial_target"]
        )
        self.graph.add_node(root_node)
        
        # Simulation d'enrichissement progressif
        # En production : appels async à toutes les sources
        
        return self.graph
    
    def generate_executive_report(self, target: str) -> Dict:
        """Génère un rapport exécutif avec les findings critiques"""
        centrality = self.graph.calculate_centrality()
        pivot_nodes = list(centrality.items())[:5]
        
        attack_paths = self.graph.identify_attack_paths(f"domain:{target}")
        
        return {
            "executive_summary": {
                "total_assets_discovered": len(self.graph.nodes),
                "relationships_mapped": len(self.graph.edges),
                "critical_pivot_points": [
                    {"entity": self.graph.nodes[nid].value, "centrality_score": score}
                    for nid, score in pivot_nodes if nid in self.graph.nodes
                ],
                "highest_risk_attack_paths": attack_paths[:3],
            },
            "recommendations": self._generate_recommendations(attack_paths)
        }
    
    def _generate_recommendations(self, attack_paths: List[Dict]) -> List[str]:
        recs = []
        if any(p["hop_count"] <= 2 for p in attack_paths):
            recs.append("CRITIQUE: Des chemins d'attaque directs ont été identifiés - action immédiate requise")
        if any(p["hop_count"] <= 4 for p in attack_paths):
            recs.append("Renforcer la segmentation réseau pour allonger les chemins d'attaque")
        recs.append("Surveiller les pivots identifiés avec des alertes SIEM dédiées")
        recs.append("Réaliser un pentest ciblé sur les assets à haute centralité")
        return recs

Outils OSINT open source : comparatif et intégration

Outil Type Forces Limites Intégration
Maltego Graphe visuel Visualisation, transforms OSINT, community édition Coûteux (pro), nécessite transforms payants API MTDS, Python transforms
Shodan Moteur de recherche IoT Couverture IPv4 complète, CVE tagging, historique Données parfois anciennes, payant pour volume REST API, Python library
SpiderFoot Automation OSINT 200+ modules, auto-corrélation, UI web Faux positifs, lent sur gros périmètres API REST, CLI, Docker
OSINT Framework Annuaire de sources Exhaustif, catégorisé, gratuit Pas d'automation, juste un répertoire N/A (web uniquement)
theHarvester Collecte d'emails/DNS Multi-sources, simple, Python Rate limiting, pas de corrélation CLI, librairie Python
Recon-ng Framework modulaire Base de données SQLite, modules OSINT Interface CLI complexe, doc limitée Modules Python, API
Amass Enum de sous-domaines Exhaustif, active+passive, graphe intégré Lent, consommateur de ressources CLI, API JSON, Docker
Censys Scan Internet TLS deep scan, precision élevée Quota API restrictif en gratuit Python library, REST API

OPSEC pour investigateurs OSINT

L'OPSEC (Operational Security) de l'investigateur est critique : chaque requête DNS, chaque connexion HTTP peut potentiellement être enregistrée par la cible ou des tiers. Une investigation mal menée peut alerter la cible, contaminer la preuve judiciaire, ou exposer l'identité de l'investigateur.

#!/bin/bash
# OPSEC Setup pour investigation OSINT
# Environnement isolé avec rotation d'identité

# 1. VM dédiée sans lien avec l'identité réelle
# Utiliser Whonix ou Tails pour les investigations sensibles

# 2. VPN + Tor en cascade (VPN over Tor)
# Note: Tor over VPN est différent et moins protecteur
mullvad connect  # VPN d'abord
# Puis router le trafic via Tor

# 3. DNS chiffré via DoH/DoT (pas de fuite DNS)
# /etc/systemd/resolved.conf
cat << 'EOF' > /etc/systemd/resolved.conf.d/dns-over-tls.conf
[Resolve]
DNS=1.1.1.1#cloudflare-dns.com 8.8.8.8#dns.google
DNSOverTLS=yes
DNSSEC=yes
EOF
systemctl restart systemd-resolved

# 4. User-Agent rotatif pour les requêtes HTTP
AGENTS=(
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/537.36"
    "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/119.0"
)
RANDOM_AGENT="${AGENTS[$RANDOM % ${#AGENTS[@]}]}"

# 5. Timestamps artificiels pour les fichiers créés
# (évite de révéler les heures de travail)
touch -t 202401010900 /tmp/investigation_notes.txt

# 6. Chiffrement du workspace d'investigation
cryptsetup luksFormat /dev/sdb1 --label osint-workspace
cryptsetup luksOpen /dev/sdb1 osint-workspace
mkfs.ext4 /dev/mapper/osint-workspace
mount /dev/mapper/osint-workspace /mnt/osint

# 7. Cloisonnement des requêtes par persona
# Jamais de mélange entre investigation et navigation personnelle
# Utiliser des profils Firefox dédiés avec cookies/history effacés

# 8. Rate limiting pour éviter la détection
# Introduire des délais aléatoires entre les requêtes
python3 -c "
import time, random
def rate_limited_request(url, min_delay=2, max_delay=8):
    import urllib.request
    time.sleep(random.uniform(min_delay, max_delay))
    return urllib.request.urlopen(url)
"

# 9. Journalisation de l'investigation (chain of custody)
mkdir -p /mnt/osint/logs
cat << 'EOF' > /mnt/osint/investigation.log
=== DÉBUT INVESTIGATION ===
Date/Heure UTC: $(date -u)
Opérateur: [hash de l'identifiant]
Cible: [à remplir]
Mandat légal: [référence document]
EOF

echo "[+] Environnement OPSEC configuré"
echo "[!] Rappel: Documenter TOUTES les actions pour la traçabilité légale"

OSINT et renseignement sur les menaces (CTI)

L'OSINT est le socle du renseignement sur les menaces (CTI — Cyber Threat Intelligence). Contrairement à l'OSINT offensif ciblant une organisation spécifique, le CTI OSINT surveille les acteurs malveillants, les campagnes d'attaques, et les indicateurs de compromission (IoC) pour nourrir les défenses en amont.

Méthodologie structurée

Exploitation des résultats

Bonnes pratiques opérationnelles

Pyramide de la douleur CTI : Les indicateurs les plus faciles à collecter via OSINT (hashes de fichiers, IPs malveillantes) sont aussi les moins durables pour l'attaquant, qui peut les changer en quelques minutes. Les indicateurs de haute valeur — TTP (Tactiques, Techniques et Procédures), outils, comportements — sont difficiles à changer et constituent le vrai objectif d'une investigation CTI mature. MITRE ATT&CK fournit le vocabulaire standardisé pour décrire ces TTP.
#!/usr/bin/env python3
"""
CTI OSINT Aggregator - Collecte et enrichissement d'IoCs
Sources: VirusTotal, AlienVault OTX, ThreatFox, URLhaus, MISP
"""
import asyncio
import aiohttp
import json
import ipaddress
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Set
from dataclasses import dataclass, field
import hashlib

@dataclass
class ThreatIndicator:
    ioc_type: str  # ip, domain, hash, url, email
    value: str
    threat_type: str = ""
    malware_family: str = ""
    confidence: int = 0  # 0-100
    first_seen: str = ""
    last_seen: str = ""
    tags: List[str] = field(default_factory=list)
    sources: List[str] = field(default_factory=list)
    mitre_ttps: List[str] = field(default_factory=list)
    kill_chain_phases: List[str] = field(default_factory=list)

class CTIAggregator:
    def __init__(self, vt_api_key: str = "", otx_api_key: str = ""):
        self.vt_api_key = vt_api_key
        self.otx_api_key = otx_api_key
        self.ioc_cache: Dict[str, ThreatIndicator] = {}
    
    async def enrich_ip(self, ip: str, 
                         session: aiohttp.ClientSession) -> ThreatIndicator:
        """Enrichit une IP avec des données CTI multi-sources"""
        indicator = ThreatIndicator(ioc_type="ip", value=ip)
        
        tasks = [
            self._query_abuseipdb(ip, session),
            self._query_otx(ip, "ip", session),
            self._query_shodan_ioc(ip, session),
        ]
        
        if self.vt_api_key:
            tasks.append(self._query_virustotal(ip, "ip-addresses", session))
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for result in results:
            if isinstance(result, dict) and "confidence" in result:
                indicator.confidence = max(indicator.confidence, result["confidence"])
                indicator.sources.extend(result.get("sources", []))
                indicator.tags.extend(result.get("tags", []))
                if result.get("malware"):
                    indicator.malware_family = result["malware"]
        
        # Déduplication
        indicator.tags = list(set(indicator.tags))
        indicator.sources = list(set(indicator.sources))
        
        return indicator
    
    async def _query_abuseipdb(self, ip: str, 
                                session: aiohttp.ClientSession) -> Dict:
        """AbuseIPDB - Score de réputation des IPs"""
        url = "https://api.abuseipdb.com/api/v2/check"
        params = {"ipAddress": ip, "maxAgeInDays": 90}
        headers = {"Key": "YOUR_ABUSEIPDB_KEY", "Accept": "application/json"}
        
        try:
            async with session.get(url, params=params, headers=headers) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    abuse_score = data.get("data", {}).get("abuseConfidenceScore", 0)
                    return {
                        "confidence": abuse_score,
                        "sources": ["abuseipdb"],
                        "tags": ["malicious_ip"] if abuse_score > 50 else []
                    }
        except Exception:
            pass
        return {}
    
    async def _query_otx(self, indicator: str, 
                          ioc_type: str, 
                          session: aiohttp.ClientSession) -> Dict:
        """AlienVault OTX - Threat pulses"""
        type_map = {"ip": "IPv4", "domain": "domain", "hash": "file"}
        otx_type = type_map.get(ioc_type, ioc_type)
        
        url = f"https://otx.alienvault.com/api/v1/indicators/{otx_type}/{indicator}/general"
        headers = {"X-OTX-API-KEY": self.otx_api_key}
        
        try:
            async with session.get(url, headers=headers) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    pulse_count = data.get("pulse_info", {}).get("count", 0)
                    return {
                        "confidence": min(pulse_count * 10, 100),
                        "sources": ["alienVault_otx"],
                        "tags": [t["name"] for t in data.get("pulse_info", {}).get("tags", [])][:5]
                    }
        except Exception:
            pass
        return {}
    
    async def _query_shodan_ioc(self, ip: str, 
                                 session: aiohttp.ClientSession) -> Dict:
        """Shodan - Services exposés pour contextualisation CTI"""
        # Données Shodan enrichissent le contexte infrastructure
        return {"confidence": 0, "sources": [], "tags": []}
    
    async def _query_virustotal(self, indicator: str, 
                                 ioc_type: str, 
                                 session: aiohttp.ClientSession) -> Dict:
        """VirusTotal - Analyse multi-AV"""
        url = f"https://www.virustotal.com/api/v3/{ioc_type}/{indicator}"
        headers = {"x-apikey": self.vt_api_key}
        
        try:
            async with session.get(url, headers=headers) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    stats = data.get("data", {}).get("attributes", {}).get("last_analysis_stats", {})
                    malicious = stats.get("malicious", 0)
                    total = sum(stats.values()) or 1
                    confidence = int((malicious / total) * 100)
                    return {
                        "confidence": confidence,
                        "sources": ["virustotal"],
                        "tags": ["vt_detection"] if malicious > 5 else [],
                        "malware": data.get("data", {}).get("attributes", {}).get("popular_threat_label", "")
                    }
        except Exception:
            pass
        return {}
    
    async def bulk_enrich(self, indicators: List[Dict]) -> List[ThreatIndicator]:
        """Enrichissement en masse avec rate limiting"""
        enriched = []
        
        async with aiohttp.ClientSession(
            headers={"User-Agent": "CTI-Platform/1.0"}
        ) as session:
            # Traiter par batch de 10 pour respecter les rate limits
            for i in range(0, len(indicators), 10):
                batch = indicators[i:i+10]
                tasks = []
                
                for ioc in batch:
                    if ioc["type"] == "ip":
                        tasks.append(self.enrich_ip(ioc["value"], session))
                    # Ajouter d'autres types (domain, hash, url)
                
                batch_results = await asyncio.gather(*tasks, return_exceptions=True)
                enriched.extend([r for r in batch_results if isinstance(r, ThreatIndicator)])
                
                # Rate limiting
                await asyncio.sleep(1)
        
        return enriched
    
    def export_to_stix(self, indicators: List[ThreatIndicator]) -> Dict:
        """Export au format STIX 2.1 pour partage CTI"""
        bundle = {
            "type": "bundle",
            "id": f"bundle--{hashlib.uuid4_compatible()}",
            "spec_version": "2.1",
            "objects": []
        }
        
        for ioc in indicators:
            if ioc.confidence > 50:  # Seuil de confiance minimum
                indicator_obj = {
                    "type": "indicator",
                    "spec_version": "2.1",
                    "id": f"indicator--{hashlib.sha256(ioc.value.encode()).hexdigest()[:36]}",
                    "name": f"{ioc.ioc_type}: {ioc.value}",
                    "indicator_types": [ioc.threat_type or "malicious-activity"],
                    "pattern": self._value_to_stix_pattern(ioc),
                    "valid_from": datetime.utcnow().isoformat() + "Z",
                    "confidence": ioc.confidence,
                    "labels": ioc.tags,
                }
                bundle["objects"].append(indicator_obj)
        
        return bundle
    
    def _value_to_stix_pattern(self, ioc: ThreatIndicator) -> str:
        if ioc.ioc_type == "ip":
            return f"[ipv4-addr:value = '{ioc.value}']"
        elif ioc.ioc_type == "domain":
            return f"[domain-name:value = '{ioc.value}']"
        elif ioc.ioc_type == "hash":
            return f"[file:hashes.'SHA-256' = '{ioc.value}']"
        elif ioc.ioc_type == "url":
            return f"[url:value = '{ioc.value}']"
        return f"[artifact:value = '{ioc.value}']"

Cadre méthodologique OSINT : formaliser une investigation

Une investigation OSINT professionnelle ne peut pas être improvisée. Elle doit s'appuyer sur une méthodologie structurée qui garantit la reproductibilité, la légalité, et la recevabilité judiciaire des preuves collectées. Le modèle PIAM (Planning, Investigation, Analysis, Mitigation) adapté au contexte OSINT offre ce cadre.

Phase Activités Livrables Durée estimée
1. Cadrage légal Définir la cible, obtenir mandat, vérifier légalité, RGPD DPO Formulaire d'autorisation signé, périmètre documenté 0.5-2 jours
2. Reconnaissance passive DNS, WHOIS, BGP, Certificate Transparency, SHODAN Inventaire infrastructure, liste sous-domaines, ASN mapping 1-3 jours
3. Collecte humaine (HUMINT-light) Profils LinkedIn, job postings, conférences, GitHub Organigramme technique, stack technologique, key persons 1-2 jours
4. Corrélation et analyse Graphe de corrélation, identification pivots, chemins d'attaque Graphe Maltego, rapport d'analyse, scoring de risque 1-3 jours
5. Monitoring continu Alertes CT logs, Shodan webhooks, paste sites, dark web Dashboard surveillance, alertes automatiques Continu
6. Rapport et restitution Rapport exécutif + technique, recommandations priorisées Rapport TLDR + annexes, plan d'action 90 jours 1-2 jours

FAQ — Questions fréquentes sur l'OSINT

La collecte d'OSINT sur une entreprise concurrente est-elle légale en France ?

Oui, dans les limites définies par la loi. Collecter des informations publiquement disponibles (site web, LinkedIn, registre du commerce, brevets, communiqués de presse) est légal et constitue de la veille concurrentielle classique. En revanche, accéder à des systèmes informatiques sans autorisation (même pour lire des informations), utiliser des identités fictives pour extraire des données, ou exploiter des données personnelles sans base légale RGPD constitue des infractions pénales. La frontière se situe entre information publique librement accessible et information obtenue par des moyens détournés.

Quelle est la différence entre OSINT, SOCMINT et HUMINT ?

OSINT (Open Source Intelligence) désigne le renseignement tiré de sources publiques et ouvertes — sites web, DNS, publications, etc. SOCMINT (Social Media Intelligence) est une sous-catégorie de l'OSINT spécifiquement centrée sur les réseaux sociaux. HUMINT (Human Intelligence) implique l'interaction directe avec des sources humaines — interviews, infiltration, agents. Dans le domaine cyber, les red teams combinent ces disciplines : OSINT pour la reconnaissance technique, SOCMINT pour le social engineering, et parfois HUMINT pour l'ingénierie sociale physique (vishing, pretexting).

Comment documenter une investigation OSINT pour qu'elle soit recevable en justice ?

La chaîne de custody (chain of custody) est fondamentale. Chaque action doit être horodatée, l'investigateur identifié, et les captures d'écran hashées (SHA-256) avant archivage. Des outils comme Hunchly (Chrome extension) ou WebRecorder créent automatiquement des archives web avec métadonnées d'intégrité. Les captures doivent inclure la barre d'adresse complète, la date/heure système, et être stockées sur un support dont l'intégrité peut être attestée (DVD WORM, stockage HSM). En cas de procédure pénale, il est recommandé de faire intervenir un huissier pour officialiser les constats.

Shodan est-il légal à utiliser ?

Oui. Shodan indexe uniquement les services publiquement accessibles sur Internet — les mêmes services qu'un attaquant pourrait découvrir avec un simple scan Nmap. Consulter Shodan pour analyser sa propre infrastructure, ou analyser celle d'un tiers dans le cadre d'un pentest autorisé ou d'une investigation défensive, est parfaitement légal. L'utilisation de Shodan pour identifier des cibles à attaquer constitue en revanche une préparation d'actes délictueux. La clé est l'intention et le cadre d'utilisation.

Comment automatiser la surveillance OSINT continue d'une organisation ?

L'automatisation OSINT continue repose sur des webhooks et des feeds : Shodan Monitor (alertes sur changements de services), crt.sh Certificate Watch (nouveaux certificats pour votre domaine), BreachWatch (surveillance des fuites), et des RSS feeds de sources de threat intelligence. Des plateformes comme Maltego Investigations, SpiderFoot HX, ou des SOAR comme XSOAR permettent d'orchestrer ces alertes et de les corréler automatiquement. Pour les budgets limités, un script Python + cron + alertes email/Slack peut couvrir les cas d'usage essentiels.

Surveillance continue

Approfondissement

Quelle formation suivre pour devenir analyste OSINT ?

Les certifications reconnues incluent OSCP (OSCE3 pour les plus avancés), la certification OSINT Curious, et les formations de l'Open Source Intelligence School. En France, le Pôle d'Excellence Cyber propose des formations spécialisées. La plateforme Bellingcat publie des guides pratiques gratuits utilisés par les journalistes d'investigation. La pratique sur des CTF OSINT (GeoGuessr, Trace Labs OSINT CTF) est indispensable pour développer les réflexes méthodologiques avant de travailler sur de vraies investigations.

Comment protéger son organisation contre l'OSINT offensif ?

La réduction de l'empreinte numérique (attack surface reduction) est la première ligne de défense. Cela inclut : supprimer les informations inutiles des registres WHOIS (utiliser la protection de confidentialité), configurer des politiques DNS minimales (pas de transfert de zone, pas de noms internes dans les rDNS publics), former les employés à ne pas publier d'informations sensibles sur LinkedIn (noms des systèmes internes, versions logicielles), auditer régulièrement ce qu'un attaquant peut découvrir via Shodan/Censys sur votre infrastructure, et surveiller les Certificate Transparency logs pour détecter des certificats frauduleux créés pour vos domaines.

Existe-t-il des outils OSINT spécifiques pour la cybersécurité ?

Oui, l'écosystème est riche. Pour la reconnaissance réseau : Amass, Subfinder, Masscan. Pour les métadonnées : FOCA, Metagoofil. Pour l'intelligence sur les menaces : MISP, OpenCTI, Cortex. Pour les fuites de données : GitDorker, TruffleHog. Pour les images : ExifTool, Google Lens, TinEye. La distribution Kali Linux inclut la plupart de ces outils préconfigurés, et la REMnux est dédiée à l'analyse de malware. Pour une approche OSINT complète, Trace Labs Kali Linux est spécifiquement optimisée pour les investigations OSINT.

Pour approfondir ces concepts dans un contexte de défense active, consultez nos analyses sur la détection de menaces avec des agents IA, la forensique assistée par IA, et notre livre blanc sur l'IA en cyberdéfense. Les techniques d'OSINT présentées ici s'intègrent naturellement dans les workflows de SIEM augmenté et de threat hunting avec Microsoft Sentinel.

Techniques de pivotage

Approfondissement

Conclusion et recommandations

La maîtrise de ces techniques et outils est indispensable pour tout professionnel de la cybersécurité en 2026. L'évolution constante des menaces exige une veille permanente et une mise à jour régulière des compétences. Pour aller plus loin, consultez nos articles techniques ou contactez notre équipe pour un accompagnement sur mesure adapté à votre contexte.

À retenir : La sécurité est un processus continu, pas un état. Chaque audit, chaque test et chaque analyse contribue à renforcer la posture de défense de l'organisation face aux menaces actuelles et futures.