Overview
The pipeline:- Receives a document (text or PDF)
- Extracts structured data according to a schema
- Validates the output
- Returns typed results
Use Cases
- Extract entities from contracts
- Parse receipts and invoices
- Structure unstructured survey responses
- Pull key data from research papers
Prompt Setup
Create a prompt in the web app: System Message:Copy
You are a precise data extraction assistant. Extract information from
documents according to the specified schema. Be accurate and only extract
information that is explicitly stated in the document.
Rules:
- If a field is not found, use null
- Extract exact values, don't paraphrase
- Include confidence scores for uncertain extractions
Copy
Extract data from the following document:
{{document}}
Copy
{
"type": "object",
"properties": {
"vendor_name": {"type": "string"},
"invoice_number": {"type": "string"},
"date": {"type": "string", "format": "date"},
"total_amount": {"type": "number"},
"currency": {"type": "string"},
"line_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"quantity": {"type": "integer"},
"unit_price": {"type": "number"},
"amount": {"type": "number"}
}
}
},
"confidence": {"type": "number", "minimum": 0, "maximum": 1}
},
"required": ["vendor_name", "total_amount", "confidence"]
}
Generated Models
Copy
# models/invoice_extractor_models.py
from pydantic import BaseModel, Field
from moxn.types.base import RenderableModel
class LineItem(BaseModel):
description: str
quantity: int | None = None
unit_price: float | None = None
amount: float | None = None
class InvoiceExtraction(BaseModel):
vendor_name: str
invoice_number: str | None = None
date: str | None = None
total_amount: float
currency: str | None = None
line_items: list[LineItem] = []
confidence: float
class ExtractionInput(RenderableModel):
document: str
def render(self, **kwargs) -> dict[str, str]:
return {"document": self.document}
Complete Implementation
Copy
import asyncio
from moxn import MoxnClient
from moxn.types.content import Provider
from anthropic import Anthropic
from pydantic import ValidationError
from models.invoice_extractor_models import (
ExtractionInput,
InvoiceExtraction
)
async def extract_invoice_data(document_text: str) -> InvoiceExtraction | None:
"""Extract structured data from an invoice document."""
async with MoxnClient() as client:
session = await client.create_prompt_session(
prompt_id="invoice-extraction-prompt",
branch_name="main",
session_data=ExtractionInput(document=document_text)
)
async with client.span(
session,
name="extract_invoice",
metadata={"doc_length": len(document_text)}
) as span:
anthropic = Anthropic()
response = anthropic.messages.create(
**session.to_anthropic_invocation(),
extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
)
parsed = session.parse_response(response)
# Validate the structured output
validation_errors = []
result = None
try:
result = InvoiceExtraction.model_validate_json(
parsed.candidates[0].content[0].text
)
except ValidationError as e:
validation_errors = [str(err) for err in e.errors()]
# Log with validation status in event attributes
event = session.create_llm_event_from_parsed_response(
parsed_response=parsed,
validation_errors=validation_errors or None,
attributes={
"extraction_success": result is not None,
"confidence": result.confidence if result else None,
"item_count": len(result.line_items) if result else 0
}
)
await client.log_telemetry_event(event)
return result
# Usage
async def main():
invoice_text = """
INVOICE
Vendor: Acme Supplies Inc.
Invoice #: INV-2024-001
Date: January 15, 2024
Items:
1. Widget A (x10) - $5.00 each = $50.00
2. Widget B (x5) - $12.00 each = $60.00
3. Shipping = $15.00
Total: $125.00 USD
"""
result = await extract_invoice_data(invoice_text)
if result:
print(f"Vendor: {result.vendor_name}")
print(f"Invoice #: {result.invoice_number}")
print(f"Total: ${result.total_amount} {result.currency}")
print(f"Confidence: {result.confidence:.0%}")
print(f"Line items: {len(result.line_items)}")
asyncio.run(main())
Batch Processing
Extract from multiple documents:Copy
async def extract_batch(documents: list[str]) -> list[InvoiceExtraction | None]:
"""Process multiple documents."""
async with MoxnClient() as client:
session = await client.create_prompt_session(
prompt_id="invoice-extraction-prompt",
session_data=ExtractionInput(document="") # Will update per doc
)
results = []
async with client.span(
session,
name="batch_extraction",
metadata={"doc_count": len(documents)}
) as batch_span:
for i, doc in enumerate(documents):
# Create fresh session for each document
doc_session = await client.create_prompt_session(
prompt_id="invoice-extraction-prompt",
session_data=ExtractionInput(document=doc)
)
async with client.span(
doc_session,
name=f"extract_doc_{i}",
metadata={"doc_index": i, "doc_length": len(doc)}
) as span:
try:
result = await process_single_document(
client, doc_session, span
)
results.append(result)
except Exception as e:
# Errors are automatically captured by the span
results.append(None)
return results
With Retries
Handle extraction failures:Copy
async def extract_with_retry(
document: str,
max_retries: int = 2
) -> InvoiceExtraction | None:
"""Extract with retry on validation failure."""
async with MoxnClient() as client:
for attempt in range(max_retries + 1):
session = await client.create_prompt_session(
prompt_id="invoice-extraction-prompt",
session_data=ExtractionInput(document=document)
)
async with client.span(
session,
name=f"extract_attempt_{attempt}",
metadata={
"attempt": attempt,
"max_retries": max_retries
}
) as span:
response = anthropic.messages.create(
**session.to_anthropic_invocation(),
extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
)
parsed = session.parse_response(response)
try:
result = InvoiceExtraction.model_validate_json(
parsed.candidates[0].content[0].text
)
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
return result
except ValidationError as e:
if attempt == max_retries:
# Log final failure with validation errors
event = session.create_llm_event_from_parsed_response(
parsed_response=parsed,
validation_errors=[str(err) for err in e.errors()],
attributes={"validation_failed": True, "final_attempt": True}
)
await client.log_telemetry_event(event)
return None
# Try again
continue
return None
PDF Extraction
Extract from PDF documents:Copy
async def extract_from_pdf(pdf_url: str) -> InvoiceExtraction | None:
"""Extract from a PDF document."""
async with MoxnClient() as client:
# Use a prompt with PDF support
session = await client.create_prompt_session(
prompt_id="pdf-extraction-prompt",
session_data=PDFExtractionInput(pdf_url=pdf_url)
)
async with client.span(session, name="extract_pdf") as span:
anthropic = Anthropic()
response = anthropic.messages.create(
**session.to_anthropic_invocation(),
extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
)
# ... same validation logic
Quality Monitoring
Track extraction quality:Copy
async def extract_with_quality_check(document: str):
result = await extract_invoice_data(document)
if result:
# Check confidence threshold
if result.confidence < 0.8:
# Flag for human review
await flag_for_review(document, result)
# Check for missing required fields
missing = []
if not result.invoice_number:
missing.append("invoice_number")
if not result.date:
missing.append("date")
if missing:
# Log warning
print(f"Missing fields: {missing}")
return result
Telemetry View
In the Moxn web app, you’ll see:- batch_extraction (trace)
- extract_doc_0 - success, confidence: 95%
- extract_doc_1 - success, confidence: 87%
- extract_doc_2 - failed, validation_errors: 2
- extract_doc_3 - success, confidence: 92%
- Input document
- Extracted JSON
- Validation status
- Confidence scores