MODULO 4.6

🔄 Pipeline Universal Implementado

Os 9 passos do pipeline GIPM em codigo funcional.

6
Topicos
~40
Minutos
Avanc.
Nivel
Pratico
Tipo
1

📐 Arquitetura do Pipeline

Visao geral das 3 fases e 9 passos implementados.

class UniversalPipeline:
    """
    Pipeline Universal GIPM - 9 passos em 3 fases

    ENTRADA (Sistema prepara):
    1. Receber Input → Validar formato e limites
    2. Validar → Aplicar regras de governanca
    3. Preparar Contexto → Montar prompt estruturado

    PROCESSAMENTO (IA executa):
    4. Invocar IA → Chamar modelo com retry
    5. Receber Estrutura → Parsear resposta JSON
    6. Validar Saida → Verificar qualidade

    SAIDA (Sistema materializa):
    7. Materializar → Gerar artefato final
    8. Persistir → Salvar tudo no banco
    9. Retornar → Resposta ao cliente
    """
2

📥 Fase Entrada (Passos 1-3)

Sistema recebe, valida e prepara o contexto.

async def phase_input(self, request: PipelineRequest) -> Context:
    # Passo 1: Receber Input
    self.logger.info("pipeline_step_1_receive", request_id=request.id)
    input_data = self._normalize_input(request)

    # Passo 2: Validar
    self.logger.info("pipeline_step_2_validate", request_id=request.id)
    self._validate_governance(input_data)
    self._validate_limits(input_data)
    self._check_blocked_content(input_data)

    # Passo 3: Preparar Contexto
    self.logger.info("pipeline_step_3_context", request_id=request.id)
    context = Context(
        request_id=request.id,
        input=input_data,
        sources=await self._load_sources(request.notebook_id),
        persona=self._load_persona(request.persona),
        history=await self._load_history(request.notebook_id, limit=10)
    )

    return context
3

⚡ Fase Processamento (Passos 4-6)

IA e invocada e sua saida e validada.

async def phase_process(self, context: Context) -> AIResponse:
    # Passo 4: Invocar IA
    self.logger.info("pipeline_step_4_invoke", request_id=context.request_id)
    prompt = self._build_prompt(context)
    raw_response = await self.llm.generate(prompt)

    # Passo 5: Receber Estrutura
    self.logger.info("pipeline_step_5_parse", request_id=context.request_id)
    try:
        structured = self._parse_response(raw_response)
    except ParseError as e:
        # Fallback: tenta corrigir formato
        structured = await self._retry_with_format_fix(raw_response)

    # Passo 6: Validar Saida
    self.logger.info("pipeline_step_6_validate", request_id=context.request_id)
    validation = self._validate_output(structured, context)
    if not validation.passed:
        raise OutputValidationError(validation.errors)

    return AIResponse(
        content=structured,
        tokens=raw_response.usage,
        validation=validation
    )
4

📤 Fase Saida (Passos 7-9)

Sistema materializa, persiste e retorna.

async def phase_output(
    self,
    context: Context,
    ai_response: AIResponse
) -> PipelineResult:
    # Passo 7: Materializar
    self.logger.info("pipeline_step_7_materialize", request_id=context.request_id)
    artifact = await self._materialize(ai_response.content, context)

    # Passo 8: Persistir
    self.logger.info("pipeline_step_8_persist", request_id=context.request_id)
    await self._persist_all(
        context=context,
        ai_response=ai_response,
        artifact=artifact
    )

    # Passo 9: Retornar
    self.logger.info("pipeline_step_9_return", request_id=context.request_id)
    return PipelineResult(
        request_id=context.request_id,
        content=artifact.content,
        metadata=artifact.metadata,
        cost=self._calculate_cost(ai_response.tokens)
    )
5

🔗 Pipeline Completo

Orquestracao das 3 fases em sequencia.

class UniversalPipeline:
    async def execute(self, request: PipelineRequest) -> PipelineResult:
        """Executa pipeline completo de 9 passos"""
        start_time = time.time()

        try:
            # Fase 1: Entrada (Passos 1-3)
            context = await self.phase_input(request)

            # Fase 2: Processamento (Passos 4-6)
            ai_response = await self.phase_process(context)

            # Fase 3: Saida (Passos 7-9)
            result = await self.phase_output(context, ai_response)

            # Metricas
            result.duration_ms = (time.time() - start_time) * 1000
            self.metrics.record_success(result)

            return result

        except PipelineError as e:
            self.metrics.record_failure(e)
            await self._persist_error(request, e)
            raise
6

📊 Rastreabilidade End-to-End

Cada passo e rastreado com metricas e logs.

# Log estruturado de cada execucao
{
    "request_id": "uuid-123",
    "pipeline_execution": {
        "steps": [
            {"step": 1, "name": "receive", "duration_ms": 5, "status": "ok"},
            {"step": 2, "name": "validate", "duration_ms": 12, "status": "ok"},
            {"step": 3, "name": "context", "duration_ms": 45, "status": "ok"},
            {"step": 4, "name": "invoke", "duration_ms": 1523, "status": "ok"},
            {"step": 5, "name": "parse", "duration_ms": 8, "status": "ok"},
            {"step": 6, "name": "validate_output", "duration_ms": 15, "status": "ok"},
            {"step": 7, "name": "materialize", "duration_ms": 234, "status": "ok"},
            {"step": 8, "name": "persist", "duration_ms": 89, "status": "ok"},
            {"step": 9, "name": "return", "duration_ms": 2, "status": "ok"}
        ],
        "total_duration_ms": 1933,
        "tokens_used": 2456,
        "cost_usd": 0.0086
    }
}

📝 Resumo do Modulo

9 Passos - Implementados em codigo Python
3 Fases - Entrada, Processamento, Saida
Orquestracao - Sistema controla todo o fluxo
Rastreabilidade - Cada passo e logado