Construire une Abstraction Python Propre sur Plusieurs Providers LLM

Construire une Abstraction Python Propre sur Plusieurs Providers LLM

Le probleme : changer de provider LLM ne devrait pas casser votre code

Vous commencez avec OpenAI. Puis le cout explose et vous basculez sur Mistral. Puis vous voulez un modele local pour les tests et vous ajoutez Ollama. A chaque migration, c’est le meme scenario : refactoring du code d’appel, adaptation du format de streaming, gestion des erreurs differente, tests a re-ecrire.

Le projet Briostoria-PyCore resout ce probleme avec une abstraction multi-providers construite sans LangChain ni LlamaIndex. L’objectif : basculer entre Mistral, Ollama, DeepInfra ou llama.cpp avec un changement de configuration, pas de code. Voici comment c’est construit, avec le vrai code.

Pourquoi pas LangChain ?

La question merite d’etre posee directement. LangChain est le framework dominant pour l’integration LLM. Trois raisons concretes m’ont fait choisir une abstraction maison :

1. Overhead de dependances. LangChain tire une quarantaine de dependances transitives. Dans un projet IA embarque qui a deja ses propres contraintes de versions, c’est un cauchemar de compatibilite.

2. Instabilite d’API. LangChain a casse son API publique plusieurs fois entre 0.0.x, 0.1.x et 0.2.x. Une abstraction maison avec un contrat d’interface explicite est plus stable a long terme.

3. Opacite du streaming. Le streaming LangChain passe par des callbacks et des StreamingCallbackHandler. Quand vous avez besoin de controler la granularite — token par token pour un chat, phrase par phrase pour la synthese vocale, monobloc pour du JSON — ces callbacks deviennent un obstacle plutot qu’une aide.

L’architecture en couches

kernel/base.py          AbstractProvider (contrat abstrait)
    |
providers/llm/_base_llm.py   BaseLLMProvider (gestion granularite)
    |
    +-- mistral_llm.py   MistralLLM (SDK Mistral)
    +-- ollama_llm.py    OllamaLLM (HTTP direct)
    +-- deepinfra_llm.py DeepInfraLLM (API compatible OpenAI)
    +-- llama_cpp_llm.py LlamaCppLLM (inference locale)

kernel/registry.py      ProviderRegistry (lazy loading + decouverte)
kernel/discovery.py     ProviderDiscovery (scan automatique)

shared/stream_msg.py    StreamMsg (format d'echange unifie)
shared/mixins.py        APIKeyMixin, HTTPClientMixin
shared/models.py        ProviderType, ProviderKind, Message
shared/contract.py      StreamTimer, StreamLogger

Le meme pattern s’applique a 9 types de providers : LLM, TTS, STT, OCR, IMG_GEN, VLM, VAD, WAKE_WORD, STT_LLM. Tous partagent la meme interface AbstractProvider et le meme format StreamMsg.

Le contrat : AbstractProvider

Tout repose sur une classe abstraite minimale dans kernel/base.py :

class AbstractProvider(ABC):
    """Classe abstraite pour tous les providers - 100% StreamMsg"""

    def __init__(self, name: str):
        self.name = name
        self.type = self.__provider_type__
        self._available = False
        self._error: Optional[str] = None

    @abstractmethod
    def initialize(self) -> bool:
        """Initialise le provider (SDK, reseaux...)"""
        pass

    @abstractmethod
    async def stream_process_msgs(self, input_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
        """
        Interface StreamMsg pure.
        Output: START@CONTROL -> DELTA@* -> COMPLETE@*
        """
        pass

    def is_available(self) -> bool:
        return self._available

Deux methodes, c’est tout. initialize() est appele une fois au demarrage — elle verifie la disponibilite du provider (cle API valide, serveur local accessible) et retourne un booleen. stream_process_msgs() est l’interface de traitement, toujours asynchrone, toujours un AsyncIterator[StreamMsg].

L’attribut de classe __provider_type__ est impose a toutes les sous-classes et valide par le registry :

def _validate_single_type_provider(self, provider_class, name):
    if not hasattr(provider_class, '__provider_type__'):
        raise ValueError(f"Provider {name} manque __provider_type__")
    provider_type = provider_class.__provider_type__
    if not isinstance(provider_type, ProviderType):
        raise ValueError(f"__provider_type__ doit etre un ProviderType")

StreamMsg : le format d’echange unifie

L’innovation centrale est StreamMsg — un dataclass qui circule entre tous les composants du systeme. Voici sa structure (simplifiee) :

Event = Literal["START", "DELTA", "COMPLETE", "ERROR"]

Channel = Literal[
    "AUDIO", "TRANSCRIPT", "ANSWER", "VAD",
    "WAKE", "IMAGE", "EMBED", "CONTROL", "LOG"
]

@dataclass
class StreamMsg:
    id: str = field(default_factory=_uuid)
    session_id: Optional[str] = None
    parent_id: Optional[str] = None
    seq: int = 0

    event: Event = "DELTA"
    channel: Channel = "CONTROL"

    content_type: str = "text/plain; charset=utf-8"
    data_text: Optional[str] = None
    data_bytes: Optional[bytes] = None

    meta: Dict[str, Any] = field(default_factory=dict)

Le protocole de communication est standardise :

START@CONTROL -> DELTA@ANSWER -> ... -> DELTA@ANSWER -> COMPLETE@ANSWER

Un START@CONTROL transporte les metadonnees (provider, modele, granularite). Les DELTA@ANSWER transportent les tokens generes. COMPLETE@ANSWER signale la fin.

Ce design a des consequences importantes. La sortie d’un provider LLM est un flux de StreamMsg. La sortie d’un provider TTS est aussi un flux de StreamMsg (mais sur le channel AUDIO). On peut les chainer sans adaptateur — la sortie d’un LLM alimente directement l’entree d’un TTS.

Le module definit aussi une taxonomie d’erreurs stricte :

ErrorCode = Literal[
    "BAD_INPUT",       # non retryable
    "TIMEOUT",         # retryable
    "UPSTREAM",        # retryable
    "INTERNAL",        # non retryable
    "RESOURCE_LIMIT"   # contextuellement retryable
]

def is_error_retryable(error_code: str) -> bool:
    retryable_codes = {"TIMEOUT", "UPSTREAM", "RESOURCE_LIMIT"}
    return error_code in retryable_codes

Et des limites de securite :

MAX_TEXT_SIZE = 1024 * 1024        # 1MB texte max
MAX_AUDIO_FRAME_SIZE = 32 * 1024 * 1024  # 32MB audio max
MAX_META_KEYS = 32                  # limite meta dict

BaseLLMProvider : la logique de granularite centralisee

La couche intermediaire BaseLLMProvider herite d’AbstractProvider et ajoute la gestion de la granularite — la vraie valeur ajoutee de cette abstraction :

class BaseLLMProvider(AbstractProvider):
    __provider_type__ = ProviderType.LLM

    supports_stream_in: bool = False
    supports_stream_out: bool = True
    supports_json_mode: bool = True
    supports_tool_calls: bool = True

    async def stream_process_msgs(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
        granularity = prompt_msg.meta.get("granularity", "stream")

        if granularity in ("monobloc", "full"):
            try:
                async for msg in self._monobloc_impl(prompt_msg):
                    yield msg
                return
            except NotImplementedError:
                async for msg in self._monobloc_fallback(prompt_msg):
                    yield msg
                return

        elif granularity == "sentence":
            async for msg in self._sentence_stream(prompt_msg):
                yield msg
            return

        async for msg in self._stream_impl(prompt_msg):
            yield msg

Trois modes de granularite :

stream (defaut) : tokens au fur et a mesure, latence minimale. Pour les interfaces chat.

sentence : phrases completes une par une. _sentence_stream() bufferise les tokens et detecte les frontieres de phrase via detect_sentence_boundary. Ideal pour la synthese vocale — on peut parler la premiere phrase pendant que le LLM genere la suite.

monobloc : toute la reponse d’un coup. D’abord _monobloc_impl est tente (certains providers ont une API non-streaming plus efficace). Si NotImplementedError, le fallback bufferise le stream :

async def _monobloc_fallback(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
    yield StreamMsg.start(channel="CONTROL",
                          meta={"mode": "monobloc_fallback"})
    buffer = ""
    async for msg in self._stream_impl(prompt_msg):
        if msg.phase == "DELTA" and msg.channel == "ANSWER":
            buffer += msg.data_text or ""

    yield StreamMsg.delta_text(buffer, channel="ANSWER")
    yield StreamMsg.complete(channel="ANSWER")

Ce fallback garantit que tous les providers supportent tous les modes de granularite, meme s’ils n’ont pas d’API monobloc native. Les sous-classes n’implementent que _stream_impl — la granularite est geree au niveau superieur.

Implementation concrete : MistralLLM

class MistralLLM(BaseLLMProvider, APIKeyMixin):

    def initialize(self) -> bool:
        if not self._validate_api_key("MISTRAL_API_KEY"):
            return False
        api_key = self._get_api_key("MISTRAL_API_KEY")
        self.client = Mistral(api_key=api_key)
        self._set_available(True)
        return True

    async def _stream_impl(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
        prompt_text, meta = self._extract_prompt_from_msg(prompt_msg)
        model_name = meta.get("model", "mistral-small-latest")

        meta_clean = {k: v for k, v in meta.items() if k != "model"}
        start_msg = self._create_start_msg(model=model_name, **meta_clean)
        yield start_msg

        messages = meta.get("messages_formatted") or (
            [{"role": "user", "content": prompt_text}] if prompt_text else [])

        stream = self.client.chat.stream(
            model=model_name,
            messages=messages,
            temperature=meta.get("temperature", 0.7),
            max_tokens=meta.get("max_tokens", 1024))

        seq = 1
        for chunk in stream:
            chunk_data = getattr(chunk, 'data', chunk)
            if hasattr(chunk_data, 'choices') and chunk_data.choices:
                delta = chunk_data.choices[0].delta
                if hasattr(delta, 'content') and delta.content:
                    yield self._create_answer_delta(
                        text=delta.content, seq=seq,
                        parent_id=start_msg.id)
                    seq += 1

Notez le nettoyage de meta_clean qui retire la cle "model" — detail typique des integrations avec des SDK externes qui ont leurs propres conventions de parametres.

Les modeles disponibles sont declares explicitement :

def get_models(self) -> List[str]:
    return [
        "mistral-small-latest", "mistral-medium-latest",
        "mistral-large-latest", "codestral-latest",
        "pixtral-large-latest", "ministral-3b-latest",
        "ministral-8b-latest"
    ]

OllamaLLM : provider local sans cle API

L’implementation Ollama illustre un cas different — un provider local qui utilise HTTP brut plutot qu’un SDK :

class OllamaLLM(BaseLLMProvider, HTTPClientMixin):

    def __init__(self, name: str = "ollama_llm"):
        super().__init__(name)
        HTTPClientMixin.__init__(self)
        self.base_url = os.getenv("OLLAMA_API_URL", "http://localhost:11434")

    def initialize(self) -> bool:
        try:
            response = self._make_request(
                f"{self.base_url}/api/version", method="GET", timeout=5)
            if response.status_code == 200:
                self._set_available(True)
                return True
            else:
                self._set_available(False,
                    f"Ollama non accessible: {response.status_code}")
                return False
        except Exception as e:
            self._set_available(False, f"Erreur connexion: {e}")
            return False

    def get_models(self) -> List[str]:
        try:
            response = self._make_request(
                f"{self.base_url}/api/tags", method="GET", timeout=10)
            data = response.json()
            return [model["name"] for model in data.get("models", [])]
        except:
            return ["gemma3:270m", "qwen3:1.7b", "qwen2.5vl:3b"]

initialize() teste la connectivite HTTP plutot que de valider une cle. get_models() interroge l’API Ollama pour retourner les modeles reellement installes — avec un fallback en dur si le serveur n’est pas accessible.

Le streaming est gere avec un pattern producer/consumer et une Queue pour le backpressure :

async def _stream_impl(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
    q: Queue = Queue(maxsize=100)  # backpressure

    def _producer():
        resp = session.post(f"{self.base_url}/api/chat",
            json={"model": model_name, "messages": messages,
                  "stream": True, "options": {...}})
        for line in resp.iter_lines():
            data = json.loads(line)
            q.put(data["message"]["content"])
        q.put(SENTINEL)

Ce pattern evite de bloquer la boucle evenementielle asyncio pendant que les requetes HTTP synchrones d’Ollama arrivent.

Les mixins : comportements transversaux sans heritage lourd

MistralLLM utilise APIKeyMixin, OllamaLLM utilise HTTPClientMixin. Ces mixins sont independants de la hierarchie principale :

class APIKeyMixin:
    def _get_api_key(self, env_var: str, *, required: bool = True) -> Optional[str]:
        key = (os.environ.get(env_var) or
               os.environ.get(env_var.upper()) or
               os.environ.get(env_var.lower()))
        return key

class HTTPClientMixin:
    def _get_session(self) -> requests.Session:
        if self._session is None:
            s = requests.Session()
            s.headers.update(self._get_default_headers())
            self._session = s
        return self._session

    def _make_request(self, url, method="POST",
                      timeout=None, **kwargs) -> requests.Response:
        session = self._get_session()
        if timeout is None:
            timeout = config.get_api_timeout()
        resp = session.request(method=method, url=url,
                               timeout=timeout, **kwargs)
        resp.raise_for_status()
        return resp

Le HTTPClientMixin gere la session reutilisable, les headers par defaut, le timeout depuis la config globale, et la gestion d’erreur commune. C’est du code que chaque provider HTTP devrait ecrire — autant le factoriser.

Le registry : decouverte automatique et lazy loading

Le ProviderRegistry maintient une table de routing nom -> chemin de classe :

_DEFAULT_PROVIDER_IMPORTS: Dict[str, str] = {
    "mistral_llm":     "pycore.providers.llm.mistral_llm.MistralLLM",
    "deepinfra_llm":   "pycore.providers.llm.deepinfra_llm.DeepInfraLLM",
    "ollama_llm":      "pycore.providers.llm.ollama_llm.OllamaLLM",
    "llama_cpp_llm":   "pycore.providers.llm.llama_cpp_llm.LlamaCppLLM",
    "edge_tts":        "pycore.providers.tts.edge_tts.EdgeTTS",
    "kokoro_tts":      "pycore.providers.tts.kokoro_tts.KokoroTTS",
    "openrouter_vlm":  "pycore.providers.vlm.openrouter_vlm.OpenRouterVLM",
    "ollama_vlm":      "pycore.providers.vlm.ollama_vlm.OllamaVLM",
    "runware_img_gen":  "pycore.providers.img_gen.runware_img_gen.RunwareImageGen",
    # ... 19 providers au total
}

L’instanciation est tardive : le module est importe via importlib.import_module uniquement quand le provider est demande. Ca evite d’importer le SDK Mistral si on n’utilise que Ollama — critique dans les environnements avec des contraintes de demarrage.

Le ProviderDiscovery dans kernel/discovery.py scanne automatiquement le repertoire providers/ pour trouver les classes qui heritent d’AbstractProvider :

class ProviderDiscovery:
    def discover_providers(self) -> int:
        providers_path = self._get_providers_path()
        discovered_count = 0
        for category_dir in providers_path.iterdir():
            if (category_dir.is_dir() and
                not category_dir.name.startswith('_')):
                discovered_count += self._scan_category(category_dir.name)
        return discovered_count

    def _scan_category(self, category: str) -> int:
        count = 0
        category_path = self._get_providers_path() / category
        for py_file in category_path.glob("*.py"):
            if py_file.name.startswith('_'):
                continue
            count += self._import_and_register(...)
        return count

Le registry valide chaque classe avant enregistrement — il refuse les classes abstraites, les classes sans __provider_type__, et les classes dont le constructeur n’accepte pas un name: str :

def _is_valid_provider_class(self, cls) -> bool:
    if not issubclass(cls, AbstractProvider):
        return False
    if cls is AbstractProvider:
        return False
    if getattr(cls, "__abstract__", False):
        return False
    try:
        cls("__probe__")  # smoke test constructeur
    except TypeError:
        return False
    return True

L’instrumentation : StreamTimer et StreamLogger

Le module shared/contract.py fournit des outils d’instrumentation specifiques au streaming :

class StreamTimer:
    def __init__(self):
        self.t0 = perf_counter()
        self.t_last = self.t0
        self.t_first_data = None

    def mark_first_data(self) -> int:
        """Marque le premier DELTA et calcule TTFB"""
        if self.t_first_data is None:
            self.t_first_data = perf_counter()
        return self.ttfb_ms

    def tick(self) -> float:
        """Latence depuis le dernier tick (ms)"""
        now = perf_counter()
        delta = (now - self.t_last) * 1000
        self.t_last = now
        return delta

    @property
    def ttfb_ms(self) -> Optional[int]:
        """Time To First Byte (ms)"""
        if self.t_first_data:
            return int((self.t_first_data - self.t0) * 1000)
        return None

Le TTFB (Time To First Byte) est la metrique la plus importante pour l’experience utilisateur en streaming. Avec StreamTimer, chaque provider peut mesurer et rapporter son TTFB sans code boilerplate.

Le StreamLogger ecrit les evenements en JSON lines pour analyse post-mortem — utile pour diagnostiquer les latences et les erreurs intermittentes.

La configuration par YAML

Les providers sont configures via des fichiers YAML valides par Pydantic :

class ModelsConfig(BaseModel):
    allow: Optional[List[StrOrItem]] = None
    deny: Optional[List[str]] = None
    prefer: Optional[List[str]] = None
    aliases: Optional[Dict[str, str]] = None
    default: Optional[str] = None

    @model_validator(mode="after")
    def _check_default_in_allow(self):
        if self.default and self.allow:
            allowed_names = [item.name if isinstance(item, ItemEntry)
                           else item for item in self.allow]
            if self.default not in allowed_names:
                raise ValueError(
                    f"models.default '{self.default}' doit appartenir a models.allow")
        return self

Chaque ItemEntry peut porter des metadonnees (prix, latence moyenne) utilisees pour la selection automatique du meilleur modele.

En pratique : basculer de provider

# Mistral cloud
provider = registry.get("mistral_llm")
await provider.initialize()

# Ollama local (meme code)
provider = registry.get("ollama_llm")
await provider.initialize()

# Le consommateur ne change pas
prompt = StreamMsg(
    event="START", channel="CONTROL",
    data_text="Explique PageRank en 3 phrases",
    meta={"model": "mistral-small-latest", "granularity": "sentence"})

async for msg in provider.stream_process_msgs(prompt):
    if msg.event == "DELTA" and msg.channel == "ANSWER":
        print(msg.data_text, end="", flush=True)

Ce que cette architecture enseigne

Contrat d’interface strict plutot que convention. Un seul point d’entree (stream_process_msgs) plutot que des methodes differentes (generate(), stream(), chat()). Le consommateur n’a pas a deviner quelle methode appeler.

Separation granularite / implementation. Les sous-classes n’implementent que _stream_impl. La granularite (token, phrase, monobloc) est geree par la classe de base. Ajouter un mode de granularite ne touche aucun provider existant.

Fallback systematique. _monobloc_impl non implemente ? Fallback vers bufferisation. Ollama inaccessible ? Liste de modeles par defaut. Ce pragmatisme rend le systeme robuste en production.

Mixins pour les comportements transversaux. APIKeyMixin et HTTPClientMixin sont factorisés sans polluer la hierarchie principale. Chaque provider pioche ce dont il a besoin.

Le meme pattern pour 9 types de providers. LLM, TTS, STT, OCR, IMG_GEN, VLM, VAD, WAKE_WORD, STT_LLM — tous derivent d’AbstractProvider, tous emettent des StreamMsg. Un pipeline multimodal (STT -> LLM -> TTS) ne necessite aucun adaptateur.

LangChain n’est pas un mauvais outil. Mais quand vous avez besoin de controler le streaming, la granularite et le cycle de vie des providers a un niveau fin, construire votre propre abstraction est un investissement qui se rentabilise des le deuxieme provider.


Un projet similaire ? Contactez Loick Briot : contact@brio-novia.eu