Construire une Abstraction Python Propre sur Plusieurs Providers LLM
Le probleme : changer de provider LLM ne devrait pas casser votre code
Vous commencez avec OpenAI. Puis le cout explose et vous basculez sur Mistral. Puis vous voulez un modele local pour les tests et vous ajoutez Ollama. A chaque migration, c’est le meme scenario : refactoring du code d’appel, adaptation du format de streaming, gestion des erreurs differente, tests a re-ecrire.
Le projet Briostoria-PyCore resout ce probleme avec une abstraction multi-providers construite sans LangChain ni LlamaIndex. L’objectif : basculer entre Mistral, Ollama, DeepInfra ou llama.cpp avec un changement de configuration, pas de code. Voici comment c’est construit, avec le vrai code.
Pourquoi pas LangChain ?
La question merite d’etre posee directement. LangChain est le framework dominant pour l’integration LLM. Trois raisons concretes m’ont fait choisir une abstraction maison :
1. Overhead de dependances. LangChain tire une quarantaine de dependances transitives. Dans un projet IA embarque qui a deja ses propres contraintes de versions, c’est un cauchemar de compatibilite.
2. Instabilite d’API. LangChain a casse son API publique plusieurs fois entre 0.0.x, 0.1.x et 0.2.x. Une abstraction maison avec un contrat d’interface explicite est plus stable a long terme.
3. Opacite du streaming. Le streaming LangChain passe par des callbacks et des StreamingCallbackHandler. Quand vous avez besoin de controler la granularite — token par token pour un chat, phrase par phrase pour la synthese vocale, monobloc pour du JSON — ces callbacks deviennent un obstacle plutot qu’une aide.
L’architecture en couches
kernel/base.py AbstractProvider (contrat abstrait)
|
providers/llm/_base_llm.py BaseLLMProvider (gestion granularite)
|
+-- mistral_llm.py MistralLLM (SDK Mistral)
+-- ollama_llm.py OllamaLLM (HTTP direct)
+-- deepinfra_llm.py DeepInfraLLM (API compatible OpenAI)
+-- llama_cpp_llm.py LlamaCppLLM (inference locale)
kernel/registry.py ProviderRegistry (lazy loading + decouverte)
kernel/discovery.py ProviderDiscovery (scan automatique)
shared/stream_msg.py StreamMsg (format d'echange unifie)
shared/mixins.py APIKeyMixin, HTTPClientMixin
shared/models.py ProviderType, ProviderKind, Message
shared/contract.py StreamTimer, StreamLogger Le meme pattern s’applique a 9 types de providers : LLM, TTS, STT, OCR, IMG_GEN, VLM, VAD, WAKE_WORD, STT_LLM. Tous partagent la meme interface AbstractProvider et le meme format StreamMsg.
Le contrat : AbstractProvider
Tout repose sur une classe abstraite minimale dans kernel/base.py :
class AbstractProvider(ABC):
"""Classe abstraite pour tous les providers - 100% StreamMsg"""
def __init__(self, name: str):
self.name = name
self.type = self.__provider_type__
self._available = False
self._error: Optional[str] = None
@abstractmethod
def initialize(self) -> bool:
"""Initialise le provider (SDK, reseaux...)"""
pass
@abstractmethod
async def stream_process_msgs(self, input_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
"""
Interface StreamMsg pure.
Output: START@CONTROL -> DELTA@* -> COMPLETE@*
"""
pass
def is_available(self) -> bool:
return self._available Deux methodes, c’est tout. initialize() est appele une fois au demarrage — elle verifie la disponibilite du provider (cle API valide, serveur local accessible) et retourne un booleen. stream_process_msgs() est l’interface de traitement, toujours asynchrone, toujours un AsyncIterator[StreamMsg].
L’attribut de classe __provider_type__ est impose a toutes les sous-classes et valide par le registry :
def _validate_single_type_provider(self, provider_class, name):
if not hasattr(provider_class, '__provider_type__'):
raise ValueError(f"Provider {name} manque __provider_type__")
provider_type = provider_class.__provider_type__
if not isinstance(provider_type, ProviderType):
raise ValueError(f"__provider_type__ doit etre un ProviderType") StreamMsg : le format d’echange unifie
L’innovation centrale est StreamMsg — un dataclass qui circule entre tous les composants du systeme. Voici sa structure (simplifiee) :
Event = Literal["START", "DELTA", "COMPLETE", "ERROR"]
Channel = Literal[
"AUDIO", "TRANSCRIPT", "ANSWER", "VAD",
"WAKE", "IMAGE", "EMBED", "CONTROL", "LOG"
]
@dataclass
class StreamMsg:
id: str = field(default_factory=_uuid)
session_id: Optional[str] = None
parent_id: Optional[str] = None
seq: int = 0
event: Event = "DELTA"
channel: Channel = "CONTROL"
content_type: str = "text/plain; charset=utf-8"
data_text: Optional[str] = None
data_bytes: Optional[bytes] = None
meta: Dict[str, Any] = field(default_factory=dict) Le protocole de communication est standardise :
START@CONTROL -> DELTA@ANSWER -> ... -> DELTA@ANSWER -> COMPLETE@ANSWER Un START@CONTROL transporte les metadonnees (provider, modele, granularite). Les DELTA@ANSWER transportent les tokens generes. COMPLETE@ANSWER signale la fin.
Ce design a des consequences importantes. La sortie d’un provider LLM est un flux de StreamMsg. La sortie d’un provider TTS est aussi un flux de StreamMsg (mais sur le channel AUDIO). On peut les chainer sans adaptateur — la sortie d’un LLM alimente directement l’entree d’un TTS.
Le module definit aussi une taxonomie d’erreurs stricte :
ErrorCode = Literal[
"BAD_INPUT", # non retryable
"TIMEOUT", # retryable
"UPSTREAM", # retryable
"INTERNAL", # non retryable
"RESOURCE_LIMIT" # contextuellement retryable
]
def is_error_retryable(error_code: str) -> bool:
retryable_codes = {"TIMEOUT", "UPSTREAM", "RESOURCE_LIMIT"}
return error_code in retryable_codes Et des limites de securite :
MAX_TEXT_SIZE = 1024 * 1024 # 1MB texte max
MAX_AUDIO_FRAME_SIZE = 32 * 1024 * 1024 # 32MB audio max
MAX_META_KEYS = 32 # limite meta dict BaseLLMProvider : la logique de granularite centralisee
La couche intermediaire BaseLLMProvider herite d’AbstractProvider et ajoute la gestion de la granularite — la vraie valeur ajoutee de cette abstraction :
class BaseLLMProvider(AbstractProvider):
__provider_type__ = ProviderType.LLM
supports_stream_in: bool = False
supports_stream_out: bool = True
supports_json_mode: bool = True
supports_tool_calls: bool = True
async def stream_process_msgs(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
granularity = prompt_msg.meta.get("granularity", "stream")
if granularity in ("monobloc", "full"):
try:
async for msg in self._monobloc_impl(prompt_msg):
yield msg
return
except NotImplementedError:
async for msg in self._monobloc_fallback(prompt_msg):
yield msg
return
elif granularity == "sentence":
async for msg in self._sentence_stream(prompt_msg):
yield msg
return
async for msg in self._stream_impl(prompt_msg):
yield msg Trois modes de granularite :
stream (defaut) : tokens au fur et a mesure, latence minimale. Pour les interfaces chat.
sentence : phrases completes une par une. _sentence_stream() bufferise les tokens et detecte les frontieres de phrase via detect_sentence_boundary. Ideal pour la synthese vocale — on peut parler la premiere phrase pendant que le LLM genere la suite.
monobloc : toute la reponse d’un coup. D’abord _monobloc_impl est tente (certains providers ont une API non-streaming plus efficace). Si NotImplementedError, le fallback bufferise le stream :
async def _monobloc_fallback(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
yield StreamMsg.start(channel="CONTROL",
meta={"mode": "monobloc_fallback"})
buffer = ""
async for msg in self._stream_impl(prompt_msg):
if msg.phase == "DELTA" and msg.channel == "ANSWER":
buffer += msg.data_text or ""
yield StreamMsg.delta_text(buffer, channel="ANSWER")
yield StreamMsg.complete(channel="ANSWER") Ce fallback garantit que tous les providers supportent tous les modes de granularite, meme s’ils n’ont pas d’API monobloc native. Les sous-classes n’implementent que _stream_impl — la granularite est geree au niveau superieur.
Implementation concrete : MistralLLM
class MistralLLM(BaseLLMProvider, APIKeyMixin):
def initialize(self) -> bool:
if not self._validate_api_key("MISTRAL_API_KEY"):
return False
api_key = self._get_api_key("MISTRAL_API_KEY")
self.client = Mistral(api_key=api_key)
self._set_available(True)
return True
async def _stream_impl(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
prompt_text, meta = self._extract_prompt_from_msg(prompt_msg)
model_name = meta.get("model", "mistral-small-latest")
meta_clean = {k: v for k, v in meta.items() if k != "model"}
start_msg = self._create_start_msg(model=model_name, **meta_clean)
yield start_msg
messages = meta.get("messages_formatted") or (
[{"role": "user", "content": prompt_text}] if prompt_text else [])
stream = self.client.chat.stream(
model=model_name,
messages=messages,
temperature=meta.get("temperature", 0.7),
max_tokens=meta.get("max_tokens", 1024))
seq = 1
for chunk in stream:
chunk_data = getattr(chunk, 'data', chunk)
if hasattr(chunk_data, 'choices') and chunk_data.choices:
delta = chunk_data.choices[0].delta
if hasattr(delta, 'content') and delta.content:
yield self._create_answer_delta(
text=delta.content, seq=seq,
parent_id=start_msg.id)
seq += 1 Notez le nettoyage de meta_clean qui retire la cle "model" — detail typique des integrations avec des SDK externes qui ont leurs propres conventions de parametres.
Les modeles disponibles sont declares explicitement :
def get_models(self) -> List[str]:
return [
"mistral-small-latest", "mistral-medium-latest",
"mistral-large-latest", "codestral-latest",
"pixtral-large-latest", "ministral-3b-latest",
"ministral-8b-latest"
] OllamaLLM : provider local sans cle API
L’implementation Ollama illustre un cas different — un provider local qui utilise HTTP brut plutot qu’un SDK :
class OllamaLLM(BaseLLMProvider, HTTPClientMixin):
def __init__(self, name: str = "ollama_llm"):
super().__init__(name)
HTTPClientMixin.__init__(self)
self.base_url = os.getenv("OLLAMA_API_URL", "http://localhost:11434")
def initialize(self) -> bool:
try:
response = self._make_request(
f"{self.base_url}/api/version", method="GET", timeout=5)
if response.status_code == 200:
self._set_available(True)
return True
else:
self._set_available(False,
f"Ollama non accessible: {response.status_code}")
return False
except Exception as e:
self._set_available(False, f"Erreur connexion: {e}")
return False
def get_models(self) -> List[str]:
try:
response = self._make_request(
f"{self.base_url}/api/tags", method="GET", timeout=10)
data = response.json()
return [model["name"] for model in data.get("models", [])]
except:
return ["gemma3:270m", "qwen3:1.7b", "qwen2.5vl:3b"] initialize() teste la connectivite HTTP plutot que de valider une cle. get_models() interroge l’API Ollama pour retourner les modeles reellement installes — avec un fallback en dur si le serveur n’est pas accessible.
Le streaming est gere avec un pattern producer/consumer et une Queue pour le backpressure :
async def _stream_impl(self, prompt_msg: StreamMsg) -> AsyncIterator[StreamMsg]:
q: Queue = Queue(maxsize=100) # backpressure
def _producer():
resp = session.post(f"{self.base_url}/api/chat",
json={"model": model_name, "messages": messages,
"stream": True, "options": {...}})
for line in resp.iter_lines():
data = json.loads(line)
q.put(data["message"]["content"])
q.put(SENTINEL) Ce pattern evite de bloquer la boucle evenementielle asyncio pendant que les requetes HTTP synchrones d’Ollama arrivent.
Les mixins : comportements transversaux sans heritage lourd
MistralLLM utilise APIKeyMixin, OllamaLLM utilise HTTPClientMixin. Ces mixins sont independants de la hierarchie principale :
class APIKeyMixin:
def _get_api_key(self, env_var: str, *, required: bool = True) -> Optional[str]:
key = (os.environ.get(env_var) or
os.environ.get(env_var.upper()) or
os.environ.get(env_var.lower()))
return key
class HTTPClientMixin:
def _get_session(self) -> requests.Session:
if self._session is None:
s = requests.Session()
s.headers.update(self._get_default_headers())
self._session = s
return self._session
def _make_request(self, url, method="POST",
timeout=None, **kwargs) -> requests.Response:
session = self._get_session()
if timeout is None:
timeout = config.get_api_timeout()
resp = session.request(method=method, url=url,
timeout=timeout, **kwargs)
resp.raise_for_status()
return resp Le HTTPClientMixin gere la session reutilisable, les headers par defaut, le timeout depuis la config globale, et la gestion d’erreur commune. C’est du code que chaque provider HTTP devrait ecrire — autant le factoriser.
Le registry : decouverte automatique et lazy loading
Le ProviderRegistry maintient une table de routing nom -> chemin de classe :
_DEFAULT_PROVIDER_IMPORTS: Dict[str, str] = {
"mistral_llm": "pycore.providers.llm.mistral_llm.MistralLLM",
"deepinfra_llm": "pycore.providers.llm.deepinfra_llm.DeepInfraLLM",
"ollama_llm": "pycore.providers.llm.ollama_llm.OllamaLLM",
"llama_cpp_llm": "pycore.providers.llm.llama_cpp_llm.LlamaCppLLM",
"edge_tts": "pycore.providers.tts.edge_tts.EdgeTTS",
"kokoro_tts": "pycore.providers.tts.kokoro_tts.KokoroTTS",
"openrouter_vlm": "pycore.providers.vlm.openrouter_vlm.OpenRouterVLM",
"ollama_vlm": "pycore.providers.vlm.ollama_vlm.OllamaVLM",
"runware_img_gen": "pycore.providers.img_gen.runware_img_gen.RunwareImageGen",
# ... 19 providers au total
} L’instanciation est tardive : le module est importe via importlib.import_module uniquement quand le provider est demande. Ca evite d’importer le SDK Mistral si on n’utilise que Ollama — critique dans les environnements avec des contraintes de demarrage.
Le ProviderDiscovery dans kernel/discovery.py scanne automatiquement le repertoire providers/ pour trouver les classes qui heritent d’AbstractProvider :
class ProviderDiscovery:
def discover_providers(self) -> int:
providers_path = self._get_providers_path()
discovered_count = 0
for category_dir in providers_path.iterdir():
if (category_dir.is_dir() and
not category_dir.name.startswith('_')):
discovered_count += self._scan_category(category_dir.name)
return discovered_count
def _scan_category(self, category: str) -> int:
count = 0
category_path = self._get_providers_path() / category
for py_file in category_path.glob("*.py"):
if py_file.name.startswith('_'):
continue
count += self._import_and_register(...)
return count Le registry valide chaque classe avant enregistrement — il refuse les classes abstraites, les classes sans __provider_type__, et les classes dont le constructeur n’accepte pas un name: str :
def _is_valid_provider_class(self, cls) -> bool:
if not issubclass(cls, AbstractProvider):
return False
if cls is AbstractProvider:
return False
if getattr(cls, "__abstract__", False):
return False
try:
cls("__probe__") # smoke test constructeur
except TypeError:
return False
return True L’instrumentation : StreamTimer et StreamLogger
Le module shared/contract.py fournit des outils d’instrumentation specifiques au streaming :
class StreamTimer:
def __init__(self):
self.t0 = perf_counter()
self.t_last = self.t0
self.t_first_data = None
def mark_first_data(self) -> int:
"""Marque le premier DELTA et calcule TTFB"""
if self.t_first_data is None:
self.t_first_data = perf_counter()
return self.ttfb_ms
def tick(self) -> float:
"""Latence depuis le dernier tick (ms)"""
now = perf_counter()
delta = (now - self.t_last) * 1000
self.t_last = now
return delta
@property
def ttfb_ms(self) -> Optional[int]:
"""Time To First Byte (ms)"""
if self.t_first_data:
return int((self.t_first_data - self.t0) * 1000)
return None Le TTFB (Time To First Byte) est la metrique la plus importante pour l’experience utilisateur en streaming. Avec StreamTimer, chaque provider peut mesurer et rapporter son TTFB sans code boilerplate.
Le StreamLogger ecrit les evenements en JSON lines pour analyse post-mortem — utile pour diagnostiquer les latences et les erreurs intermittentes.
La configuration par YAML
Les providers sont configures via des fichiers YAML valides par Pydantic :
class ModelsConfig(BaseModel):
allow: Optional[List[StrOrItem]] = None
deny: Optional[List[str]] = None
prefer: Optional[List[str]] = None
aliases: Optional[Dict[str, str]] = None
default: Optional[str] = None
@model_validator(mode="after")
def _check_default_in_allow(self):
if self.default and self.allow:
allowed_names = [item.name if isinstance(item, ItemEntry)
else item for item in self.allow]
if self.default not in allowed_names:
raise ValueError(
f"models.default '{self.default}' doit appartenir a models.allow")
return self Chaque ItemEntry peut porter des metadonnees (prix, latence moyenne) utilisees pour la selection automatique du meilleur modele.
En pratique : basculer de provider
# Mistral cloud
provider = registry.get("mistral_llm")
await provider.initialize()
# Ollama local (meme code)
provider = registry.get("ollama_llm")
await provider.initialize()
# Le consommateur ne change pas
prompt = StreamMsg(
event="START", channel="CONTROL",
data_text="Explique PageRank en 3 phrases",
meta={"model": "mistral-small-latest", "granularity": "sentence"})
async for msg in provider.stream_process_msgs(prompt):
if msg.event == "DELTA" and msg.channel == "ANSWER":
print(msg.data_text, end="", flush=True) Ce que cette architecture enseigne
Contrat d’interface strict plutot que convention. Un seul point d’entree (stream_process_msgs) plutot que des methodes differentes (generate(), stream(), chat()). Le consommateur n’a pas a deviner quelle methode appeler.
Separation granularite / implementation. Les sous-classes n’implementent que _stream_impl. La granularite (token, phrase, monobloc) est geree par la classe de base. Ajouter un mode de granularite ne touche aucun provider existant.
Fallback systematique. _monobloc_impl non implemente ? Fallback vers bufferisation. Ollama inaccessible ? Liste de modeles par defaut. Ce pragmatisme rend le systeme robuste en production.
Mixins pour les comportements transversaux. APIKeyMixin et HTTPClientMixin sont factorisés sans polluer la hierarchie principale. Chaque provider pioche ce dont il a besoin.
Le meme pattern pour 9 types de providers. LLM, TTS, STT, OCR, IMG_GEN, VLM, VAD, WAKE_WORD, STT_LLM — tous derivent d’AbstractProvider, tous emettent des StreamMsg. Un pipeline multimodal (STT -> LLM -> TTS) ne necessite aucun adaptateur.
LangChain n’est pas un mauvais outil. Mais quand vous avez besoin de controler le streaming, la granularite et le cycle de vie des providers a un niveau fin, construire votre propre abstraction est un investissement qui se rentabilise des le deuxieme provider.
Un projet similaire ? Contactez Loick Briot : contact@brio-novia.eu