import asyncio
from moxn import MoxnClient
from moxn.types.content import Provider
from anthropic import Anthropic
from models.knowledge_base_assistant_models import RAGInput, Document
# Your search implementation
async def search_knowledge_base(query: str, top_k: int = 5) -> list[Document]:
"""Search your vector database or search engine."""
# Example: using a vector DB
results = await vector_db.search(query, limit=top_k)
return [
Document(
id=r["id"],
title=r["title"],
content=r["content"],
score=r["score"]
)
for r in results
]
async def answer_question(
query: str,
user_name: str,
company_name: str = "Acme Corp"
) -> str:
"""Answer a question using RAG."""
async with MoxnClient() as client:
# Step 1: Search for relevant documents (outside span)
documents = await search_knowledge_base(query, top_k=5)
# Step 2: Create prompt session with context
session = await client.create_prompt_session(
prompt_id="rag-prompt-id",
branch_name="main", # Use commit_id in production
session_data=RAGInput(
company_name=company_name,
user_name=user_name,
query=query,
search_results=documents
)
)
# Step 3: Generate response with all context in metadata
async with client.span(
session,
name="generate_answer",
metadata={
"user_name": user_name,
"query": query,
"doc_count": len(documents),
"top_score": documents[0].score if documents else 0
}
) as span:
anthropic = Anthropic()
response = anthropic.messages.create(
**session.to_anthropic_invocation()
)
# Log telemetry
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
return response.content[0].text
# Run the pipeline
async def main():
answer = await answer_question(
query="How do I reset my password?",
user_name="Alice"
)
print(answer)
asyncio.run(main())