Skip to main content
This example demonstrates a multi-agent workflow with parallel execution, hierarchical spans, and aggregation—all with full observability.

Overview

The workflow:
  1. A coordinator receives a document
  2. Multiple analyzers run in parallel (sentiment, entities, summary)
  3. Results are aggregated into a final report
  4. Everything is traced with parent-child spans

Prompt Setup

Create multiple prompts in your task:

Coordinator Prompt

Plans which analyses to run.

Sentiment Analyzer Prompt

Analyze the sentiment of the following text.
Return: positive, negative, or neutral with confidence.

Text: {{text}}

Entity Extractor Prompt

Extract named entities from the following text.
Return: people, organizations, locations, dates.

Text: {{text}}

Summarizer Prompt

Create a concise summary of the following text.
Keep it under 3 sentences.

Text: {{text}}

Aggregator Prompt

Combine the following analyses into a cohesive report.

Sentiment: {{sentiment_result}}
Entities: {{entity_result}}
Summary: {{summary_result}}

Complete Implementation

import asyncio
from dataclasses import dataclass
from moxn import MoxnClient
from moxn.types.content import Provider
from moxn.types.telemetry import SpanContext
from anthropic import Anthropic

@dataclass
class AnalysisResults:
    sentiment: str
    entities: dict
    summary: str
    final_report: str


async def analyze_document(document: str) -> AnalysisResults:
    """Analyze a document with multiple specialized agents."""

    async with MoxnClient() as client:
        anthropic = Anthropic()

        # Create root span for the entire workflow
        coordinator_session = await client.create_prompt_session(
            prompt_id="coordinator-prompt",
            session_data=CoordinatorInput(document=document)
        )

        async with client.span(
            coordinator_session,
            name="document_analysis_workflow",
            metadata={"doc_length": len(document)}
        ) as root_span:

            # Get root context for parallel tasks
            root_context = root_span.context

            # Run analyzers in parallel
            sentiment_task = analyze_sentiment(
                client, anthropic, document, root_context
            )
            entity_task = extract_entities(
                client, anthropic, document, root_context
            )
            summary_task = create_summary(
                client, anthropic, document, root_context
            )

            # Wait for all to complete
            sentiment, entities, summary = await asyncio.gather(
                sentiment_task,
                entity_task,
                summary_task
            )

            # Aggregate results
            final_report = await aggregate_results(
                client, anthropic,
                sentiment, entities, summary,
                root_context
            )

            return AnalysisResults(
                sentiment=sentiment,
                entities=entities,
                summary=summary,
                final_report=final_report
            )


async def analyze_sentiment(
    client: MoxnClient,
    anthropic: Anthropic,
    text: str,
    parent_context: SpanContext
) -> str:
    """Analyze sentiment of text."""

    session = await client.create_prompt_session(
        prompt_id="sentiment-prompt",
        session_data=SentimentInput(text=text)
    )

    async with client.span(
        session,
        name="sentiment_analysis",
        metadata={"text_length": len(text)},
        parent_context=parent_context  # Link to root
    ) as span:
        response = anthropic.messages.create(
            **session.to_anthropic_invocation()
        )

        await client.log_telemetry_event_from_response(
            session, response, Provider.ANTHROPIC
        )

        return response.content[0].text


async def extract_entities(
    client: MoxnClient,
    anthropic: Anthropic,
    text: str,
    parent_context: SpanContext
) -> dict:
    """Extract named entities from text."""

    session = await client.create_prompt_session(
        prompt_id="entity-prompt",
        session_data=EntityInput(text=text)
    )

    async with client.span(
        session,
        name="entity_extraction",
        metadata={"text_length": len(text)},
        parent_context=parent_context
    ) as span:
        response = anthropic.messages.create(
            **session.to_anthropic_invocation(),
            extra_headers={"anthropic-beta": "structured-outputs-2025-11-13"}
        )

        await client.log_telemetry_event_from_response(
            session, response, Provider.ANTHROPIC
        )

        parsed = session.parse_response(response)
        return json.loads(parsed.candidates[0].content[0].text)


async def create_summary(
    client: MoxnClient,
    anthropic: Anthropic,
    text: str,
    parent_context: SpanContext
) -> str:
    """Create a summary of text."""

    session = await client.create_prompt_session(
        prompt_id="summary-prompt",
        session_data=SummaryInput(text=text)
    )

    async with client.span(
        session,
        name="summarization",
        metadata={"text_length": len(text)},
        parent_context=parent_context
    ) as span:
        response = anthropic.messages.create(
            **session.to_anthropic_invocation()
        )

        await client.log_telemetry_event_from_response(
            session, response, Provider.ANTHROPIC
        )

        return response.content[0].text


async def aggregate_results(
    client: MoxnClient,
    anthropic: Anthropic,
    sentiment: str,
    entities: dict,
    summary: str,
    parent_context: SpanContext
) -> str:
    """Aggregate analysis results into final report."""

    session = await client.create_prompt_session(
        prompt_id="aggregator-prompt",
        session_data=AggregatorInput(
            sentiment_result=sentiment,
            entity_result=json.dumps(entities),
            summary_result=summary
        )
    )

    async with client.span(
        session,
        name="aggregate_results",
        parent_context=parent_context
    ) as span:
        response = anthropic.messages.create(
            **session.to_anthropic_invocation()
        )

        await client.log_telemetry_event_from_response(
            session, response, Provider.ANTHROPIC
        )

        return response.content[0].text


# Run the workflow
async def main():
    document = """
    Today, Acme Corporation announced a major partnership with TechCorp
    to develop next-generation AI solutions. CEO Jane Smith expressed
    excitement about the collaboration, stating it would create 500 new
    jobs in San Francisco by 2025. Investors responded positively, with
    stock prices rising 15% in after-hours trading.
    """

    results = await analyze_document(document)

    print("=== Analysis Results ===")
    print(f"\nSentiment: {results.sentiment}")
    print(f"\nEntities: {results.entities}")
    print(f"\nSummary: {results.summary}")
    print(f"\n=== Final Report ===\n{results.final_report}")

asyncio.run(main())

Trace Hierarchy

The resulting trace looks like:
  • document_analysis_workflow (root) - 3.2s
    • sentiment_analysis - 0.8s (parallel)
      • LLM Event: Claude, 50 in / 30 out
    • entity_extraction - 1.1s (parallel)
      • LLM Event: Claude, 50 in / 80 out
    • summarization - 0.9s (parallel)
      • LLM Event: Claude, 50 in / 60 out
    • aggregate_results - 0.4s (sequential)
      • LLM Event: Claude, 200 in / 150 out

With Error Handling

Add resilience to the workflow:
async def analyze_with_fallback(
    client: MoxnClient,
    anthropic: Anthropic,
    text: str,
    parent_context: SpanContext
):
    """Analyze with fallback on failure."""

    async with client.span(
        session,
        name="sentiment_with_fallback",
        metadata={"has_fallback": True},
        parent_context=parent_context
    ) as span:
        try:
            return await analyze_sentiment(
                client, anthropic, text, parent_context
            )
        except Exception as e:
            # Span automatically captures error details
            return "unknown"  # Fallback value


async def analyze_document_resilient(document: str):
    """Workflow that continues even if some agents fail."""

    async with MoxnClient() as client:
        anthropic = Anthropic()

        async with client.span(...) as root_span:
            root_context = root_span.context

            # Use gather with return_exceptions=True
            results = await asyncio.gather(
                analyze_sentiment(...),
                extract_entities(...),
                create_summary(...),
                return_exceptions=True
            )

            # Handle partial failures
            sentiment = results[0] if not isinstance(results[0], Exception) else "unknown"
            entities = results[1] if not isinstance(results[1], Exception) else {}
            summary = results[2] if not isinstance(results[2], Exception) else "Summary unavailable"

            # Aggregate with what we have
            return await aggregate_results(...)

Distributed Execution

For workflows across services:
# Service A: Coordinator
async def coordinate(document: str):
    async with MoxnClient() as client:
        async with client.span(session, name="coordinate") as span:
            # Extract carrier for other services
            carrier = client.extract_context()

            # Queue work for other services
            await queue.put({
                "task": "sentiment",
                "carrier": carrier.model_dump(mode="json"),
                "document": document
            })
            await queue.put({
                "task": "entities",
                "carrier": carrier.model_dump(mode="json"),
                "document": document
            })

# Service B: Worker
async def worker():
    async with MoxnClient() as client:
        while True:
            message = await queue.get()
            carrier = MoxnTraceCarrier.model_validate(message["carrier"])

            async with client.span_from_carrier(
                carrier,
                name=message["task"]
            ) as span:
                # Process and log
                result = await process(message)
                await client.log_telemetry_event(...)

            await results_queue.put(result)

Conditional Workflows

Run different paths based on initial analysis:
async def conditional_workflow(document: str):
    async with MoxnClient() as client:
        # Classify first to know the document type
        doc_type = await classify_document(client, document)

        # Create span with classification in metadata
        async with client.span(
            session,
            name="conditional_analysis",
            metadata={
                "doc_type": doc_type,
                "doc_length": len(document)
            }
        ) as root_span:
            # Route to appropriate pipeline
            if doc_type == "financial":
                return await financial_pipeline(client, document, root_span.context)
            elif doc_type == "legal":
                return await legal_pipeline(client, document, root_span.context)
            else:
                return await general_pipeline(client, document, root_span.context)

Observability Benefits

With proper span hierarchy you can:
  1. Debug failures: See exactly which agent failed
  2. Measure performance: Identify slow agents
  3. Track costs: See token usage per agent
  4. Analyze patterns: Find common failure modes

Next Steps