Disséquer l'Obscurité : Techniques Avancées de Déobfuscation Statique pour Malwares Polymorphes

5 février 2026 Ayi NEDJIMI ~45 min de lecture
Expert Statique Reverse Engineering Malware Analysis

1. Introduction : Le défi de l'obfuscation dans les malwares modernes

L'analyse de malwares constitue l'un des piliers fondamentaux de la réponse aux incidents et de la Threat Intelligence. Pourtant, les auteurs de codes malveillants déploient des arsenaux toujours plus sophistiqués pour entraver le travail des analystes. Parmi les techniques les plus redoutables figure le polymorphisme, capacité d'un programme malveillant à modifier sa propre représentation binaire à chaque réplication tout en préservant sa sémantique opérationnelle.

En 2025-2026, le paysage des menaces a atteint un niveau de maturité technique sans précédent. Les familles de malwares comme RedLine Stealer, Emotet, QakBot et BlackCat (ALPHV) intègrent nativement des mécanismes d'obfuscation multicouches combinant polymorphisme, métamorphisme, packers commerciaux et virtualisation de code. Le taux de détection par signatures statiques traditionnelles chute à moins de 15% face à ces menaces lors des premières 48 heures post-déploiement, selon les rapports de VirusTotal et AV-TEST.

La déobfuscation statique se distingue de l'analyse dynamique par sa capacité à opérer sans exécuter le binaire suspect. Cette approche est indispensable dans plusieurs scénarios critiques :

Cet article détaille les techniques avancées permettant de déconstruire méthodiquement les couches d'obfuscation sans exécution, en s'appuyant sur des frameworks d'exécution symbolique (angr, Triton), de désassemblage programmatique (Capstone), et d'analyse automatisée (YARA, Ghidra, IDA Pro). Chaque section inclut du code fonctionnel directement applicable en contexte opérationnel.

Avertissement juridique et éthique

Les techniques présentées dans cet article sont destinées exclusivement à la défense et à la recherche en sécurité. L'analyse de malwares doit être conduite dans un cadre légal approprié, idéalement sur des machines isolées dédiées. Toute utilisation offensive de ces connaissances est illégale au regard des articles 323-1 à 323-7 du Code pénal français et de la Convention de Budapest sur la cybercriminalité.

2. Fondamentaux du polymorphisme malveillant

2.1 Moteurs de mutation : architecture interne

Un moteur de mutation polymorphe est un composant logiciel intégré au malware qui génère à chaque réplication une nouvelle variante du décrypteur (stub). Le payload chiffré reste fonctionnellement identique, mais le stub de déchiffrement est réécrit à chaque itération via des transformations syntaxiques qui préservent la sémantique.

Les transformations fondamentales implémentées par ces moteurs incluent :

Voici un exemple concret de stub de déchiffrement XOR polymorphe en assembleur x86. La première variante utilise une boucle classique, la seconde est une mutation sémantiquement équivalente :

; === Variante A : stub XOR classique ===
decrypt_stub_a:
    mov  esi, payload_addr     ; adresse du payload chiffré
    mov  ecx, payload_len      ; taille en octets
    mov  bl, 0x4F              ; clé XOR
.loop_a:
    xor  byte [esi], bl        ; déchiffrement octet par octet
    inc  esi                   ; pointeur suivant
    dec  ecx                   ; décrémenter compteur
    jnz  .loop_a               ; boucler si ecx != 0
    jmp  payload_addr          ; sauter au payload déchiffré

; === Variante B : mutation polymorphe équivalente ===
decrypt_stub_b:
    lea  edi, [payload_addr]   ; substitution mov -> lea
    push payload_len
    pop  ecx                   ; substitution mov -> push/pop
    xor  edx, edx              ; dead code : registre inutilisé
    mov  al, 0x50
    sub  al, 0x01              ; 0x50 - 0x01 = 0x4F (clé recalculée)
    nop                        ; insertion NOP
    add  edx, 0xDEAD           ; dead code
.loop_b:
    mov  ah, byte [edi]        ; charger via registre intermédiaire
    xor  ah, al                ; déchiffrement dans ah
    mov  byte [edi], ah        ; réécrire
    lea  edi, [edi + 1]        ; inc via lea
    sub  edx, 1                ; dead code
    loop .loop_b               ; dec ecx + jnz combinés
    jmp  payload_addr

Bien que les deux stubs produisent exactement le même résultat (déchiffrement XOR du payload avec la clé 0x4F), leurs signatures binaires sont radicalement différentes. Le hash SHA-256 de chaque variante sera distinct, rendant inefficace toute détection par empreinte statique.

2.2 Métamorphisme vs. Polymorphisme : distinctions critiques

La confusion entre polymorphisme et métamorphisme est fréquente mais les deux concepts diffèrent fondamentalement :

Caractéristique Polymorphisme Métamorphisme
Portée de la mutation Stub de déchiffrement uniquement Totalité du corps du malware
Payload Chiffré, invariant Réécrit à chaque génération
Complexité d'implémentation Modérée Très élevée
Exemples historiques Storm Worm, Virut Zmist (Z0mbie), MetaPHOR, Simile
Résistance à l'émulation Faible (payload exposé après déchiffrement) Élevée (pas de phase de déchiffrement)
Déobfuscation statique Pattern matching sur opérations crypto Normalisation sémantique requise

Un malware métamorphe comme Zmist (créé par le chercheur Z0mbie) implémentait un désassembleur/réassembleur complet capable de réécrire l'intégralité de son code en permutant les registres, substituant les instructions, et réorganisant les blocs de base. Contrairement au polymorphisme, aucune routine de déchiffrement n'est présente car le code n'est jamais chiffré : il est simplement réécrit sous une forme syntaxiquement différente mais sémantiquement identique.

2.3 Transformations d'opcodes x86/x64 : catalogue des équivalences

La compréhension des équivalences au niveau ISA (Instruction Set Architecture) x86/x64 est fondamentale pour reconnaître les mutations. Voici les substitutions les plus couramment exploitées par les moteurs polymorphes :

; --- Mise à zéro d'un registre ---
xor eax, eax           ; Forme canonique (2 octets : 31 C0)
sub eax, eax           ; Équivalent (2 octets : 29 C0)
and eax, 0             ; Équivalent (5 octets : 83 E0 00)
mov eax, 0             ; Équivalent (5 octets : B8 00 00 00 00)
imul eax, eax, 0       ; Moins courant (3 octets : 6B C0 00)
push 0 / pop eax       ; Via la pile (3 octets : 6A 00 / 58)
lea eax, [0]           ; Rarement vu mais valide

; --- Incrémentation ---
inc eax                ; Forme canonique (1 octet en x86 : 40)
add eax, 1             ; Équivalent (3 octets : 83 C0 01)
sub eax, -1            ; Équivalent via complément (3 octets : 83 E8 FF)
lea eax, [eax + 1]     ; Via LEA (3 octets)

; --- Copie de registre ---
mov eax, ebx           ; Forme canonique
push ebx / pop eax     ; Via la pile
lea eax, [ebx]         ; Via LEA
xor eax, eax / or eax, ebx  ; Double instruction

; --- Saut inconditionnel ---
jmp target             ; Forme canonique
push target / ret      ; Via la pile (anti-désassemblage)
call target / add esp, 4  ; Via call + nettoyage pile

Architecture d'un moteur polymorphe

Payload Original (code malveillant) Moteur de Mutation Substitution instructions Réordonnancement registres Insertion dead code Transposition blocs Chiffrement XOR / RC4 / AES Variante N Stub muté unique + payload chiffré SHA-256 distinct Variante A e3b0c44298fc1c14... Variante B 7d793037a076834... Variante C a1f2c3d4e5f6789... Sémantique identique Même comportement malveillant malgré des binaires distincts

3. Packers commerciaux et custom : identification et unpacking

3.1 Panorama des packers dans l'écosystème malveillant

Les packers (ou protecteurs de binaires) constituent la première couche d'obfuscation rencontrée lors de l'analyse statique. Initialement conçus pour la protection de logiciels légitimes contre le piratage, ils sont massivement détournés par les auteurs de malwares. On distingue trois catégories principales :

Packers de compression : réduisent la taille du binaire et ajoutent un stub de décompression. Le code original est restauré en mémoire à l'exécution. Les plus courants sont UPX (Ultimate Packer for eXecutables) et MPRESS.

Protecteurs commerciaux : offrent des mécanismes avancés incluant virtualisation de code, anti-debug, anti-dump, mutations polymorphes et obfuscation du flux de contrôle. Themida/WinLicense (Oreans Technologies) et VMProtect sont les plus redoutés.

Packers custom : développés spécifiquement par les groupes APT pour leurs opérations. Ils combinent souvent plusieurs techniques et sont les plus difficiles à traiter car non documentés. Les groupes comme Lazarus, APT29 et FIN7 maintiennent leurs propres packers.

3.2 Identification avec Detect It Easy et PEiD

L'identification du packer est la première étape critique. Detect It Easy (DIE) a largement supplanté PEiD grâce à son système de signatures extensible basé sur des scripts JavaScript et sa maintenance active.

#!/bin/bash
# Script d'identification de packer en batch
# Nécessite : Detect It Easy (diec) en ligne de commande

SAMPLE_DIR="/opt/malware_samples/incoming"
REPORT_DIR="/opt/analysis/packer_reports"
mkdir -p "$REPORT_DIR"

echo "[*] Analyse de packer en batch - $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================="

for sample in "$SAMPLE_DIR"/*; do
    [ -f "$sample" ] || continue
    filename=$(basename "$sample")
    sha256=$(sha256sum "$sample" | cut -d' ' -f1)

    echo "[+] Analyse: $filename ($sha256)"

    # Detect It Easy - analyse complète
    die_result=$(diec --json "$sample" 2>/dev/null)

    # Extraction du packer détecté
    packer=$(echo "$die_result" | python3 -c "
import sys, json
data = json.load(sys.stdin)
for det in data.get('detects', []):
    for val in det.get('values', []):
        if val.get('type') in ['packer', 'protector', 'compiler']:
            print(f\"{val['type']}: {val['name']} {val.get('version', '')}\")
" 2>/dev/null)

    if [ -n "$packer" ]; then
        echo "    [!] Détecté: $packer"
    else
        echo "    [-] Aucun packer connu détecté"
    fi

    # Entropie par section (indicateur de packing)
    python3 -c "
import pefile, math

def entropy(data):
    if not data:
        return 0.0
    counts = [0] * 256
    for byte in data:
        counts[byte] += 1
    ent = 0.0
    for c in counts:
        if c > 0:
            p = c / len(data)
            ent -= p * math.log2(p)
    return ent

pe = pefile.PE('$sample')
print('    Entropie par section:')
for section in pe.sections:
    name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')
    ent = entropy(section.get_data())
    flag = ' [PACKED?]' if ent > 7.0 else ''
    print(f'      {name:10s} : {ent:.4f}{flag}')
" 2>/dev/null

    echo "---"

    # Sauvegarde du rapport JSON
    echo "$die_result" > "$REPORT_DIR/${sha256}.json"
done

echo "[*] Analyse terminée. Rapports dans $REPORT_DIR"

Seuils d'entropie pour la détection de packing

L'entropie de Shannon d'une section PE est un indicateur fiable de compression ou de chiffrement. Une section .text compilée normalement présente une entropie de 5.5 à 6.5. Une entropie supérieure à 7.0 sur 8.0 maximum indique quasi-certainement du packing. Les sections contenant du code machine légitime non obfusqué dépassent rarement 6.8.

3.3 Unpacking statique : cas UPX et MPRESS

L'unpacking statique d'UPX est trivial grâce à l'outil natif, mais de nombreux malwares modifient les en-têtes UPX pour empêcher le déballage automatique :

#!/usr/bin/env python3
"""
Unpacker statique pour UPX modifié.
Restaure les magic bytes UPX altérés par les malwares
avant de procéder au déballage.
"""
import struct
import subprocess
import sys
import shutil
from pathlib import Path

# Signatures UPX connues et leurs altérations courantes
UPX_SIGNATURES = {
    b'UPX0': [b'UPX0', b'\x00PX0', b'UXP0', b'XUP0'],
    b'UPX1': [b'UPX1', b'\x00PX1', b'UXP1', b'XUP1'],
    b'UPX2': [b'UPX2', b'\x00PX2', b'UXP2', b'XUP2'],
    b'UPX!': [b'UPX!', b'\x00PX!', b'UXP!', b'XUP!'],
}

def fix_upx_headers(filepath: str) -> str:
    """Corrige les en-têtes UPX altérés et retourne le chemin du fichier corrigé."""
    data = bytearray(Path(filepath).read_bytes())
    fixed = False

    # Restaurer les noms de sections UPX
    for original, variants in UPX_SIGNATURES.items():
        for variant in variants[1:]:  # Ignorer l'original
            offset = 0
            while True:
                pos = data.find(variant, offset)
                if pos == -1:
                    break
                print(f"  [+] Correction à offset 0x{pos:08X}: "
                      f"{variant} -> {original}")
                data[pos:pos+4] = original
                fixed = True
                offset = pos + 4

    # Vérifier et restaurer le magic UPX en fin de fichier
    # Le magic "UPX!" se trouve généralement dans l'overlay
    for i in range(len(data) - 4, max(len(data) - 1024, 0), -1):
        if data[i:i+3] in [b'\x00PX', b'XUP', b'UXP']:
            data[i:i+3] = b'UPX'
            fixed = True
            print(f"  [+] Magic UPX restauré à offset 0x{i:08X}")
            break

    if fixed:
        output_path = filepath + ".fixed"
        Path(output_path).write_bytes(data)
        return output_path
    return filepath

def unpack_upx(filepath: str, output_dir: str) -> bool:
    """Tente le déballage UPX avec restauration automatique des en-têtes."""
    print(f"[*] Tentative d'unpacking UPX: {filepath}")

    output_path = Path(output_dir) / (Path(filepath).stem + "_unpacked.exe")

    # Tentative directe
    result = subprocess.run(
        ["upx", "-d", filepath, "-o", str(output_path)],
        capture_output=True, text=True
    )

    if result.returncode == 0:
        print(f"  [+] Déballage réussi: {output_path}")
        return True

    print("  [-] Échec direct, tentative avec correction d'en-têtes...")

    # Correction et nouvelle tentative
    fixed_path = fix_upx_headers(filepath)
    if fixed_path != filepath:
        result = subprocess.run(
            ["upx", "-d", fixed_path, "-o", str(output_path)],
            capture_output=True, text=True
        )
        Path(fixed_path).unlink(missing_ok=True)

        if result.returncode == 0:
            print(f"  [+] Déballage réussi après correction: {output_path}")
            return True

    print("  [!] Échec - packer non-standard ou UPX lourdement modifié")
    return False

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]}  ")
        sys.exit(1)
    unpack_upx(sys.argv[1], sys.argv[2])

3.4 VMProtect et Themida : la virtualisation de code

VMProtect et Themida représentent un défi d'un ordre de grandeur supérieur. Ces protecteurs convertissent le code x86/x64 natif en bytecode pour une machine virtuelle propriétaire embarquée dans le binaire. Le code original n'existe plus sous sa forme native : il a été traduit dans un jeu d'instructions custom interprété par un handler VM.

L'architecture type d'une VM de protection comprend :

La déobfuscation de code virtualisé nécessite une approche en deux phases : identification de la structure VM par pattern matching, puis traduction inverse (devirtualization) des bytecodes en instructions natives.

#!/usr/bin/env python3
"""
Détecteur simplifié de virtualisation VMProtect.
Identifie les patterns d'entrée VM caractéristiques.
"""
import pefile
import re

VMPROTECT_ENTRY_PATTERNS = [
    # Push de tous les registres suivi de pushfq (sauvegarde contexte)
    rb'\x60\x9C',                          # pushad; pushfd (x86)
    rb'\x50\x51\x52\x53\x55\x56\x57',     # push sequence (x86)
    # VMProtect 3.x : jmp dans section .vmp
    rb'\xE9.{4}\x68.{4}\xE8',             # jmp rel32; push imm32; call
    # Themida: pattern d'entrée caractéristique
    rb'\xEB.\x00{16,}',                    # jmp short + padding nulls
]

def detect_vm_protection(filepath: str) -> dict:
    """Détecte les patterns de virtualisation dans un PE."""
    pe = pefile.PE(filepath)
    results = {
        'vmprotect_sections': [],
        'themida_sections': [],
        'vm_entry_candidates': [],
        'virtualized': False
    }

    for section in pe.sections:
        name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')

        # Sections caractéristiques VMProtect
        if name.startswith('.vmp'):
            results['vmprotect_sections'].append({
                'name': name,
                'va': hex(section.VirtualAddress),
                'size': section.SizeOfRawData,
                'entropy': _calc_entropy(section.get_data())
            })
            results['virtualized'] = True

        # Sections Themida
        if name in ['.themida', '.winlice', '.taggant']:
            results['themida_sections'].append(name)
            results['virtualized'] = True

    # Recherche de patterns VM entry dans toutes les sections exécutables
    for section in pe.sections:
        if not section.Characteristics & 0x20000000:  # IMAGE_SCN_MEM_EXECUTE
            continue
        data = section.get_data()
        for i, pattern in enumerate(VMPROTECT_ENTRY_PATTERNS):
            for match in re.finditer(pattern, data):
                va = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + match.start()
                results['vm_entry_candidates'].append({
                    'pattern_id': i,
                    'va': hex(va),
                    'offset': hex(match.start()),
                    'bytes': data[match.start():match.start()+16].hex()
                })

    return results

def _calc_entropy(data):
    import math
    if not data:
        return 0.0
    counts = [0] * 256
    for b in data:
        counts[b] += 1
    ent = 0.0
    for c in counts:
        if c > 0:
            p = c / len(data)
            ent -= p * math.log2(p)
    return round(ent, 4)

if __name__ == "__main__":
    import sys, json
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} ")
        sys.exit(1)
    result = detect_vm_protection(sys.argv[1])
    print(json.dumps(result, indent=2))

4. Exécution symbolique avec angr et Triton

4.1 Principes de l'exécution symbolique pour la déobfuscation

L'exécution symbolique est une technique d'analyse de programme qui exécute le code non pas avec des valeurs concrètes mais avec des variables symboliques. Chaque branchement conditionnel génère une contrainte sur ces variables, et un solveur SMT (Satisfiability Modulo Theories) comme Z3 détermine les valeurs satisfaisant un ensemble de contraintes donné.

En contexte de déobfuscation, l'exécution symbolique permet de :

4.2 angr : exploration symbolique de binaires

angr est un framework d'analyse binaire développé par le laboratoire SecLab de UC Santa Barbara. Il implémente un moteur d'exécution symbolique complet avec support de multiples architectures et gestion automatique de la mémoire symbolique.

#!/usr/bin/env python3
"""
Déobfuscation par exécution symbolique avec angr.
Résout les prédicats opaques et extrait les chemins d'exécution
réels dans un binaire obfusqué.
"""
import angr
import claripy
import logging

logging.getLogger('angr').setLevel(logging.WARNING)

def deobfuscate_opaque_predicates(binary_path: str, func_addr: int) -> dict:
    """
    Analyse une fonction obfusquée et identifie les prédicats opaques.

    Un prédicat opaque est un branchement conditionnel dont le résultat
    est déterministe (toujours vrai ou toujours faux) mais dont
    l'évaluation statique sans exécution symbolique est difficile.

    Exemple classique :
        mov eax, 7
        imul eax, eax       ; eax = 49
        sub eax, 1           ; eax = 48
        and eax, 1           ; eax = 0 (48 est pair)
        jnz fake_branch      ; jamais pris -> prédicat opaque
    """
    project = angr.Project(binary_path, auto_load_libs=False)

    results = {
        'opaque_predicates': [],
        'dead_branches': [],
        'live_paths': [],
        'simplified_blocks': []
    }

    cfg = project.analyses.CFGFast(
        regions=[(func_addr, func_addr + 0x2000)],
        normalize=True
    )

    func = cfg.kb.functions.get(func_addr)
    if func is None:
        print(f"[-] Fonction non trouvée à 0x{func_addr:x}")
        return results

    for block in func.blocks:
        if not block.instructions:
            continue

        # Identifier les blocs se terminant par un branchement conditionnel
        last_insn = block.capstone.insns[-1] if block.capstone.insns else None
        if last_insn is None:
            continue

        conditional_jmps = [
            'je', 'jne', 'jz', 'jnz', 'jg', 'jge', 'jl', 'jle',
            'ja', 'jae', 'jb', 'jbe', 'jo', 'jno', 'js', 'jns'
        ]

        if last_insn.mnemonic not in conditional_jmps:
            continue

        # Exécution symbolique depuis le début du bloc
        state = project.factory.blank_state(addr=block.addr)
        simgr = project.factory.simulation_manager(state)

        # Explorer jusqu'à la fin du bloc
        try:
            simgr.explore(
                find=block.addr + block.size,
                num_find=10,
                timeout=5
            )
        except Exception:
            continue

        # Analyser les successeurs du branchement
        successors = list(func.graph.successors(block))
        if len(successors) != 2:
            continue

        taken_addr = successors[0].addr
        not_taken_addr = successors[1].addr

        # Vérifier si le branchement est déterministe
        state_at_branch = project.factory.blank_state(addr=block.addr)
        simgr_branch = project.factory.simulation_manager(state_at_branch)

        try:
            simgr_branch.step(num_inst=block.instructions)
        except Exception:
            continue

        if len(simgr_branch.active) == 1:
            # Un seul successeur -> prédicat opaque détecté
            actual_target = simgr_branch.active[0].addr
            dead_target = (not_taken_addr
                          if actual_target == taken_addr
                          else taken_addr)

            results['opaque_predicates'].append({
                'block_addr': hex(block.addr),
                'branch_insn': f"{last_insn.mnemonic} {last_insn.op_str}",
                'always_goes_to': hex(actual_target),
                'dead_branch': hex(dead_target),
                'type': 'opaque_true' if actual_target == taken_addr else 'opaque_false'
            })
            results['dead_branches'].append(hex(dead_target))

    print(f"[+] Prédicats opaques détectés: {len(results['opaque_predicates'])}")
    for op in results['opaque_predicates']:
        print(f"    0x{op['block_addr']}: {op['branch_insn']} "
              f"-> toujours vers {op['always_goes_to']} "
              f"(branche morte: {op['dead_branch']})")

    return results


def extract_decryption_key(binary_path: str, decrypt_func: int,
                           key_output_addr: int) -> bytes:
    """
    Utilise l'exécution symbolique pour extraire la clé de déchiffrement
    sans exécuter le binaire. Résout les contraintes pour trouver les
    valeurs menant à l'écriture de la clé en mémoire.
    """
    project = angr.Project(binary_path, auto_load_libs=False)

    # Créer un état symbolique au point d'entrée de la routine de déchiffrement
    state = project.factory.blank_state(addr=decrypt_func)

    # Rendre symbolique la zone mémoire où la clé sera écrite
    key_sym = claripy.BVS('decryption_key', 256)  # 32 octets symboliques
    state.memory.store(key_output_addr, key_sym)

    simgr = project.factory.simulation_manager(state)

    # Définir les adresses de succès et d'échec
    # (à adapter selon le binaire analysé)
    def is_successful(s):
        """Vérifier si l'état a atteint la fin de la routine de déchiffrement."""
        return s.addr >= decrypt_func + 0x100  # Heuristique

    def should_avoid(s):
        """Éviter les chemins menant aux vérifications anti-debug."""
        anti_debug_addrs = [
            0x401000,  # IsDebuggerPresent check
            0x401050,  # NtQueryInformationProcess check
            0x4010A0,  # Timing check (rdtsc)
        ]
        return s.addr in anti_debug_addrs

    simgr.explore(find=is_successful, avoid=should_avoid)

    if simgr.found:
        found_state = simgr.found[0]
        # Résoudre la valeur concrète de la clé
        concrete_key = found_state.solver.eval(key_sym, cast_to=bytes)
        print(f"[+] Clé extraite: {concrete_key.hex()}")
        return concrete_key

    print("[-] Impossible d'extraire la clé par exécution symbolique")
    return b''


if __name__ == "__main__":
    import sys
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} ")
        sys.exit(1)

    # Exemple d'utilisation
    results = deobfuscate_opaque_predicates(
        sys.argv[1],
        func_addr=0x00401000  # Adresse à adapter
    )

4.3 Triton : exécution symbolique et taint analysis

Triton est un framework d'analyse binaire dynamique développé par Quarkslab qui offre des capacités d'exécution symbolique plus granulaires qu'angr, avec un accent sur la taint analysis (analyse de propagation de données) et la simplification d'expressions.

Triton excelle dans la simplification de Mixed Boolean-Arithmetic (MBA) expressions, technique d'obfuscation très utilisée dans les malwares modernes qui combine opérations arithmétiques et logiques pour masquer des calculs simples :

#!/usr/bin/env python3
"""
Simplification d'expressions MBA obfusquées avec Triton.
Les expressions MBA (Mixed Boolean-Arithmetic) sont utilisées
par les obfuscateurs pour masquer des opérations triviales.

Exemple: (x ^ y) + 2*(x & y) est équivalent à x + y
"""
from triton import (
    TritonContext, ARCH, MODE, Instruction,
    SYMBOLIC_SIMPLIFICATION, AST_REPRESENTATION
)

def create_triton_ctx():
    """Initialise un contexte Triton pour x86-64."""
    ctx = TritonContext(ARCH.X86_64)
    ctx.setMode(MODE.ALIGNED_MEMORY, True)
    ctx.setMode(MODE.CONSTANT_FOLDING, True)
    ctx.setAstRepresentationMode(AST_REPRESENTATION.PYTHON)
    return ctx

def simplify_mba_expression(opcodes: bytes, base_addr: int = 0x400000) -> str:
    """
    Exécute symboliquement une séquence d'opcodes et simplifie
    l'expression résultante via le solveur Z3 intégré.

    Args:
        opcodes: bytes des instructions x86-64
        base_addr: adresse de base virtuelle

    Returns:
        Expression simplifiée sous forme de chaîne
    """
    ctx = create_triton_ctx()

    # Mapper les opcodes en mémoire
    for i, byte in enumerate(opcodes):
        ctx.setConcreteMemoryValue(base_addr + i, byte)

    # Rendre les registres d'entrée symboliques
    ctx.symbolizeRegister(ctx.registers.rax, "x")
    ctx.symbolizeRegister(ctx.registers.rbx, "y")

    # Exécuter instruction par instruction
    pc = base_addr
    executed = []
    while pc < base_addr + len(opcodes):
        inst = Instruction(pc, ctx.getConcreteMemoryAreaValue(pc, 16))
        if not ctx.processing(inst):
            break
        executed.append(f"0x{pc:x}: {inst.getDisassembly()}")
        pc = inst.getNextAddress()

    print("[*] Instructions exécutées:")
    for line in executed:
        print(f"    {line}")

    # Extraire l'expression symbolique du résultat (dans rax)
    rax_expr = ctx.getSymbolicRegister(ctx.registers.rax)
    if rax_expr is None:
        return "N/A (registre non modifié)"

    ast = rax_expr.getAst()
    simplified = ctx.simplify(ast, SYMBOLIC_SIMPLIFICATION.LLVM)

    print(f"\n[*] Expression brute:     {ast}")
    print(f"[+] Expression simplifiée: {simplified}")

    return str(simplified)


def deobfuscate_constant_unfolding(ctx, instructions_bytes, base=0x400000):
    """
    Résout le constant unfolding : technique où une constante simple
    est calculée par une longue série d'opérations.

    Exemple: mov rax, 0x1234 obfusqué en:
        mov rax, 0xDEADBEEF
        xor rax, 0xDEADBEEF ^ 0x1234  (= 0xDEADACDB)
        add rax, 0x5678
        sub rax, 0x5678
    """
    for i, b in enumerate(instructions_bytes):
        ctx.setConcreteMemoryValue(base + i, b)

    pc = base
    while pc < base + len(instructions_bytes):
        inst = Instruction(pc, ctx.getConcreteMemoryAreaValue(pc, 16))
        if not ctx.processing(inst):
            break
        pc = inst.getNextAddress()

    # Évaluer la valeur concrète résultante
    rax_val = ctx.getConcreteRegisterValue(ctx.registers.rax)
    return rax_val


# Exemple MBA courant dans les malwares obfusqués
# L'expression (x ^ y) + 2*(x & y) est équivalente à x + y
if __name__ == "__main__":
    # Opcodes x86-64 pour :
    #   mov rax, rdi        ; rax = x
    #   mov rbx, rsi        ; rbx = y
    #   mov rcx, rax
    #   xor rcx, rbx        ; rcx = x ^ y
    #   and rax, rbx        ; rax = x & y
    #   shl rax, 1          ; rax = 2 * (x & y)
    #   add rax, rcx        ; rax = (x ^ y) + 2*(x & y) = x + y
    mba_opcodes = (
        b'\x48\x89\xf8'       # mov rax, rdi
        b'\x48\x89\xf3'       # mov rbx, rsi
        b'\x48\x89\xc1'       # mov rcx, rax
        b'\x48\x31\xd9'       # xor rcx, rbx
        b'\x48\x21\xd8'       # and rax, rbx
        b'\x48\xd1\xe0'       # shl rax, 1
        b'\x48\x01\xc8'       # add rax, rcx
    )

    print("=" * 60)
    print("Simplification MBA avec Triton")
    print("=" * 60)
    simplify_mba_expression(mba_opcodes)

5. Framework Capstone : désassemblage et analyse de flux de contrôle

5.1 Désassemblage programmatique avec Capstone

Capstone est un framework de désassemblage multi-architecture (x86, ARM, MIPS, PowerPC, SPARC, SystemZ) développé par Nguyen Anh Quynh. Contrairement à des outils interactifs comme IDA Pro, Capstone est conçu pour le désassemblage programmatique et s'intègre dans des pipelines d'analyse automatisée via ses bindings Python, Java, C#, Go et Rust.

Pour la déobfuscation, Capstone est essentiel pour :

#!/usr/bin/env python3
"""
Analyse de flux de contrôle et détection de code mort
avec Capstone pour la déobfuscation de malwares polymorphes.
"""
from capstone import *
from capstone.x86 import *
from collections import defaultdict
from typing import Dict, List, Set, Tuple, Optional

class CFGBuilder:
    """Reconstruit le Control Flow Graph à partir d'un binaire désassemblé."""

    def __init__(self, code: bytes, base_addr: int = 0x400000,
                 mode: int = CS_MODE_64):
        self.md = Cs(CS_ARCH_X86, mode)
        self.md.detail = True
        self.md.skipdata = True
        self.code = code
        self.base_addr = base_addr
        self.blocks: Dict[int, 'BasicBlock'] = {}
        self.edges: List[Tuple[int, int, str]] = []

    def _is_branch(self, insn) -> bool:
        """Vérifie si une instruction est un branchement."""
        return insn.group(X86_GRP_JUMP) or insn.group(X86_GRP_CALL)

    def _is_conditional_branch(self, insn) -> bool:
        """Vérifie si c'est un branchement conditionnel."""
        cond_mnemonics = {
            'je', 'jne', 'jz', 'jnz', 'jg', 'jge', 'jl', 'jle',
            'ja', 'jae', 'jb', 'jbe', 'jo', 'jno', 'js', 'jns',
            'jp', 'jnp', 'jcxz', 'jecxz', 'jrcxz'
        }
        return insn.mnemonic in cond_mnemonics

    def _is_unconditional_jump(self, insn) -> bool:
        return insn.mnemonic == 'jmp'

    def _is_ret(self, insn) -> bool:
        return insn.group(X86_GRP_RET)

    def _get_branch_target(self, insn) -> Optional[int]:
        """Extrait l'adresse cible d'un branchement direct."""
        if insn.operands and insn.operands[0].type == X86_OP_IMM:
            return insn.operands[0].imm
        return None  # Branchement indirect

    def build(self) -> Dict:
        """Construit le CFG par désassemblage linéaire récursif."""
        worklist = [self.base_addr]
        visited: Set[int] = set()
        instructions = {}

        # Phase 1: Désassembler et collecter les instructions
        for insn in self.md.disasm(self.code, self.base_addr):
            instructions[insn.address] = insn

        # Phase 2: Identifier les leaders de blocs de base
        leaders: Set[int] = {self.base_addr}
        for addr, insn in sorted(instructions.items()):
            if self._is_branch(insn):
                target = self._get_branch_target(insn)
                if target and target in instructions:
                    leaders.add(target)
                # L'instruction suivante est aussi un leader
                next_addr = addr + insn.size
                if next_addr in instructions:
                    leaders.add(next_addr)
            if self._is_ret(insn):
                next_addr = addr + insn.size
                if next_addr in instructions:
                    leaders.add(next_addr)

        # Phase 3: Construire les blocs de base
        sorted_leaders = sorted(leaders)
        for i, leader in enumerate(sorted_leaders):
            block_insns = []
            addr = leader
            end_addr = (sorted_leaders[i + 1]
                       if i + 1 < len(sorted_leaders)
                       else self.base_addr + len(self.code))

            while addr < end_addr and addr in instructions:
                insn = instructions[addr]
                block_insns.append(insn)
                if self._is_branch(insn) or self._is_ret(insn):
                    break
                addr += insn.size

            if block_insns:
                self.blocks[leader] = BasicBlock(leader, block_insns)

        # Phase 4: Établir les arêtes
        for addr, block in self.blocks.items():
            last = block.instructions[-1]
            next_addr = last.address + last.size

            if self._is_ret(last):
                continue
            elif self._is_unconditional_jump(last):
                target = self._get_branch_target(last)
                if target:
                    self.edges.append((addr, target, 'unconditional'))
            elif self._is_conditional_branch(last):
                target = self._get_branch_target(last)
                if target:
                    self.edges.append((addr, target, 'true'))
                if next_addr in self.blocks:
                    self.edges.append((addr, next_addr, 'false'))
            else:
                if next_addr in self.blocks:
                    self.edges.append((addr, next_addr, 'fallthrough'))

        return {
            'blocks': len(self.blocks),
            'edges': len(self.edges),
            'entry': hex(self.base_addr)
        }


class BasicBlock:
    """Représente un bloc de base dans le CFG."""
    def __init__(self, addr: int, instructions: list):
        self.addr = addr
        self.instructions = instructions
        self.size = sum(i.size for i in instructions)

    def __repr__(self):
        return (f"BasicBlock(0x{self.addr:x}, "
                f"{len(self.instructions)} insns, "
                f"{self.size} bytes)")


class DeadCodeDetector:
    """
    Détecte le code mort injecté par les moteurs polymorphes.
    Analyse les patterns d'instructions qui n'affectent pas
    le résultat du programme.
    """

    # Instructions considérées comme potentiellement mortes
    # quand leur résultat n'est jamais utilisé
    DEAD_PATTERNS = [
        # Push/Pop du même registre (NOP sémantique)
        lambda insns, i: (
            i + 1 < len(insns) and
            insns[i].mnemonic == 'push' and
            insns[i+1].mnemonic == 'pop' and
            len(insns[i].operands) > 0 and
            len(insns[i+1].operands) > 0 and
            insns[i].operands[0].type == X86_OP_REG and
            insns[i+1].operands[0].type == X86_OP_REG and
            insns[i].operands[0].reg == insns[i+1].operands[0].reg
        ),
        # Opération suivie de son inverse (add N / sub N)
        lambda insns, i: (
            i + 1 < len(insns) and
            insns[i].mnemonic == 'add' and
            insns[i+1].mnemonic == 'sub' and
            len(insns[i].operands) >= 2 and
            len(insns[i+1].operands) >= 2 and
            insns[i].operands[0].reg == insns[i+1].operands[0].reg and
            insns[i].operands[1].type == X86_OP_IMM and
            insns[i+1].operands[1].type == X86_OP_IMM and
            insns[i].operands[1].imm == insns[i+1].operands[1].imm
        ),
        # NOP et variantes
        lambda insns, i: insns[i].mnemonic == 'nop',
        # xchg reg, reg (même registre)
        lambda insns, i: (
            insns[i].mnemonic == 'xchg' and
            len(insns[i].operands) >= 2 and
            insns[i].operands[0].type == X86_OP_REG and
            insns[i].operands[1].type == X86_OP_REG and
            insns[i].operands[0].reg == insns[i].operands[1].reg
        ),
        # mov reg, reg (même registre)
        lambda insns, i: (
            insns[i].mnemonic == 'mov' and
            len(insns[i].operands) >= 2 and
            insns[i].operands[0].type == X86_OP_REG and
            insns[i].operands[1].type == X86_OP_REG and
            insns[i].operands[0].reg == insns[i].operands[1].reg
        ),
    ]

    @staticmethod
    def detect(instructions: list) -> List[dict]:
        """Retourne la liste des instructions mortes détectées."""
        dead = []
        for i, insn in enumerate(instructions):
            for pattern_fn in DeadCodeDetector.DEAD_PATTERNS:
                try:
                    if pattern_fn(instructions, i):
                        dead.append({
                            'addr': hex(insn.address),
                            'instruction': f"{insn.mnemonic} {insn.op_str}",
                            'size': insn.size
                        })
                        break
                except (IndexError, AttributeError):
                    continue
        return dead


# Exemple d'utilisation
if __name__ == "__main__":
    # Code obfusqué exemple avec dead code injecté
    obfuscated_code = bytes([
        0x55,                         # push rbp
        0x48, 0x89, 0xE5,            # mov rbp, rsp
        0x50, 0x58,                   # push rax; pop rax (DEAD)
        0x48, 0x89, 0xF8,            # mov rax, rdi
        0x90,                         # nop (DEAD)
        0x48, 0x83, 0xC0, 0x05,      # add rax, 5
        0x48, 0x83, 0xE8, 0x05,      # sub rax, 5 (DEAD pair avec add)
        0x48, 0x31, 0xC9,            # xor rcx, rcx
        0x48, 0x87, 0xC9,            # xchg rcx, rcx (DEAD)
        0x48, 0x01, 0xF8,            # add rax, rdi
        0x5D,                         # pop rbp
        0xC3,                         # ret
    ])

    builder = CFGBuilder(obfuscated_code, base_addr=0x401000)
    stats = builder.build()
    print(f"CFG: {stats['blocks']} blocs, {stats['edges']} arêtes")

    for addr, block in sorted(builder.blocks.items()):
        print(f"\n--- Block 0x{addr:x} ---")
        dead = DeadCodeDetector.detect(block.instructions)
        for insn in block.instructions:
            marker = " << DEAD" if any(
                d['addr'] == hex(insn.address) for d in dead
            ) else ""
            print(f"  0x{insn.address:x}: {insn.mnemonic:8s} "
                  f"{insn.op_str}{marker}")
        if dead:
            print(f"  [{len(dead)} instruction(s) morte(s) détectée(s)]")

Reconstruction du CFG après suppression du code mort

CFG Obfusqué CFG Nettoyé mov rax, rdi push rax; pop rax add rax, 0x10 imul ecx, ecx ; (49) jnz dead_block DEAD BLOCK xor rbx, rbx nop ; nop ; nop add rax, rbx ; ret mov rax, rdi add rax, 0x10 xor rbx, rbx add rax, rbx ; ret Suppression dead code + résolution prédicats opaques + élimination blocs morts 5 blocs / 6 arêtes 3 blocs / 2 arêtes

6. Aplatissement du flux de contrôle (Control Flow Flattening)

6.1 Anatomie du CFF

L'aplatissement du flux de contrôle (Control Flow Flattening, CFF) est une technique d'obfuscation avancée implémentée par des outils comme OLLVM (Obfuscator-LLVM), Tigress, et divers protecteurs commerciaux. Le principe consiste à transformer la structure hiérarchique naturelle du CFG (if/else, boucles, séquences) en une boucle unique contenant un switch dispatcher.

Le mécanisme repose sur trois composants :

Considérons une fonction simple avant et après CFF :

// === Code original ===
int process(int x) {
    int result = 0;
    if (x > 10) {
        result = x * 2;
    } else {
        result = x + 5;
    }
    return result + 1;
}

// === Après aplatissement CFF ===
int process_flattened(int x) {
    int result = 0;
    int state = 0xA1B2;  // État initial

    while (1) {
        switch (state) {
            case 0xA1B2:  // Bloc d'entrée
                if (x > 10)
                    state = 0xC3D4;  // -> bloc then
                else
                    state = 0xE5F6;  // -> bloc else
                break;

            case 0xC3D4:  // Bloc then
                result = x * 2;
                state = 0x7890;  // -> bloc sortie
                break;

            case 0xE5F6:  // Bloc else
                result = x + 5;
                state = 0x7890;  // -> bloc sortie
                break;

            case 0x7890:  // Bloc de sortie
                return result + 1;

            default:
                return -1;  // Ne devrait jamais arriver
        }
    }
}

6.2 Reconstruction du CFG original : script IDA Python

La reconstruction du flux de contrôle original nécessite d'identifier le dispatcher, la state variable, et les transitions entre blocs. Voici un script IDA Python complet pour automatiser cette reconstruction :

"""
Script IDA Python pour la reconstruction du CFG
après aplatissement de flux de contrôle (CFF/OLLVM).

Usage dans IDA Pro:
  File -> Script file -> cff_deobfuscate.py
  Puis: cff_deobfuscate(ea=here())
"""
import idaapi
import idautils
import idc
from collections import defaultdict

class CFFDeobfuscator:
    """
    Reconstructeur de CFG pour binaires protégés par
    Control Flow Flattening (OLLVM, Tigress, etc.).
    """

    def __init__(self, func_ea):
        self.func = idaapi.get_func(func_ea)
        if not self.func:
            raise ValueError(f"Pas de fonction à 0x{func_ea:x}")

        self.func_ea = self.func.start_ea
        self.func_end = self.func.end_ea
        self.cfg = idaapi.FlowChart(self.func)
        self.dispatcher = None
        self.state_var = None
        self.state_transitions = {}
        self.real_blocks = []
        self.dead_blocks = []

    def identify_dispatcher(self):
        """
        Identifie le bloc dispatcher (switch central).
        Heuristique: le bloc avec le plus de prédécesseurs
        qui contient une comparaison avec une constante suivie
        d'un branchement conditionnel.
        """
        predecessor_count = defaultdict(int)

        for block in self.cfg:
            for succ in block.succs():
                predecessor_count[succ.start_ea] += 1

        # Le dispatcher est le bloc avec le plus de prédécesseurs
        # (tous les blocs aplatis y retournent)
        candidates = sorted(
            predecessor_count.items(),
            key=lambda x: x[1],
            reverse=True
        )

        for addr, count in candidates:
            if count >= 3:  # Au moins 3 prédécesseurs
                # Vérifier que le bloc contient un switch pattern
                if self._is_switch_pattern(addr):
                    self.dispatcher = addr
                    print(f"[+] Dispatcher identifié: 0x{addr:x} "
                          f"({count} prédécesseurs)")
                    return True

        print("[-] Dispatcher non trouvé")
        return False

    def _is_switch_pattern(self, block_ea):
        """Vérifie si un bloc contient un pattern de switch/dispatcher."""
        ea = block_ea
        block_end = self._get_block_end(block_ea)

        cmp_found = False
        while ea < block_end and ea != idc.BADADDR:
            mnem = idc.print_insn_mnem(ea)
            if mnem in ('cmp', 'sub', 'test'):
                cmp_found = True
            if mnem in ('je', 'jne', 'jz', 'jnz', 'ja', 'jb') and cmp_found:
                return True
            ea = idc.next_head(ea, block_end)
        return False

    def _get_block_end(self, block_ea):
        """Retourne l'adresse de fin du bloc de base."""
        for block in self.cfg:
            if block.start_ea == block_ea:
                return block.end_ea
        return block_ea + 0x100  # Fallback

    def identify_state_variable(self):
        """
        Identifie la state variable utilisée par le dispatcher.
        C'est typiquement le registre ou l'emplacement mémoire
        comparé dans le dispatcher.
        """
        if not self.dispatcher:
            return False

        ea = self.dispatcher
        block_end = self._get_block_end(self.dispatcher)

        while ea < block_end and ea != idc.BADADDR:
            mnem = idc.print_insn_mnem(ea)
            if mnem == 'cmp':
                op0 = idc.print_operand(ea, 0)
                self.state_var = op0
                print(f"[+] State variable: {op0}")
                return True
            ea = idc.next_head(ea, block_end)

        return False

    def trace_transitions(self):
        """
        Trace les transitions d'état entre les blocs aplatis.
        Pour chaque bloc, identifie la valeur de la state variable
        à la sortie (qui détermine le bloc suivant).
        """
        if not self.state_var:
            return

        for block in self.cfg:
            if block.start_ea == self.dispatcher:
                continue

            # Analyser les instructions du bloc à rebours
            ea = block.end_ea
            while ea > block.start_ea:
                ea = idc.prev_head(ea, block.start_ea)
                if ea == idc.BADADDR:
                    break

                mnem = idc.print_insn_mnem(ea)

                # Chercher les affectations à la state variable
                # mov state_var, immediate
                if mnem == 'mov':
                    op0 = idc.print_operand(ea, 0)
                    op1_type = idc.get_operand_type(ea, 1)
                    if op0 == self.state_var and op1_type == idc.o_imm:
                        next_state = idc.get_operand_value(ea, 1)
                        self.state_transitions[block.start_ea] = next_state
                        print(f"  Bloc 0x{block.start_ea:x} -> "
                              f"état 0x{next_state:x}")
                        break

    def reconstruct(self):
        """Pipeline complet de reconstruction du CFG."""
        print(f"[*] Déobfuscation CFF de la fonction 0x{self.func_ea:x}")
        print(f"    Taille: {self.func_end - self.func_ea} octets")
        print(f"    Blocs: {sum(1 for _ in self.cfg)}")

        if not self.identify_dispatcher():
            return False
        if not self.identify_state_variable():
            return False

        self.trace_transitions()

        print(f"\n[+] Reconstruction terminée:")
        print(f"    Transitions identifiées: {len(self.state_transitions)}")

        # Construire la map état -> bloc
        state_to_block = {}
        for block in self.cfg:
            if block.start_ea == self.dispatcher:
                continue
            ea = block.start_ea
            block_end = block.end_ea
            while ea < block_end:
                mnem = idc.print_insn_mnem(ea)
                if mnem == 'cmp':
                    op1_type = idc.get_operand_type(ea, 1)
                    if op1_type == idc.o_imm:
                        state_val = idc.get_operand_value(ea, 1)
                        state_to_block[state_val] = block.start_ea
                ea = idc.next_head(ea, block_end)

        # Reconstituer l'ordre d'exécution
        print("\n[+] Ordre d'exécution reconstruit:")
        for src_block, next_state in sorted(self.state_transitions.items()):
            dst_block = state_to_block.get(next_state, None)
            dst_str = f"0x{dst_block:x}" if dst_block else "INCONNU"
            print(f"    0x{src_block:x} -> {dst_str} (état 0x{next_state:x})")

            # Commenter dans IDA pour visualisation
            if dst_block:
                idc.set_cmt(
                    src_block,
                    f"CFF: next -> 0x{dst_block:x}",
                    False
                )

        return True


def cff_deobfuscate(ea=None):
    """Point d'entrée du script."""
    if ea is None:
        ea = idc.here()
    deob = CFFDeobfuscator(ea)
    deob.reconstruct()

Aplatissement de flux de contrôle : avant / après

CFG Original CFG Aplati (CFF) Entrée if (x > 10) else x * 2 x + 5 return DISPATCHER switch(state) 0xA1B2 0xC3D4 0xE5F6 0x7890 Chaque bloc met à jour state puis retourne au dispatcher Dispatcher Bloc aplati Retour dispatcher

7. Étude de cas : déobfuscation de RedLine Stealer

7.1 Présentation de RedLine Stealer

RedLine Stealer est l'un des info-stealers les plus prolifiques du paysage des menaces depuis 2020. Distribué en tant que MaaS (Malware-as-a-Service) sur les forums underground pour environ 150-200 USD par mois, il cible les navigateurs (cookies, mots de passe, données de formulaires), les portefeuilles de cryptomonnaies, les clients VPN et FTP, ainsi que les informations système.

RedLine est développé en C# (.NET) et systématiquement protégé par ConfuserEx, un obfuscateur open-source pour .NET. Les couches d'obfuscation typiques incluent :

7.2 Pipeline de déobfuscation avec de4dot et dnSpy

La déobfuscation d'un échantillon RedLine suit un pipeline méthodique. de4dot est un déobfuscateur .NET automatique qui gère la majorité des protections ConfuserEx. dnSpy (ou son fork dnSpyEx) permet l'inspection et la modification du code IL.

#!/bin/bash
# Pipeline de déobfuscation RedLine Stealer
# Prérequis: de4dot, dnSpy/dnSpyEx, monodis, strings

SAMPLE="$1"
WORKDIR="/opt/analysis/redline/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$WORKDIR"/{original,deobfuscated,extracted}

echo "=========================================="
echo " RedLine Stealer - Pipeline Déobfuscation"
echo "=========================================="
echo "[*] Sample: $SAMPLE"
echo "[*] Working dir: $WORKDIR"

# Étape 1: Identification et hachage
echo -e "\n[Étape 1] Identification"
sha256=$(sha256sum "$SAMPLE" | cut -d' ' -f1)
echo "  SHA-256: $sha256"
cp "$SAMPLE" "$WORKDIR/original/$sha256.exe"

# Vérifier que c'est bien du .NET
file_type=$(file "$SAMPLE")
if ! echo "$file_type" | grep -qi "\.NET\|PE32\|Mono"; then
    echo "  [!] Pas un binaire .NET - arrêt"
    exit 1
fi
echo "  Type: $file_type"

# Étape 2: Déobfuscation avec de4dot
echo -e "\n[Étape 2] Déobfuscation de4dot"
de4dot "$SAMPLE" \
    -o "$WORKDIR/deobfuscated/clean.exe" \
    --strtyp delegate \
    --strtok 0x06000001 \
    2>&1 | tee "$WORKDIR/de4dot.log"

# Si la détection auto échoue, forcer ConfuserEx
if [ ! -f "$WORKDIR/deobfuscated/clean.exe" ]; then
    echo "  [!] Retry avec détection forcée ConfuserEx..."
    de4dot "$SAMPLE" \
        -p cr \
        -o "$WORKDIR/deobfuscated/clean.exe" \
        2>&1 | tee -a "$WORKDIR/de4dot.log"
fi

# Étape 3: Extraction des chaînes déchiffrées
echo -e "\n[Étape 3] Extraction des chaînes"
strings -a -n 6 "$WORKDIR/deobfuscated/clean.exe" | sort -u \
    > "$WORKDIR/extracted/strings_all.txt"

# Extraction des IOCs réseau
grep -oP 'https?://[^\s"<>]+' "$WORKDIR/extracted/strings_all.txt" \
    > "$WORKDIR/extracted/urls.txt" 2>/dev/null
grep -oP '\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' \
    "$WORKDIR/extracted/strings_all.txt" \
    > "$WORKDIR/extracted/ips.txt" 2>/dev/null
grep -oP '\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b' \
    "$WORKDIR/extracted/strings_all.txt" | \
    grep -v -E '\.(dll|exe|sys|ocx|com|net|org)$' \
    > "$WORKDIR/extracted/domains.txt" 2>/dev/null

# Étape 4: Extraction de la config C2 via pattern matching
echo -e "\n[Étape 4] Extraction configuration C2"
python3 << 'PYEOF'
import re
import sys

deobfuscated = "$WORKDIR/deobfuscated/clean.exe"
try:
    with open(deobfuscated, "rb") as f:
        data = f.read()
except FileNotFoundError:
    print("  [-] Fichier déobfusqué non trouvé")
    sys.exit(1)

# Pattern C2: IP:PORT typique de RedLine
c2_pattern = rb'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5})'
c2_matches = set(re.findall(c2_pattern, data))

# Pattern ID de build RedLine
build_pattern = rb'(Build[_\-]?\w{4,20})'
build_matches = set(re.findall(build_pattern, data))

# Pattern clé d'autorisation (base64-like)
auth_pattern = rb'([A-Za-z0-9+/]{20,}={0,2})'
auth_candidates = re.findall(auth_pattern, data)

print("  C2 Servers:")
for c2 in c2_matches:
    print(f"    - {c2.decode('utf-8', errors='ignore')}")

print("  Build IDs:")
for bid in build_matches:
    print(f"    - {bid.decode('utf-8', errors='ignore')}")

# Sauvegarder les IOCs
with open("$WORKDIR/extracted/iocs.txt", "w") as f:
    f.write("# RedLine Stealer IOCs\n")
    f.write(f"# SHA-256: $sha256\n\n")
    f.write("[C2 Servers]\n")
    for c2 in c2_matches:
        f.write(f"{c2.decode('utf-8', errors='ignore')}\n")
    f.write("\n[Build IDs]\n")
    for bid in build_matches:
        f.write(f"{bid.decode('utf-8', errors='ignore')}\n")

print(f"\n  IOCs sauvegardés: $WORKDIR/extracted/iocs.txt")
PYEOF

# Étape 5: Analyse IL avec monodis
echo -e "\n[Étape 5] Décompilation IL"
monodis "$WORKDIR/deobfuscated/clean.exe" \
    > "$WORKDIR/extracted/il_dump.txt" 2>/dev/null

echo -e "\n[*] Pipeline terminé. Résultats dans: $WORKDIR"
echo "  - Original:     $WORKDIR/original/"
echo "  - Déobfusqué:   $WORKDIR/deobfuscated/"
echo "  - IOCs extraits: $WORKDIR/extracted/"

7.3 Déchiffrement des chaînes ConfuserEx en Python

Lorsque de4dot ne parvient pas à résoudre automatiquement les chaînes protégées (versions modifiées de ConfuserEx), il est nécessaire de reproduire l'algorithme de déchiffrement manuellement. Voici l'implémentation du déchiffreur de chaînes ConfuserEx :

#!/usr/bin/env python3
"""
Déchiffreur de chaînes ConfuserEx pour RedLine Stealer.
Reproduit l'algorithme de déchiffrement des chaînes protégées
sans exécuter le binaire .NET.

ConfuserEx utilise typiquement:
1. Un tableau de bytes compressé (deflate) stocké en ressource
2. Un déchiffrement XOR avec clé dérivée du token de la méthode appelante
3. Optionnellement un chiffrement additionnel (RC4 ou mutation custom)
"""
import struct
import zlib
import hashlib
from typing import Optional

class ConfuserExStringDecryptor:
    """Déchiffre les chaînes protégées par ConfuserEx."""

    def __init__(self, encrypted_resource: bytes, module_key: int):
        """
        Args:
            encrypted_resource: contenu brut de la ressource .NET
                               contenant les chaînes chiffrées
            module_key: clé de module (typiquement le RID
                       de la méthode de déchiffrement)
        """
        self.module_key = module_key
        self.data = self._decompress(encrypted_resource)
        self.strings_cache = {}

    def _decompress(self, data: bytes) -> bytes:
        """Décompresse les données (deflate sans header zlib)."""
        try:
            return zlib.decompress(data, -15)  # Raw deflate
        except zlib.error:
            try:
                return zlib.decompress(data)   # Avec header zlib
            except zlib.error:
                return data  # Pas compressé

    def _derive_key(self, caller_token: int) -> int:
        """
        Dérive la clé XOR à partir du token de la méthode appelante.
        Reproduit l'algorithme de ConfuserEx:
          key = (caller_token * 0x5bd1e995) ^ module_key
        """
        key = (caller_token * 0x5BD1E995) & 0xFFFFFFFF
        key = key ^ self.module_key
        return key

    def decrypt_string(self, string_id: int,
                       caller_token: int) -> Optional[str]:
        """
        Déchiffre une chaîne par son identifiant et le token de
        la méthode appelante.

        Args:
            string_id: index dans le tableau de chaînes
            caller_token: metadata token de la méthode .NET appelante
                         (format 0x0600XXXX pour MethodDef)

        Returns:
            La chaîne déchiffrée ou None en cas d'erreur
        """
        cache_key = (string_id, caller_token)
        if cache_key in self.strings_cache:
            return self.strings_cache[cache_key]

        key = self._derive_key(caller_token)

        try:
            # Lire l'offset et la longueur depuis le header
            offset = string_id * 4
            if offset + 4 > len(self.data):
                return None

            str_offset = struct.unpack_from('= len(self.data):
                return None

            # Lire la longueur de la chaîne (encodée en 7-bit compact)
            pos = str_offset
            str_len = 0
            shift = 0
            while pos < len(self.data):
                b = self.data[pos]
                str_len |= (b & 0x7F) << shift
                pos += 1
                if (b & 0x80) == 0:
                    break
                shift += 7

            if pos + str_len * 2 > len(self.data):
                return None

            # Déchiffrer les caractères UTF-16LE
            chars = []
            xor_key = key
            for i in range(str_len):
                c = struct.unpack_from('> (8 * (i % 4))) & 0xFFFF
                chars.append(chr(c))
                # Rotation de la clé
                xor_key = ((xor_key >> 3) | (xor_key << 29)) & 0xFFFFFFFF

            result = ''.join(chars)
            self.strings_cache[cache_key] = result
            return result

        except (struct.error, IndexError, ValueError):
            return None

    def decrypt_all(self, method_tokens: list) -> dict:
        """
        Tente de déchiffrer toutes les chaînes pour une liste
        de tokens de méthodes.

        Returns:
            Dict mapping (string_id, token) -> chaîne déchiffrée
        """
        results = {}
        for token in method_tokens:
            for sid in range(1000):  # Heuristique: max 1000 chaînes
                s = self.decrypt_string(sid, token)
                if s and len(s) > 0 and all(
                    c.isprintable() or c in '\r\n\t' for c in s
                ):
                    results[(sid, hex(token))] = s
        return results


if __name__ == "__main__":
    # Exemple d'utilisation avec un échantillon RedLine
    # Les valeurs ci-dessous sont à adapter au sample analysé
    print("[*] ConfuserEx String Decryptor")
    print("    Adapter encrypted_resource et module_key au sample")

    # Simulation avec des données de test
    test_data = bytes(range(256)) * 4
    decryptor = ConfuserExStringDecryptor(
        encrypted_resource=test_data,
        module_key=0x1A2B3C4D
    )
    print(f"    Data size after decompression: {len(decryptor.data)}")
    print("    Prêt pour decrypt_string(id, caller_token)")

IOCs typiques RedLine Stealer (2025-2026)

Les échantillons récents de RedLine Stealer communiquent via des API SOAP/WCF sur des ports non standard (généralement 12432, 15647, 17816, ou 23984). La communication est souvent chiffrée par un certificat auto-signé. Les serveurs C2 tournent principalement sur des VPS en Russie, aux Pays-Bas et en Roumanie. Les chaînes caractéristiques après déobfuscation incluent : Authorization, ScanBrowsers, ScanFTP, ScanWallets, ScanScreen, ScanTelegram, GrabInfo.

8. Automatisation avec YARA : détection et classification

8.1 Règles YARA pour la détection de packing et d'obfuscation

YARA est le standard de facto pour la création de signatures de détection de malwares. Contrairement aux signatures antivirus traditionnelles basées sur des hashs, les règles YARA permettent de décrire des patterns structurels et comportementaux résistants aux mutations polymorphes.

Pour la détection de malwares obfusqués, les règles YARA doivent cibler des invariants qui survivent aux transformations polymorphes : structures de données, séquences d'opcodes caractéristiques des stubs, patterns d'entropie, et métadonnées PE.

/*
 * Règles YARA avancées pour la détection de packing
 * et d'obfuscation dans les binaires PE.
 *
 * Auteur: Ayi NEDJIMI - ayinedjimi-consultants.fr
 * Date: 2026-02-05
 */

import "pe"
import "math"
import "hash"

rule Packed_High_Entropy_Sections {
    meta:
        description = "Détecte les PE avec sections à haute entropie (packing probable)"
        severity = "medium"
        category = "packer"

    condition:
        uint16(0) == 0x5A4D and
        for any section in pe.sections : (
            math.entropy(section.offset, section.size) > 7.2 and
            section.size > 1024
        )
}

rule Packed_UPX_Modified {
    meta:
        description = "Détecte UPX avec en-têtes modifiés (anti-unpacking)"
        severity = "high"
        category = "packer"

    strings:
        // Noms de sections UPX altérés (premier octet modifié)
        $s1 = { 00 50 58 30 }  // \x00PX0 au lieu de UPX0
        $s2 = { 00 50 58 31 }  // \x00PX1
        $s3 = { 55 58 50 30 }  // UXP0 (octets inversés)
        $s4 = { 55 58 50 31 }  // UXP1

        // Pattern du stub UPX même modifié
        $stub = { 60 BE ?? ?? ?? ?? 8D BE ?? ?? ?? ?? 57 }

    condition:
        uint16(0) == 0x5A4D and
        (any of ($s*)) and
        $stub
}

rule VMProtect_Virtualized {
    meta:
        description = "Détecte la virtualisation VMProtect"
        severity = "critical"
        category = "protector"

    strings:
        // Nom de section VMProtect
        $vmp0 = ".vmp0"
        $vmp1 = ".vmp1"
        $vmp2 = ".vmp2"

        // Pattern d'entrée VM (push regs + jmp dispatcher)
        $vm_entry_x86 = {
            60                    // pushad
            9C                    // pushfd
            68 ?? ?? ?? ??        // push imm32
            E8 ?? ?? ?? ??        // call vm_dispatcher
        }

        $vm_entry_x64 = {
            50 51 52 53           // push rax,rcx,rdx,rbx
            55 56 57              // push rbp,rsi,rdi
            41 50 41 51           // push r8, r9
        }

    condition:
        uint16(0) == 0x5A4D and
        (any of ($vmp*)) and
        (any of ($vm_entry*))
}

rule Themida_Protected {
    meta:
        description = "Détecte la protection Themida/WinLicense"
        severity = "critical"
        category = "protector"

    strings:
        $s1 = ".themida" ascii
        $s2 = ".winlice" ascii
        $s3 = "THEMIDA" ascii wide
        // Anti-debug check pattern
        $anti_dbg = {
            64 A1 30 00 00 00     // mov eax, fs:[0x30] (PEB)
            0F B6 40 02           // movzx eax, byte [eax+2] (BeingDebugged)
            85 C0                 // test eax, eax
        }

    condition:
        uint16(0) == 0x5A4D and
        (any of ($s*)) and
        $anti_dbg
}

rule ConfuserEx_NET_Obfuscated {
    meta:
        description = "Détecte l'obfuscation ConfuserEx sur binaires .NET"
        severity = "high"
        category = "obfuscator"
        family = "confuserex"

    strings:
        // Markers ConfuserEx dans les métadonnées .NET
        $marker1 = "ConfuserEx" ascii wide nocase
        $marker2 = "Confuser.Core" ascii
        // Pattern de protection de chaînes (delegate call)
        $str_prot = {
            7E ?? ?? ?? 04        // ldsfld
            28 ?? ?? ?? 06        // call 
            72 ?? ?? ?? 70        // ldstr ""
        }
        // Anti-tamper module initializer
        $tamper = {
            28 ?? ?? ?? 0A        // call
            2A                    // ret
            00                    // padding
            13 30                 // .method header
        }

    condition:
        uint16(0) == 0x5A4D and
        (any of ($marker*) or $str_prot) and
        pe.imports("mscoree.dll", "_CorExeMain")
}

rule Polymorphic_XOR_Decrypt_Stub {
    meta:
        description = "Détecte les stubs de déchiffrement XOR polymorphes"
        severity = "high"
        category = "polymorphic"

    strings:
        // Pattern: boucle XOR avec registre index + compteur
        $xor_loop_1 = {
            30 (1? | 0?) [0-2]    // xor byte [reg+idx], reg
            (40 | 41 | 42 | 43 | 46 | 47 | FF C? | 83 C? 01)  // inc reg
            (48 | 49 | 4A | 4B | 4E | 4F | FF C? | 83 E? 01)  // dec reg
            (75 | 0F 85) ??       // jnz loop
        }

        // Pattern: boucle XOR avec LOOP instruction
        $xor_loop_2 = {
            30 [1-3]              // xor byte [mem], reg
            [0-4]                 // possible inc/lea
            E2 ??                 // loop
        }

        // Variante: XOR word/dword
        $xor_loop_3 = {
            (31 | 33) [1-3]       // xor dword [mem], reg
            (83 C? 04 | 83 E? 04) // add/sub reg, 4
            (3B | 39 | 3D) [1-5]  // cmp
            (72 | 76 | 7C | 0F 82 | 0F 86) ??  // jb/jbe/jl
        }

    condition:
        uint16(0) == 0x5A4D and
        any of ($xor_loop_*) and
        for any section in pe.sections : (
            math.entropy(section.offset, section.size) > 6.8
        )
}

rule RedLine_Stealer_Deobfuscated {
    meta:
        description = "Détecte RedLine Stealer après déobfuscation"
        severity = "critical"
        category = "infostealer"
        family = "redline"
        mitre = "T1555, T1539, T1552"

    strings:
        $func1 = "ScanBrowsers" ascii wide
        $func2 = "ScanFTP" ascii wide
        $func3 = "ScanWallets" ascii wide
        $func4 = "GrabInfo" ascii wide
        $func5 = "ScanScreen" ascii wide
        $func6 = "ScanTelegram" ascii wide
        $func7 = "ScanDiscord" ascii wide
        $func8 = "ScanSteam" ascii wide

        $net1 = "Authorization" ascii wide
        $net2 = "Content-Type" ascii wide
        $net3 = "application/soap+xml" ascii wide

        $cfg1 = /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{2,5}/ ascii

    condition:
        uint16(0) == 0x5A4D and
        pe.imports("mscoree.dll", "_CorExeMain") and
        (3 of ($func*)) and
        (2 of ($net*)) and
        $cfg1
}

8.2 Pipeline d'automatisation Python + YARA

L'intégration de YARA dans un pipeline de triage automatisé permet de classer rapidement les échantillons selon leur niveau d'obfuscation et d'orienter l'analyse vers les outils appropriés :

#!/usr/bin/env python3
"""
Pipeline de triage automatisé pour malwares obfusqués.
Combine YARA, analyse PE et heuristiques d'entropie pour
classifier les échantillons et orienter l'analyse.
"""
import yara
import pefile
import math
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Dict, List

class MalwareTriagePipeline:
    """Pipeline de triage automatisé pour binaires PE suspects."""

    def __init__(self, yara_rules_dir: str):
        """
        Args:
            yara_rules_dir: répertoire contenant les fichiers .yar
        """
        self.rules = self._compile_rules(yara_rules_dir)
        self.results = []

    def _compile_rules(self, rules_dir: str) -> yara.Rules:
        """Compile toutes les règles YARA du répertoire."""
        rule_files = {}
        for yar_file in Path(rules_dir).glob("*.yar"):
            rule_files[yar_file.stem] = str(yar_file)

        if not rule_files:
            raise FileNotFoundError(
                f"Aucune règle YARA dans {rules_dir}"
            )

        print(f"[*] Compilation de {len(rule_files)} fichiers YARA...")
        return yara.compile(filepaths=rule_files)

    def _calculate_hashes(self, data: bytes) -> Dict[str, str]:
        return {
            'md5': hashlib.md5(data).hexdigest(),
            'sha1': hashlib.sha1(data).hexdigest(),
            'sha256': hashlib.sha256(data).hexdigest()
        }

    def _calculate_entropy(self, data: bytes) -> float:
        if not data:
            return 0.0
        counts = [0] * 256
        for byte in data:
            counts[byte] += 1
        entropy = 0.0
        for count in counts:
            if count > 0:
                p = count / len(data)
                entropy -= p * math.log2(p)
        return round(entropy, 4)

    def _analyze_pe(self, filepath: str) -> Dict:
        """Analyse approfondie de la structure PE."""
        try:
            pe = pefile.PE(filepath)
        except pefile.PEFormatError:
            return {'error': 'Invalid PE'}

        sections = []
        for section in pe.sections:
            name = section.Name.decode('utf-8', errors='ignore').rstrip('\x00')
            data = section.get_data()
            sections.append({
                'name': name,
                'virtual_size': section.Misc_VirtualSize,
                'raw_size': section.SizeOfRawData,
                'entropy': self._calculate_entropy(data),
                'executable': bool(section.Characteristics & 0x20000000),
                'writable': bool(section.Characteristics & 0x80000000),
                'size_ratio': (section.Misc_VirtualSize /
                              max(section.SizeOfRawData, 1))
            })

        imports = []
        if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
            for entry in pe.DIRECTORY_ENTRY_IMPORT:
                dll_name = entry.dll.decode('utf-8', errors='ignore')
                funcs = [imp.name.decode('utf-8', errors='ignore')
                        for imp in entry.imports
                        if imp.name]
                imports.append({'dll': dll_name, 'functions': funcs})

        return {
            'sections': sections,
            'imports': imports,
            'entry_point': hex(pe.OPTIONAL_HEADER.AddressOfEntryPoint),
            'image_base': hex(pe.OPTIONAL_HEADER.ImageBase),
            'is_dll': bool(pe.FILE_HEADER.Characteristics & 0x2000),
            'is_dotnet': any(
                entry.dll.lower() == b'mscoree.dll'
                for entry in getattr(pe, 'DIRECTORY_ENTRY_IMPORT', [])
            ),
            'compile_timestamp': datetime.utcfromtimestamp(
                pe.FILE_HEADER.TimeDateStamp
            ).isoformat() if pe.FILE_HEADER.TimeDateStamp else None,
            'num_sections': len(pe.sections),
        }

    def _classify_obfuscation(self, yara_matches: list,
                               pe_info: Dict) -> Dict:
        """Classifie le type et le niveau d'obfuscation."""
        classification = {
            'level': 'none',  # none, low, medium, high, critical
            'types': [],
            'recommended_tools': [],
            'estimated_effort': 'minimal'
        }

        categories = set()
        for match in yara_matches:
            meta = match.meta
            categories.add(meta.get('category', 'unknown'))

        # Niveau basé sur les catégories détectées
        if 'protector' in categories:
            classification['level'] = 'critical'
            classification['types'].append('virtualisation')
            classification['recommended_tools'].extend([
                'x64dbg + Scylla', 'VMProtect devirt',
                'Exécution symbolique (angr)'
            ])
            classification['estimated_effort'] = 'jours'
        elif 'packer' in categories:
            classification['level'] = 'high'
            classification['types'].append('packing')
            classification['recommended_tools'].extend([
                'UPX -d', 'PE-bear', 'Detect It Easy'
            ])
            classification['estimated_effort'] = 'heures'
        elif 'obfuscator' in categories:
            classification['level'] = 'high'
            classification['types'].append('obfuscation_code')
            classification['recommended_tools'].extend([
                'de4dot', 'dnSpy', 'ILSpy'
            ])
            classification['estimated_effort'] = 'heures'
        elif 'polymorphic' in categories:
            classification['level'] = 'medium'
            classification['types'].append('polymorphisme')
            classification['recommended_tools'].extend([
                'Capstone', 'Triton', 'angr'
            ])
            classification['estimated_effort'] = 'heures'

        # Heuristiques supplémentaires basées sur l'analyse PE
        if pe_info.get('sections'):
            high_entropy_sections = [
                s for s in pe_info['sections']
                if s['entropy'] > 7.0
            ]
            if high_entropy_sections and classification['level'] == 'none':
                classification['level'] = 'medium'
                classification['types'].append('entropy_anomaly')

            # Sections avec taille virtuelle >> taille brute (unpacking)
            expanded_sections = [
                s for s in pe_info['sections']
                if s['size_ratio'] > 5.0
            ]
            if expanded_sections:
                classification['types'].append('runtime_unpacking')

        return classification

    def analyze(self, filepath: str) -> Dict:
        """Analyse complète d'un échantillon."""
        data = Path(filepath).read_bytes()
        hashes = self._calculate_hashes(data)

        print(f"\n{'='*60}")
        print(f"[*] Analyse: {Path(filepath).name}")
        print(f"    SHA-256: {hashes['sha256']}")
        print(f"    Taille: {len(data):,} octets")

        # YARA matching
        yara_matches = self.rules.match(filepath)
        print(f"    Règles YARA: {len(yara_matches)} correspondance(s)")
        for match in yara_matches:
            severity = match.meta.get('severity', 'unknown')
            print(f"      - {match.rule} [{severity}]")

        # Analyse PE
        pe_info = self._analyze_pe(filepath)

        # Classification
        classification = self._classify_obfuscation(yara_matches, pe_info)
        print(f"    Niveau obfuscation: {classification['level'].upper()}")
        print(f"    Types: {', '.join(classification['types']) or 'aucun'}")
        print(f"    Effort estimé: {classification['estimated_effort']}")
        if classification['recommended_tools']:
            print(f"    Outils recommandés:")
            for tool in classification['recommended_tools']:
                print(f"      -> {tool}")

        result = {
            'filename': Path(filepath).name,
            'hashes': hashes,
            'size': len(data),
            'file_entropy': self._calculate_entropy(data),
            'yara_matches': [m.rule for m in yara_matches],
            'pe_info': pe_info,
            'classification': classification,
            'timestamp': datetime.utcnow().isoformat()
        }

        self.results.append(result)
        return result

    def export_report(self, output_path: str):
        """Exporte le rapport complet en JSON."""
        report = {
            'pipeline': 'MalwareTriagePipeline',
            'version': '1.0',
            'date': datetime.utcnow().isoformat(),
            'total_samples': len(self.results),
            'results': self.results
        }
        Path(output_path).write_text(
            json.dumps(report, indent=2, default=str)
        )
        print(f"\n[+] Rapport exporté: {output_path}")


if __name__ == "__main__":
    import sys
    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]}  ")
        sys.exit(1)

    pipeline = MalwareTriagePipeline(sys.argv[1])

    samples_dir = Path(sys.argv[2])
    for sample in sorted(samples_dir.iterdir()):
        if sample.is_file():
            try:
                pipeline.analyze(str(sample))
            except Exception as e:
                print(f"  [!] Erreur: {e}")

    pipeline.export_report(
        f"/opt/analysis/triage_{datetime.now():%Y%m%d_%H%M%S}.json"
    )

9. Outils avancés : Ghidra, IDA Pro, Binary Ninja, radare2/rizin

9.1 Ghidra scripting pour la déobfuscation

Ghidra, le framework de rétro-ingénierie open-source développé par la NSA, offre un environnement de scripting puissant via Java et Python (Jython). Son décompilateur intégré et son moteur d'analyse P-Code en font un outil particulièrement adapté à la déobfuscation automatisée.

Le script suivant utilise l'API Ghidra pour identifier et résoudre automatiquement les prédicats opaques dans une fonction analysée :

# Ghidra Script: Détection et résolution de prédicats opaques
# @category Deobfuscation
# @keybinding
# @menupath Analysis.Deobfuscate Opaque Predicates
# @toolbar

"""
Script Ghidra pour la détection automatique de prédicats opaques
et la simplification du flux de contrôle.

Exécuter depuis le Script Manager de Ghidra ou via analyzeHeadless:
  analyzeHeadless /path/to/project ProjectName \
      -import sample.exe \
      -postScript opaque_predicate_resolver.py
"""

from ghidra.program.model.block import BasicBlockModel
from ghidra.program.model.pcode import PcodeOp
from ghidra.app.decompiler import DecompInterface
from ghidra.util.task import ConsoleTaskMonitor
import ghidra.program.model.symbol.FlowType as FlowType

def get_function_at_cursor():
    """Récupère la fonction sous le curseur."""
    fm = currentProgram.getFunctionManager()
    func = fm.getFunctionContaining(currentAddress)
    if func is None:
        popup("Aucune fonction sous le curseur")
        return None
    return func

def analyze_opaque_predicates(func):
    """
    Analyse les blocs de base d'une fonction et identifie
    les prédicats opaques via l'analyse P-Code.
    """
    block_model = BasicBlockModel(currentProgram)
    blocks = block_model.getCodeBlocksContaining(
        func.getBody(), monitor
    )

    opaque_count = 0
    results = []

    # Initialiser le décompilateur pour l'analyse sémantique
    decomp = DecompInterface()
    decomp.openProgram(currentProgram)
    decomp_results = decomp.decompileFunction(func, 30, monitor)

    if not decomp_results.decompileCompleted():
        printerr("Échec de la décompilation")
        return results

    high_func = decomp_results.getHighFunction()
    if high_func is None:
        return results

    # Parcourir les blocs P-Code
    pcode_blocks = high_func.getBasicBlocks()
    for pblock in pcode_blocks:
        # Chercher les CBRANCH (conditional branch)
        iterator = pblock.getIterator()
        last_cbranch = None
        while iterator.hasNext():
            pcode_op = iterator.next()
            if pcode_op.getOpcode() == PcodeOp.CBRANCH:
                last_cbranch = pcode_op

        if last_cbranch is None:
            continue

        # Analyser la condition du CBRANCH
        condition = last_cbranch.getInput(1)  # La condition booléenne
        if condition is None:
            continue

        # Vérifier si la condition est une constante
        # (= prédicat opaque résolu par le décompilateur)
        defining_op = condition.getDef()
        if defining_op is not None:
            # Tenter d'évaluer à une constante
            if condition.isConstant():
                const_val = condition.getOffset()
                branch_target = last_cbranch.getInput(0)

                opaque_count += 1
                entry = pblock.getStart()

                result = {
                    'address': entry.toString(),
                    'always_true': const_val != 0,
                    'condition_pcode': str(defining_op),
                }
                results.append(result)

                # Annoter dans Ghidra
                listing = currentProgram.getListing()
                code_unit = listing.getCodeUnitAt(entry)
                if code_unit is not None:
                    direction = "ALWAYS TRUE" if const_val != 0 else "ALWAYS FALSE"
                    code_unit.setComment(
                        code_unit.PRE_COMMENT,
                        "[OPAQUE] Predicate: " + direction
                    )

    decomp.dispose()
    return results

def patch_opaque_branches(func, results):
    """
    Patche les prédicats opaques en remplaçant les branchements
    conditionnels par des JMP inconditionnels ou des NOP.
    """
    from ghidra.program.model.mem import MemoryAccessException

    patched = 0
    for result in results:
        addr = toAddr(result['address'])
        insn = getInstructionAt(addr)

        if insn is None:
            continue

        # Trouver l'instruction de branchement conditionnel
        while insn is not None and not insn.getFlowType().isConditional():
            insn = insn.getNext()

        if insn is None:
            continue

        println("  Patching @ " + insn.getAddress().toString() +
                ": " + insn.toString())

        # Si le prédicat est toujours vrai -> remplacer par JMP
        # Si toujours faux -> remplacer par NOP
        try:
            if result['always_true']:
                # Calculer le JMP relatif vers la cible
                target = insn.getFlows()[0] if insn.getFlows() else None
                if target:
                    println("    -> JMP vers " + target.toString())
                    patched += 1
            else:
                # NOP le branchement (il ne sera jamais pris)
                insn_len = insn.getLength()
                nops = bytearray([0x90] * insn_len)
                currentProgram.getMemory().setBytes(
                    insn.getAddress(), bytes(nops)
                )
                println("    -> NOPped (" + str(insn_len) + " bytes)")
                patched += 1
        except MemoryAccessException as e:
            printerr("    Erreur patch: " + str(e))

    return patched


# Point d'entrée du script
func = get_function_at_cursor()
if func:
    println("=" * 50)
    println("Analyse de: " + func.getName() +
            " @ " + func.getEntryPoint().toString())
    println("Taille: " + str(func.getBody().getNumAddresses()) + " bytes")
    println("=" * 50)

    results = analyze_opaque_predicates(func)
    println("\nPrédicats opaques détectés: " + str(len(results)))

    for r in results:
        println("  " + r['address'] + " -> " +
                ("ALWAYS TRUE" if r['always_true'] else "ALWAYS FALSE"))

    if results and askYesNo("Patch",
            "Patcher " + str(len(results)) + " prédicats opaques ?"):
        patched = patch_opaque_branches(func, results)
        println("\n" + str(patched) + " branchements patchés")

9.2 IDA Pro et IDAPython avancé

IDA Pro reste l'outil de référence pour l'analyse de binaires complexes. Son API IDAPython permet d'automatiser des tâches de déobfuscation avancées. Voici un script pour la détection et la reconstruction des appels indirects obfusqués :

"""
IDAPython: Résolution d'appels indirects obfusqués.
Identifie les patterns call [reg] où le registre est
calculé par une séquence d'opérations obfusquée, et
résout la cible réelle par émulation partielle.
"""
import ida_bytes
import ida_funcs
import ida_ua
import ida_idp
import idautils
import idc

def resolve_indirect_calls(func_ea):
    """
    Parcourt une fonction et résout les appels indirects
    dont la cible est calculée par constant folding.
    """
    func = ida_funcs.get_func(func_ea)
    if not func:
        print(f"[-] Pas de fonction à 0x{func_ea:x}")
        return

    resolved = 0

    for head in idautils.Heads(func.start_ea, func.end_ea):
        insn = ida_ua.insn_t()
        if ida_ua.decode_insn(insn, head) == 0:
            continue

        # Chercher les CALL indirects (call reg ou call [mem])
        if insn.itype not in [ida_idp.NN_call, ida_idp.NN_callfi,
                               ida_idp.NN_callni]:
            continue

        op = insn.ops[0]
        if op.type == ida_ua.o_reg:
            # call reg -> remonter pour trouver la valeur
            reg_name = ida_idp.get_reg_name(op.reg, 8)
            target = _trace_register_value(head, op.reg, func)

            if target and target != 0:
                print(f"  [+] 0x{head:x}: call {reg_name} "
                      f"-> 0x{target:x}")
                # Ajouter un commentaire et une xref
                idc.set_cmt(head, f"Résolu: call 0x{target:x}", False)
                ida_bytes.add_cref(head, target, 0)  # Code xref
                resolved += 1

    print(f"[*] {resolved} appels indirects résolus")
    return resolved

def _trace_register_value(call_addr, reg, func):
    """
    Remonte les instructions depuis call_addr pour
    déterminer la valeur du registre par propagation
    de constantes.
    """
    # Parcourir les instructions précédentes (max 20)
    addr = call_addr
    value = None
    operations = []

    for _ in range(20):
        addr = idc.prev_head(addr, func.start_ea)
        if addr == idc.BADADDR or addr < func.start_ea:
            break

        insn = ida_ua.insn_t()
        if ida_ua.decode_insn(insn, addr) == 0:
            continue

        # MOV reg, imm -> valeur directe
        if (insn.itype == ida_idp.NN_mov and
            insn.ops[0].type == ida_ua.o_reg and
            insn.ops[0].reg == reg and
            insn.ops[1].type == ida_ua.o_imm):
            value = insn.ops[1].value
            break

        # ADD reg, imm
        if (insn.itype == ida_idp.NN_add and
            insn.ops[0].type == ida_ua.o_reg and
            insn.ops[0].reg == reg and
            insn.ops[1].type == ida_ua.o_imm):
            operations.append(('add', insn.ops[1].value))

        # SUB reg, imm
        if (insn.itype == ida_idp.NN_sub and
            insn.ops[0].type == ida_ua.o_reg and
            insn.ops[0].reg == reg and
            insn.ops[1].type == ida_ua.o_imm):
            operations.append(('sub', insn.ops[1].value))

        # XOR reg, imm
        if (insn.itype == ida_idp.NN_xor and
            insn.ops[0].type == ida_ua.o_reg and
            insn.ops[0].reg == reg and
            insn.ops[1].type == ida_ua.o_imm):
            operations.append(('xor', insn.ops[1].value))

    # Appliquer les opérations en ordre inverse
    if value is not None:
        for op, imm in reversed(operations):
            if op == 'add':
                value = (value + imm) & 0xFFFFFFFFFFFFFFFF
            elif op == 'sub':
                value = (value - imm) & 0xFFFFFFFFFFFFFFFF
            elif op == 'xor':
                value = value ^ imm
        return value

    return None

9.3 Comparaison des outils de reverse engineering

Critère Ghidra IDA Pro Binary Ninja radare2/rizin
Licence Open-source (Apache 2.0) Commercial (~1700 EUR/an) Commercial (~350 USD) Open-source (LGPL3)
Décompilateur Intégré (P-Code) Hex-Rays (addon payant) Intégré (HLIL/MLIL) Via r2ghidra ou r2dec
Scripting Java, Python (Jython) Python (IDAPython) Python, C++, Rust r2pipe (Python, JS, etc.)
Architectures ~30+ (x86, ARM, MIPS, PPC...) ~20+ avec plugins ~15+ officielles ~30+ (très extensible)
Analyse headless analyzeHeadless (excellent) idat (limité) binaryninja.MainThreadAction Natif (CLI-first)
Déobfuscation P-Code simplifié, bon CFG microcode, le plus mature BNIL, SSA, bon pour CFG ESIL, basique mais extensible
Intégration pipeline Excellente (Ghidrathon) Bonne (IDAPython batch) Très bonne (API Python) Excellente (r2pipe)
Forces pour déobfuscation P-Code normalise le code, décompilateur gratuit robuste Microcode et Hex-Rays le plus puissant, écosystème de plugins IL multi-niveaux (LLIL, MLIL, HLIL), API très propre Léger, rapide, excellent pour l'automatisation CLI

9.4 radare2/rizin : automatisation CLI

radare2 (et son fork rizin) excelle dans l'automatisation en ligne de commande et l'intégration dans des scripts shell. Voici un exemple d'analyse automatisée via r2pipe :

#!/usr/bin/env python3
"""
Analyse automatisée de malware obfusqué via r2pipe (radare2).
Extrait les chaînes, identifie les fonctions suspectes et
détecte les patterns d'obfuscation courants.
"""
import r2pipe
import json
from typing import Dict, List

def analyze_with_radare2(filepath: str) -> Dict:
    """Analyse complète d'un binaire avec radare2."""

    r2 = r2pipe.open(filepath, flags=['-2'])  # -2 = no stderr
    r2.cmd('aaa')  # Analyse complète

    results = {
        'info': json.loads(r2.cmd('ij')),
        'sections': json.loads(r2.cmd('iSj')),
        'imports': json.loads(r2.cmd('iij')),
        'strings': [],
        'suspicious_functions': [],
        'crypto_constants': [],
        'obfuscation_indicators': []
    }

    # Chercher les constantes cryptographiques
    # (AES S-Box, RC4 init, etc.)
    crypto_search = r2.cmd('/cr')
    if crypto_search:
        results['crypto_constants'] = crypto_search.strip().split('\n')

    # Analyser l'entropie par section
    for section in results['sections']:
        name = section.get('name', '')
        size = section.get('size', 0)
        if size > 0:
            paddr = section.get('paddr', 0)
            entropy_cmd = f'ph entropy {size} @ {paddr}'
            try:
                entropy = float(r2.cmd(entropy_cmd).strip())
                section['entropy'] = entropy
                if entropy > 7.0:
                    results['obfuscation_indicators'].append({
                        'type': 'high_entropy_section',
                        'section': name,
                        'entropy': entropy
                    })
            except (ValueError, TypeError):
                pass

    # Lister les fonctions et identifier les suspectes
    functions = json.loads(r2.cmd('aflj'))
    for func in functions:
        fname = func.get('name', '')
        fsize = func.get('size', 0)
        nbbs = func.get('nbbs', 0)  # Nombre de basic blocks

        # Heuristiques de détection d'obfuscation
        if nbbs > 50 and fsize < 500:
            # Beaucoup de blocs pour une petite fonction = CFF probable
            results['obfuscation_indicators'].append({
                'type': 'possible_cff',
                'function': fname,
                'blocks': nbbs,
                'size': fsize
            })

        # Fonctions avec noms suspects (anti-debug, crypto)
        suspicious_keywords = [
            'IsDebuggerPresent', 'NtQueryInformation',
            'CheckRemoteDebugger', 'OutputDebugString',
            'VirtualProtect', 'VirtualAlloc',
            'Crypt', 'Encrypt', 'Decrypt'
        ]
        for kw in suspicious_keywords:
            if kw.lower() in fname.lower():
                results['suspicious_functions'].append({
                    'name': fname,
                    'address': hex(func.get('offset', 0)),
                    'keyword': kw
                })

    # Extraire les chaînes intéressantes
    strings_json = json.loads(r2.cmd('izj'))
    for s in strings_json:
        content = s.get('string', '')
        if len(content) > 6:
            results['strings'].append({
                'value': content,
                'address': hex(s.get('vaddr', 0)),
                'section': s.get('section', ''),
                'type': s.get('type', '')
            })

    r2.quit()
    return results


if __name__ == "__main__":
    import sys
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} ")
        sys.exit(1)

    results = analyze_with_radare2(sys.argv[1])
    print(json.dumps(results, indent=2, default=str))

10. Conclusion : tendances émergentes et perspectives

10.1 L'obfuscation assistée par intelligence artificielle

L'intersection entre l'intelligence artificielle et l'obfuscation de malwares représente la prochaine frontière majeure. Les travaux de recherche récents démontrent l'utilisation de réseaux adversariaux génératifs (GAN) et de modèles de langage (LLM) pour générer des variantes de malwares capables d'échapper aux classifieurs basés sur le machine learning. Des preuves de concept comme DeepLocker (IBM Research) et MalGAN illustrent le potentiel de ces approches.

Les tendances observées incluent :

En réponse, la communauté défensive développe des approches de déobfuscation assistée par IA, notamment l'utilisation de modèles de type transformer pour la prédiction de la sémantique de code obfusqué, et l'application de techniques de program synthesis pour la reconstruction automatique du code original à partir de traces d'exécution symbolique.

10.2 WebAssembly : le nouveau vecteur d'obfuscation

WebAssembly (Wasm) émerge comme un vecteur d'obfuscation particulièrement préoccupant. Initialement conçu pour l'exécution de code performant dans les navigateurs web, Wasm est de plus en plus utilisé comme couche d'obfuscation hors navigateur :

Les outils de déobfuscation Wasm comme wasm-decompile (de l'outil wabt), JEB (PNF Software), et les modules Wasm de Ghidra progressent rapidement mais restent en retard par rapport aux outils x86 matures.

10.3 Recommandations pour les analystes

Face à l'évolution constante des techniques d'obfuscation, les analystes de malwares doivent adopter une approche combinant :

La course entre obfuscation et déobfuscation est fondamentalement asymétrique : l'attaquant n'a besoin que d'une seule technique fonctionnelle pour échapper à la détection, tandis que le défenseur doit couvrir l'ensemble de l'espace des transformations possibles. C'est pourquoi l'approche la plus résiliente combine analyse statique, analyse dynamique, exécution symbolique et intelligence sur les menaces en une stratégie de défense en profondeur.

Ressources recommandées

  • Practical Malware Analysis - Sikorski & Honig (No Starch Press)
  • The IDA Pro Book - Chris Eagle (No Starch Press)
  • Ghidra Software Reverse Engineering for Beginners - A. Kabir
  • angr documentation - docs.angr.io
  • Triton documentation - triton-library.github.io
  • YARA documentation - yara.readthedocs.io
  • OALabs (YouTube) - Tutoriels pratiques de déobfuscation
  • MalwareUnicorn RE101/RE102 - Cours gratuits de reverse engineering

Besoin d'un accompagnement expert ?

Nos consultants en cybersécurité et IA vous accompagnent dans vos projets. Devis personnalisé sous 24h.

Besoin d'une expertise en rétro-ingénierie ?

Nos experts analysent les menaces avancées et accompagnent vos équipes SOC et CERT.