1
๐ฅ Passo 1-2: Request e Validacao
# app/services/pipeline.py
from uuid import uuid4
from app.config import settings
class Pipeline:
async def step_1_receive(self, request: dict) -> dict:
"""Passo 1: Receber e normalizar input"""
return {
"request_id": str(uuid4()),
"input": request.get("input", ""),
"persona": request.get("persona", settings.DEFAULT_PERSONA),
"timestamp": datetime.utcnow()
}
async def step_2_validate(self, data: dict) -> dict:
"""Passo 2: Validar contra regras de governanca"""
if not data["input"]:
raise ValidationError("Input vazio")
if len(data["input"]) > settings.MAX_TOKENS_PER_REQUEST:
raise ValidationError("Input muito grande")
# Validar persona
if data["persona"] not in ALLOWED_PERSONAS:
raise ValidationError(f"Persona invalida: {data['persona']}")
return data
2
๐๏ธ Passo 3-4: Contexto e Normalizacao
async def step_3_context(self, data: dict) -> dict:
"""Passo 3: Coletar contexto necessario"""
context = {
**data,
"system_context": {
"date": datetime.utcnow().isoformat(),
"version": settings.VERSION,
}
}
# Adicionar contexto especifico do dominio
if data.get("document_id"):
context["document"] = await self.load_document(data["document_id"])
return context
async def step_4_normalize(self, context: dict) -> dict:
"""Passo 4: Normalizar para formato do LLM"""
normalized = {
**context,
"input_normalized": context["input"].strip().lower(),
"token_count_estimate": len(context["input"].split()) * 1.3
}
return normalized
3
๐ญ Passo 5: Injecao de Persona
async def step_5_persona(self, data: dict) -> dict:
"""Passo 5: Injetar persona no prompt"""
from app.governance.personas import get_persona
persona = get_persona(data["persona"])
# Montar prompt com persona injetada
prompt = f"""{persona.system_prompt}
CONTEXTO ATUAL:
{data.get('system_context', {})}
SOLICITACAO DO USUARIO:
{data['input']}
RESPONDA SEGUINDO AS REGRAS DA PERSONA ACIMA."""
data["prompt_final"] = prompt
data["persona_used"] = persona.name
return data
4
๐ค Passo 6: Chamada a IA
async def step_6_invoke(self, data: dict) -> dict:
"""Passo 6: Chamar o LLM"""
from app.services.llm import LLMService
llm = LLMService()
start_time = time.time()
response = await llm.generate(
prompt=data["prompt_final"],
max_tokens=8192,
temperature=0.7
)
data["ai_response"] = response.text
data["tokens_input"] = response.usage.input_tokens
data["tokens_output"] = response.usage.output_tokens
data["latency_ms"] = (time.time() - start_time) * 1000
data["cost_usd"] = self._calculate_cost(response.usage)
return data
5
๐พ Passo 7: Persistencia
async def step_7_persist(self, data: dict, db: Session) -> dict:
"""Passo 7: Salvar TUDO no banco"""
from app.models.interaction import Interaction
interaction = Interaction(
request_id=data["request_id"],
input_text=data["input"],
prompt_sent=data["prompt_final"],
response_received=data["ai_response"],
persona_used=data["persona_used"],
tokens_input=data["tokens_input"],
tokens_output=data["tokens_output"],
cost_usd=data["cost_usd"],
latency_ms=data["latency_ms"],
timestamp=data["timestamp"]
)
db.add(interaction)
db.commit()
data["persisted"] = True
return data
6
๐ฆ Passo 8: Materializacao
async def step_8_materialize(self, data: dict) -> dict:
"""Passo 8: Gerar artefato final (se necessario)"""
output_format = data.get("output_format", "text")
if output_format == "text":
data["artifact"] = data["ai_response"]
elif output_format == "markdown":
data["artifact"] = self._format_markdown(data["ai_response"])
elif output_format == "json":
data["artifact"] = self._parse_json(data["ai_response"])
return data
7
โฉ๏ธ Passo 9: Retorno
async def step_9_return(self, data: dict) -> PipelineResponse:
"""Passo 9: Retornar resposta ao cliente"""
return PipelineResponse(
request_id=data["request_id"],
response=data["artifact"],
metadata={
"tokens_used": data["tokens_input"] + data["tokens_output"],
"cost_usd": data["cost_usd"],
"latency_ms": data["latency_ms"],
"persona_used": data["persona_used"]
}
)
8
๐งช Pipeline Completo
async def execute(self, request: dict, db: Session) -> PipelineResponse:
"""Executa pipeline completo de 9 passos"""
# ENTRADA
data = await self.step_1_receive(request)
data = await self.step_2_validate(data)
data = await self.step_3_context(data)
data = await self.step_4_normalize(data)
# PROCESSAMENTO
data = await self.step_5_persona(data)
data = await self.step_6_invoke(data)
# SAIDA
data = await self.step_7_persist(data, db)
data = await self.step_8_materialize(data)
return await self.step_9_return(data)
๐ Resumo do Modulo
โ9 Passos - Implementados em codigo
โValidacao - Regras de governanca
โPersistencia - Tudo e salvo
โRastreabilidade - request_id em tudo