Pipeline de Données IA : De la Collecte au Modèle Entraîné

Pipeline de Données IA : De la Collecte au Modèle Entraîné

Selon une étude Anaconda publiée en 2023, les data scientists passent 45 % de leur temps à préparer et nettoyer les données — plus qu’à construire des modèles. Ce chiffre illustre une vérité souvent ignorée par les directions : la qualité d’un modèle d’IA est déterminée à 70 % par la qualité du pipeline de données qui l’alimente. Voici comment construire un pipeline robuste, automatisé et maintenable, de la source brute jusqu’au modèle en production.

Architecture d’un Pipeline ML Moderne

Un pipeline de données pour l’IA suit une logique ETL enrichie (Extract, Transform, Load) avec des couches supplémentaires spécifiques au machine learning :

Sources → Ingestion → Validation → Nettoyage → Feature Engineering → Versioning → Entraînement → Registry

Chaque étape doit être idempotente (réexécutable sans effet de bord), traçable (chaque transformation loguée) et testable (données de sortie validées automatiquement).

Étape 1 — Collecte et Ingestion des Données

Connecteurs Multi-Sources

Les données d’entreprise viennent rarement d’une seule source. Un pipeline solide agrège bases de données, APIs, fichiers et flux temps réel.

from abc import ABC, abstractmethod
from pathlib import Path
import pandas as pd
import httpx
import sqlalchemy

class ConnecteurDonnees(ABC):
    @abstractmethod
    def extraire(self, **params) -> pd.DataFrame:
        pass

class ConnecteurSQL(ConnecteurDonnees):
    def __init__(self, url_connexion: str):
        self.engine = sqlalchemy.create_engine(url_connexion)

    def extraire(self, requete: str, **params) -> pd.DataFrame:
        return pd.read_sql(requete, self.engine, params=params)

class ConnecteurAPI(ConnecteurDonnees):
    def __init__(self, url_base: str, api_key: str):
        self.url_base = url_base
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def extraire(self, endpoint: str, date_debut: str, date_fin: str) -> pd.DataFrame:
        response = httpx.get(
            f"{self.url_base}/{endpoint}",
            headers=self.headers,
            params={"from": date_debut, "to": date_fin},
            timeout=30.0
        )
        response.raise_for_status()
        return pd.DataFrame(response.json()["data"])

class ConnecteurCSV(ConnecteurDonnees):
    def __init__(self, dossier: str):
        self.dossier = Path(dossier)

    def extraire(self, pattern: str = "*.csv", **params) -> pd.DataFrame:
        fichiers = list(self.dossier.glob(pattern))
        return pd.concat([pd.read_csv(f) for f in fichiers], ignore_index=True)

# Orchestration multi-sources
class IngesteurMultiSources:
    def __init__(self):
        self.connecteurs = {}

    def enregistrer(self, nom: str, connecteur: ConnecteurDonnees):
        self.connecteurs[nom] = connecteur

    def ingerer_tout(self) -> dict[str, pd.DataFrame]:
        resultats = {}
        for nom, conn in self.connecteurs.items():
            try:
                resultats[nom] = conn.extraire()
                print(f"[OK] {nom} : {len(resultats[nom])} lignes ingérées")
            except Exception as e:
                print(f"[ERREUR] {nom} : {e}")
        return resultats

DuckDB — SQL Analytique Ultra-Rapide sur Fichiers

DuckDB est devenu incontournable pour les pipelines de données analytiques. Il exécute des requêtes SQL directement sur des fichiers CSV, Parquet, JSON — sans serveur, sans installation.

import duckdb

con = duckdb.connect("pipeline.db")

# Requête multi-fichiers en une ligne
con.execute("""
    CREATE OR REPLACE TABLE ventes_brutes AS
    SELECT
        *,
        filename AS source_fichier,
        current_date AS date_ingestion
    FROM read_csv_auto('data/ventes/*.csv', union_by_name=True)
""")

# Jointure avec une API distante (Parquet sur S3)
con.execute("""
    CREATE OR REPLACE TABLE enrichi AS
    SELECT v.*, r.region_nom, r.region_code
    FROM ventes_brutes v
    LEFT JOIN read_parquet('s3://mon-bucket/referentiels/regions.parquet') r
        ON v.code_postal[:2] = r.departement_code
""")

print(con.execute("SELECT COUNT(*), COUNT(DISTINCT client_id) FROM enrichi").fetchdf())

Étape 2 — Validation et Qualité des Données

Great Expectations — Contrats de Données

Great Expectations permet de définir des “expectations” (contrats) que vos données doivent respecter. Toute violation est détectée et loguée.

import great_expectations as gx

context = gx.get_context()
datasource = context.sources.add_pandas("ventes_ds")
asset = datasource.add_dataframe_asset("ventes")

validator = context.get_validator(
    batch_request=asset.build_batch_request(dataframe=df_ventes),
    expectation_suite_name="ventes_contrat_v1"
)

# Définir les contrats qualité
validator.expect_column_to_exist("montant_ht")
validator.expect_column_values_to_not_be_null("client_id", mostly=0.99)  # 99% non-null
validator.expect_column_values_to_be_between("montant_ht", min_value=0, max_value=1_000_000)
validator.expect_column_values_to_be_unique("commande_id")
validator.expect_column_values_to_match_regex("code_postal", r"^d{5}$")

# Valider et obtenir le rapport
results = validator.validate()
print(f"Succès : {results.success}")
print(f"Tests échoués : {results.statistics['unsuccessful_expectations']}/{results.statistics['evaluated_expectations']}")

validator.save_expectation_suite()

Étape 3 — Nettoyage et Transformation

Stratégies de Traitement des Valeurs Manquantes

import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer

def nettoyer_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # 1. Supprimer les doublons exacts
    avant = len(df)
    df = df.drop_duplicates()
    print(f"Doublons supprimés : {avant - len(df)}")

    # 2. Normaliser les chaînes de caractères
    cols_texte = df.select_dtypes(include="object").columns
    for col in cols_texte:
        df[col] = (df[col]
            .str.strip()
            .str.upper()
            .str.replace(r's+', ' ', regex=True))

    # 3. Détecter et corriger les outliers (méthode IQR)
    cols_numeriques = df.select_dtypes(include=np.number).columns
    for col in cols_numeriques:
        Q1, Q3 = df[col].quantile([0.25, 0.75])
        IQR = Q3 - Q1
        borne_inf = Q1 - 3 * IQR
        borne_sup = Q3 + 3 * IQR
        n_outliers = ((df[col] < borne_inf) | (df[col] > borne_sup)).sum()
        if n_outliers > 0:
            df[col] = df[col].clip(borne_inf, borne_sup)
            print(f"Outliers corrigés dans '{col}' : {n_outliers}")

    # 4. Imputer les valeurs manquantes (KNN pour les numériques)
    taux_manquants = df.isnull().mean()
    cols_a_imputer = taux_manquants[taux_manquants < 0.30].index  # < 30% manquant
    cols_a_supprimer = taux_manquants[taux_manquants >= 0.30].index

    if len(cols_a_imputer) > 0:
        imputer = KNNImputer(n_neighbors=5)
        df[cols_a_imputer] = imputer.fit_transform(df[cols_a_imputer])

    df = df.drop(columns=cols_a_supprimer)

    return df

Étape 4 — Feature Engineering

Le feature engineering transforme les données brutes en variables que le modèle peut exploiter efficacement.

import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer

class FeatureEngineer:
    def __init__(self):
        self.scaler = StandardScaler()
        self.encodeurs = {}
        self.tfidf = TfidfVectorizer(max_features=100, ngram_range=(1, 2))

    def transformer(self, df: pd.DataFrame, entrainement: bool = True) -> pd.DataFrame:
        df = df.copy()

        # Features temporelles
        if "date_commande" in df.columns:
            df["date_commande"] = pd.to_datetime(df["date_commande"])
            df["mois"] = df["date_commande"].dt.month
            df["jour_semaine"] = df["date_commande"].dt.dayofweek
            df["est_weekend"] = (df["jour_semaine"] >= 5).astype(int)
            df["trimestre"] = df["date_commande"].dt.quarter

            # Features cycliques (sin/cos pour capturer la périodicité)
            df["mois_sin"] = np.sin(2 * np.pi * df["mois"] / 12)
            df["mois_cos"] = np.cos(2 * np.pi * df["mois"] / 12)

        # Encodage des variables catégorielles
        cols_cat = df.select_dtypes(include="object").columns
        for col in cols_cat:
            if col == "description_produit":
                continue  # Traité séparément
            if entrainement:
                self.encodeurs[col] = LabelEncoder()
                df[col] = self.encodeurs[col].fit_transform(df[col].fillna("INCONNU"))
            else:
                if col in self.encodeurs:
                    df[col] = df[col].map(
                        lambda x: self.encodeurs[col].transform([x])[0]
                        if x in self.encodeurs[col].classes_ else -1
                    )

        # TF-IDF sur texte libre
        if "description_produit" in df.columns:
            if entrainement:
                tfidf_matrix = self.tfidf.fit_transform(df["description_produit"].fillna(""))
            else:
                tfidf_matrix = self.tfidf.transform(df["description_produit"].fillna(""))

            tfidf_df = pd.DataFrame(
                tfidf_matrix.toarray(),
                columns=[f"tfidf_{w}" for w in self.tfidf.get_feature_names_out()],
                index=df.index
            )
            df = pd.concat([df.drop("description_produit", axis=1), tfidf_df], axis=1)

        # Normalisation des variables numériques
        cols_num = df.select_dtypes(include=np.number).columns
        if entrainement:
            df[cols_num] = self.scaler.fit_transform(df[cols_num])
        else:
            df[cols_num] = self.scaler.transform(df[cols_num])

        return df

Étape 5 — Versioning des Données avec DVC

DVC (Data Version Control) gère le versioning des datasets comme Git gère le code source. Indispensable dès que plusieurs personnes travaillent sur le même projet ML.

# Initialiser DVC dans votre repo Git
git init && dvc init

# Suivre un dataset volumieux
dvc add data/ventes_2025.csv
git add data/ventes_2025.csv.dvc .gitignore
git commit -m "feat(data): ajouter dataset ventes 2025"

# Stocker sur S3 (ou Google Drive, Azure...)
dvc remote add -d mon_stockage s3://mon-bucket-ml/dvc-store
dvc push

# Plus tard : revenir à une version précédente
git checkout v1.2.0
dvc checkout  # Restaure le dataset correspondant

Traçabilité des Expériences avec MLflow

import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier

mlflow.set_experiment("prediction_churn_pme")

with mlflow.start_run(run_name="pipeline_v3_gbm"):
    # Logger les paramètres du pipeline
    mlflow.log_params({
        "version_dataset": "2025-Q3",
        "n_features": X_train.shape[1],
        "modele": "GradientBoosting",
        "learning_rate": 0.05,
        "n_estimators": 200,
        "max_depth": 4
    })

    # Entraîner le modèle
    model = GradientBoostingClassifier(
        learning_rate=0.05, n_estimators=200, max_depth=4
    )
    model.fit(X_train, y_train)

    # Logger les métriques
    from sklearn.metrics import roc_auc_score, average_precision_score
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    mlflow.log_metrics({
        "auc_roc": roc_auc_score(y_test, y_pred_proba),
        "average_precision": average_precision_score(y_test, y_pred_proba),
        "accuracy": model.score(X_test, y_test)
    })

    # Sauvegarder le modèle et le pipeline complet
    mlflow.sklearn.log_model(model, "modele_churn")
    print(f"AUC-ROC : {roc_auc_score(y_test, y_pred_proba):.4f}")

Automatisation avec Apache Airflow

Pour les pipelines en production, Airflow orchestre l’exécution des étapes selon un planning et des dépendances.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "equipe_data",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["data@monentreprise.fr"]
}

with DAG(
    "pipeline_ml_ventes",
    default_args=default_args,
    schedule_interval="0 2 * * 1",  # Tous les lundis à 2h
    start_date=datetime(2025, 1, 1),
    catchup=False
) as dag:

    ingerer = PythonOperator(
        task_id="ingerer_donnees",
        python_callable=ingerer_toutes_sources,
        op_kwargs={"date": "{{ ds }}"}
    )

    valider = PythonOperator(
        task_id="valider_qualite",
        python_callable=valider_avec_great_expectations
    )

    transformer = PythonOperator(
        task_id="transformer_features",
        python_callable=executer_feature_engineering
    )

    entrainer = PythonOperator(
        task_id="entrainer_modele",
        python_callable=lancer_entrainement_mlflow
    )

    # Définir les dépendances
    ingerer >> valider >> transformer >> entrainer

Conclusion

Un pipeline de données robuste n’est pas un luxe — c’est la fondation de tout projet IA durable. Les équipes qui investissent dans cette infrastructure gagnent en productivité (réentraînement en quelques heures au lieu de plusieurs jours), en fiabilité (détection automatique des anomalies de données) et en traçabilité (reproductibilité des expériences).

Les outils présentés — DuckDB, Great Expectations, DVC, MLflow, Airflow — forment un stack open-source complet, adopté par des milliers d’équipes dans le monde et parfaitement adapté aux contraintes des PME.

Votre entreprise collecte des données mais peine à les transformer en valeur IA ? Contactez-nous à contact@brio-novia.eu — nous auditons votre architecture de données et construisons le pipeline adapté à votre contexte.