Skip to main content
This example shows a complete RAG (Retrieval-Augmented Generation) pipeline using Moxn for prompt management and telemetry.

Overview

The pipeline:
  1. Receives a user query
  2. Searches a knowledge base for relevant documents
  3. Sends query + context to an LLM
  4. Returns a grounded response

Prompt Setup

First, create a prompt in the Moxn web app with these messages: System Message:
You are a helpful assistant for {{company_name}}. Answer questions using ONLY
the provided context. If the context doesn't contain the answer, say so.

Guidelines:
- Be concise and direct
- Cite sources when possible
- Don't make up information
User Message:
User: {{user_name}}
Question: {{query}}

Relevant documents:
{{search_results}}

Generated Models

After setting up the prompt, generate models:
async with MoxnClient() as client:
    await client.generate_task_models(
        task_id="your-task-id",
        output_dir="./models"
    )
This creates:
# models/knowledge_base_assistant_models.py

from moxn.types.base import RenderableModel
import json

class Document(RenderableModel):
    id: str
    title: str
    content: str
    score: float

class RAGInput(RenderableModel):
    company_name: str
    user_name: str
    query: str
    search_results: list[Document]

    def render(self, **kwargs) -> dict[str, str]:
        # Format documents for the LLM
        docs_formatted = "\n\n".join([
            f"[{i+1}] {doc.title}\n{doc.content}"
            for i, doc in enumerate(self.search_results)
        ])

        return {
            "company_name": self.company_name,
            "user_name": self.user_name,
            "query": self.query,
            "search_results": docs_formatted
        }

Complete Implementation

import asyncio
from moxn import MoxnClient
from moxn.types.content import Provider
from anthropic import Anthropic
from models.knowledge_base_assistant_models import RAGInput, Document

# Your search implementation
async def search_knowledge_base(query: str, top_k: int = 5) -> list[Document]:
    """Search your vector database or search engine."""
    # Example: using a vector DB
    results = await vector_db.search(query, limit=top_k)

    return [
        Document(
            id=r["id"],
            title=r["title"],
            content=r["content"],
            score=r["score"]
        )
        for r in results
    ]


async def answer_question(
    query: str,
    user_name: str,
    company_name: str = "Acme Corp"
) -> str:
    """Answer a question using RAG."""

    async with MoxnClient() as client:
        # Step 1: Search for relevant documents (outside span)
        documents = await search_knowledge_base(query, top_k=5)

        # Step 2: Create prompt session with context
        session = await client.create_prompt_session(
            prompt_id="rag-prompt-id",
            branch_name="main",  # Use commit_id in production
            session_data=RAGInput(
                company_name=company_name,
                user_name=user_name,
                query=query,
                search_results=documents
            )
        )

        # Step 3: Generate response with all context in metadata
        async with client.span(
            session,
            name="generate_answer",
            metadata={
                "user_name": user_name,
                "query": query,
                "doc_count": len(documents),
                "top_score": documents[0].score if documents else 0
            }
        ) as span:
            anthropic = Anthropic()
            response = anthropic.messages.create(
                **session.to_anthropic_invocation()
            )

            # Log telemetry
            await client.log_telemetry_event_from_response(
                session, response, Provider.ANTHROPIC
            )

            return response.content[0].text


# Run the pipeline
async def main():
    answer = await answer_question(
        query="How do I reset my password?",
        user_name="Alice"
    )
    print(answer)

asyncio.run(main())

Multi-Turn Conversations

Extend the pipeline for conversations:
async def conversation_loop():
    async with MoxnClient() as client:
        session = await client.create_prompt_session(
            prompt_id="rag-prompt-id",
            session_data=RAGInput(
                company_name="Acme Corp",
                user_name="Alice",
                query="",  # Initial empty
                search_results=[]
            )
        )

        anthropic = Anthropic()

        while True:
            query = input("You: ")
            if query.lower() in ["exit", "quit"]:
                break

            # Search for each turn
            documents = await search_knowledge_base(query)

            # Update session with new query
            # (In practice, you'd create a new session or update context)
            session.append_user_text(
                f"Question: {query}\n\nContext:\n" +
                "\n".join([d.content for d in documents])
            )

            async with client.span(session, name=f"turn_{turn}") as span:
                response = anthropic.messages.create(
                    **session.to_anthropic_invocation()
                )

                await client.log_telemetry_event_from_response(
                    session, response, Provider.ANTHROPIC
                )

            # Add response to session for context
            parsed = session.parse_response(response)
            session.append_assistant_response(parsed)

            print(f"Assistant: {response.content[0].text}")

With Source Citations

Add citation tracking to your response:
class CitedResponse(BaseModel):
    answer: str
    citations: list[int]  # Document indices used
    confidence: float

async def answer_with_citations(query: str, documents: list[Document]):
    # Use structured output for citations
    session = await client.create_prompt_session(
        prompt_id="rag-with-citations-prompt",
        session_data=RAGInput(query=query, search_results=documents)
    )

    response = anthropic.messages.create(
        **session.to_anthropic_invocation(),
        extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
    )

    parsed = session.parse_response(response)
    result = CitedResponse.model_validate_json(
        parsed.candidates[0].content[0].text
    )

    return result

Production Considerations

Document Chunking

def chunk_document(doc: str, max_tokens: int = 500) -> list[str]:
    """Split documents into chunks for better retrieval."""
    # Simple chunking (use a proper splitter in production)
    words = doc.split()
    chunks = []
    current_chunk = []

    for word in words:
        current_chunk.append(word)
        if len(current_chunk) >= max_tokens:
            chunks.append(" ".join(current_chunk))
            current_chunk = []

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

Reranking

async def search_with_rerank(query: str, top_k: int = 5) -> list[Document]:
    # Get more candidates than needed
    candidates = await search_knowledge_base(query, top_k=20)

    # Rerank with a cross-encoder or LLM
    reranked = await rerank_documents(query, candidates)

    return reranked[:top_k]

Fallback Handling

async def answer_with_fallback(query: str, documents: list[Document]):
    if not documents or documents[0].score < 0.5:
        return "I don't have enough information to answer that question."

    # Proceed with normal RAG
    ...

Observability

The telemetry captures:
  • Search latency and document counts
  • LLM input/output
  • Token usage and costs
  • Per-query metadata
View in the Moxn web app:
  • answer_question (trace)
    • search_documents (span) - 50ms
      • doc_count: 5, top_score: 0.92
    • generate_answer (span) - 1.2s
      • LLM Event: 150 in / 200 out tokens

Next Steps