Overview
The workflow:- A coordinator receives a document
- Multiple analyzers run in parallel (sentiment, entities, summary)
- Results are aggregated into a final report
- Everything is traced with parent-child spans
Prompt Setup
Create multiple prompts in your task:Coordinator Prompt
Plans which analyses to run.Sentiment Analyzer Prompt
Copy
Analyze the sentiment of the following text.
Return: positive, negative, or neutral with confidence.
Text: {{text}}
Entity Extractor Prompt
Copy
Extract named entities from the following text.
Return: people, organizations, locations, dates.
Text: {{text}}
Summarizer Prompt
Copy
Create a concise summary of the following text.
Keep it under 3 sentences.
Text: {{text}}
Aggregator Prompt
Copy
Combine the following analyses into a cohesive report.
Sentiment: {{sentiment_result}}
Entities: {{entity_result}}
Summary: {{summary_result}}
Complete Implementation
Copy
import asyncio
from dataclasses import dataclass
from moxn import MoxnClient
from moxn.types.content import Provider
from moxn.types.telemetry import SpanContext
from anthropic import Anthropic
@dataclass
class AnalysisResults:
sentiment: str
entities: dict
summary: str
final_report: str
async def analyze_document(document: str) -> AnalysisResults:
"""Analyze a document with multiple specialized agents."""
async with MoxnClient() as client:
anthropic = Anthropic()
# Create root span for the entire workflow
coordinator_session = await client.create_prompt_session(
prompt_id="coordinator-prompt",
session_data=CoordinatorInput(document=document)
)
async with client.span(
coordinator_session,
name="document_analysis_workflow",
metadata={"doc_length": len(document)}
) as root_span:
# Get root context for parallel tasks
root_context = root_span.context
# Run analyzers in parallel
sentiment_task = analyze_sentiment(
client, anthropic, document, root_context
)
entity_task = extract_entities(
client, anthropic, document, root_context
)
summary_task = create_summary(
client, anthropic, document, root_context
)
# Wait for all to complete
sentiment, entities, summary = await asyncio.gather(
sentiment_task,
entity_task,
summary_task
)
# Aggregate results
final_report = await aggregate_results(
client, anthropic,
sentiment, entities, summary,
root_context
)
return AnalysisResults(
sentiment=sentiment,
entities=entities,
summary=summary,
final_report=final_report
)
async def analyze_sentiment(
client: MoxnClient,
anthropic: Anthropic,
text: str,
parent_context: SpanContext
) -> str:
"""Analyze sentiment of text."""
session = await client.create_prompt_session(
prompt_id="sentiment-prompt",
session_data=SentimentInput(text=text)
)
async with client.span(
session,
name="sentiment_analysis",
metadata={"text_length": len(text)},
parent_context=parent_context # Link to root
) as span:
response = anthropic.messages.create(
**session.to_anthropic_invocation()
)
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
return response.content[0].text
async def extract_entities(
client: MoxnClient,
anthropic: Anthropic,
text: str,
parent_context: SpanContext
) -> dict:
"""Extract named entities from text."""
session = await client.create_prompt_session(
prompt_id="entity-prompt",
session_data=EntityInput(text=text)
)
async with client.span(
session,
name="entity_extraction",
metadata={"text_length": len(text)},
parent_context=parent_context
) as span:
response = anthropic.messages.create(
**session.to_anthropic_invocation(),
extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
)
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
parsed = session.parse_response(response)
return json.loads(parsed.candidates[0].content[0].text)
async def create_summary(
client: MoxnClient,
anthropic: Anthropic,
text: str,
parent_context: SpanContext
) -> str:
"""Create a summary of text."""
session = await client.create_prompt_session(
prompt_id="summary-prompt",
session_data=SummaryInput(text=text)
)
async with client.span(
session,
name="summarization",
metadata={"text_length": len(text)},
parent_context=parent_context
) as span:
response = anthropic.messages.create(
**session.to_anthropic_invocation()
)
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
return response.content[0].text
async def aggregate_results(
client: MoxnClient,
anthropic: Anthropic,
sentiment: str,
entities: dict,
summary: str,
parent_context: SpanContext
) -> str:
"""Aggregate analysis results into final report."""
session = await client.create_prompt_session(
prompt_id="aggregator-prompt",
session_data=AggregatorInput(
sentiment_result=sentiment,
entity_result=json.dumps(entities),
summary_result=summary
)
)
async with client.span(
session,
name="aggregate_results",
parent_context=parent_context
) as span:
response = anthropic.messages.create(
**session.to_anthropic_invocation()
)
await client.log_telemetry_event_from_response(
session, response, Provider.ANTHROPIC
)
return response.content[0].text
# Run the workflow
async def main():
document = """
Today, Acme Corporation announced a major partnership with TechCorp
to develop next-generation AI solutions. CEO Jane Smith expressed
excitement about the collaboration, stating it would create 500 new
jobs in San Francisco by 2025. Investors responded positively, with
stock prices rising 15% in after-hours trading.
"""
results = await analyze_document(document)
print("=== Analysis Results ===")
print(f"\nSentiment: {results.sentiment}")
print(f"\nEntities: {results.entities}")
print(f"\nSummary: {results.summary}")
print(f"\n=== Final Report ===\n{results.final_report}")
asyncio.run(main())
Trace Hierarchy
The resulting trace looks like:- document_analysis_workflow (root) - 3.2s
- sentiment_analysis - 0.8s (parallel)
- LLM Event: Claude, 50 in / 30 out
- entity_extraction - 1.1s (parallel)
- LLM Event: Claude, 50 in / 80 out
- summarization - 0.9s (parallel)
- LLM Event: Claude, 50 in / 60 out
- aggregate_results - 0.4s (sequential)
- LLM Event: Claude, 200 in / 150 out
- sentiment_analysis - 0.8s (parallel)
With Error Handling
Add resilience to the workflow:Copy
async def analyze_with_fallback(
client: MoxnClient,
anthropic: Anthropic,
text: str,
parent_context: SpanContext
):
"""Analyze with fallback on failure."""
async with client.span(
session,
name="sentiment_with_fallback",
metadata={"has_fallback": True},
parent_context=parent_context
) as span:
try:
return await analyze_sentiment(
client, anthropic, text, parent_context
)
except Exception as e:
# Span automatically captures error details
return "unknown" # Fallback value
async def analyze_document_resilient(document: str):
"""Workflow that continues even if some agents fail."""
async with MoxnClient() as client:
anthropic = Anthropic()
async with client.span(...) as root_span:
root_context = root_span.context
# Use gather with return_exceptions=True
results = await asyncio.gather(
analyze_sentiment(...),
extract_entities(...),
create_summary(...),
return_exceptions=True
)
# Handle partial failures
sentiment = results[0] if not isinstance(results[0], Exception) else "unknown"
entities = results[1] if not isinstance(results[1], Exception) else {}
summary = results[2] if not isinstance(results[2], Exception) else "Summary unavailable"
# Aggregate with what we have
return await aggregate_results(...)
Distributed Execution
For workflows across services:Copy
# Service A: Coordinator
async def coordinate(document: str):
async with MoxnClient() as client:
async with client.span(session, name="coordinate") as span:
# Extract carrier for other services
carrier = client.extract_context()
# Queue work for other services
await queue.put({
"task": "sentiment",
"carrier": carrier.model_dump(mode="json"),
"document": document
})
await queue.put({
"task": "entities",
"carrier": carrier.model_dump(mode="json"),
"document": document
})
# Service B: Worker
async def worker():
async with MoxnClient() as client:
while True:
message = await queue.get()
carrier = MoxnTraceCarrier.model_validate(message["carrier"])
async with client.span_from_carrier(
carrier,
name=message["task"]
) as span:
# Process and log
result = await process(message)
await client.log_telemetry_event(...)
await results_queue.put(result)
Conditional Workflows
Run different paths based on initial analysis:Copy
async def conditional_workflow(document: str):
async with MoxnClient() as client:
# Classify first to know the document type
doc_type = await classify_document(client, document)
# Create span with classification in metadata
async with client.span(
session,
name="conditional_analysis",
metadata={
"doc_type": doc_type,
"doc_length": len(document)
}
) as root_span:
# Route to appropriate pipeline
if doc_type == "financial":
return await financial_pipeline(client, document, root_span.context)
elif doc_type == "legal":
return await legal_pipeline(client, document, root_span.context)
else:
return await general_pipeline(client, document, root_span.context)
Observability Benefits
With proper span hierarchy you can:- Debug failures: See exactly which agent failed
- Measure performance: Identify slow agents
- Track costs: See token usage per agent
- Analyze patterns: Find common failure modes