Akashmj22122002's picture
Upload folder using huggingface_hub
14edff4 verified
from __future__ import annotations
import abc
import contextvars
from typing import Any
from ..logger import logger
from . import util
from .processor_interface import TracingProcessor
from .scope import Scope
class Trace(abc.ABC):
"""A complete end-to-end workflow containing related spans and metadata.
A trace represents a logical workflow or operation (e.g., "Customer Service Query"
or "Code Generation") and contains all the spans (individual operations) that occur
during that workflow.
Example:
```python
# Basic trace usage
with trace("Order Processing") as t:
validation_result = await Runner.run(validator, order_data)
if validation_result.approved:
await Runner.run(processor, order_data)
# Trace with metadata and grouping
with trace(
"Customer Service",
group_id="chat_123",
metadata={"customer": "user_456"}
) as t:
result = await Runner.run(support_agent, query)
```
Notes:
- Use descriptive workflow names
- Group related traces with consistent group_ids
- Add relevant metadata for filtering/analysis
- Use context managers for reliable cleanup
- Consider privacy when adding trace data
"""
@abc.abstractmethod
def __enter__(self) -> Trace:
pass
@abc.abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@abc.abstractmethod
def start(self, mark_as_current: bool = False):
"""Start the trace and optionally mark it as the current trace.
Args:
mark_as_current: If true, marks this trace as the current trace
in the execution context.
Notes:
- Must be called before any spans can be added
- Only one trace can be current at a time
- Thread-safe when using mark_as_current
"""
pass
@abc.abstractmethod
def finish(self, reset_current: bool = False):
"""Finish the trace and optionally reset the current trace.
Args:
reset_current: If true, resets the current trace to the previous
trace in the execution context.
Notes:
- Must be called to complete the trace
- Finalizes all open spans
- Thread-safe when using reset_current
"""
pass
@property
@abc.abstractmethod
def trace_id(self) -> str:
"""Get the unique identifier for this trace.
Returns:
str: The trace's unique ID in the format 'trace_<32_alphanumeric>'
Notes:
- IDs are globally unique
- Used to link spans to their parent trace
- Can be used to look up traces in the dashboard
"""
pass
@property
@abc.abstractmethod
def name(self) -> str:
"""Get the human-readable name of this workflow trace.
Returns:
str: The workflow name (e.g., "Customer Service", "Data Processing")
Notes:
- Should be descriptive and meaningful
- Used for grouping and filtering in the dashboard
- Helps identify the purpose of the trace
"""
pass
@abc.abstractmethod
def export(self) -> dict[str, Any] | None:
"""Export the trace data as a serializable dictionary.
Returns:
dict | None: Dictionary containing trace data, or None if tracing is disabled.
Notes:
- Includes all spans and their data
- Used for sending traces to backends
- May include metadata and group ID
"""
pass
class NoOpTrace(Trace):
"""A no-op implementation of Trace that doesn't record any data.
Used when tracing is disabled but trace operations still need to work.
Maintains proper context management but doesn't store or export any data.
Example:
```python
# When tracing is disabled, traces become NoOpTrace
with trace("Disabled Workflow") as t:
# Operations still work but nothing is recorded
await Runner.run(agent, "query")
```
"""
def __init__(self):
self._started = False
self._prev_context_token: contextvars.Token[Trace | None] | None = None
def __enter__(self) -> Trace:
if self._started:
if not self._prev_context_token:
logger.error("Trace already started but no context token set")
return self
self._started = True
self.start(mark_as_current=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish(reset_current=True)
def start(self, mark_as_current: bool = False):
if mark_as_current:
self._prev_context_token = Scope.set_current_trace(self)
def finish(self, reset_current: bool = False):
if reset_current and self._prev_context_token is not None:
Scope.reset_current_trace(self._prev_context_token)
self._prev_context_token = None
@property
def trace_id(self) -> str:
"""The trace's unique identifier.
Returns:
str: A unique ID for this trace.
"""
return "no-op"
@property
def name(self) -> str:
"""The workflow name for this trace.
Returns:
str: Human-readable name describing this workflow.
"""
return "no-op"
def export(self) -> dict[str, Any] | None:
"""Export the trace data as a dictionary.
Returns:
dict | None: Trace data in exportable format, or None if no data.
"""
return None
NO_OP_TRACE = NoOpTrace()
class TraceImpl(Trace):
"""
A trace that will be recorded by the tracing library.
"""
__slots__ = (
"_name",
"_trace_id",
"group_id",
"metadata",
"_prev_context_token",
"_processor",
"_started",
)
def __init__(
self,
name: str,
trace_id: str | None,
group_id: str | None,
metadata: dict[str, Any] | None,
processor: TracingProcessor,
):
self._name = name
self._trace_id = trace_id or util.gen_trace_id()
self.group_id = group_id
self.metadata = metadata
self._prev_context_token: contextvars.Token[Trace | None] | None = None
self._processor = processor
self._started = False
@property
def trace_id(self) -> str:
return self._trace_id
@property
def name(self) -> str:
return self._name
def start(self, mark_as_current: bool = False):
if self._started:
return
self._started = True
self._processor.on_trace_start(self)
if mark_as_current:
self._prev_context_token = Scope.set_current_trace(self)
def finish(self, reset_current: bool = False):
if not self._started:
return
self._processor.on_trace_end(self)
if reset_current and self._prev_context_token is not None:
Scope.reset_current_trace(self._prev_context_token)
self._prev_context_token = None
def __enter__(self) -> Trace:
if self._started:
if not self._prev_context_token:
logger.error("Trace already started but no context token set")
return self
self.start(mark_as_current=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish(reset_current=exc_type is not GeneratorExit)
def export(self) -> dict[str, Any] | None:
return {
"object": "trace",
"id": self.trace_id,
"workflow_name": self.name,
"group_id": self.group_id,
"metadata": self.metadata,
}