1
📐 Arquitetura do Pipeline
Visao geral das 3 fases e 9 passos implementados.
class UniversalPipeline:
"""
Pipeline Universal GIPM - 9 passos em 3 fases
ENTRADA (Sistema prepara):
1. Receber Input → Validar formato e limites
2. Validar → Aplicar regras de governanca
3. Preparar Contexto → Montar prompt estruturado
PROCESSAMENTO (IA executa):
4. Invocar IA → Chamar modelo com retry
5. Receber Estrutura → Parsear resposta JSON
6. Validar Saida → Verificar qualidade
SAIDA (Sistema materializa):
7. Materializar → Gerar artefato final
8. Persistir → Salvar tudo no banco
9. Retornar → Resposta ao cliente
"""
2
📥 Fase Entrada (Passos 1-3)
Sistema recebe, valida e prepara o contexto.
async def phase_input(self, request: PipelineRequest) -> Context:
# Passo 1: Receber Input
self.logger.info("pipeline_step_1_receive", request_id=request.id)
input_data = self._normalize_input(request)
# Passo 2: Validar
self.logger.info("pipeline_step_2_validate", request_id=request.id)
self._validate_governance(input_data)
self._validate_limits(input_data)
self._check_blocked_content(input_data)
# Passo 3: Preparar Contexto
self.logger.info("pipeline_step_3_context", request_id=request.id)
context = Context(
request_id=request.id,
input=input_data,
sources=await self._load_sources(request.notebook_id),
persona=self._load_persona(request.persona),
history=await self._load_history(request.notebook_id, limit=10)
)
return context
3
⚡ Fase Processamento (Passos 4-6)
IA e invocada e sua saida e validada.
async def phase_process(self, context: Context) -> AIResponse:
# Passo 4: Invocar IA
self.logger.info("pipeline_step_4_invoke", request_id=context.request_id)
prompt = self._build_prompt(context)
raw_response = await self.llm.generate(prompt)
# Passo 5: Receber Estrutura
self.logger.info("pipeline_step_5_parse", request_id=context.request_id)
try:
structured = self._parse_response(raw_response)
except ParseError as e:
# Fallback: tenta corrigir formato
structured = await self._retry_with_format_fix(raw_response)
# Passo 6: Validar Saida
self.logger.info("pipeline_step_6_validate", request_id=context.request_id)
validation = self._validate_output(structured, context)
if not validation.passed:
raise OutputValidationError(validation.errors)
return AIResponse(
content=structured,
tokens=raw_response.usage,
validation=validation
)
4
📤 Fase Saida (Passos 7-9)
Sistema materializa, persiste e retorna.
async def phase_output(
self,
context: Context,
ai_response: AIResponse
) -> PipelineResult:
# Passo 7: Materializar
self.logger.info("pipeline_step_7_materialize", request_id=context.request_id)
artifact = await self._materialize(ai_response.content, context)
# Passo 8: Persistir
self.logger.info("pipeline_step_8_persist", request_id=context.request_id)
await self._persist_all(
context=context,
ai_response=ai_response,
artifact=artifact
)
# Passo 9: Retornar
self.logger.info("pipeline_step_9_return", request_id=context.request_id)
return PipelineResult(
request_id=context.request_id,
content=artifact.content,
metadata=artifact.metadata,
cost=self._calculate_cost(ai_response.tokens)
)
5
🔗 Pipeline Completo
Orquestracao das 3 fases em sequencia.
class UniversalPipeline:
async def execute(self, request: PipelineRequest) -> PipelineResult:
"""Executa pipeline completo de 9 passos"""
start_time = time.time()
try:
# Fase 1: Entrada (Passos 1-3)
context = await self.phase_input(request)
# Fase 2: Processamento (Passos 4-6)
ai_response = await self.phase_process(context)
# Fase 3: Saida (Passos 7-9)
result = await self.phase_output(context, ai_response)
# Metricas
result.duration_ms = (time.time() - start_time) * 1000
self.metrics.record_success(result)
return result
except PipelineError as e:
self.metrics.record_failure(e)
await self._persist_error(request, e)
raise
6
📊 Rastreabilidade End-to-End
Cada passo e rastreado com metricas e logs.
# Log estruturado de cada execucao
{
"request_id": "uuid-123",
"pipeline_execution": {
"steps": [
{"step": 1, "name": "receive", "duration_ms": 5, "status": "ok"},
{"step": 2, "name": "validate", "duration_ms": 12, "status": "ok"},
{"step": 3, "name": "context", "duration_ms": 45, "status": "ok"},
{"step": 4, "name": "invoke", "duration_ms": 1523, "status": "ok"},
{"step": 5, "name": "parse", "duration_ms": 8, "status": "ok"},
{"step": 6, "name": "validate_output", "duration_ms": 15, "status": "ok"},
{"step": 7, "name": "materialize", "duration_ms": 234, "status": "ok"},
{"step": 8, "name": "persist", "duration_ms": 89, "status": "ok"},
{"step": 9, "name": "return", "duration_ms": 2, "status": "ok"}
],
"total_duration_ms": 1933,
"tokens_used": 2456,
"cost_usd": 0.0086
}
}
📝 Resumo do Modulo
✓9 Passos - Implementados em codigo Python
✓3 Fases - Entrada, Processamento, Saida
✓Orquestracao - Sistema controla todo o fluxo
✓Rastreabilidade - Cada passo e logado