Skip to main content
Moxn provides W3C Trace Context compatible spans for full observability of your LLM workflows. This guide covers how to create spans and propagate trace context.

Why Spans?

Spans give you:
  • Visibility: See exactly what happened during an LLM call
  • Debugging: Trace issues back to specific invocations
  • Analytics: Measure latency, token usage, and costs
  • Correlation: Link related LLM calls in complex workflows

Basic Span Usage

Wrap LLM calls in spans to capture telemetry:
from moxn import MoxnClient
from moxn.types.content import Provider
from anthropic import Anthropic

async with MoxnClient() as client:
    session = await client.create_prompt_session(
        prompt_id="...",
        session_data=your_input
    )

    # Create a span for this LLM call
    async with client.span(session) as span:
        anthropic = Anthropic()
        response = anthropic.messages.create(
            **session.to_anthropic_invocation()
        )

        # Log the event within the span
        await client.log_telemetry_event_from_response(
            session, response, Provider.ANTHROPIC
        )

Span Parameters

Basic Parameters

async with client.span(
    session,
    name="analyze_query",     # Custom span name (defaults to prompt name)
    metadata={"key": "value"} # Searchable attributes
) as span:
    ...

Full Signature

async with client.span(
    prompt_session: PromptSession,
    name: str | None = None,
    metadata: dict[str, Any] | None = None,
    *,
    parent_context: SpanContext | None = None,  # For async patterns
    trace_context: TraceContext | None = None   # For distributed tracing
) as span:
    ...

Span Properties

Access span information within the context:
async with client.span(
    session,
    metadata={
        "user_id": "user_123",
        "query_type": "product_question"
    }
) as span:
    span.span_id          # str - unique span identifier
    span.context.trace_id # str - trace identifier
    span.name             # str - span name
    span.parent_span_id   # str | None - parent span (if nested)
    span.start_time       # datetime
    span.attributes       # dict - custom attributes (from metadata)

Parent-Child Spans

Create hierarchical traces for complex workflows:
async with client.span(session, name="handle_request") as root_span:
    # First child span
    async with client.span(session, name="classify_query") as classify_span:
        classification = await classify(query)
        await client.log_telemetry_event_from_response(...)

    # Second child span
    async with client.span(session, name="generate_response") as response_span:
        response = await generate(query, classification)
        await client.log_telemetry_event_from_response(...)
This creates a trace hierarchy:
  • handle_request (root)
    • classify_query (child)
    • generate_response (child)

Parallel Spans

For concurrent operations, pass parent context explicitly:
import asyncio

async with client.span(session, name="parallel_analysis") as root_span:
    # Extract parent context for parallel tasks
    root_context = root_span.context

    async def analyze_sentiment():
        async with client.span(
            session,
            name="sentiment_analysis",
            parent_context=root_context  # Explicit parent
        ) as span:
            response = await call_llm_for_sentiment()
            await client.log_telemetry_event_from_response(...)
            return response

    async def analyze_entities():
        async with client.span(
            session,
            name="entity_extraction",
            parent_context=root_context  # Same parent
        ) as span:
            response = await call_llm_for_entities()
            await client.log_telemetry_event_from_response(...)
            return response

    # Run in parallel
    sentiment, entities = await asyncio.gather(
        analyze_sentiment(),
        analyze_entities()
    )
Result:
  • parallel_analysis (root)
    • sentiment_analysis (parallel child)
    • entity_extraction (parallel child)

Distributed Tracing

Propagate traces across services using carriers:

Extract Context

async with client.span(session, name="api_handler") as span:
    # Extract context for propagation
    carrier = client.extract_context()

    if carrier:
        # Send to another service via queue, HTTP, etc.
        await queue.put({
            "carrier": carrier.model_dump(mode="json"),
            "payload": data
        })

Resume from Carrier

In another service or worker:
from moxn.types.telemetry import MoxnTraceCarrier

# Receive the carrier
message = await queue.get()
carrier = MoxnTraceCarrier.model_validate(message["carrier"])

# Create a span that continues the trace
async with client.span_from_carrier(
    carrier,
    name="process_async_task"
) as span:
    # Process the work
    result = await process(message["payload"])
    await client.log_telemetry_event(...)

Carrier Contents

carrier = client.extract_context()

carrier.trace_id      # str - W3C trace ID
carrier.span_id       # str - Parent span ID
carrier.prompt_id     # UUID - Source prompt
carrier.prompt_name   # str
carrier.task_id       # UUID
carrier.commit_id     # UUID | None
carrier.branch_id     # UUID | None

W3C Trace Context

Moxn spans are W3C Trace Context compatible:
# Extract W3C headers for HTTP propagation
async with client.span(session) as span:
    headers = {
        "traceparent": f"00-{span.context.trace_id}-{span.span_id}-01"
    }
    await http_client.post(url, headers=headers, json=data)

Incoming HTTP Requests

Resume traces from incoming HTTP:
from moxn.types.telemetry import TraceContext

# Parse incoming traceparent header
traceparent = request.headers.get("traceparent")
if traceparent:
    parts = traceparent.split("-")
    trace_context = TraceContext(
        trace_id=parts[1],
        span_id=parts[2]
    )

    async with client.span(
        session,
        trace_context=trace_context  # Continue existing trace
    ) as span:
        ...

Adding Metadata

Add searchable metadata to spans by passing it in the metadata parameter at span creation:
async with client.span(
    session,
    metadata={
        "customer_id": "cust_123",
        "request_id": "req_456",
        "environment": "production",
        "query_type": "product_question",
        "document_count": len(documents)
    }
) as span:
    response = await call_llm()
    # Metadata is captured with all telemetry events in this span
This is the preferred pattern—define all known metadata upfront when creating the span context.

Span Events

Within a span, you can log multiple events:
async with client.span(session, name="multi_step_workflow") as span:
    # First LLM call
    response1 = await call_llm_1()
    await client.log_telemetry_event_from_response(
        session, response1, Provider.ANTHROPIC
    )

    # Process result
    processed = process(response1)

    # Second LLM call
    response2 = await call_llm_2(processed)
    await client.log_telemetry_event_from_response(
        session, response2, Provider.ANTHROPIC
    )
Both events are associated with the same span.

Error Handling

Spans capture errors automatically. Exceptions are recorded on the span when raised:
async with client.span(
    session,
    metadata={"operation": "llm_call"}
) as span:
    try:
        response = await call_llm()
        await client.log_telemetry_event_from_response(...)
    except Exception as e:
        # The span automatically records exception details
        # (error.type and error.message are captured)
        raise

Flushing Telemetry

Ensure all telemetry is sent before shutdown:
async with MoxnClient() as client:
    # Your code...

    # Explicit flush with timeout (optional)
    await client.flush(timeout=5.0)

# Context manager automatically flushes on exit
For serverless or short-lived processes:
async def lambda_handler(event, context):
    async with MoxnClient() as client:
        session = await client.create_prompt_session(...)

        async with client.span(session) as span:
            response = await call_llm()
            await client.log_telemetry_event_from_response(...)

        # Ensure telemetry is sent before Lambda returns
        await client.flush(timeout=3.0)

    return {"statusCode": 200}

Complete Example

from moxn import MoxnClient
from moxn.types.content import Provider
from anthropic import Anthropic
import asyncio

async def handle_customer_query(query: str, customer_id: str):
    async with MoxnClient() as client:
        session = await client.create_prompt_session(
            prompt_id="product-help-prompt",
            branch_name="main",
            session_data=ProductHelpInput(query=query)
        )

        async with client.span(
            session,
            name="customer_support_request",
            metadata={
                "customer_id": customer_id,
                "query_length": len(query)
            }
        ) as root_span:

            # Step 1: Classify the query
            classification = await classify_query(query)

            # Step 2: Search for relevant docs
            docs = await search_knowledge_base(query)

            # Step 3: Generate response with all context in metadata
            async with client.span(
                session,
                name="generate",
                metadata={
                    "classification": classification,
                    "doc_count": len(docs)
                }
            ) as span:
                anthropic = Anthropic()
                response = anthropic.messages.create(
                    **session.to_anthropic_invocation()
                )
                await client.log_telemetry_event_from_response(
                    session, response, Provider.ANTHROPIC
                )

            return response.content[0].text

asyncio.run(handle_customer_query("How do I reset my password?", "cust_123"))

Next Steps