Pipeline de recherche avancé
Architecture d'un système de recherche moderne
Un système de recherche documentaire moderne basé sur les embeddings se compose de plusieurs couches interconnectées qui transforment les documents bruts en résultats pertinents. L'architecture typique comprend quatre phases distinctes : l'indexation, le stockage, la recherche et le post-traitement.
Composants d'une Architecture Complète
- Ingestion Layer : Parsing, extraction de texte (PDF, DOCX, HTML)
- Processing Layer : Chunking, cleaning, enrichissement métadonnées
- Embedding Layer : Génération de vecteurs sémantiques
- Storage Layer : Base vectorielle (Pinecone, Qdrant, FAISS)
- Retrieval Layer : Recherche de similarité, filtrage
- Ranking Layer : Reranking, fusion de scores
- Serving Layer : API, caching, monitoring
Cette architecture modulaire permet de faire évoluer chaque composant indépendamment. Par exemple, vous pouvez changer le modèle d'embedding sans modifier la logique de chunking, ou ajouter un layer de reranking sans impacter le stockage vectoriel.
Les différentes étapes du pipeline
Le pipeline de recherche documentaire se décompose en deux flux distincts : le flux d'indexation (offline) et le flux de recherche (online). Chaque flux possède ses propres optimisations et contraintes.
Flux d'Indexation (Offline)
- Document Parsing : Extraction du contenu textuel depuis formats variés (PDF, DOCX, HTML, Markdown)
- Text Cleaning : Suppression du bruit (headers/footers, numéros de page, caractères spéciaux)
- Chunking : Découpage en segments de 256-1024 tokens selon la stratégie choisie
- Metadata Enrichment : Ajout de métadonnées (source, date, auteur, section)
- Embedding Generation : Conversion des chunks en vecteurs (batch processing pour performance)
- Vector Storage : Insertion dans la base vectorielle avec indexation
Flux de Recherche (Online)
- Query Processing : Normalisation, expansion, reformulation de la requête utilisateur
- Query Embedding : Génération du vecteur de la requête (latence critique: 20-50ms)
- Vector Search : Recherche des k vecteurs les plus similaires (k=20-100 typiquement)
- Metadata Filtering : Application de filtres (date, source, permissions)
- Reranking : Réordonnancement fin avec cross-encoder (optionnel)
- Result Formatting : Préparation de la réponse finale (top-k, scores, highlights)
Latences Typiques en Production
- Query embedding : 20-50ms
- Vector search (10M docs) : 30-100ms
- Reranking (top-100) : 200-500ms
- Total end-to-end : 250-650ms
Retrieval vs Ranking
La distinction entre retrieval (récupération) et ranking (classement) est fondamentale pour comprendre l'architecture de recherche moderne. Ces deux phases répondent à des objectifs différents avec des compromis performance/précision distincts.
Critère | Retrieval (Bi-encoder) | Ranking (Cross-encoder) |
---|---|---|
Objectif | Récupérer rapidement des candidats pertinents | Classer finement les meilleurs candidats |
Volume traité | Millions à milliards de documents | 10-100 documents candidats |
Latence | 30-100ms sur 10M docs | 200-500ms sur 100 docs |
Architecture | Encodage indépendant (query ⊥ doc) | Encodage joint (query + doc ensemble) |
Précision | Recall@100 : 85-95% | NDCG@10 : 92-98% |
Scalabilité | Excellente (pré-calcul des embeddings) | Limitée (calcul à la volée) |
Exemple concret : Pour une requête "vulnérabilités zero-day 2025", le retrieval récupère 100 documents potentiellement pertinents en 50ms via recherche vectorielle. Le reranking analyse ensuite les 100 paires (query, document) avec un cross-encoder pour produire le top-10 final en 300ms supplémentaires.
Schéma d'architecture type
Voici une architecture de référence pour un système de recherche documentaire en production supportant 1000+ requêtes/seconde sur 10M+ documents :
┌─────────────────────────────────────────────────────────────┐
│ INDEXATION (OFFLINE) │
├─────────────────────────────────────────────────────────────┤
│ Documents → Parser → Chunker → Embedder → Vector DB │
│ (S3) (Tika) (LangChain) (batch) (Qdrant) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ RECHERCHE (ONLINE) │
├─────────────────────────────────────────────────────────────┤
│ User Query │
│ ↓ │
│ Query Processor (expansion, reformulation) │
│ ↓ │
│ Query Embedder (20-50ms) │
│ ↓ │
│ Vector Search (k=100, 30-100ms) → Metadata Filter │
│ ↓ │
│ Cross-Encoder Reranking (top-100 → top-10, 200-500ms) │
│ ↓ │
│ Response (JSON + scores + highlights) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ COMPOSANTS TRANSVERSES │
├─────────────────────────────────────────────────────────────┤
│ • Redis (cache résultats fréquents, TTL 1h) │
│ • PostgreSQL (métadonnées, users, permissions) │
│ • Prometheus + Grafana (métriques latence, QPS) │
│ • OpenTelemetry (tracing distribué) │
└─────────────────────────────────────────────────────────────┘
Cette architecture sépare clairement les chemins chauds (recherche temps-réel) et froids (indexation batch), permettant d'optimiser indépendamment la latence utilisateur et le débit d'ingestion.
Reranking et amélioration de la pertinence
Qu'est-ce que le reranking ?
Le reranking (ou réordonnancement) est une technique de post-traitement qui consiste à réorganiser les résultats d'une première recherche (retrieval) en utilisant un modèle plus précis mais plus coûteux. C'est l'équivalent d'un "second avis" expert sur une présélection de candidats.
Pourquoi le Reranking est-il Nécessaire ?
Les modèles de retrieval (bi-encoders) encodent les requêtes et documents indépendamment, sans interaction entre eux. Ils manquent donc des signaux subtils de pertinence. Les cross-encoders du reranking encodent conjointement query+document, capturant des interactions fines (ex: négations, nuances sémantiques) impossibles à détecter avec des embeddings séparés.
Gains Typiques du Reranking
- NDCG@10 : +5 à +15 points (ex: 0.82 → 0.94)
- MRR (Mean Reciprocal Rank) : +10 à +25 points
- Precision@5 : +8 à +18 points
Exemple : Sur une requête "effets secondaires ibuprofène enfants", le retrieval renvoie 100 documents mentionnant ces termes. Le reranking détecte que le document classé 27e mentionne spécifiquement les interactions médicamenteuses critiques chez les enfants et le remonte en position 2.
Modèles de reranking (cross-encoders)
Les cross-encoders sont des modèles Transformer qui prennent en entrée la concaténation de la requête et du document, produisant un score de pertinence unique. Voici les modèles les plus performants en 2025 :
Modèle | Paramètres | NDCG@10 (MS MARCO) | Latence (1 doc) | Cas d'usage |
---|---|---|---|---|
ms-marco-MiniLM-L-6-v2 | 22M | 0.88 | 5ms | Production rapide |
ms-marco-MiniLM-L-12-v2 | 33M | 0.91 | 12ms | Équilibre perf/qualité |
bge-reranker-large | 335M | 0.94 | 40ms | Multilingual |
mxbai-rerank-large-v1 | 435M | 0.95 | 55ms | Maximum précision |
Cohere Rerank v3 | ?? (API) | 0.96 | 150ms | API cloud (128 langues) |
Recommandation Production
Pour la plupart des applications, bge-reranker-large offre le meilleur compromis : NDCG@10 de 0.94, support multilingual (100+ langues), latence acceptable (40ms/doc), et open-source (déployable on-premise). Pour reranker 100 docs en parallèle sur GPU, comptez 200-300ms total.
Bi-encoder vs cross-encoder
La différence fondamentale entre bi-encoder et cross-encoder réside dans leur architecture d'attention et leur mode de calcul.
Bi-Encoder (Retrieval)
- Architecture : Deux encodeurs indépendants (ou le même utilisé deux fois)
- Processus : embed(query) et embed(document) calculés séparément
- Comparaison : Similarité cosinus entre les deux vecteurs
- Avantage : Documents pré-encodés → recherche ultra-rapide (ANN search)
- Limite : Pas d'interaction query-document dans le modèle
Cross-Encoder (Reranking)
- Architecture : Un seul encodeur sur [CLS] query [SEP] document [SEP]
- Processus : Attention croisée complète entre tous les tokens
- Comparaison : Score de pertinence direct (0-1 ou logit)
- Avantage : Capture des interactions fines (négations, dépendances)
- Limite : Doit réencoder chaque paire (query, doc) → lent à grande échelle
Exemple Concret : Négation
Requête : "médicaments sans ordonnance pour insomnie"
Document A : "La mélatonine est disponible sans ordonnance"
Document B : "Les benzodiazépines nécessitent une ordonnance médicale"
Bi-encoder : Score similaire (0.78 vs 0.76) car les mots-clés sont présents
Cross-encoder : Doc A=0.94, Doc B=0.31 → détecte correctement la négation dans B
En pratique, on combine les deux : bi-encoder pour retrieval rapide (top-100 sur 10M docs), puis cross-encoder pour reranking précis (top-10 final).
Implémentation avec Sentence-Transformers
La bibliothèque sentence-transformers
fournit une API unifiée pour le retrieval et le reranking. Voici une implémentation complète avec gestion d'erreurs et optimisations :
import torch
from sentence_transformers import SentenceTransformer, CrossEncoder
from typing import List, Tuple
import numpy as np
class DocumentSearchEngine:
def __init__(self, retrieval_model: str = "BAAI/bge-large-en-v1.5",
reranking_model: str = "BAAI/bge-reranker-large"):
"""
Système de recherche avec retrieval + reranking.
Args:
retrieval_model: Bi-encoder pour recherche vectorielle
reranking_model: Cross-encoder pour reranking fin
"""
# Utiliser GPU si disponible
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Charger les modèles
self.retriever = SentenceTransformer(retrieval_model, device=self.device)
self.reranker = CrossEncoder(reranking_model, max_length=512, device=self.device)
def encode_documents(self, documents: List[str], batch_size: int = 32) -> np.ndarray:
"""
Encoder des documents pour l'indexation.
Args:
documents: Liste de textes à encoder
batch_size: Taille des batchs pour traitement GPU
Returns:
Matrice d'embeddings (n_docs, dim)
"""
embeddings = self.retriever.encode(
documents,
batch_size=batch_size,
show_progress_bar=True,
normalize_embeddings=True, # Pour similarité cosinus
convert_to_numpy=True
)
return embeddings
def search(self, query: str, documents: List[str], doc_embeddings: np.ndarray,
k_retrieval: int = 100, k_final: int = 10) -> List[Tuple[int, float]]:
"""
Recherche en deux étapes : retrieval + reranking.
Args:
query: Requête utilisateur
documents: Corpus complet
doc_embeddings: Embeddings pré-calculés des documents
k_retrieval: Nombre de candidats du retrieval (100-200)
k_final: Nombre de résultats finaux après reranking
Returns:
Liste de (doc_index, reranking_score) triée par pertinence
"""
# Étape 1: Retrieval (bi-encoder)
query_embedding = self.retriever.encode(
query,
normalize_embeddings=True,
convert_to_numpy=True
)
# Similarité cosinus (car normalisé)
similarities = doc_embeddings @ query_embedding
top_k_indices = np.argsort(similarities)[::-1][:k_retrieval]
# Étape 2: Reranking (cross-encoder)
candidate_docs = [documents[i] for i in top_k_indices]
pairs = [[query, doc] for doc in candidate_docs]
# Calcul des scores de reranking
rerank_scores = self.reranker.predict(
pairs,
batch_size=32,
show_progress_bar=False
)
# Réordonnancement final
reranked_indices = np.argsort(rerank_scores)[::-1][:k_final]
final_results = [
(top_k_indices[i], float(rerank_scores[i]))
for i in reranked_indices
]
return final_results
# Exemple d'utilisation
if __name__ == "__main__":
# Corpus de documents
documents = [
"Les embeddings transforment le texte en vecteurs numériques.",
"Le reranking améliore la précision des résultats de recherche.",
"FAISS est une bibliothèque pour la recherche vectorielle rapide.",
"Les cross-encoders sont plus précis mais plus lents que les bi-encoders."
]
# Initialiser le moteur
engine = DocumentSearchEngine()
# Indexation (une seule fois)
print("Indexation des documents...")
doc_embeddings = engine.encode_documents(documents)
# Recherche
query = "Comment améliorer la qualité des résultats de recherche ?"
results = engine.search(query, documents, doc_embeddings, k_retrieval=4, k_final=2)
print(f"\nRequête: {query}")
print("\nTop-2 résultats après reranking:")
for rank, (doc_idx, score) in enumerate(results, 1):
print(f" {rank}. [Score: {score:.3f}] {documents[doc_idx]}")
Résultat attendu : Le document sur le reranking obtient le score le plus élevé (0.92+), car le cross-encoder détecte la correspondance sémantique avec "améliorer la qualité" même si les termes exacts diffèrent.
Compromis performance vs précision
Le choix du nombre de candidats à reranker (k) est un compromis critique entre latence et qualité. Voici les résultats d'une étude sur MS MARCO avec bge-reranker-large :
k (retrieval) | NDCG@10 | Latence reranking | Latence totale | Recommandation |
---|---|---|---|---|
20 | 0.89 | 80ms | 150ms | Très rapide, qualité correcte |
50 | 0.92 | 200ms | 270ms | ⭐ Équilibre optimal |
100 | 0.94 | 400ms | 470ms | Haute précision |
200 | 0.945 | 800ms | 870ms | Gains marginaux |
Stratégie de Production
- Applications interactives (chatbots, Q&A) : k=20-50, latence <200ms
- Applications standard (knowledge base) : k=50-100, latence <500ms
- Applications batch (analytics, rapports) : k=200+, pas de contrainte latence
Au-delà de k=100, les gains de précision deviennent marginaux (<0.5%) mais la latence double. La règle empirique : k=50 pour la plupart des cas.
Gains de pertinence mesurables
Voici des résultats réels de migration vers une architecture avec reranking sur trois projets clients différents :
Cas 1 : Knowledge Base Entreprise (2M documents)
- Avant : BGE-large-en-v1.5 seul, NDCG@10 = 0.78
- Après : + bge-reranker-large (k=100), NDCG@10 = 0.91
- Gain : +16.7% de précision, satisfaction utilisateur +28%
- Latence : 85ms → 420ms (acceptable pour usage interne)
Cas 2 : E-commerce Search (5M produits)
- Avant : OpenAI ada-002 + BM25 hybride, MRR = 0.72
- Après : + cross-encoder reranking (k=50), MRR = 0.84
- Gain : +16.7% MRR, taux de clic +19%, conversions +12%
- Latence : 120ms → 280ms (optimisé avec caching)
Cas 3 : Support Client RAG (500K tickets)
- Avant : Cohere embed-multilingual-v3, Recall@20 = 0.81
- Après : + Cohere Rerank v3, Recall@20 = 0.93
- Gain : +14.8% recall, réduction temps résolution -22%
- Latence : 95ms → 245ms (via API Cohere)
Conclusion : Le reranking apporte systématiquement +10 à +20% d'amélioration sur les métriques de pertinence, avec un coût latence de 150-350ms supplémentaires. Le ROI est positif dès que la qualité des résultats impacte directement le business (conversions, productivité, satisfaction).
Query expansion et reformulation
Techniques de query expansion
La query expansion consiste à enrichir ou reformuler la requête utilisateur pour améliorer le recall de la recherche. C'est particulièrement utile pour les requêtes courtes, ambigües ou contenant du jargon.
1. Expansion par Synonymes et Termes Associés
Ajoute des synonymes et termes sémantiquement proches à la requête originale.
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
class SynonymExpander:
def __init__(self, vocabulary: List[str]):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.vocabulary = vocabulary
self.vocab_embeddings = self.model.encode(vocabulary, normalize_embeddings=True)
def expand_query(self, query: str, top_k: int = 3) -> List[str]:
"""
Ajoute les k termes les plus similaires du vocabulaire.
"""
query_emb = self.model.encode(query, normalize_embeddings=True)
similarities = self.vocab_embeddings @ query_emb
top_indices = np.argsort(similarities)[::-1][:top_k]
return [self.vocabulary[i] for i in top_indices]
# Exemple
vocab = ["voiture", "automobile", "véhicule", "transport", "moto", "camion"]
expander = SynonymExpander(vocab)
expanded = expander.expand_query("voiture", top_k=3)
print(expanded) # ['automobile', 'véhicule', 'camion']
2. Multi-Query avec LLM
Utilise un LLM pour générer plusieurs reformulations de la requête, puis fusionne les résultats.
from openai import OpenAI
from typing import List
def generate_multi_queries(query: str, n: int = 3) -> List[str]:
"""
Génère n reformulations de la requête.
"""
client = OpenAI()
prompt = f"""Génère {n} reformulations différentes de la requête suivante,
en variant les termes et l'angle d'approche :
Requête : \"{query}\"
Retourne uniquement les reformulations, une par ligne."""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
reformulations = response.choices[0].message.content.strip().split('\n')
return [q.strip() for q in reformulations if q.strip()]
# Exemple
query = "comment sécuriser Active Directory ?"
reformulations = generate_multi_queries(query, n=3)
# Résultats possibles :
# 1. "quelles sont les meilleures pratiques pour protéger Active Directory ?"
# 2. "audit de sécurité AD : par où commencer ?"
# 3. "vulnérabilités courantes dans Active Directory et remèdes"
On recherche ensuite avec chaque reformulation, puis on fusionne les résultats avec Reciprocal Rank Fusion (RRF) ou weighted averaging.
Expansion par synonymes et termes liés
Cette technique classique de NLP reste pertinente en complément des embeddings. Elle permet de capturer des variations lexicales que les modèles sémantiques peuvent parfois manquer.
Approches Disponibles
- WordNet : Base de données lexicale hiérarchique (anglais principalement)
- ConceptNet : Graphe de connaissances multilingual avec relations sémantiques
- Embeddings-based : Chercher les termes les plus proches dans l'espace vectoriel
- Domain-specific : Thésaurus métier (ex: MeSH pour médical, SNOMED)
from nltk.corpus import wordnet
import nltk
nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)
def expand_with_wordnet(query: str) -> List[str]:
"""
Expansion avec synonymes WordNet.
"""
words = query.lower().split()
expanded_terms = set(words) # Inclut termes originaux
for word in words:
# Récupérer tous les synsets (ensembles de synonymes)
synsets = wordnet.synsets(word)
for synset in synsets[:2]: # Limiter aux 2 premiers sens
for lemma in synset.lemmas()[:3]: # Top 3 synonymes par sens
expanded_terms.add(lemma.name().replace('_', ' '))
return list(expanded_terms)
# Exemple
query = "car repair"
expanded = expand_with_wordnet(query)
print(expanded)
# ['car', 'repair', 'automobile', 'auto', 'vehicle', 'fix', 'mend', 'restore']
Bonne Pratique
Ne pas étendre toutes les requêtes systématiquement. Utilisez l'expansion uniquement si : 1) La requête est courte (<5 mots) 2) Le recall initial est faible (<70%) 3) La requête contient des acronymes ou jargon spécifique
Multi-query avec LLM
La technique multi-query utilise un LLM pour générer plusieurs perspectives d'une même question, améliorant significativement le recall sur des requêtes complexes.
Pipeline Multi-Query Complet
from typing import List, Dict
import numpy as np
class MultiQueryRetriever:
def __init__(self, base_retriever, llm_client):
self.base_retriever = base_retriever
self.llm = llm_client
def generate_queries(self, original_query: str, n: int = 3) -> List[str]:
"""Génère n reformulations."""
prompt = f"""Tu es un expert en formulation de requêtes de recherche.
Génère {n} reformulations de cette question qui explorent différents angles :
Question originale : {original_query}
Formate ta réponse comme :
1. [première reformulation]
2. [deuxième reformulation]
3. [troisième reformulation]"""
response = self.llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
# Parser les reformulations
lines = response.choices[0].message.content.strip().split('\n')
queries = [original_query] # Toujours inclure l'originale
for line in lines:
if line.strip() and not line.startswith('#'):
# Retirer numérotation ("1. ", "- ", etc.)
clean = line.lstrip('0123456789.-) ').strip()
if clean:
queries.append(clean)
return queries[:n+1] # n reformulations + originale
def retrieve_with_fusion(self, query: str, k: int = 10) -> List[Dict]:
"""
Recherche multi-query avec Reciprocal Rank Fusion.
"""
# 1. Générer les reformulations
queries = self.generate_queries(query, n=3)
print(f"Requêtes générées : {len(queries)}")
# 2. Rechercher avec chaque requête
all_results = {}
for q in queries:
results = self.base_retriever.search(q, k=k*2) # Récupérer 2x plus
for rank, (doc_id, score) in enumerate(results, 1):
if doc_id not in all_results:
all_results[doc_id] = {'scores': [], 'ranks': []}
all_results[doc_id]['scores'].append(score)
all_results[doc_id]['ranks'].append(rank)
# 3. Reciprocal Rank Fusion (RRF)
k_rrf = 60 # Paramètre standard
for doc_id in all_results:
ranks = all_results[doc_id]['ranks']
rrf_score = sum(1.0 / (k_rrf + r) for r in ranks)
all_results[doc_id]['final_score'] = rrf_score
# 4. Trier par score RRF
sorted_results = sorted(
all_results.items(),
key=lambda x: x[1]['final_score'],
reverse=True
)[:k]
return [{'doc_id': doc_id, 'score': data['final_score']}
for doc_id, data in sorted_results]
# Exemple d'utilisation
retriever = MultiQueryRetriever(base_retriever, openai_client)
results = retriever.retrieve_with_fusion(
"Comment détecter une attaque par ransomware ?",
k=10
)
Gains Observés
- Recall@20 : +8 à +15 points vs recherche simple
- Diversité : Résultats couvrant plus d'aspects de la question
- Robustesse : Moins sensible à la formulation exacte
- Coût : +150-300ms latence, +3-4x appels LLM/recherche
HyDE (Hypothetical Document Embeddings)
HyDE (Hypothetical Document Embeddings) est une technique avancée qui inverse le problème : au lieu d'encoder la question, on demande au LLM de générer une réponse hypothétique, puis on encode cette réponse pour la recherche vectorielle.
Intuition de HyDE
Les questions et réponses ont des distributions sémantiques différentes. Une question courte ("c'est quoi HNSW?") s'embeddent différemment d'une réponse détaillée. HyDE génère un document hypothétique qui ressemble davantage aux vrais documents, améliorant ainsi la similarité vectorielle.
Implémentation HyDE
from openai import OpenAI
from sentence_transformers import SentenceTransformer
class HyDERetriever:
def __init__(self, vector_db, embedding_model="BAAI/bge-large-en-v1.5"):
self.db = vector_db
self.embedder = SentenceTransformer(embedding_model)
self.llm = OpenAI()
def generate_hypothetical_document(self, query: str) -> str:
"""
Génère un document hypothétique répondant à la question.
"""
prompt = f"""Tu es un expert technique. Réponds à cette question de manière
détaillée et factuelle (2-3 paragraphes) :
Question : {query}
Réponse détaillée :"""
response = self.llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3, # Faible pour réponses factuelles
max_tokens=300
)
return response.choices[0].message.content.strip()
def search_with_hyde(self, query: str, k: int = 10) -> List[Dict]:
"""
Recherche en 3 étapes : génération, embedding, recherche.
"""
# Étape 1: Générer document hypothétique
hypothetical_doc = self.generate_hypothetical_document(query)
print(f"Document hypothétique : {hypothetical_doc[:150]}...")
# Étape 2: Embedder le document (pas la question!)
doc_embedding = self.embedder.encode(hypothetical_doc, normalize_embeddings=True)
# Étape 3: Recherche vectorielle
results = self.db.search(doc_embedding, k=k)
return results
# Exemple comparatif
query = "Qu'est-ce que HNSW?"
# Méthode standard
query_emb = embedder.encode(query) # Vecteur court/général
standard_results = db.search(query_emb)
# Méthode HyDE
hyde = HyDERetriever(db, embedder)
hyde_results = hyde.search_with_hyde(query)
# Document hypothétique généré :
# "HNSW (Hierarchical Navigable Small World) est un algorithme de graphe
# pour la recherche approximative des plus proches voisins. Il construit
# une structure hiérarchique de graphes connectés permettant des recherches
# en O(log N) avec une précision de 95%+..."
Quand Utiliser HyDE ?
Scénario | HyDE Recommandé ? | Raison |
---|---|---|
Questions factuelles courtes | ✅ Oui | HyDE génère contexte riche |
Requêtes déjà détaillées | ❌ Non | Pas de gain, coût LLM inutile |
Domaine spécialisé (médical, juridique) | ⚠️ Prudence | Risque hallucinations LLM |
Multilingue | ✅ Oui | LLM traduit implicitement |
Temps-réel (<100ms) | ❌ Non | Latence LLM 200-500ms |
Résultats benchmark : Sur MS MARCO, HyDE améliore NDCG@10 de 0.78 → 0.85 (+9%) pour questions courtes, mais ne bat pas le reranking (0.91). Stratégie optimale : HyDE + reranking combinés pour 0.93 NDCG@10.
Query reformulation automatique
La reformulation automatique utilise des modèles spécialisés pour transformer une requête mal formulée en une version optimisée. C'est particulièrement utile pour les requêtes avec fautes de frappe, acronymes non résolus ou formulation vague.
Techniques de Reformulation
1. Correction Orthographique
from spellchecker import SpellChecker
def correct_spelling(query: str) -> str:
spell = SpellChecker(language='fr')
words = query.split()
corrected = [spell.correction(w) or w for w in words]
return ' '.join(corrected)
# Exemple
query = "coment sécuriser activ directory"
corrected = correct_spelling(query)
print(corrected) # "comment sécuriser active directory"
2. Expansion d'Acronymes
ACRONYM_MAP = {
"AD": "Active Directory",
"GPO": "Group Policy Object",
"SIEM": "Security Information Event Management",
"RAG": "Retrieval Augmented Generation",
"LLM": "Large Language Model"
}
def expand_acronyms(query: str) -> str:
words = query.split()
expanded = []
for word in words:
upper = word.upper()
if upper in ACRONYM_MAP:
expanded.append(f"{word} ({ACRONYM_MAP[upper]})")
else:
expanded.append(word)
return ' '.join(expanded)
# Exemple
query = "vulnérabilités AD et GPO"
expanded = expand_acronyms(query)
print(expanded)
# "vulnérabilités AD (Active Directory) et GPO (Group Policy Object)"
3. Reformulation avec LLM
def reformulate_with_llm(query: str) -> str:
"""
Reformule une requête vague en version précise.
"""
prompt = f"""Reformule cette requête de recherche pour la rendre plus précise
et efficace. Garde la même intention mais améliore la clarté.
Requête originale : {query}
Requête reformulée :"""
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
return response.choices[0].message.content.strip()
# Exemples
reformulate_with_llm("trucs pour AD")
# → "Techniques de sécurisation d'Active Directory"
reformulate_with_llm("erreur quand j'installe")
# → "messages d'erreur lors de l'installation de logiciels"
Pipeline complet : En production, combinez ces techniques en cascade : correction orthographique → expansion acronymes → reformulation LLM (si nécessaire). Cela améliore le recall de 15-25% sur les requêtes mal formulées.
Recherche hybride : dense + sparse
Limitations de la recherche purement sémantique
Bien que puissante, la recherche vectorielle pure présente des faiblesses fondamentales que les approches hybrides résolvent. Comprendre ces limitations est crucial pour concevoir un système robuste.
Problèmes de la Recherche Dense Seule
1. Mauvaise Performance sur les Correspondances Exactes
- Problème : Les embeddings ne capturent pas bien les identifiants exacts (codes, références, numéros)
- Exemple : Recherche "CVE-2024-1234" peut renvoyer d'autres CVE similaires mais pas celui-là
- Raison : Les numéros/codes sont embeddés sémantiquement, pas lexicalement
2. Sensibilité aux Variantes Lexicales
- Problème : Variantes orthographiques ("email" vs "e-mail") ou abréviations
- Exemple : "COVID-19" vs "Covid" vs "coronavirus" → embeddings différents
- Impact : Recall incomplet sur des termes pourtant identiques
3. Difficulté avec les Termes Rares
- Problème : Les mots techniques/rares sont mal représentés dans l'espace d'embedding
- Exemple : "Kerberoasting" (attaque AD spécifique) embeddé proche de "Kerberos" général
- Raison : Peu d'occurrences dans les données d'entraînement du modèle
4. Absence de Scoring Lexical
- Problème : Pas de prise en compte de la fréquence des termes (TF-IDF)
- Exemple : Un document mentionnant 10x "cybersecurité" pas nécessairement mieux classé
- Solution : BM25 pondère les occurrences multiples
Benchmark Comparatif : Dense vs Sparse vs Hybrid
Dataset | Dense seul (NDCG@10) | BM25 seul | Hybrid | Gain hybrid |
---|---|---|---|---|
MS MARCO | 0.78 | 0.72 | 0.82 | +5% |
Natural Questions | 0.81 | 0.68 | 0.84 | +4% |
Technical Docs | 0.74 | 0.79 | 0.86 | +9% |
Code Search | 0.69 | 0.82 | 0.88 | +7% |
Observation clé : Sur les corpus techniques (docs, code), BM25 surpasse souvent le dense seul, car les correspondances lexicales exactes sont critiques. L'hybride combine le meilleur des deux mondes.
Combiner recherche dense (embeddings) et sparse (BM25)
La recherche hybride exécute en parallèle une recherche dense (similarité vectorielle) et sparse (BM25), puis fusionne les résultats. Voici une implémentation complète avec Elasticsearch + Qdrant.
Architecture Hybride avec Elasticsearch et Qdrant
import asyncio
from elasticsearch import Elasticsearch
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Tuple
import numpy as np
class HybridSearchEngine:
def __init__(self, es_url: str = "http://localhost:9200",
qdrant_url: str = "http://localhost:6333"):
# Clients
self.es = Elasticsearch(es_url)
self.qdrant = QdrantClient(url=qdrant_url)
self.embedder = SentenceTransformer("BAAI/bge-large-en-v1.5")
# Noms des collections
self.es_index = "documents"
self.qdrant_collection = "documents_vectors"
def index_documents(self, documents: List[Dict[str, str]]):
"""
Indexe documents dans ES (BM25) et Qdrant (vectors).
Args:
documents: [{"id": "1", "text": "...", "metadata": {...}}, ...]
"""
# 1. Indexation Elasticsearch (BM25)
for doc in documents:
self.es.index(
index=self.es_index,
id=doc["id"],
document={"text": doc["text"], **doc.get("metadata", {})}
)
# 2. Génération embeddings
texts = [doc["text"] for doc in documents]
embeddings = self.embedder.encode(texts, normalize_embeddings=True)
# 3. Indexation Qdrant (vectors)
points = [
PointStruct(
id=int(doc["id"]),
vector=embeddings[i].tolist(),
payload={"text": doc["text"], **doc.get("metadata", {})}
)
for i, doc in enumerate(documents)
]
self.qdrant.upsert(collection_name=self.qdrant_collection, points=points)
print(f"Indexé {len(documents)} documents (ES + Qdrant)")
async def search_bm25(self, query: str, k: int = 100) -> List[Tuple[str, float]]:
"""Recherche BM25 avec Elasticsearch."""
result = self.es.search(
index=self.es_index,
body={
"query": {"match": {"text": query}},
"size": k
}
)
return [(hit["_id"], hit["_score"]) for hit in result["hits"]["hits"]]
async def search_vector(self, query: str, k: int = 100) -> List[Tuple[str, float]]:
"""Recherche vectorielle avec Qdrant."""
query_vector = self.embedder.encode(query, normalize_embeddings=True)
results = self.qdrant.search(
collection_name=self.qdrant_collection,
query_vector=query_vector.tolist(),
limit=k
)
return [(str(hit.id), hit.score) for hit in results]
async def hybrid_search(self, query: str, k: int = 10,
alpha: float = 0.5) -> List[Dict]:
"""
Recherche hybride avec fusion par pondération.
Args:
query: Requête utilisateur
k: Nombre de résultats finaux
alpha: Pondération (0=BM25 pur, 1=vector pur, 0.5=équilibré)
Returns:
Liste de {doc_id, score, text} triée par score hybride
"""
# Exécuter les deux recherches en parallèle
bm25_task = self.search_bm25(query, k=100)
vector_task = self.search_vector(query, k=100)
bm25_results, vector_results = await asyncio.gather(bm25_task, vector_task)
# Normaliser les scores (min-max scaling)
def normalize_scores(results: List[Tuple[str, float]]) -> Dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return {doc_id: 1.0 for doc_id, _ in results}
return {
doc_id: (score - min_s) / (max_s - min_s)
for doc_id, score in results
}
bm25_scores = normalize_scores(bm25_results)
vector_scores = normalize_scores(vector_results)
# Fusion des scores
all_doc_ids = set(bm25_scores.keys()) | set(vector_scores.keys())
hybrid_scores = {}
for doc_id in all_doc_ids:
bm25_s = bm25_scores.get(doc_id, 0.0)
vector_s = vector_scores.get(doc_id, 0.0)
hybrid_scores[doc_id] = (1 - alpha) * bm25_s + alpha * vector_s
# Trier et retourner top-k
sorted_docs = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)[:k]
# Récupérer les textes depuis Qdrant
results = []
for doc_id, score in sorted_docs:
doc = self.qdrant.retrieve(
collection_name=self.qdrant_collection,
ids=[int(doc_id)]
)[0]
results.append({
"doc_id": doc_id,
"score": score,
"text": doc.payload["text"]
})
return results
# Exemple d'utilisation
async def main():
engine = HybridSearchEngine()
# Indexation
docs = [
{"id": "1", "text": "Les embeddings convertissent le texte en vecteurs."},
{"id": "2", "text": "BM25 est un algorithme de ranking probabiliste."},
{"id": "3", "text": "La recherche hybride combine dense et sparse retrieval."}
]
engine.index_documents(docs)
# Recherche hybride
results = await engine.hybrid_search(
"comment combiner recherche sémantique et lexicale ?",
k=3,
alpha=0.5
)
for i, res in enumerate(results, 1):
print(f"{i}. [Score: {res['score']:.3f}] {res['text']}")
if __name__ == "__main__":
asyncio.run(main())
Cette implémentation exécute les deux recherches en parallèle (asyncio) pour minimiser la latence totale : ~70-120ms au lieu de 140-200ms en séquentiel.
Fusion des scores (RRF, weighted fusion)
Il existe plusieurs stratégies pour fusionner les résultats de recherches multiples. Chaque méthode a ses avantages selon le contexte.
1. Reciprocal Rank Fusion (RRF)
Méthode simple et robuste qui pondère par les rangs plutôt que les scores bruts. Idéal quand les échelles de scores sont incomparables.
def reciprocal_rank_fusion(results_list: List[List[Tuple[str, float]]],
k: int = 60) -> List[Tuple[str, float]]:
"""
Fusion RRF de plusieurs listes de résultats.
Args:
results_list: [[('doc1', score), ...], [('doc2', score), ...], ...]
k: Paramètre de pondération (60 est standard)
Returns:
Liste triée par score RRF
"""
rrf_scores = {}
for results in results_list:
for rank, (doc_id, _) in enumerate(results, 1):
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0.0
# Score RRF : somme des 1/(k+rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Trier par score décroissant
sorted_results = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results
# Exemple
bm25_results = [('doc1', 10.5), ('doc2', 8.3), ('doc3', 7.1)]
vector_results = [('doc2', 0.95), ('doc3', 0.89), ('doc1', 0.82)]
fused = reciprocal_rank_fusion([bm25_results, vector_results], k=60)
# Résultat : [('doc2', 0.0323), ('doc3', 0.0311), ('doc1', 0.0308)]
# doc2 est 1er car bien classé dans les deux (rank 2 et 1)
2. Weighted Score Fusion
Fusion par pondération des scores normalisés. Permet de privilégier une méthode sur l'autre.
def weighted_score_fusion(results_dict: Dict[str, List[Tuple[str, float]]],
weights: Dict[str, float]) -> List[Tuple[str, float]]:
"""
Fusion pondérée avec normalisation min-max.
Args:
results_dict: {'bm25': [(doc, score), ...], 'vector': [(doc, score), ...]}
weights: {'bm25': 0.3, 'vector': 0.7} # Doit sommer à 1.0
Returns:
Liste triée par score fusionné
"""
def normalize(results: List[Tuple[str, float]]) -> Dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return {doc: 1.0 for doc, _ in results}
return {doc: (s - min_s) / (max_s - min_s) for doc, s in results}
# Normaliser chaque source
normalized = {source: normalize(results)
for source, results in results_dict.items()}
# Fusionner avec pondération
all_docs = set().union(*[set(d.keys()) for d in normalized.values()])
fused_scores = {}
for doc in all_docs:
fused_scores[doc] = sum(
weights[source] * normalized[source].get(doc, 0.0)
for source in results_dict.keys()
)
return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
# Exemple
results = {
'bm25': [('doc1', 12.5), ('doc2', 9.8)],
'vector': [('doc2', 0.92), ('doc3', 0.88)]
}
weights = {'bm25': 0.3, 'vector': 0.7} # Privilégier le vectoriel
fused = weighted_score_fusion(results, weights)
# doc2 sera premier (présent dans les deux avec bon score vectoriel)
3. Distribution-Based Score Fusion (DBSF)
Méthode avancée qui normalise via la distribution statistique des scores (z-score).
import numpy as np
def dbsf_fusion(results_dict: Dict[str, List[Tuple[str, float]]],
weights: Dict[str, float]) -> List[Tuple[str, float]]:
"""
Fusion via z-score normalization.
"""
def zscore_normalize(results: List[Tuple[str, float]]) -> Dict[str, float]:
scores = np.array([s for _, s in results])
mean, std = scores.mean(), scores.std()
if std == 0:
return {doc: 0.0 for doc, _ in results}
return {doc: (s - mean) / std for doc, s in results}
# Normaliser chaque source
normalized = {source: zscore_normalize(results)
for source, results in results_dict.items()}
# Fusionner
all_docs = set().union(*[set(d.keys()) for d in normalized.values()])
fused_scores = {}
for doc in all_docs:
fused_scores[doc] = sum(
weights[source] * normalized[source].get(doc, 0.0)
for source in results_dict.keys()
)
return sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
Comparaison des Méthodes
Méthode | Avantages | Inconvénients | Quand utiliser |
---|---|---|---|
RRF | Simple, robuste, pas de normalisation | Ignore magnitude des scores | Scores incomparables entre sources |
Weighted | Contrôle précis, interprétable | Nécessite normalisation | Sources avec échelles similaires |
DBSF | Statistiquement rigoureux | Plus complexe, nécessite volume | Large corpus (>10K docs) |
Recommandation Production
RRF avec k=60 est le choix par défaut : simple, robuste, pas de paramètre à tuner. Utilisez weighted fusion si vous avez des données de validation pour optimiser les poids (ex: 30% BM25, 70% vector).
Implémentation pratique
Voici une implémentation production-ready avec gestion d'erreurs, métriques et caching.
import time
from functools import lru_cache
from typing import List, Dict, Optional
import hashlib
import logging
logger = logging.getLogger(__name__)
class ProductionHybridSearch:
def __init__(self, es_client, qdrant_client, embedder,
cache_size: int = 1000, cache_ttl: int = 3600):
self.es = es_client
self.qdrant = qdrant_client
self.embedder = embedder
self.cache_ttl = cache_ttl
# Métriques
self.metrics = {
'total_queries': 0,
'cache_hits': 0,
'avg_latency_ms': 0,
'errors': 0
}
def _cache_key(self, query: str, k: int, alpha: float) -> str:
"""Génère clé de cache."""
key_str = f"{query}|{k}|{alpha}"
return hashlib.md5(key_str.encode()).hexdigest()
@lru_cache(maxsize=1000)
def _get_query_embedding(self, query: str) -> tuple:
"""Cache les embeddings de requêtes."""
emb = self.embedder.encode(query, normalize_embeddings=True)
return tuple(emb) # Convertir en tuple pour hashable
def search(self, query: str, k: int = 10, alpha: float = 0.5,
filters: Optional[Dict] = None) -> Dict:
"""
Recherche hybride avec métriques et gestion d'erreurs.
Returns:
{
'results': [...],
'metadata': {
'latency_ms': float,
'num_results': int,
'from_cache': bool
}
}
"""
start_time = time.time()
self.metrics['total_queries'] += 1
try:
# Recherches parallèles avec timeout
bm25_results = self._search_bm25(query, k=100, filters=filters)
vector_emb = self._get_query_embedding(query)
vector_results = self._search_vector(list(vector_emb), k=100, filters=filters)
# Fusion RRF
fused = self._rrf_fusion([bm25_results, vector_results])
# Top-k final
final_results = fused[:k]
# Métriques
latency_ms = (time.time() - start_time) * 1000
self._update_metrics(latency_ms)
return {
'results': final_results,
'metadata': {
'latency_ms': round(latency_ms, 2),
'num_results': len(final_results),
'from_cache': False,
'fusion_method': 'RRF',
'alpha': alpha
}
}
except Exception as e:
self.metrics['errors'] += 1
logger.error(f"Erreur recherche hybride: {e}")
# Fallback : recherche vectorielle seule
return self._fallback_search(query, k)
def _search_bm25(self, query: str, k: int, filters: Optional[Dict]) -> List:
"""Recherche BM25."""
query_body = {"query": {"match": {"text": query}}, "size": k}
if filters:
query_body["query"] = {
"bool": {
"must": query_body["query"],
"filter": [{
"term": {field: value}
} for field, value in filters.items()]
}
}
result = self.es.search(index="documents", body=query_body, timeout="5s")
return [(hit["_id"], hit["_score"]) for hit in result["hits"]["hits"]]
def _search_vector(self, query_vector: List[float], k: int,
filters: Optional[Dict]) -> List:
"""Recherche vectorielle."""
search_params = {
"collection_name": "documents",
"query_vector": query_vector,
"limit": k
}
if filters:
search_params["query_filter"] = {
"must": [
{"key": field, "match": {"value": value}}
for field, value in filters.items()
]
}
results = self.qdrant.search(**search_params)
return [(str(hit.id), hit.score) for hit in results]
def _rrf_fusion(self, results_list: List[List], k: int = 60) -> List:
"""Reciprocal Rank Fusion."""
rrf_scores = {}
for results in results_list:
for rank, (doc_id, _) in enumerate(results, 1):
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0.0
rrf_scores[doc_id] += 1.0 / (k + rank)
return sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
def _fallback_search(self, query: str, k: int) -> Dict:
"""Recherche de secours en cas d'erreur."""
logger.warning("Utilisation du fallback (vector only)")
vector_emb = self._get_query_embedding(query)
results = self._search_vector(list(vector_emb), k, filters=None)
return {
'results': results[:k],
'metadata': {'fallback': True, 'method': 'vector_only'}
}
def _update_metrics(self, latency_ms: float):
"""Met à jour les métriques."""
n = self.metrics['total_queries']
current_avg = self.metrics['avg_latency_ms']
self.metrics['avg_latency_ms'] = (current_avg * (n-1) + latency_ms) / n
def get_metrics(self) -> Dict:
"""Retourne les métriques."""
return {
**self.metrics,
'cache_hit_rate': self.metrics['cache_hits'] / max(1, self.metrics['total_queries']),
'error_rate': self.metrics['errors'] / max(1, self.metrics['total_queries'])
}
Cas où l'hybride est supérieur
L'approche hybride montre des gains significatifs dans des scénarios spécifiques. Voici quand l'implémenter :
✅ Hybride Fortement Recommandé
- Documentation technique : Codes, références, identifiants exacts critiques
- Corpus multilingue : BM25 capture mots-clés multilingues invariants
- Recherche légale/médicale : Précision lexicale + contexte sémantique
- E-commerce : Noms produits exacts + recherche sémantique attributs
- Code search : Noms fonctions/variables + similarité logique
⚠️ Hybride Optionnel
- Contenu conversationnel : Dense seul souvent suffisant (forums, chats)
- Corpus homogène : Peu de variation lexicale, embeddings performants seuls
- Contrainte latence stricte : Hybride ajoute 50-100ms
❌ Hybride Non Recommandé
- Corpus très petit : <1000 docs, overhead pas justifié
- Recherche image/audio : Pas de représentation lexicale pertinente
- Ressources limitées : Maintenir 2 systèmes coûteux
Benchmark Comparatif : Dense vs Hybride par Domaine
Domaine | Dense NDCG@10 | Hybrid NDCG@10 | Gain | Verdict |
---|---|---|---|---|
Documentation technique | 0.74 | 0.86 | +16% | ⭐ Critique |
Code source | 0.69 | 0.88 | +28% | ⭐ Critique |
E-commerce | 0.81 | 0.89 | +10% | ✅ Recommandé |
Knowledge base entreprise | 0.78 | 0.84 | +8% | ✅ Recommandé |
Articles de blog | 0.83 | 0.86 | +4% | ⚠️ Optionnel |
Conversations/chats | 0.87 | 0.88 | +1% | ❌ Pas nécessaire |
Filtrage et métadonnées intelligentes
Pre-filtering vs post-filtering
Le choix entre pre-filtering (filtrer avant la recherche vectorielle) et post-filtering (filtrer après) a un impact majeur sur les performances et la qualité des résultats.
Pre-Filtering (Filtre en Amont)
- Méthode : La base vectorielle ne cherche que parmi les documents matchant les filtres
- Avantage : Recherche plus rapide (moins de vecteurs à comparer)
- Limite : Peut manquer de résultats si le filtre est trop restrictif
- Implémentation : Utilisé par Qdrant, Pinecone, Weaviate
# Exemple avec Qdrant
from qdrant_client.models import Filter, FieldCondition, MatchValue
# Pre-filtering : seuls les docs de 2024 sont cherchés
results = qdrant_client.search(
collection_name="documents",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="year",
match=MatchValue(value=2024)
),
FieldCondition(
key="category",
match=MatchValue(value="security")
)
]
),
limit=10
)
Post-Filtering (Filtre en Aval)
- Méthode : Recherche sur tout le corpus, puis filtre les résultats
- Avantage : Garantit de toujours obtenir k résultats (si existants)
- Limite : Plus lent (doit chercher plus de candidats)
- Implémentation : Filtrage applicatif post-recherche
# Post-filtering : cherche 100, filtre, garde top-10
raw_results = qdrant_client.search(
collection_name="documents",
query_vector=query_embedding,
limit=100 # Chercher plus large
)
# Filtrer en Python
filtered = [
r for r in raw_results
if r.payload.get("year") == 2024
and r.payload.get("category") == "security"
][:10] # Garder top-10
Stratégie Recommandée
Critère | Pre-Filtering | Post-Filtering |
---|---|---|
Sélectivité faible (>30% docs) | ✅ Optimal | ❌ Inefficace |
Sélectivité élevée (<5% docs) | ⚠️ Risque manque résultats | ✅ Plus sûr |
Latence critique | ✅ Plus rapide | ❌ Plus lent |
Qualité garantie | ❌ Peut manquer top-k | ✅ Toujours k résultats |
Filtres temporels et géographiques
Les filtres temporels et géographiques sont parmi les plus courants en production. Voici comment les implémenter efficacement.
Filtres Temporels
from datetime import datetime, timedelta
from qdrant_client.models import Filter, FieldCondition, Range
# Recherche sur les 30 derniers jours
thirty_days_ago = int((datetime.now() - timedelta(days=30)).timestamp())
results = qdrant_client.search(
collection_name="documents",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="timestamp",
range=Range(
gte=thirty_days_ago # Greater than or equal
)
)
]
),
limit=10
)
# Filtres temporels courants
filters_exemples = {
"Dernière semaine": Range(gte=now - 7*24*3600),
"Ce mois": Range(gte=first_day_of_month, lte=last_day_of_month),
"Année 2024": Range(gte=1704067200, lte=1735689599), # timestamps
"Avant 2020": Range(lte=1577836800)
}
Filtres Géographiques
from qdrant_client.models import Filter, FieldCondition, GeoRadius, GeoPoint
# Recherche dans un rayon de 10km autour de Paris
results = qdrant_client.search(
collection_name="locations",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="location",
geo_radius=GeoRadius(
center=GeoPoint(lat=48.8566, lon=2.3522), # Paris
radius=10000 # 10km en mètres
)
)
]
),
limit=10
)
# Filtre par polygone (zone géographique complexe)
from qdrant_client.models import GeoPolygon
paris_polygon = GeoPolygon(
exterior=GeoLineString(
points=[
GeoPoint(lat=48.9, lon=2.2),
GeoPoint(lat=48.9, lon=2.5),
GeoPoint(lat=48.8, lon=2.5),
GeoPoint(lat=48.8, lon=2.2),
GeoPoint(lat=48.9, lon=2.2) # Fermer le polygone
]
)
)
Filtres catégoriels
Les filtres sur catégories, tags ou métadonnées structurées sont essentiels pour affiner les résultats.
# Filtres multiples avec logique booléenne
from qdrant_client.models import Filter, FieldCondition, MatchAny, MatchValue
results = qdrant_client.search(
collection_name="documents",
query_vector=query_embedding,
query_filter=Filter(
must=[ # ET logique
FieldCondition(key="status", match=MatchValue(value="published")),
FieldCondition(
key="category",
match=MatchAny(any=["security", "compliance"]) # OU logique
)
],
must_not=[ # SAUF
FieldCondition(key="archived", match=MatchValue(value=True))
],
should=[ # BOOST (optionnel)
FieldCondition(key="featured", match=MatchValue(value=True))
]
),
limit=10
)
# Exemple complexe : e-commerce
e_commerce_filter = Filter(
must=[
FieldCondition(key="in_stock", match=MatchValue(value=True)),
FieldCondition(key="price", range=Range(gte=10, lte=100)),
FieldCondition(key="brand", match=MatchAny(any=["Nike", "Adidas"]))
],
must_not=[
FieldCondition(key="condition", match=MatchValue(value="used"))
]
)
Filtres dynamiques basés sur le contexte
Les filtres dynamiques s'adaptent automatiquement au contexte utilisateur (historique, préférences, permissions).
class ContextualSearchEngine:
def __init__(self, qdrant_client, embedder):
self.client = qdrant_client
self.embedder = embedder
def search_with_user_context(self, query: str, user_id: str,
k: int = 10) -> List[Dict]:
"""
Recherche avec filtres dynamiques basés sur le profil utilisateur.
"""
# 1. Récupérer le contexte utilisateur
user_profile = self._get_user_profile(user_id)
# 2. Construire les filtres dynamiquement
filters = self._build_dynamic_filters(user_profile)
# 3. Recherche vectorielle filtrée
query_vector = self.embedder.encode(query)
results = self.client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=filters,
limit=k
)
return results
def _build_dynamic_filters(self, user_profile: Dict) -> Filter:
"""
Construit les filtres selon le profil utilisateur.
"""
must_conditions = []
# Filtre permissions (sécurité)
must_conditions.append(
FieldCondition(
key="access_level",
range=Range(lte=user_profile["clearance_level"])
)
)
# Filtre département (visibilité)
must_conditions.append(
FieldCondition(
key="department",
match=MatchAny(any=user_profile["allowed_departments"])
)
)
# Filtre langue préférée
if user_profile.get("preferred_language"):
must_conditions.append(
FieldCondition(
key="language",
match=MatchValue(value=user_profile["preferred_language"])
)
)
# Filtre fraîcheur (si utilisateur préfère contenu récent)
if user_profile.get("prefer_recent"):
thirty_days_ago = int((datetime.now() - timedelta(days=30)).timestamp())
must_conditions.append(
FieldCondition(
key="updated_at",
range=Range(gte=thirty_days_ago)
)
)
return Filter(must=must_conditions)
def _get_user_profile(self, user_id: str) -> Dict:
"""
Récupère le profil depuis base de données ou cache.
"""
# Simuler récupération
return {
"clearance_level": 3,
"allowed_departments": ["engineering", "security"],
"preferred_language": "fr",
"prefer_recent": True
}
Impact sur les performances
Les filtres ont un impact direct sur la latence et la qualité. Voici des benchmarks réels sur un corpus de 10M documents avec Qdrant :
Type de filtre | Sélectivité | Latence (pre-filter) | Latence (post-filter) | Recommandation |
---|---|---|---|---|
Aucun filtre | 100% | 45ms | 45ms | Baseline |
Catégorie unique | 20% | 38ms | 120ms | ✅ Pre-filter |
Date range (30j) | 8% | 32ms | 180ms | ✅ Pre-filter |
Multi-filtres (3+) | 2% | 65ms | 250ms | ⚠️ Hybride |
Filtre très restrictif | 0.1% | 150ms* | 200ms | ❌ Post-filter |
* Le pre-filter très restrictif doit chercher dans beaucoup plus de candidats pour trouver k résultats
Règle d'Or du Filtrage
- Sélectivité >10% : Pre-filtering optimal
- Sélectivité 5-10% : Pre-filter avec over-fetching (chercher 2-3x plus)
- Sélectivité <5% : Post-filtering ou approche hybride
- Permissions critiques : Toujours pre-filter (sécurité)
Personnalisation des résultats
User embeddings et profils utilisateurs
Les user embeddings capturent les préférences et intérêts d'un utilisateur sous forme vectorielle, permettant une personnalisation sémantique de la recherche.
Construction d'un User Embedding
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
class UserEmbeddingBuilder:
def __init__(self, embedder, decay_days: int = 90):
self.embedder = embedder
self.decay_days = decay_days
def build_user_embedding(self, user_id: str,
user_history: List[Dict]) -> np.ndarray:
"""
Construit un embedding utilisateur depuis son historique.
Args:
user_history: [{"doc_id": ..., "timestamp": ..., "interaction_type": ...}]
Returns:
Vecteur représentant les intérêts de l'utilisateur
"""
embeddings_weighted = []
now = datetime.now()
for interaction in user_history:
# Récupérer l'embedding du document
doc_embedding = self._get_doc_embedding(interaction["doc_id"])
# Calculer le poids selon le type d'interaction
interaction_weight = self._get_interaction_weight(
interaction["interaction_type"]
)
# Appliquer décroissance temporelle
days_ago = (now - interaction["timestamp"]).days
time_decay = np.exp(-days_ago / self.decay_days)
# Poids final
weight = interaction_weight * time_decay
embeddings_weighted.append(doc_embedding * weight)
if not embeddings_weighted:
# Utilisateur nouveau : embedding neutre
return np.zeros(self.embedder.get_sentence_embedding_dimension())
# Moyenne pondérée des embeddings
user_embedding = np.mean(embeddings_weighted, axis=0)
# Normaliser
user_embedding = user_embedding / np.linalg.norm(user_embedding)
return user_embedding
def _get_interaction_weight(self, interaction_type: str) -> float:
"""
Pondération selon le type d'interaction.
"""
weights = {
"viewed": 1.0,
"clicked": 2.0,
"bookmarked": 3.0,
"shared": 4.0,
"downloaded": 5.0
}
return weights.get(interaction_type, 1.0)
def personalized_search(self, query: str, user_embedding: np.ndarray,
alpha: float = 0.7) -> List:
"""
Recherche personnalisée combinant query et profil utilisateur.
Args:
alpha: Pondération (1=query pur, 0=profil pur)
"""
# Embedding de la requête
query_embedding = self.embedder.encode(query, normalize_embeddings=True)
# Fusion query + user profile
personalized_embedding = (
alpha * query_embedding +
(1 - alpha) * user_embedding
)
personalized_embedding /= np.linalg.norm(personalized_embedding)
# Recherche avec l'embedding personnalisé
results = self.vector_db.search(
query_vector=personalized_embedding.tolist(),
limit=10
)
return results
Gains observés : La personnalisation via user embeddings améliore le CTR de 15-30% et la satisfaction utilisateur de 20-35% sur des systèmes de recommandation et recherche.
Historique de recherche et feedback
L'exploitation de l'historique de recherche et du feedback utilisateur permet d'améliorer continuellement la pertinence.
Système de Feedback Implicite et Explicite
from enum import Enum
class FeedbackType(Enum):
IMPLICIT_CLICK = 1 # Utilisateur a cliqué
IMPLICIT_TIME = 2 # Temps passé >30s
EXPLICIT_THUMBS_UP = 3 # Pouce haut
EXPLICIT_THUMBS_DOWN = 4 # Pouce bas
EXPLICIT_REPORT = 5 # Signalé comme non pertinent
class FeedbackTracker:
def __init__(self, db_client):
self.db = db_client
def track_feedback(self, user_id: str, query: str,
doc_id: str, feedback_type: FeedbackType,
rank_position: int):
"""
Enregistre le feedback utilisateur.
"""
feedback_entry = {
"user_id": user_id,
"query": query,
"doc_id": doc_id,
"feedback_type": feedback_type.name,
"rank_position": rank_position,
"timestamp": datetime.now(),
"weight": self._get_feedback_weight(feedback_type, rank_position)
}
self.db.insert("feedback", feedback_entry)
def _get_feedback_weight(self, feedback_type: FeedbackType,
rank_position: int) -> float:
"""
Calcule le poids du feedback.
"""
base_weights = {
FeedbackType.IMPLICIT_CLICK: 1.0,
FeedbackType.IMPLICIT_TIME: 1.5,
FeedbackType.EXPLICIT_THUMBS_UP: 3.0,
FeedbackType.EXPLICIT_THUMBS_DOWN: -2.0,
FeedbackType.EXPLICIT_REPORT: -5.0
}
# Pondérer selon la position (clic en position 1 vaut plus que position 10)
position_factor = 1.0 / np.log2(rank_position + 1)
return base_weights[feedback_type] * position_factor
def get_query_performance(self, query: str, days: int = 30) -> Dict:
"""
Analyse les performances d'une requête.
"""
feedbacks = self.db.query(
"SELECT * FROM feedback WHERE query = ? AND timestamp > ?",
(query, datetime.now() - timedelta(days=days))
)
return {
"total_searches": len(feedbacks),
"click_through_rate": self._compute_ctr(feedbacks),
"avg_rank_clicked": self._compute_avg_rank(feedbacks),
"satisfaction_score": self._compute_satisfaction(feedbacks)
}
def _compute_ctr(self, feedbacks: List[Dict]) -> float:
"""Taux de clic."""
clicks = [f for f in feedbacks if "CLICK" in f["feedback_type"]]
return len(clicks) / max(1, len(feedbacks))
def _compute_avg_rank(self, feedbacks: List[Dict]) -> float:
"""Position moyenne des clics."""
clicks = [f for f in feedbacks if "CLICK" in f["feedback_type"]]
if not clicks:
return 0.0
return np.mean([f["rank_position"] for f in clicks])
def _compute_satisfaction(self, feedbacks: List[Dict]) -> float:
"""Score de satisfaction global."""
if not feedbacks:
return 0.0
total_weight = sum(f["weight"] for f in feedbacks)
return total_weight / len(feedbacks)
Contextual search
La recherche contextuelle adapte les résultats selon le contexte de la session : navigation précédente, heure, appareil, etc.
class ContextualSearchEngine:
def search_with_context(self, query: str, context: Dict) -> List[Dict]:
"""
Recherche avec prise en compte du contexte.
Args:
context: {
"session_history": [...], # Pages vues dans la session
"time_of_day": "morning",
"device": "mobile",
"location": "Paris",
"previous_query": "..."
}
"""
# 1. Enrichir la requête avec le contexte
enriched_query = self._enrich_query(query, context)
# 2. Ajuster les filtres selon le contexte
filters = self._build_contextual_filters(context)
# 3. Recherche standard
results = self.search(enriched_query, filters=filters)
# 4. Réordonnancer selon le contexte
reranked = self._contextual_rerank(results, context)
return reranked
def _enrich_query(self, query: str, context: Dict) -> str:
"""
Enrichit la requête avec le contexte de session.
"""
# Si requête liée à la précédente
if context.get("previous_query") and self._is_followup(query):
return f"{context['previous_query']} {query}"
# Si navigation précédente suggère un thème
if context.get("session_history"):
theme = self._extract_theme(context["session_history"])
if theme:
return f"{query} {theme}"
return query
def _build_contextual_filters(self, context: Dict) -> Filter:
"""
Construit des filtres adaptés au contexte.
"""
filters = []
# Filtres selon l'appareil
if context.get("device") == "mobile":
# Privilégier les contenus courts sur mobile
filters.append(
FieldCondition(key="content_length", range=Range(lte=2000))
)
# Filtres selon l'heure
if context.get("time_of_day") == "morning":
# Le matin : actualités récentes
filters.append(
FieldCondition(
key="published_at",
range=Range(gte=int((datetime.now() - timedelta(days=1)).timestamp()))
)
)
# Filtres géographiques
if context.get("location"):
filters.append(
FieldCondition(key="region", match=MatchValue(value=context["location"]))
)
return Filter(must=filters) if filters else None
def _contextual_rerank(self, results: List, context: Dict) -> List:
"""
Réordonne selon le contexte de session.
"""
if not context.get("session_history"):
return results
# Calculer similarité avec documents vus
session_docs = context["session_history"]
for result in results:
# Boost si lié aux docs déjà vus
similarity_boost = self._compute_session_similarity(
result["doc_id"], session_docs
)
result["score"] *= (1 + 0.2 * similarity_boost) # Boost jusqu'à 20%
return sorted(results, key=lambda x: x["score"], reverse=True)
A/B testing et apprentissage continu
L'A/B testing permet d'optimiser progressivement les paramètres du système de recherche en mesurant l'impact réel sur les utilisateurs.
Framework d'A/B Testing pour Recherche
import hashlib
from typing import Dict, List
import random
class SearchABTestFramework:
def __init__(self):
self.experiments = {}
self.metrics_tracker = MetricsTracker()
def register_experiment(self, experiment_id: str,
variants: Dict[str, Dict]):
"""
Enregistre une expérience A/B.
Args:
experiment_id: Identifiant unique
variants: {
"control": {"reranking": False, "k": 50, "alpha": 0.5},
"variant_a": {"reranking": True, "k": 100, "alpha": 0.5},
"variant_b": {"reranking": True, "k": 50, "alpha": 0.7}
}
"""
self.experiments[experiment_id] = {
"variants": variants,
"traffic_split": self._equal_split(len(variants)),
"start_date": datetime.now(),
"status": "active"
}
def assign_variant(self, user_id: str, experiment_id: str) -> str:
"""
Assigne un utilisateur à une variante (stable hash-based).
"""
if experiment_id not in self.experiments:
return "control"
# Hash utilisateur + expérience pour assignation stable
hash_input = f"{user_id}:{experiment_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
variants = list(self.experiments[experiment_id]["variants"].keys())
variant_index = hash_value % len(variants)
return variants[variant_index]
def search_with_experiment(self, query: str, user_id: str,
experiment_id: str = "reranking_test"):
"""
Exécute une recherche selon la variante assignée.
"""
# Assigner variante
variant = self.assign_variant(user_id, experiment_id)
config = self.experiments[experiment_id]["variants"][variant]
# Exécuter recherche avec config spécifique
start_time = time.time()
results = self._execute_search(query, config)
latency = time.time() - start_time
# Tracker l'impression
self.metrics_tracker.track_impression(
experiment_id=experiment_id,
variant=variant,
user_id=user_id,
query=query,
results=results,
latency=latency
)
return results
def analyze_experiment(self, experiment_id: str,
min_samples: int = 1000) -> Dict:
"""
Analyse statistique des résultats.
"""
metrics = self.metrics_tracker.get_metrics(experiment_id)
# Calculer pour chaque variante
analysis = {}
for variant, data in metrics.items():
if len(data["impressions"]) < min_samples:
continue
analysis[variant] = {
"impressions": len(data["impressions"]),
"ctr": np.mean(data["clicks"]),
"avg_latency": np.mean(data["latencies"]),
"satisfaction": np.mean(data["satisfaction_scores"]),
"revenue_per_search": np.mean(data["conversions"]) * avg_order_value
}
# Test statistique (Student's t-test)
if "control" in analysis and len(analysis) > 1:
for variant in analysis:
if variant != "control":
p_value = self._compute_significance(
analysis["control"],
analysis[variant]
)
analysis[variant]["p_value"] = p_value
analysis[variant]["significant"] = p_value < 0.05
return analysis
def _execute_search(self, query: str, config: Dict) -> List:
"""Exécute recherche avec configuration spécifique."""
# Recherche de base
results = self.base_search(query, k=config["k"], alpha=config["alpha"])
# Reranking si activé
if config.get("reranking", False):
results = self.reranker.rerank(query, results)
return results
# Exemple d'utilisation
ab_framework = SearchABTestFramework()
# Expérience : tester l'impact du reranking
ab_framework.register_experiment(
"reranking_impact_2025",
variants={
"control": {"reranking": False, "k": 50},
"reranking_enabled": {"reranking": True, "k": 100}
}
)
# Après 2 semaines et 10K recherches
results = ab_framework.analyze_experiment("reranking_impact_2025")
# Résultats typiques :
# control: CTR 12%, latency 85ms, satisfaction 3.2/5
# reranking: CTR 15.5% (+29%), latency 320ms, satisfaction 3.8/5, p=0.002 (significatif!)
Métriques Clés à Tracker en A/B Testing
- CTR : Taux de clic sur les résultats
- Time to click : Temps avant le premier clic
- Satisfaction : Feedback utilisateur (thumbs up/down)
- Session success rate : % sessions avec conversion
- Latence p95 : 95e percentile de latence
- Zero-result rate : % requêtes sans résultats
Métriques et évaluation de qualité
Métriques classiques (Precision, Recall, F1)
Les métriques de base de l'information retrieval restent fondamentales pour évaluer la qualité d'un système de recherche.
Définitions et Formules
Precision (Précision)
Proportion de documents pertinents parmi ceux retournés :
Precision@k = (Nombre de docs pertinents dans top-k) / k
Exemple : Si sur 10 résultats, 7 sont pertinents → Precision@10 = 0.70
Recall (Rappel)
Proportion de documents pertinents retrouvés parmi tous ceux existants :
Recall@k = (Docs pertinents dans top-k) / (Total docs pertinents)
Exemple : Si 7 docs sur 20 pertinents totaux → Recall@10 = 0.35
F1-Score
Moyenne harmonique entre Precision et Recall :
F1 = 2 × (Precision × Recall) / (Precision + Recall)
Exemple : Precision=0.70, Recall=0.35 → F1 = 0.467
Implémentation Python
import numpy as np
from typing import List, Set
def precision_at_k(retrieved: List[str], relevant: Set[str], k: int) -> float:
"""
Calcule Precision@k.
Args:
retrieved: Liste des doc_ids retournés (ordonnés par score)
relevant: Ensemble des doc_ids pertinents (ground truth)
k: Nombre de résultats à considérer
Returns:
Precision@k entre 0 et 1
"""
retrieved_at_k = set(retrieved[:k])
num_relevant_retrieved = len(retrieved_at_k & relevant)
return num_relevant_retrieved / k if k > 0 else 0.0
def recall_at_k(retrieved: List[str], relevant: Set[str], k: int) -> float:
"""
Calcule Recall@k.
"""
retrieved_at_k = set(retrieved[:k])
num_relevant_retrieved = len(retrieved_at_k & relevant)
total_relevant = len(relevant)
return num_relevant_retrieved / total_relevant if total_relevant > 0 else 0.0
def f1_at_k(retrieved: List[str], relevant: Set[str], k: int) -> float:
"""
Calcule F1@k.
"""
prec = precision_at_k(retrieved, relevant, k)
rec = recall_at_k(retrieved, relevant, k)
if prec + rec == 0:
return 0.0
return 2 * (prec * rec) / (prec + rec)
# Exemple
retrieved_docs = ["doc1", "doc2", "doc3", "doc4", "doc5", "doc6", "doc7", "doc8", "doc9", "doc10"]
relevant_docs = {"doc1", "doc3", "doc5", "doc8", "doc12", "doc15", "doc20"} # 7 pertinents totaux
print(f"Precision@10: {precision_at_k(retrieved_docs, relevant_docs, 10):.3f}") # 0.400
print(f"Recall@10: {recall_at_k(retrieved_docs, relevant_docs, 10):.3f}") # 0.571
print(f"F1@10: {f1_at_k(retrieved_docs, relevant_docs, 10):.3f}") # 0.471
MAP (Mean Average Precision)
MAP est une métrique plus sophistiquée qui prend en compte l'ordre des résultats. Elle calcule la moyenne des précisions à chaque position où un document pertinent apparaît.
Formule et Calcul
AP = (1/|Relevant|) × Σ (Precision@k × rel(k))
où rel(k) = 1 si le doc en position k est pertinent, 0 sinon
MAP = Moyenne des AP sur toutes les requêtes
def average_precision(retrieved: List[str], relevant: Set[str]) -> float:
"""
Calcule Average Precision pour une requête.
"""
if not relevant:
return 0.0
precision_sum = 0.0
num_relevant_seen = 0
for k, doc_id in enumerate(retrieved, 1):
if doc_id in relevant:
num_relevant_seen += 1
# Precision à cette position
precision_at_k = num_relevant_seen / k
precision_sum += precision_at_k
return precision_sum / len(relevant)
def mean_average_precision(all_queries: List[Dict]) -> float:
"""
Calcule MAP sur plusieurs requêtes.
Args:
all_queries: [{"retrieved": [...], "relevant": {...}}, ...]
"""
aps = []
for query_results in all_queries:
ap = average_precision(
query_results["retrieved"],
query_results["relevant"]
)
aps.append(ap)
return np.mean(aps) if aps else 0.0
# Exemple
retrieved = ["doc2", "doc1", "doc4", "doc3", "doc5"] # doc1, doc3, doc5 pertinents
relevant = {"doc1", "doc3", "doc5"}
# Calcul manuel :
# Position 2 (doc1 pertinent) : P@2 = 1/2 = 0.50
# Position 4 (doc3 pertinent) : P@4 = 2/4 = 0.50
# Position 5 (doc5 pertinent) : P@5 = 3/5 = 0.60
# AP = (0.50 + 0.50 + 0.60) / 3 = 0.533
print(f"AP: {average_precision(retrieved, relevant):.3f}") # 0.533
NDCG (Normalized Discounted Cumulative Gain)
NDCG est la métrique de référence pour évaluer les systèmes de ranking. Elle tient compte des niveaux de pertinence (pas seulement binaire) et applique un rabais logarithmique aux positions basses.
Formule
DCG@k = Σ (rel_i / log2(i + 1))
pour i de 1 à k
NDCG@k = DCG@k / IDCG@k
où IDCG = DCG du classement idéal
import numpy as np
def dcg_at_k(relevances: List[float], k: int) -> float:
"""
Calcule DCG@k.
Args:
relevances: Scores de pertinence (ex: [3, 2, 3, 0, 1, 2]) pour chaque position
k: Nombre de positions à considérer
Returns:
DCG score
"""
relevances_at_k = relevances[:k]
gains = [(2**rel - 1) / np.log2(i + 2) for i, rel in enumerate(relevances_at_k)]
return sum(gains)
def ndcg_at_k(retrieved: List[str], relevance_scores: Dict[str, float], k: int) -> float:
"""
Calcule NDCG@k.
Args:
retrieved: Liste des doc_ids retournés (ordonnés)
relevance_scores: {doc_id: score_pertinence} (0=non pertinent, 3=très pertinent)
k: Position
Returns:
NDCG@k entre 0 et 1
"""
# DCG réel
actual_relevances = [relevance_scores.get(doc_id, 0.0) for doc_id in retrieved]
dcg = dcg_at_k(actual_relevances, k)
# IDCG (classement idéal)
ideal_relevances = sorted(relevance_scores.values(), reverse=True)
idcg = dcg_at_k(ideal_relevances, k)
return dcg / idcg if idcg > 0 else 0.0
# Exemple
retrieved_docs = ["doc1", "doc2", "doc3", "doc4", "doc5"]
relevance = {
"doc1": 3, # Très pertinent
"doc2": 1, # Peu pertinent
"doc3": 2, # Pertinent
"doc4": 0, # Non pertinent
"doc5": 3, # Très pertinent
"doc6": 3, # Non retourné mais très pertinent
}
print(f"NDCG@5: {ndcg_at_k(retrieved_docs, relevance, 5):.3f}") # ~0.850
# Un classement idéal serait : [doc1, doc5, doc6, doc3, doc2] (tous les 3 puis 2 puis 1)
MRR (Mean Reciprocal Rank)
MRR mesure à quelle position apparaît le premier document pertinent. Utile pour évaluer les systèmes de Q&A où une seule bonne réponse suffit.
RR = 1 / rank(premier_doc_pertinent)
MRR = Moyenne des RR sur toutes les requêtes
Exemple : Si le premier résultat pertinent est en position 3 → RR = 1/3 = 0.333
def reciprocal_rank(retrieved: List[str], relevant: Set[str]) -> float:
"""
Calcule Reciprocal Rank pour une requête.
"""
for rank, doc_id in enumerate(retrieved, 1):
if doc_id in relevant:
return 1.0 / rank
return 0.0 # Aucun document pertinent trouvé
def mean_reciprocal_rank(all_queries: List[Dict]) -> float:
"""
Calcule MRR sur plusieurs requêtes.
"""
rrs = []
for query_results in all_queries:
rr = reciprocal_rank(
query_results["retrieved"],
query_results["relevant"]
)
rrs.append(rr)
return np.mean(rrs) if rrs else 0.0
# Exemples
queries = [
{"retrieved": ["d1", "d2", "d3"], "relevant": {"d2"}}, # RR = 1/2 = 0.5
{"retrieved": ["d4", "d5", "d6"], "relevant": {"d4"}}, # RR = 1/1 = 1.0
{"retrieved": ["d7", "d8", "d9"], "relevant": {"d9"}}, # RR = 1/3 = 0.333
]
print(f"MRR: {mean_reciprocal_rank(queries):.3f}") # (0.5 + 1.0 + 0.333) / 3 = 0.611
Construire un dataset d'évaluation
Un dataset d'évaluation de qualité est crucial pour mesurer objectivement les améliorations. Voici comment en construire un.
Méthode 1 : Annotation Manuelle
import pandas as pd
from typing import List, Dict
class EvaluationDatasetBuilder:
def __init__(self):
self.annotations = []
def sample_queries(self, query_logs: List[str], n: int = 100) -> List[str]:
"""
Sélectionne n requêtes représentatives depuis les logs.
"""
# Stratégies de sampling
freq_dist = Counter(query_logs)
# Mix de requêtes fréquentes et rares
top_50 = [q for q, _ in freq_dist.most_common(50)] # 50% populaires
rare_50 = random.sample([q for q in freq_dist if freq_dist[q] <= 5], 50) # 50% rares
return top_50 + rare_50
def annotate_results(self, query: str, retrieved_docs: List[Dict]):
"""
Interface d'annotation pour un annotateur humain.
"""
print(f"\n=== Requête : {query} ===")
annotations = {}
for i, doc in enumerate(retrieved_docs[:20], 1): # Top-20 seulement
print(f"\n[{i}] {doc['title']}")
print(f" {doc['snippet'][:200]}...")
score = input(" Pertinence (0=non, 1=peu, 2=moyen, 3=très) : ")
annotations[doc['id']] = int(score)
self.annotations.append({
"query": query,
"relevance_scores": annotations,
"annotator": "annotator_1",
"timestamp": datetime.now()
})
def export_dataset(self, filepath: str):
"""Exporte le dataset au format JSON."""
with open(filepath, 'w') as f:
json.dump(self.annotations, f, indent=2, default=str)
# Processus complet
builder = EvaluationDatasetBuilder()
# 1. Sélectionner 100 requêtes représentatives
query_logs = load_query_logs() # Depuis base de données
sampled_queries = builder.sample_queries(query_logs, n=100)
# 2. Annoter (peut prendre 20-40h pour 100 requêtes x 20 docs)
for query in sampled_queries:
retrieved = search_engine.search(query, k=20)
builder.annotate_results(query, retrieved)
# 3. Exporter
builder.export_dataset("eval_dataset_v1.json")
Méthode 2 : Annotation Semi-Automatique avec LLM
def llm_assisted_annotation(query: str, doc: Dict) -> int:
"""
Utilise un LLM pour pré-annoter (puis validation humaine).
"""
prompt = f"""Tu es un expert en évaluation de pertinence.
Requête : {query}
Document : {doc['title']}
{doc['content'][:500]}
Évalue la pertinence sur cette échelle :
0 - Non pertinent (hors sujet)
1 - Peu pertinent (légèrement lié)
2 - Pertinent (répond partiellement)
3 - Très pertinent (répond complètement)
Réponds uniquement avec le chiffre."""
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.0
)
try:
score = int(response.choices[0].message.content.strip())
return max(0, min(3, score)) # Clamp entre 0-3
except:
return 1 # Score par défaut en cas d'erreur
# Coût typique : $0.01-0.03 par annotation avec GPT-4o
# Pour 100 queries x 20 docs = 2000 annotations ≈ $20-60
Monitoring en production
Le monitoring continu en production permet de détecter les régressions et d'identifier les opportunités d'amélioration.
Dashboard de Métriques Temps-Réel
from prometheus_client import Counter, Histogram, Gauge
import time
# Métriques Prometheus
search_requests_total = Counter(
'search_requests_total',
'Nombre total de requêtes de recherche',
['method', 'status']
)
search_latency = Histogram(
'search_latency_seconds',
'Latence des recherches',
['method'],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
search_quality_score = Gauge(
'search_quality_score',
'Score de qualité moyen (CTR)',
['time_window']
)
zero_results_rate = Gauge(
'zero_results_rate',
'Taux de requêtes sans résultats'
)
class MonitoredSearchEngine:
def __init__(self, base_engine):
self.engine = base_engine
self.recent_ctr = [] # Sliding window CTR
def search(self, query: str, user_id: str) -> Dict:
"""
Recherche avec monitoring intégré.
"""
start_time = time.time()
try:
# Exécuter recherche
results = self.engine.search(query)
# Latence
latency = time.time() - start_time
search_latency.labels(method='hybrid').observe(latency)
# Compteurs
status = 'success' if results else 'zero_results'
search_requests_total.labels(method='hybrid', status=status).inc()
# Taux zéro résultat
if not results:
self._update_zero_results_rate()
# Logger pour analyse
self._log_search(query, user_id, results, latency)
return results
except Exception as e:
search_requests_total.labels(method='hybrid', status='error').inc()
raise
def track_click(self, query: str, doc_id: str, rank: int):
"""
Tracker les clics pour calculer le CTR.
"""
# Mise à jour CTR glissant (dernières 1000 recherches)
self.recent_ctr.append(1) # Clic = 1
if len(self.recent_ctr) > 1000:
self.recent_ctr.pop(0)
ctr = np.mean(self.recent_ctr)
search_quality_score.labels(time_window='1h').set(ctr)
def _update_zero_results_rate(self):
"""Calcule le taux de zéro résultats sur la dernière heure."""
# Récupérer depuis base métriques
recent_searches = get_recent_searches(hours=1)
zero_count = sum(1 for s in recent_searches if s['num_results'] == 0)
rate = zero_count / len(recent_searches) if recent_searches else 0
zero_results_rate.set(rate)
# Configuration Grafana Dashboard
# Panel 1 : Latence p50/p95/p99
# Panel 2 : QPS (queries per second)
# Panel 3 : Taux d'erreur
# Panel 4 : CTR temps-réel
# Panel 5 : Zero-results rate
# Panel 6 : Distribution des latences (heatmap)
Alertes Recommandées
- Latence p95 > 1s : Alerte investigation
- Taux erreur > 1% : Alerte critique
- Zero-results > 15% : Dégradation qualité
- CTR baisse >20% : Régression possible
- QPS spike >3x normale : Risque surcharge
Optimisations pour la production
Caching intelligent
Le caching est l'optimisation la plus efficace pour réduire la latence et les coûts. Une stratégie de cache bien conçue peut réduire la charge de 60-80%.
Stratégie Multi-Niveaux
import redis
import hashlib
import json
from functools import lru_cache
from typing import Optional, List, Dict
class MultiLevelCacheSystem:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# L1 : Cache in-memory (LRU)
# L2 : Cache Redis (distribué)
@lru_cache(maxsize=1000) # L1 Cache (in-process)
def get_query_embedding(self, query: str) -> tuple:
"""
Cache L1 : Embeddings de requêtes en mémoire.
"""
embedding = self.embedder.encode(query)
return tuple(embedding) # Tuple pour hashable
def search_with_cache(self, query: str, k: int = 10) -> Optional[List[Dict]]:
"""
Recherche avec cache L1 (in-memory) et L2 (Redis).
"""
# Générer clé de cache
cache_key = self._generate_cache_key(query, k)
# L1 : Vérifier cache in-memory (nanoseconds)
if hasattr(self, '_l1_cache') and cache_key in self._l1_cache:
return self._l1_cache[cache_key]
# L2 : Vérifier Redis (1-5ms)
cached = self.redis.get(cache_key)
if cached:
results = json.loads(cached)
# Stocker en L1 pour accès futurs
if not hasattr(self, '_l1_cache'):
self._l1_cache = {}
self._l1_cache[cache_key] = results
return results
# L3 : Exécuter recherche réelle (50-500ms)
results = self._execute_search(query, k)
# Stocker dans les caches
self._store_in_caches(cache_key, results, ttl=3600) # 1h TTL
return results
def _generate_cache_key(self, query: str, k: int) -> str:
"""
Génère clé de cache stable.
"""
# Normaliser la requête
normalized = query.lower().strip()
key_str = f"search:{normalized}:k={k}"
return hashlib.sha256(key_str.encode()).hexdigest()
def _store_in_caches(self, key: str, results: List[Dict], ttl: int):
"""
Stocke dans L1 et L2.
"""
# L1 in-memory
if not hasattr(self, '_l1_cache'):
self._l1_cache = {}
self._l1_cache[key] = results
# L2 Redis
self.redis.setex(
key,
ttl,
json.dumps(results, default=str)
)
def warm_cache(self, popular_queries: List[str]):
"""
Pré-chauffe le cache avec les requêtes populaires.
À exécuter pendant les heures creuses.
"""
for query in popular_queries:
self.search_with_cache(query)
print(f"Cache préchauffé avec {len(popular_queries)} requêtes")
# Utilisation
cache_system = MultiLevelCacheSystem(redis_client)
# Préchauffage nocturne
top_1000_queries = get_popular_queries(days=7, limit=1000)
cache_system.warm_cache(top_1000_queries)
# Statistiques typiques :
# - L1 hit rate : 10-15% (requêtes répétées dans même processus)
# - L2 hit rate : 40-60% (requêtes populaires)
# - Total cache hit rate : 50-75%
# - Latence L1 hit : <1ms
# - Latence L2 hit : 2-5ms
# - Latence cache miss : 80-500ms
Cache Invalidation Intelligente
class SmartCacheInvalidation:
def on_document_update(self, doc_id: str):
"""
Invalide intelligemment le cache après mise à jour de document.
"""
# Méthode 1 : Invalidation totale (simple mais brutal)
# self.redis.flushdb() # ❌ Trop agressif
# Méthode 2 : Invalidation par pattern (mieux)
# Trouver toutes les requêtes ayant retourné ce document
affected_queries = self._find_queries_returning_doc(doc_id)
for query in affected_queries:
cache_key = self._generate_cache_key(query, k=10)
self.redis.delete(cache_key)
# Méthode 3 : Lazy invalidation (optimal)
# Ajouter version/timestamp au cache
doc_version = self._get_doc_version(doc_id)
self.redis.hset(f"doc_versions", doc_id, doc_version)
def get_with_version_check(self, cache_key: str, doc_ids: List[str]) -> Optional[Dict]:
"""
Vérifie que les versions des documents en cache sont à jour.
"""
cached = self.redis.get(cache_key)
if not cached:
return None
cached_data = json.loads(cached)
cached_versions = cached_data.get('doc_versions', {})
# Vérifier si les versions ont changé
for doc_id in doc_ids:
current_version = self.redis.hget("doc_versions", doc_id)
if current_version != cached_versions.get(doc_id):
# Version obsolète, invalider
self.redis.delete(cache_key)
return None
return cached_data['results']
Approximate search pour la scalabilité
La recherche approximative (ANN - Approximate Nearest Neighbors) sacrifie légèrement la précision pour gagner 10-100x en vitesse et scalabilité.
Configuration HNSW pour Production
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, HnswConfigDiff
# Création de collection avec HNSW optimisé
client = QdrantClient(url="http://localhost:6333")
client.create_collection(
collection_name="documents_prod",
vectors_config=VectorParams(
size=1024, # Dimension des embeddings
distance=Distance.COSINE
),
hnsw_config=HnswConfigDiff(
m=16, # Nombre de connexions par noeud (défaut: 16)
ef_construct=100, # Qualité construction (défaut: 100)
full_scan_threshold=10000 # Seuil full-scan (défaut: 10k)
)
)
# Paramétrage recherche
results = client.search(
collection_name="documents_prod",
query_vector=query_embedding,
limit=10,
search_params={
"hnsw_ef": 128, # Qualité recherche (défaut: 128)
"exact": False # False = ANN rapide, True = exact lent
}
)
# Trade-offs HNSW
"""
Paramètre | Impact Latence | Impact Précision | Impact Mémoire
-----------------------------------------------------------------
m | Faible | Moyen | Élevé
ef_construct| Aucun (index) | Élevé | Aucun
hnsw_ef | Élevé | Élevé | Faible
Recommandations production :
- Corpus <1M : m=16, ef_construct=100, hnsw_ef=64
- Corpus 1-10M : m=32, ef_construct=200, hnsw_ef=128 (défaut optimal)
- Corpus >10M : m=48, ef_construct=400, hnsw_ef=256
Précision attendue : 95-99% vs exact search
Gain vitesse : 10-50x selon corpus
"""
Benchmarks ANN vs Exact
Corpus | Exact (ms) | HNSW (ms) | Speedup | Recall@10 |
---|---|---|---|---|
100K docs | 120 | 8 | 15x | 99.2% |
1M docs | 850 | 25 | 34x | 98.5% |
10M docs | 6,500 | 45 | 144x | 97.8% |
100M docs | 65,000 | 80 | 812x | 96.5% |
Load balancing et réplication
Pour supporter 1000+ QPS, une architecture distribuée avec load balancing est nécessaire.
Architecture Multi-Noeud avec Qdrant
import random
from typing import List
class LoadBalancedSearchCluster:
def __init__(self, qdrant_nodes: List[str]):
"""
Args:
qdrant_nodes: ["http://node1:6333", "http://node2:6333", "http://node3:6333"]
"""
self.nodes = [QdrantClient(url=node) for node in qdrant_nodes]
self.node_health = {i: True for i in range(len(self.nodes))}
def search(self, query_vector: List[float], k: int = 10) -> List[Dict]:
"""
Recherche avec load balancing et failover.
"""
# Sélectionner un noeud sain aléatoirement (round-robin possible aussi)
healthy_nodes = [i for i, healthy in self.node_health.items() if healthy]
if not healthy_nodes:
raise Exception("Aucun noeud sain disponible")
node_idx = random.choice(healthy_nodes)
try:
results = self.nodes[node_idx].search(
collection_name="documents",
query_vector=query_vector,
limit=k,
timeout=5 # Timeout 5s
)
return results
except Exception as e:
# Marquer le noeud comme non sain
self.node_health[node_idx] = False
print(f"Noeud {node_idx} en erreur, failover...")
# Retry sur un autre noeud
remaining_nodes = [i for i in healthy_nodes if i != node_idx]
if remaining_nodes:
return self.search(query_vector, k) # Recursion
else:
raise
def health_check(self):
"""
Vérification périodique de la santé des noeuds.
À exécuter toutes les 10 secondes.
"""
for i, client in enumerate(self.nodes):
try:
# Ping simple
client.get_collections()
self.node_health[i] = True
except:
self.node_health[i] = False
# Configuration Kubernetes pour auto-scaling
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: qdrant-cluster
spec:
replicas: 3 # 3 noeuds par défaut
template:
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: qdrant-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: qdrant-cluster
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
"""
Monitoring et alerting
Un monitoring proactif permet de détecter les problèmes avant qu'ils impactent les utilisateurs.
Voir section précédente sur "Monitoring en production" pour l'implémentation détaillée.
Checklist Monitoring Production
- ✅ Latence : p50, p95, p99 (alertes si p95 >1s)
- ✅ Throughput : QPS actuel vs capacité max
- ✅ Taux d'erreur : Alerte si >1%
- ✅ Cache hit rate : Objectif >60%
- ✅ Zero-results rate : Objectif <10%
- ✅ CTR : Suivre quotidiennement (régression si -15%)
- ✅ Santé des noeuds : Disponibilité cluster >99.9%
- ✅ Utilisation mémoire : Alerte si >80%
Gestion des pics de charge
Les pics de charge (lancements produits, actualités virales) peuvent multiplier le trafic par 5-50x. Voici comment les absorber.
Stratégies Anti-Spike
1. Rate Limiting Adaptatif
from redis import Redis
import time
class AdaptiveRateLimiter:
def __init__(self, redis_client: Redis):
self.redis = redis_client
def check_rate_limit(self, user_id: str, endpoint: str) -> bool:
"""
Rate limiting avec buckets adaptatifs.
"""
# Charge actuelle du système
system_load = self._get_system_load() # 0-100
# Ajuster les limites selon la charge
if system_load < 50:
max_requests = 100 # Normal
elif system_load < 80:
max_requests = 50 # Surchargé
else:
max_requests = 10 # Critique
# Token bucket algorithm
key = f"rate_limit:{user_id}:{endpoint}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, 60) # Fenêtre 1 minute
return current <= max_requests
def _get_system_load(self) -> int:
"""Récupère la charge depuis métriques."""
# Exemple : moyenne CPU/mémoire des noeuds
return 42 # Simuler
2. Shedding de Charge (Load Shedding)
def search_with_degradation(query: str, user_tier: str) -> Dict:
"""
Dégrade gracieusement le service selon la charge.
"""
load = get_current_load_percentage()
if load < 70:
# Service complet : retrieval + reranking
return full_search(query)
elif load < 85:
# Dégradation légère : skip reranking pour free users
if user_tier == "premium":
return full_search(query)
else:
return basic_search(query) # Sans reranking
else:
# Dégradation forte : cache only
cached = get_from_cache(query)
if cached:
return cached
else:
# Renvoyer résultats pré-calculés génériques
return fallback_results(query)
3. Auto-Scaling Prédictif
- Monitorer les patterns de trafic (heures de pointe)
- Pré-scaler 15 minutes avant les pics prévisibles
- Garder 30% de capacité tampon pour absorber les pics imprévus
- Scale down progressif (pas brutal) pour éviter oscillations
Règles d'Or Production
- Caching : 60-80% hit rate réduit charge de 3-5x
- ANN search : HNSW avec 97%+ recall acceptable
- Replication : Minimum 3 noeuds pour HA
- Rate limiting : Adaptatif selon charge système
- Degradation : Préférer service dégradé que down complet
- Monitoring : Alertes proactives (p95 latence, error rate)
Améliorez la pertinence de votre moteur de recherche
Nous pouvons auditer votre système de recherche actuel et implémenter des techniques avancées de reranking, query expansion et recherche hybride pour booster vos métriques.
Demander un audit rechercheQuestions fréquentes
Le reranking est-il toujours nécessaire ?
Non, le reranking n'est pas toujours nécessaire, mais il apporte des gains significatifs dans la plupart des cas. Voici quand l'utiliser :
- ✅ Recommandé : Documentation technique, recherche légale/médicale, e-commerce, knowledge bases d'entreprise
- ⚠️ Optionnel : Contenu conversationnel simple, corpus très homogène, contraintes latence strictes (<100ms)
- ❌ Inutile : Corpus <1000 documents (overhead injustifié), recherche d'images/audio sans texte
Gains typiques : +10 à +20% sur NDCG@10, +15 à +30% sur le CTR. Coût : +150-350ms de latence supplémentaire. Le ROI est positif dès que la qualité de recherche impacte directement le business (conversions, productivité).
Quel impact sur la latence ?
Voici les latences typiques par composant sur un corpus de 10M documents :
- Query embedding : 20-50ms (modèle SentenceTransformer sur CPU)
- Vector search (HNSW) : 30-100ms (selon configuration ef)
- Metadata filtering : +5-20ms (selon sélectivité)
- Reranking (100 docs) : 200-500ms (cross-encoder sur GPU)
- Hybrid search (BM25+vector) : +50-100ms (exécution parallèle)
Latence totale end-to-end :
- Recherche simple : 50-150ms
- Recherche hybride : 100-250ms
- Recherche hybride + reranking : 250-650ms
Optimisations possibles : Caching (hit rate 60-80% → latence <5ms), GPU inference (reranking 3-5x plus rapide), reranking asynchrone (résultats progressifs).
Comment gérer les requêtes multilingues ?
Trois approches principales existent pour le multilingual :
1. Modèles Multilingual Natifs (Recommandé)
- BGE-M3 : 100+ langues, excellent NDCG (0.84 multilingual MTEB)
- multilingual-e5-large : 94 langues, très performant
- Cohere embed-multilingual-v3 : 100+ langues (API)
Avantage : Requête en français trouve docs en anglais/espagnol/etc. automatiquement.
2. Détection de Langue + Modèle Spécifique
from langdetect import detect
lang = detect(query) # 'fr', 'en', 'es', etc.
if lang == 'fr':
embedder = SentenceTransformer("camembert-base")
elif lang == 'en':
embedder = SentenceTransformer("all-mpnet-base-v2")
# ...
Avantage : Meilleure précision par langue. Inconvénient : Pas de cross-lingual search.
3. Traduction Automatique
Traduire toutes les requêtes en anglais avant recherche. Inconvénient : Perte de nuances, latence +200ms, coût traduction.
Recommandation : Utilisez BGE-M3 ou multilingual-e5-large pour 90% des cas. Réserve les modèles spécifiques aux langues avec très gros volume.
Peut-on combiner toutes ces techniques ?
Oui, et c'est même recommandé en production. Voici une stack complète optimale :
Pipeline Production Recommandé
- Query Processing :
- Correction orthographique (si nécessaire)
- Expansion d'acronymes (domaine spécifique)
- Query expansion avec LLM (pour requêtes courtes/vagues)
- Retrieval Hybride :
- Recherche dense (embeddings) : top-100
- Recherche sparse (BM25) : top-100
- Fusion RRF : top-100 unifiés
- Filtering :
- Pre-filtering (permissions, date range si sélectivité >10%)
- Post-filtering (filtres très restrictifs)
- Reranking :
- Cross-encoder sur top-100 → top-10 final
- Contextual boosting (user profile, session history)
- Post-Processing :
- Deduplication (si nécessaire)
- Diversity reranking (MMR - Maximal Marginal Relevance)
Coût latence total : 300-700ms end-to-end. Gains qualité : NDCG@10 de 0.75 (baseline dense) → 0.93 (pipeline complet), soit +24%.
Optimisation : Exécuter retrieval dense et sparse en parallèle (asyncio) pour gagner 50-100ms. Utiliser caching agressif (hit rate 70%+) pour latence moyenne <150ms.
Comment mesurer l'amélioration concrète ?
Voici la méthodologie complète pour mesurer objectivement les améliorations :
1. Métriques Offline (Dataset d'Évaluation)
- NDCG@10 : Métrique de référence (objectif : >0.85)
- MRR : Pour Q&A (objectif : >0.70)
- Recall@100 : Pour retrieval (objectif : >0.90)
Processus : Constituer un dataset de 100-500 requêtes annotées manuellement (ou via LLM + validation). Réexécuter à chaque changement pour détecter régressions.
2. Métriques Online (Production)
- CTR (Click-Through Rate) : % utilisateurs cliquant sur un résultat
- Time to click : Temps avant premier clic (plus bas = mieux)
- Zero-results rate : % requêtes sans résultats (objectif : <10%)
- Session success rate : % sessions avec conversion/objectif atteint
- User satisfaction : Feedback explicite (thumbs up/down)
3. A/B Testing
Protocole :
- Définir variantes (control vs traitement)
- Assigner utilisateurs de manière stable (hash-based)
- Collecter données pendant 2-4 semaines (minimum 10K recherches)
- Analyse statistique (t-test, p-value <0.05 pour significativité)
- Décision : rollout si gains >5% avec p<0.05
4. Exemple de Dashboard de Suivi
Métrique | Baseline (sem 1) | Après reranking (sem 2) | Δ | p-value |
---|---|---|---|---|
NDCG@10 (offline) | 0.78 | 0.91 | +16.7% | N/A |
CTR | 12.3% | 15.8% | +28.5% | 0.002 |
Time to click | 8.5s | 6.2s | -27.1% | 0.018 |
Zero-results rate | 14.2% | 12.8% | -9.9% | 0.125 |
Latence p95 | 120ms | 420ms | +250% | N/A |
Conclusion : Gains significatifs sur CTR et time-to-click (p<0.05), trade-off acceptable sur latence. Décision : Rollout à 100% avec optimisation latence en parallèle (caching, GPU).