Skip to main content
The MoxnClient is your main entry point for interacting with the Moxn platform. It handles authentication, fetching prompts, creating sessions, managing telemetry, and code generation.

Usage

from moxn import MoxnClient

async with MoxnClient() as client:
    session = await client.create_prompt_session(
        prompt_id="your-prompt-id",
        branch_name="main"
    )
The client automatically uses MOXN_API_KEY from your environment.

Fetching Content

get_prompt

Fetch a prompt template with optional caching.
async def get_prompt(
    prompt_id: UUID | str,
    branch_name: str | None = None,
    commit_id: str | None = None,
) -> PromptTemplate
Parameters:
NameTypeDescription
prompt_idUUID | strThe prompt’s anchor ID
branch_namestr | NoneBranch name (mutually exclusive with commit_id)
commit_idstr | NoneCommit ID for immutable access (mutually exclusive with branch_name)
Returns: PromptTemplate Caching Behavior:
  • Commit access: Cached indefinitely (immutable)
  • Branch access: Always fetches latest (mutable)
# Development: get latest from branch
prompt = await client.get_prompt("prompt-id", branch_name="main")

# Production: pin to specific commit
prompt = await client.get_prompt("prompt-id", commit_id="abc123")

get_task

Fetch an entire task with all its prompts and schemas.
async def get_task(
    task_id: str,
    branch_name: str | None = None,
    commit_id: str | None = None,
) -> Task
Parameters:
NameTypeDescription
task_idstrThe task’s anchor ID
branch_namestr | NoneBranch name
commit_idstr | NoneCommit ID
Returns: Task
task = await client.get_task("task-id", branch_name="main")
for prompt in task.prompts:
    print(f"Prompt: {prompt.name}")

get_branch_head

Get the head commit for a branch. Useful for determining deployment versions.
async def get_branch_head(
    task_id: str,
    branch_name: str = "main"
) -> BranchHeadResponse
Returns: BranchHeadResponse with:
  • effective_commit_id: The commit ID to use
  • has_uncommitted_changes: Whether there are pending changes
  • last_committed_at: Timestamp of last commit

Creating Sessions

create_prompt_session

Create a new prompt session by fetching a prompt and combining it with session data.
async def create_prompt_session(
    prompt_id: str,
    branch_name: str | None = None,
    commit_id: str | None = None,
    session_data: RenderableModel | None = None,
) -> PromptSession
Parameters:
NameTypeDescription
prompt_idstrThe prompt’s anchor ID
branch_namestr | NoneBranch name
commit_idstr | NoneCommit ID
session_dataRenderableModel | NonePydantic model for variable substitution
Returns: PromptSession
from models.my_task_models import QueryInput

session = await client.create_prompt_session(
    prompt_id="prompt-id",
    branch_name="main",
    session_data=QueryInput(
        query="What is the weather?",
        user_name="Alice"
    )
)

prompt_session_from_session_data

Create a session using the prompt ID embedded in the session data’s metadata.
async def prompt_session_from_session_data(
    session_data: RenderableModel,
    branch_name: str | None = None,
    commit_id: str | None = None,
) -> PromptSession
This method reads the prompt_id from session_data.moxn_schema_metadata, so you don’t need to specify it separately.
# Session data knows which prompt it belongs to
session = await client.prompt_session_from_session_data(
    session_data=QueryInput(query="Hello"),
    branch_name="main"
)

Telemetry & Spans

span

Create a span for tracing and observability.
@asynccontextmanager
async def span(
    prompt_session: PromptSession,
    name: str | None = None,
    metadata: dict[str, Any] | None = None,
    *,
    parent_context: SpanContext | None = None,
    trace_context: TraceContext | None = None,
) -> AsyncGenerator[Span, None]
Parameters:
NameTypeDescription
prompt_sessionPromptSessionThe session for this span
namestr | NoneSpan name (defaults to prompt name)
metadatadict | NoneSearchable attributes (customer_id, request_id, etc.)
parent_contextSpanContext | NoneExplicit parent for async patterns
trace_contextTraceContext | NoneFor distributed tracing
Returns: AsyncGenerator[Span, None]
async with client.span(
    session,
    name="process_query",
    metadata={
        "customer_id": "cust_123",
        "status": "processing"
    }
) as span:
    response = anthropic.messages.create(**session.to_anthropic_invocation())
    await client.log_telemetry_event_from_response(session, response, Provider.ANTHROPIC)

span_from_carrier

Create a span from a trace carrier for distributed tracing.
@asynccontextmanager
async def span_from_carrier(
    carrier: MoxnTraceCarrier,
    name: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> AsyncGenerator[Span, None]
Use this in queue workers or callback handlers to continue a trace.
# In a queue worker
carrier = MoxnTraceCarrier.model_validate(message["carrier"])
async with client.span_from_carrier(carrier, name="process_item") as span:
    await process(message["data"])

extract_context

Extract the current span context for propagation across services.
def extract_context() -> MoxnTraceCarrier | None
Returns: MoxnTraceCarrier if there’s an active span, None otherwise.
async with client.span(session) as span:
    carrier = client.extract_context()
    if carrier:
        await queue.put({
            "carrier": carrier.model_dump(mode="json"),
            "data": payload
        })

log_telemetry_event

Log an LLM event to the current span.
async def log_telemetry_event(event: LLMEvent) -> None
event = session.create_llm_event_from_parsed_response(parsed_response)
await client.log_telemetry_event(event)

log_telemetry_event_from_response

Convenience method to parse a provider response and log it in one call.
async def log_telemetry_event_from_response(
    prompt_session: PromptSession,
    response: AnthropicMessage | OpenAIChatCompletion | GoogleGenerateContentResponse,
    provider: Provider,
) -> None
Parameters:
NameTypeDescription
prompt_sessionPromptSessionThe session used for the request
responseProvider responseRaw response from the LLM provider
providerProviderThe provider enum value
response = anthropic.messages.create(**session.to_anthropic_invocation())
await client.log_telemetry_event_from_response(session, response, Provider.ANTHROPIC)

flush

Await in-flight telemetry logs. Call this at process exit or Lambda return.
async def flush(timeout: float | None = None) -> None
# At shutdown
await client.flush(timeout=5.0)

Code Generation

generate_task_models

Generate Pydantic models from all schemas in a task.
async def generate_task_models(
    task_id: str | UUID,
    branch_name: str | None = None,
    commit_id: str | None = None,
    output_dir: Path | str | None = None,
) -> DatamodelCodegenResponse
Parameters:
NameTypeDescription
task_idstr | UUIDThe task ID
branch_namestr | NoneBranch name (defaults to “main”)
commit_idstr | NoneCommit ID
output_dirPath | str | NoneDirectory to write generated code
Returns: DatamodelCodegenResponse with generated code
# Generate and save models
response = await client.generate_task_models(
    task_id="task-id",
    branch_name="main",
    output_dir="./models"
)
print(f"Generated: {response.filename}")

Task & Prompt Creation

create_task

Create a new task programmatically.
async def create_task(
    name: str,
    description: str | None = None,
    branch_name: str = "main",
) -> Task
task = await client.create_task(
    name="Customer Support Bot",
    description="Handles customer inquiries"
)

create_prompt

Create a new prompt with messages.
async def create_prompt(
    task_id: str | UUID,
    name: str,
    messages: list[MessageData | dict],
    description: str | None = None,
    branch_name: str = "main",
) -> PromptTemplate
from moxn.types.requests import MessageData
from moxn.types.content import MessageRole
from moxn.types.blocks.text import TextContentModel

messages = [
    MessageData(
        name="System",
        role=MessageRole.SYSTEM,
        blocks=[[TextContentModel(text="You are helpful.")]]
    )
]

prompt = await client.create_prompt(
    task_id=task.id,
    name="Q&A Prompt",
    messages=messages
)

Lifecycle Methods

close

Close the underlying HTTP client.
async def close() -> None

verify_access

Verify API key and return identity information.
async def verify_access() -> dict
info = await client.verify_access()
print(f"Tenant: {info['tenant_id']}")