MODULO 5.4

๐Ÿ”„ Implementando o Pipeline

Os 9 passos do Pipeline Universal em codigo funcional.

8
Topicos
~60
Minutos
Pratico
Nivel
Hands-on
Tipo
1

๐Ÿ“ฅ Passo 1-2: Request e Validacao

# app/services/pipeline.py
from uuid import uuid4
from app.config import settings

class Pipeline:
    async def step_1_receive(self, request: dict) -> dict:
        """Passo 1: Receber e normalizar input"""
        return {
            "request_id": str(uuid4()),
            "input": request.get("input", ""),
            "persona": request.get("persona", settings.DEFAULT_PERSONA),
            "timestamp": datetime.utcnow()
        }

    async def step_2_validate(self, data: dict) -> dict:
        """Passo 2: Validar contra regras de governanca"""
        if not data["input"]:
            raise ValidationError("Input vazio")
        if len(data["input"]) > settings.MAX_TOKENS_PER_REQUEST:
            raise ValidationError("Input muito grande")
        # Validar persona
        if data["persona"] not in ALLOWED_PERSONAS:
            raise ValidationError(f"Persona invalida: {data['persona']}")
        return data
2

๐Ÿ—‚๏ธ Passo 3-4: Contexto e Normalizacao

    async def step_3_context(self, data: dict) -> dict:
        """Passo 3: Coletar contexto necessario"""
        context = {
            **data,
            "system_context": {
                "date": datetime.utcnow().isoformat(),
                "version": settings.VERSION,
            }
        }
        # Adicionar contexto especifico do dominio
        if data.get("document_id"):
            context["document"] = await self.load_document(data["document_id"])
        return context

    async def step_4_normalize(self, context: dict) -> dict:
        """Passo 4: Normalizar para formato do LLM"""
        normalized = {
            **context,
            "input_normalized": context["input"].strip().lower(),
            "token_count_estimate": len(context["input"].split()) * 1.3
        }
        return normalized
3

๐ŸŽญ Passo 5: Injecao de Persona

    async def step_5_persona(self, data: dict) -> dict:
        """Passo 5: Injetar persona no prompt"""
        from app.governance.personas import get_persona

        persona = get_persona(data["persona"])

        # Montar prompt com persona injetada
        prompt = f"""{persona.system_prompt}

CONTEXTO ATUAL:
{data.get('system_context', {})}

SOLICITACAO DO USUARIO:
{data['input']}

RESPONDA SEGUINDO AS REGRAS DA PERSONA ACIMA."""

        data["prompt_final"] = prompt
        data["persona_used"] = persona.name
        return data
4

๐Ÿค– Passo 6: Chamada a IA

    async def step_6_invoke(self, data: dict) -> dict:
        """Passo 6: Chamar o LLM"""
        from app.services.llm import LLMService

        llm = LLMService()
        start_time = time.time()

        response = await llm.generate(
            prompt=data["prompt_final"],
            max_tokens=8192,
            temperature=0.7
        )

        data["ai_response"] = response.text
        data["tokens_input"] = response.usage.input_tokens
        data["tokens_output"] = response.usage.output_tokens
        data["latency_ms"] = (time.time() - start_time) * 1000
        data["cost_usd"] = self._calculate_cost(response.usage)

        return data
5

๐Ÿ’พ Passo 7: Persistencia

    async def step_7_persist(self, data: dict, db: Session) -> dict:
        """Passo 7: Salvar TUDO no banco"""
        from app.models.interaction import Interaction

        interaction = Interaction(
            request_id=data["request_id"],
            input_text=data["input"],
            prompt_sent=data["prompt_final"],
            response_received=data["ai_response"],
            persona_used=data["persona_used"],
            tokens_input=data["tokens_input"],
            tokens_output=data["tokens_output"],
            cost_usd=data["cost_usd"],
            latency_ms=data["latency_ms"],
            timestamp=data["timestamp"]
        )

        db.add(interaction)
        db.commit()

        data["persisted"] = True
        return data
6

๐Ÿ“ฆ Passo 8: Materializacao

    async def step_8_materialize(self, data: dict) -> dict:
        """Passo 8: Gerar artefato final (se necessario)"""
        output_format = data.get("output_format", "text")

        if output_format == "text":
            data["artifact"] = data["ai_response"]
        elif output_format == "markdown":
            data["artifact"] = self._format_markdown(data["ai_response"])
        elif output_format == "json":
            data["artifact"] = self._parse_json(data["ai_response"])

        return data
7

โ†ฉ๏ธ Passo 9: Retorno

    async def step_9_return(self, data: dict) -> PipelineResponse:
        """Passo 9: Retornar resposta ao cliente"""
        return PipelineResponse(
            request_id=data["request_id"],
            response=data["artifact"],
            metadata={
                "tokens_used": data["tokens_input"] + data["tokens_output"],
                "cost_usd": data["cost_usd"],
                "latency_ms": data["latency_ms"],
                "persona_used": data["persona_used"]
            }
        )
8

๐Ÿงช Pipeline Completo

    async def execute(self, request: dict, db: Session) -> PipelineResponse:
        """Executa pipeline completo de 9 passos"""
        # ENTRADA
        data = await self.step_1_receive(request)
        data = await self.step_2_validate(data)
        data = await self.step_3_context(data)
        data = await self.step_4_normalize(data)

        # PROCESSAMENTO
        data = await self.step_5_persona(data)
        data = await self.step_6_invoke(data)

        # SAIDA
        data = await self.step_7_persist(data, db)
        data = await self.step_8_materialize(data)
        return await self.step_9_return(data)

๐Ÿ“ Resumo do Modulo

โœ“9 Passos - Implementados em codigo
โœ“Validacao - Regras de governanca
โœ“Persistencia - Tudo e salvo
โœ“Rastreabilidade - request_id em tudo