Pipeline de Données IA : De la Collecte au Modèle Entraîné
Selon une étude Anaconda publiée en 2023, les data scientists passent 45 % de leur temps à préparer et nettoyer les données — plus qu’à construire des modèles. Ce chiffre illustre une vérité souvent ignorée par les directions : la qualité d’un modèle d’IA est déterminée à 70 % par la qualité du pipeline de données qui l’alimente. Voici comment construire un pipeline robuste, automatisé et maintenable, de la source brute jusqu’au modèle en production.
Architecture d’un Pipeline ML Moderne
Un pipeline de données pour l’IA suit une logique ETL enrichie (Extract, Transform, Load) avec des couches supplémentaires spécifiques au machine learning :
Sources → Ingestion → Validation → Nettoyage → Feature Engineering → Versioning → Entraînement → Registry Chaque étape doit être idempotente (réexécutable sans effet de bord), traçable (chaque transformation loguée) et testable (données de sortie validées automatiquement).
Étape 1 — Collecte et Ingestion des Données
Connecteurs Multi-Sources
Les données d’entreprise viennent rarement d’une seule source. Un pipeline solide agrège bases de données, APIs, fichiers et flux temps réel.
from abc import ABC, abstractmethod
from pathlib import Path
import pandas as pd
import httpx
import sqlalchemy
class ConnecteurDonnees(ABC):
@abstractmethod
def extraire(self, **params) -> pd.DataFrame:
pass
class ConnecteurSQL(ConnecteurDonnees):
def __init__(self, url_connexion: str):
self.engine = sqlalchemy.create_engine(url_connexion)
def extraire(self, requete: str, **params) -> pd.DataFrame:
return pd.read_sql(requete, self.engine, params=params)
class ConnecteurAPI(ConnecteurDonnees):
def __init__(self, url_base: str, api_key: str):
self.url_base = url_base
self.headers = {"Authorization": f"Bearer {api_key}"}
def extraire(self, endpoint: str, date_debut: str, date_fin: str) -> pd.DataFrame:
response = httpx.get(
f"{self.url_base}/{endpoint}",
headers=self.headers,
params={"from": date_debut, "to": date_fin},
timeout=30.0
)
response.raise_for_status()
return pd.DataFrame(response.json()["data"])
class ConnecteurCSV(ConnecteurDonnees):
def __init__(self, dossier: str):
self.dossier = Path(dossier)
def extraire(self, pattern: str = "*.csv", **params) -> pd.DataFrame:
fichiers = list(self.dossier.glob(pattern))
return pd.concat([pd.read_csv(f) for f in fichiers], ignore_index=True)
# Orchestration multi-sources
class IngesteurMultiSources:
def __init__(self):
self.connecteurs = {}
def enregistrer(self, nom: str, connecteur: ConnecteurDonnees):
self.connecteurs[nom] = connecteur
def ingerer_tout(self) -> dict[str, pd.DataFrame]:
resultats = {}
for nom, conn in self.connecteurs.items():
try:
resultats[nom] = conn.extraire()
print(f"[OK] {nom} : {len(resultats[nom])} lignes ingérées")
except Exception as e:
print(f"[ERREUR] {nom} : {e}")
return resultats DuckDB — SQL Analytique Ultra-Rapide sur Fichiers
DuckDB est devenu incontournable pour les pipelines de données analytiques. Il exécute des requêtes SQL directement sur des fichiers CSV, Parquet, JSON — sans serveur, sans installation.
import duckdb
con = duckdb.connect("pipeline.db")
# Requête multi-fichiers en une ligne
con.execute("""
CREATE OR REPLACE TABLE ventes_brutes AS
SELECT
*,
filename AS source_fichier,
current_date AS date_ingestion
FROM read_csv_auto('data/ventes/*.csv', union_by_name=True)
""")
# Jointure avec une API distante (Parquet sur S3)
con.execute("""
CREATE OR REPLACE TABLE enrichi AS
SELECT v.*, r.region_nom, r.region_code
FROM ventes_brutes v
LEFT JOIN read_parquet('s3://mon-bucket/referentiels/regions.parquet') r
ON v.code_postal[:2] = r.departement_code
""")
print(con.execute("SELECT COUNT(*), COUNT(DISTINCT client_id) FROM enrichi").fetchdf()) Étape 2 — Validation et Qualité des Données
Great Expectations — Contrats de Données
Great Expectations permet de définir des “expectations” (contrats) que vos données doivent respecter. Toute violation est détectée et loguée.
import great_expectations as gx
context = gx.get_context()
datasource = context.sources.add_pandas("ventes_ds")
asset = datasource.add_dataframe_asset("ventes")
validator = context.get_validator(
batch_request=asset.build_batch_request(dataframe=df_ventes),
expectation_suite_name="ventes_contrat_v1"
)
# Définir les contrats qualité
validator.expect_column_to_exist("montant_ht")
validator.expect_column_values_to_not_be_null("client_id", mostly=0.99) # 99% non-null
validator.expect_column_values_to_be_between("montant_ht", min_value=0, max_value=1_000_000)
validator.expect_column_values_to_be_unique("commande_id")
validator.expect_column_values_to_match_regex("code_postal", r"^d{5}$")
# Valider et obtenir le rapport
results = validator.validate()
print(f"Succès : {results.success}")
print(f"Tests échoués : {results.statistics['unsuccessful_expectations']}/{results.statistics['evaluated_expectations']}")
validator.save_expectation_suite() Étape 3 — Nettoyage et Transformation
Stratégies de Traitement des Valeurs Manquantes
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
def nettoyer_dataframe(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
# 1. Supprimer les doublons exacts
avant = len(df)
df = df.drop_duplicates()
print(f"Doublons supprimés : {avant - len(df)}")
# 2. Normaliser les chaînes de caractères
cols_texte = df.select_dtypes(include="object").columns
for col in cols_texte:
df[col] = (df[col]
.str.strip()
.str.upper()
.str.replace(r's+', ' ', regex=True))
# 3. Détecter et corriger les outliers (méthode IQR)
cols_numeriques = df.select_dtypes(include=np.number).columns
for col in cols_numeriques:
Q1, Q3 = df[col].quantile([0.25, 0.75])
IQR = Q3 - Q1
borne_inf = Q1 - 3 * IQR
borne_sup = Q3 + 3 * IQR
n_outliers = ((df[col] < borne_inf) | (df[col] > borne_sup)).sum()
if n_outliers > 0:
df[col] = df[col].clip(borne_inf, borne_sup)
print(f"Outliers corrigés dans '{col}' : {n_outliers}")
# 4. Imputer les valeurs manquantes (KNN pour les numériques)
taux_manquants = df.isnull().mean()
cols_a_imputer = taux_manquants[taux_manquants < 0.30].index # < 30% manquant
cols_a_supprimer = taux_manquants[taux_manquants >= 0.30].index
if len(cols_a_imputer) > 0:
imputer = KNNImputer(n_neighbors=5)
df[cols_a_imputer] = imputer.fit_transform(df[cols_a_imputer])
df = df.drop(columns=cols_a_supprimer)
return df Étape 4 — Feature Engineering
Le feature engineering transforme les données brutes en variables que le modèle peut exploiter efficacement.
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
class FeatureEngineer:
def __init__(self):
self.scaler = StandardScaler()
self.encodeurs = {}
self.tfidf = TfidfVectorizer(max_features=100, ngram_range=(1, 2))
def transformer(self, df: pd.DataFrame, entrainement: bool = True) -> pd.DataFrame:
df = df.copy()
# Features temporelles
if "date_commande" in df.columns:
df["date_commande"] = pd.to_datetime(df["date_commande"])
df["mois"] = df["date_commande"].dt.month
df["jour_semaine"] = df["date_commande"].dt.dayofweek
df["est_weekend"] = (df["jour_semaine"] >= 5).astype(int)
df["trimestre"] = df["date_commande"].dt.quarter
# Features cycliques (sin/cos pour capturer la périodicité)
df["mois_sin"] = np.sin(2 * np.pi * df["mois"] / 12)
df["mois_cos"] = np.cos(2 * np.pi * df["mois"] / 12)
# Encodage des variables catégorielles
cols_cat = df.select_dtypes(include="object").columns
for col in cols_cat:
if col == "description_produit":
continue # Traité séparément
if entrainement:
self.encodeurs[col] = LabelEncoder()
df[col] = self.encodeurs[col].fit_transform(df[col].fillna("INCONNU"))
else:
if col in self.encodeurs:
df[col] = df[col].map(
lambda x: self.encodeurs[col].transform([x])[0]
if x in self.encodeurs[col].classes_ else -1
)
# TF-IDF sur texte libre
if "description_produit" in df.columns:
if entrainement:
tfidf_matrix = self.tfidf.fit_transform(df["description_produit"].fillna(""))
else:
tfidf_matrix = self.tfidf.transform(df["description_produit"].fillna(""))
tfidf_df = pd.DataFrame(
tfidf_matrix.toarray(),
columns=[f"tfidf_{w}" for w in self.tfidf.get_feature_names_out()],
index=df.index
)
df = pd.concat([df.drop("description_produit", axis=1), tfidf_df], axis=1)
# Normalisation des variables numériques
cols_num = df.select_dtypes(include=np.number).columns
if entrainement:
df[cols_num] = self.scaler.fit_transform(df[cols_num])
else:
df[cols_num] = self.scaler.transform(df[cols_num])
return df Étape 5 — Versioning des Données avec DVC
DVC (Data Version Control) gère le versioning des datasets comme Git gère le code source. Indispensable dès que plusieurs personnes travaillent sur le même projet ML.
# Initialiser DVC dans votre repo Git
git init && dvc init
# Suivre un dataset volumieux
dvc add data/ventes_2025.csv
git add data/ventes_2025.csv.dvc .gitignore
git commit -m "feat(data): ajouter dataset ventes 2025"
# Stocker sur S3 (ou Google Drive, Azure...)
dvc remote add -d mon_stockage s3://mon-bucket-ml/dvc-store
dvc push
# Plus tard : revenir à une version précédente
git checkout v1.2.0
dvc checkout # Restaure le dataset correspondant Traçabilité des Expériences avec MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
mlflow.set_experiment("prediction_churn_pme")
with mlflow.start_run(run_name="pipeline_v3_gbm"):
# Logger les paramètres du pipeline
mlflow.log_params({
"version_dataset": "2025-Q3",
"n_features": X_train.shape[1],
"modele": "GradientBoosting",
"learning_rate": 0.05,
"n_estimators": 200,
"max_depth": 4
})
# Entraîner le modèle
model = GradientBoostingClassifier(
learning_rate=0.05, n_estimators=200, max_depth=4
)
model.fit(X_train, y_train)
# Logger les métriques
from sklearn.metrics import roc_auc_score, average_precision_score
y_pred_proba = model.predict_proba(X_test)[:, 1]
mlflow.log_metrics({
"auc_roc": roc_auc_score(y_test, y_pred_proba),
"average_precision": average_precision_score(y_test, y_pred_proba),
"accuracy": model.score(X_test, y_test)
})
# Sauvegarder le modèle et le pipeline complet
mlflow.sklearn.log_model(model, "modele_churn")
print(f"AUC-ROC : {roc_auc_score(y_test, y_pred_proba):.4f}") Automatisation avec Apache Airflow
Pour les pipelines en production, Airflow orchestre l’exécution des étapes selon un planning et des dépendances.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "equipe_data",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data@monentreprise.fr"]
}
with DAG(
"pipeline_ml_ventes",
default_args=default_args,
schedule_interval="0 2 * * 1", # Tous les lundis à 2h
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
ingerer = PythonOperator(
task_id="ingerer_donnees",
python_callable=ingerer_toutes_sources,
op_kwargs={"date": "{{ ds }}"}
)
valider = PythonOperator(
task_id="valider_qualite",
python_callable=valider_avec_great_expectations
)
transformer = PythonOperator(
task_id="transformer_features",
python_callable=executer_feature_engineering
)
entrainer = PythonOperator(
task_id="entrainer_modele",
python_callable=lancer_entrainement_mlflow
)
# Définir les dépendances
ingerer >> valider >> transformer >> entrainer Conclusion
Un pipeline de données robuste n’est pas un luxe — c’est la fondation de tout projet IA durable. Les équipes qui investissent dans cette infrastructure gagnent en productivité (réentraînement en quelques heures au lieu de plusieurs jours), en fiabilité (détection automatique des anomalies de données) et en traçabilité (reproductibilité des expériences).
Les outils présentés — DuckDB, Great Expectations, DVC, MLflow, Airflow — forment un stack open-source complet, adopté par des milliers d’équipes dans le monde et parfaitement adapté aux contraintes des PME.
Votre entreprise collecte des données mais peine à les transformer en valeur IA ? Contactez-nous à contact@brio-novia.eu — nous auditons votre architecture de données et construisons le pipeline adapté à votre contexte.