Skip to main content

Returns

Yields a GalteaSpan object with helper methods for setting trace data dynamically.

Example

with start_trace("manual_span", type="SPAN") as span:
    # Do work
    span.update(metadata={"status": "done"})

Parameters

name
string
required
Name of the trace.
type
string
TraceType value: SPAN, GENERATION, EVENT, AGENT, TOOL, CHAIN, RETRIEVER, EVALUATOR, EMBEDDING, GUARDRAIL. See Trace Types for details.
description
string
Human-readable description of what this operation does. Useful for documentation and debugging. Maximum size: 32KB.
input
any
Input data for the trace. Accepts any value (non-serializable objects are converted to string). Maximum size: 128KB.
metadata
any
Metadata for the trace. Accepts any value (non-serializable objects are converted to string). Maximum size: 128KB.
attributes
dict
Custom OpenTelemetry attributes to add to the span.

GalteaSpan Methods

The yielded GalteaSpan object provides these methods:
update(input, output, metadata, type)
method
Update trace attributes. All parameters are optional and accept any value (non-serializable objects are converted to string).
set_attribute(key, value)
method
Set a custom attribute on the span.
record_exception(exception)
method
Manually record an exception on the span.

When to Use

Use start_trace() instead of @trace when you need:
  1. Fine-grained control over specific code blocks rather than entire functions
  2. Dynamic attributes that are only known at runtime
  3. Conditional tracing based on runtime conditions
  4. Tracing third-party code that you can’t decorate

Complete Example

def rag_pipeline(query: str, inference_result_id: str) -> str:
    token = set_context(inference_result_id=inference_result_id)

    try:
        # Retrieval step
        with start_trace(
            "retrieve_documents",
            type="RETRIEVER",
            description="Searches vector store for relevant documents",
            input={"query": query},
        ) as span:
            # Simulated vector store search
            docs = [
                {"id": "doc1", "content": "Paris is the capital of France."},
                {"id": "doc2", "content": "France is in Western Europe."},
            ]
            span.update(output={"doc_count": len(docs), "docs": docs})

        # Generation step
        with start_trace(
            "generate_response",
            type="GENERATION",
            description="Generates final response using retrieved context",
            input={"query": query},
        ) as span:
            # Simulated LLM response
            response_content = "Based on the documents, Paris is the capital of France."
            span.update(
                output={"response": response_content},
                metadata={"tokens_used": 42, "model": "gpt-4"},
            )

        return response_content
    finally:
        clear_context(token)


# Create an inference result to associate traces with
inference_result_for_rag = galtea.inference_results.create(
    session_id=session_start_trace.id,
    input="What is the capital of France?",
)
if inference_result_for_rag is None:
    raise ValueError("inference_result_for_rag is None")

result = rag_pipeline("What is the capital of France?", inference_result_for_rag.id)

Nested Traces

Traces automatically form a parent-child hierarchy when nested:
def process_with_nested_traces(inference_result_id: str) -> dict:
    token = set_context(inference_result_id=inference_result_id)

    try:
        with start_trace(
            "parent_operation", type="CHAIN", input={"task": "process_all"}
        ) as parent:
            # First child
            with start_trace("child_step_1", type="TOOL") as span:
                step1_result = {"processed": True, "items": 5}
                span.update(output=step1_result)

            # Second child
            with start_trace("child_step_2", type="TOOL") as span:
                step2_result = {"validated": True, "errors": 0}
                span.update(output=step2_result)

            parent.update(output={"total_steps": 2, "status": "completed"})

        return {"step1": step1_result, "step2": step2_result}
    finally:
        clear_context(token)


# Create another inference result for nested trace example
inference_result_for_nested = galtea.inference_results.create(
    session_id=session_start_trace.id,
    input="Process all items",
)
if inference_result_for_nested is None:
    raise ValueError("inference_result_for_nested is None")

nested_result = process_with_nested_traces(inference_result_for_nested.id)
Like @trace, traces created with start_trace() are automatically exported to Galtea API when clear_context() is called.