Skip to main content
Reference

Operations & Observability

Structured logging, metrics, cost tracking, debugging stuck agents, health checks, and end-to-end debugging scenarios for production harnesses.

When an agent is live, you can’t just monitor the model—you need to see the entire system. This document covers logging, metrics, cost tracking, debugging, and health checks for production harnesses.

In simple terms: What to measure, what to log, how to debug when things break.


Part 1: Structured Logging Strategy

Why Structured Logging Matters

Unstructured logs are nearly useless at scale:

2026-04-18 14:23:15 ERROR: Tool failed

Structured logs (JSON) are queryable:

{
  "timestamp": "2026-04-18T14:23:15Z",
  "level": "ERROR",
  "event": "tool_execution_failed",
  "tool_name": "web_search",
  "agent_id": "agent-42",
  "session_id": "sess-abc123",
  "error_code": "timeout",
  "error_message": "Tool exceeded 30s timeout",
  "retry_count": 2,
  "context_tokens_used": 8532,
  "cost_usd": 0.0425
}

With JSON, you can ask: “How many agents timeout per day?” or “What’s the error rate by tool?” Unstructured logs require humans to read each one.

Core Events to Log

Every agent harness should log these events:

1. Session Lifecycle

{
  "timestamp": "2026-04-18T14:23:00Z",
  "event": "session_start",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "user_id": "user-789",
  "model": "claude-3-5-sonnet",
  "context_limit_tokens": 200000,
  "startup_memory_tokens": 2500,
  "startup_memory_summary": "Claude Code instructions + 3 recent sessions",
  "environment": "production",
  "tags": ["web-scraper", "batch-job"]
}

Why: Track which agents/models are used, startup overhead, frequency

{
  "timestamp": "2026-04-18T14:35:42Z",
  "event": "session_end",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "total_duration_seconds": 722,
  "total_input_tokens": 45000,
  "total_output_tokens": 12000,
  "total_cost_usd": 0.285,
  "loop_iterations": 8,
  "final_status": "success",
  "error": null
}

Why: Cost accounting, loop efficiency, success/failure tracking

2. Agent Decision Loop

{
  "timestamp": "2026-04-18T14:23:15Z",
  "event": "agent_step",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "iteration": 3,
  "step_type": "reasoning",
  "reasoning_summary": "Need to search for documentation on KV cache optimization",
  "input_tokens": 5200,
  "output_tokens": 450,
  "step_cost_usd": 0.028,
  "latency_ms": 2840
}

Why: See what agent is thinking, cost per step, detect slow reasoning

3. Tool Calls

{
  "timestamp": "2026-04-18T14:23:20Z",
  "event": "tool_call",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "tool_name": "web_search",
  "tool_version": "2.1",
  "input_params": {
    "query": "KV cache optimization techniques",
    "num_results": 5
  },
  "execution_latency_ms": 1240,
  "status": "success",
  "output_length_chars": 8450,
  "error": null
}

Why: Tool performance, failure detection, identify slow/failing tools

4. Error Handling

{
  "timestamp": "2026-04-18T14:23:35Z",
  "event": "error",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "error_type": "ToolExecutionError",
  "error_message": "Timeout waiting for API response",
  "error_code": "timeout",
  "tool_name": "web_search",
  "context_at_error": {
    "iteration": 4,
    "tokens_used": 18500,
    "context_remaining": 181500,
    "loop_depth": 1
  },
  "recovery_action": "retry_with_backoff",
  "recovery_success": true,
  "stacktrace": "..."
}

Why: Understand what breaks, recovery success rates, patterns in failures

5. Cost Events

{
  "timestamp": "2026-04-18T14:35:42Z",
  "event": "cost_checkpoint",
  "session_id": "sess-abc123",
  "agent_id": "agent-42",
  "model": "claude-3-5-sonnet",
  "accumulated_input_tokens": 45000,
  "accumulated_output_tokens": 12000,
  "cost_per_mtok_input": 0.003,
  "cost_per_mtok_output": 0.015,
  "total_cost_usd": 0.285,
  "cost_limit_usd": 1.0,
  "cost_vs_limit_pct": 28.5,
  "warning_threshold_exceeded": false
}

Why: Real-time cost tracking, budget alerts, cost per session/agent

Log Levels and When to Use Each

LevelUse CaseExampleOverhead
DEBUGDevelopment only, never productionTool input/output detailsHigh; always off in prod
INFONormal operational eventsSession start, tool calls, step completionLow; always on
WARNDegradation, unusual but handledRetry after timeout, context 80% fullLow; always on
ERRORFailures requiring attentionTool fails, agent gives up, cost exceededVery low; always on
CRITICALSystem is brokenOut of memory, model unreachable, budget exceededImmediate alert

Best practice: Use INFO for all normal events, WARN for degradation, ERROR for failures. DEBUG only in development or when troubleshooting.

Structured Log Format (Python)

import json
import logging
import time
from datetime import datetime
from typing import Any, Dict, Optional

class StructuredLogger:
    """JSON logger for harness events"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # JSON handler
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_event(
        self,
        event: str,
        level: str = "INFO",
        **kwargs
    ) -> None:
        """Log structured event to JSON"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": level,
            "event": event,
            **kwargs
        }
        
        getattr(self.logger, level.lower())(json.dumps(log_entry))
    
    def session_start(
        self,
        session_id: str,
        agent_id: str,
        model: str,
        context_limit: int
    ) -> None:
        self.log_event(
            "session_start",
            session_id=session_id,
            agent_id=agent_id,
            model=model,
            context_limit_tokens=context_limit
        )
    
    def tool_call(
        self,
        session_id: str,
        tool_name: str,
        latency_ms: float,
        status: str,
        error: Optional[str] = None
    ) -> None:
        self.log_event(
            "tool_call",
            session_id=session_id,
            tool_name=tool_name,
            execution_latency_ms=latency_ms,
            status=status,
            error=error
        )
    
    def cost_checkpoint(
        self,
        session_id: str,
        accumulated_cost: float,
        cost_limit: float
    ) -> None:
        self.log_event(
            "cost_checkpoint",
            session_id=session_id,
            accumulated_cost_usd=accumulated_cost,
            cost_limit_usd=cost_limit,
            cost_vs_limit_pct=round((accumulated_cost / cost_limit) * 100, 1)
        )

# Usage
logger = StructuredLogger("harness")
logger.session_start(
    session_id="sess-abc123",
    agent_id="agent-42",
    model="claude-3-5-sonnet",
    context_limit=200000
)

Log Storage & Retention

In production, logs should flow to:

  1. Short-term storage (local files): Last 24-48 hours, searchable

    • /var/log/harness/ directory
    • Rotated daily (logrotate or Python logging handlers)
    • Enables quick debugging of recent issues
  2. Streaming to observability platform: For querying/alerting

    • Option A: Log to CloudWatch, Datadog, New Relic (if using cloud)
    • Option B: Self-hosted ELK stack (Elasticsearch + Logstash + Kibana) or Loki
    • Option C: Simple time-series DB (InfluxDB for metrics, Postgres for logs)
  3. Archive storage (cold): 30–90 days for compliance

    • Cloud storage (S3, GCS) with lifecycle rules
    • Compressed JSON files, queryable via SQL (Athena, BigQuery)

Recommended: Use local rotation + streaming to observability platform for queries, skip archive unless compliance requires it.


Part 2: Metrics & Dashboards

Key Metrics to Track

Every production harness should expose these metrics:

Latency Metrics (Response Time)

What: How long operations take

harness_latency_step_p50_ms = 1250        # Median
harness_latency_step_p95_ms = 3400        # 95th percentile (key SLA)
harness_latency_step_p99_ms = 5200        # 99th percentile
harness_latency_tool_call_p95_ms = 2100   # By tool
harness_latency_session_total_seconds = 45 # End-to-end

Why these percentiles:

  • p50: Normal case
  • p95: SLA boundary (most users experience < this)
  • p99: Tail behavior (worst 1%)
harness_latency_model_inference_ms = 1800  # Just the LLM
harness_latency_tool_overhead_ms = 400     # Tool I/O
harness_latency_memory_load_ms = 50        # Memory loading

Why: Pinpoint where time is spent. If model inference is fast but session is slow, tools are the problem.

Throughput Metrics (Work Done)

harness_tokens_input_per_session = 45000      # Typical input
harness_tokens_output_per_session = 12000     # Typical output
harness_tokens_per_second = 120               # Throughput
harness_steps_per_session = 8                 # Iterations needed
harness_sessions_per_hour = 15                # Concurrency

Error & Reliability

harness_errors_total{tool=web_search} = 42
harness_errors_rate_per_session = 0.03        # 3% of sessions error
harness_recovery_success_rate = 0.92          # Retries work 92% of time
harness_tool_failure_rate{tool=api_call} = 0.15
harness_context_overflow_count = 2            # Sessions hitting context limit
harness_iteration_timeout_count = 1           # Agents exceeding max iterations

Cost Metrics (Critical for Budget)

harness_cost_total_usd = 1250.43              # Lifetime cost
harness_cost_per_session_usd = 0.285          # Average cost
harness_cost_per_token_usd = 3.2e-6           # Token cost
harness_cost_by_agent{agent=agent-42} = 450.12
harness_cost_by_model{model=claude-3-5-sonnet} = 1050.0
harness_cost_by_hour = [24.50, 28.30, ...]    # Trending
harness_budget_used_pct = 85.4                # Budget remaining

Quality Metrics (Application-Specific)

These depend on your task:

# For code generation agent
harness_quality_tests_pass_rate = 0.87        # 87% of generated code passes tests
harness_quality_hallucination_rate = 0.02     # 2% of outputs are hallucinated
harness_quality_user_satisfaction = 4.2/5.0   # 1-5 scale from user feedback

# For research agent
harness_quality_sources_verified = 0.94       # 94% of cited sources exist
harness_quality_citation_accuracy = 0.91      # 91% of quotes are accurate
harness_quality_contradiction_count = 3       # 3 internal contradictions found

Metrics Implementation (Python with Prometheus)

from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
latency_histogram = Histogram(
    'harness_latency_step_ms',
    'Step latency in milliseconds',
    ['agent_id'],
    buckets=(100, 500, 1000, 2500, 5000, 10000)
)

error_counter = Counter(
    'harness_errors_total',
    'Total errors',
    ['agent_id', 'error_type']
)

cost_gauge = Gauge(
    'harness_cost_total_usd',
    'Total cost in USD',
    ['agent_id']
)

step_counter = Counter(
    'harness_steps_total',
    'Total completed steps',
    ['agent_id']
)

# Usage in agent loop
start_time = time.time()

try:
    # Do work...
    result = agent_step(...)
    latency_ms = (time.time() - start_time) * 1000
    latency_histogram.labels(agent_id='agent-42').observe(latency_ms)
    step_counter.labels(agent_id='agent-42').inc()
    
except Exception as e:
    error_counter.labels(
        agent_id='agent-42',
        error_type=type(e).__name__
    ).inc()
    raise

Dashboard Templates

Dashboard 1: Latency & Performance

Title: Agent Latency Trends
Rows:
  1. Step latency p95 (line graph, 24h history)
     Alert: Red if p95 > 5000ms for 5min
  2. Session duration histogram (bar chart, distribution)
  3. Tokens per second (line graph by model)
  4. Tool latency by tool (bar chart, top 10 slowest)
  5. Memory load time trend (area chart)

Typical view:

  • x-axis: Time (past 24 hours)
  • y-axis: Latency (ms)
  • Lines: p50, p95, p99 overlaid

Dashboard 2: Cost Tracking

Title: Cost & Budget
Rows:
  1. Total cost (big number, $XXX.XX)
     Smaller: Daily spend, Budget remaining
  2. Cost trend by day (area chart, red if exceeding budget)
     Alert: Yellow if 80% of budget, Red if 95%+
  3. Cost by agent (bar chart, top agents)
  4. Cost by model (pie chart, breakdown)
  5. Token efficiency (cost per successful task)

Dashboard 3: Error & Reliability

Title: Health & Errors
Rows:
  1. Session success rate (big percentage, 98.5%)
  2. Error rate trend (line, should be flat/down)
  3. Errors by type (table: tool_timeout=42, api_error=8, ...)
  4. Recovery success rate (42 retried, 38 succeeded = 90%)
  5. Context overflow incidents (count by time)
  6. Tool availability (each tool: up/degraded/down)

Dashboard 4: Agent-Specific Metrics

For specialized agents, add domain metrics:

Title: [Agent Name] Quality Metrics
Rows:
  1. Success rate (target: >95%)
  2. User satisfaction (1-5 scale)
  3. Task completion time (trend, alert if > baseline)
  4. Hallucination rate (detected contradictions)
  5. Cost per successful task
  6. Recent incidents (list)

Part 3: Cost Tracking & Alerts

Cost Calculation: Input vs Output Tokens

Most APIs charge different rates for input (cheaper) and output (more expensive):

Claude 3.5 Sonnet pricing (April 2026):
  Input:  $3 per 1M tokens
  Output: $15 per 1M tokens

Cost per session = (input_tokens / 1M) * $3 + (output_tokens / 1M) * $15

Example: 45,000 input, 12,000 output
  = (45000 / 1e6) * 3 + (12000 / 1e6) * 15
  = 0.135 + 0.18
  = $0.315 per session

At scale: 100 sessions/day = $31.50/day

Note: Prices approximate as of early 2025. Check provider websites for current rates.

Cost Tracking Implementation

class CostTracker:
    """Track and enforce budget limits"""
    
    def __init__(self, budget_usd: float):
        self.budget = budget_usd
        self.accumulated_cost = 0.0
        self.session_start_cost = 0.0
        
        # Model prices (input, output per 1M tokens)
        self.prices = {
            "claude-3-5-sonnet": {"input": 3.0, "output": 15.0},
            "gpt-4o": {"input": 15.0, "output": 60.0},
            "qwen-27b": {"input": 0.5, "output": 1.5},  # Cheaper local
        }
    
    def record_tokens(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        """Record token usage and return cost"""
        prices = self.prices[model]
        
        input_cost = (input_tokens / 1e6) * prices["input"]
        output_cost = (output_tokens / 1e6) * prices["output"]
        total_cost = input_cost + output_cost
        
        self.accumulated_cost += total_cost
        return total_cost
    
    def budget_remaining(self) -> float:
        """Return budget left in USD"""
        return max(0, self.budget - self.accumulated_cost)
    
    def budget_pct_used(self) -> float:
        """Return percent of budget used (0-100)"""
        return min(100, (self.accumulated_cost / self.budget) * 100)
    
    def check_budget(self) -> bool:
        """Return True if under budget, False if exceeded"""
        return self.accumulated_cost < self.budget
    
    def get_alert_level(self) -> str:
        """Return alert level based on budget usage"""
        pct = self.budget_pct_used()
        if pct < 75:
            return "ok"
        elif pct < 90:
            return "warning"
        elif pct < 100:
            return "critical"
        else:
            return "exceeded"

# Usage
tracker = CostTracker(budget_usd=10.0)

# After model call
cost = tracker.record_tokens(
    model="claude-3-5-sonnet",
    input_tokens=45000,
    output_tokens=12000
)
print(f"This call cost: ${cost:.4f}")
print(f"Budget used: {tracker.budget_pct_used():.1f}%")
print(f"Remaining: ${tracker.budget_remaining():.2f}")

if not tracker.check_budget():
    print("ERROR: Budget exceeded, stopping agent")
    sys.exit(1)

Budget Alerts

Implement these automated alerts:

def evaluate_budget_alert(tracker: CostTracker) -> Optional[Alert]:
    """Determine if budget alert should fire"""
    
    alert_level = tracker.get_alert_level()
    
    if alert_level == "warning":
        return Alert(
            level="WARN",
            message=f"Budget at {tracker.budget_pct_used():.0f}%",
            action="reduce_iteration_count or switch to cheaper model"
        )
    elif alert_level == "critical":
        return Alert(
            level="CRITICAL",
            message=f"Budget {tracker.budget_pct_used():.0f}% spent, {tracker.budget_remaining():.2f} left",
            action="STOP_AGENT immediately, review cost drivers"
        )
    elif alert_level == "exceeded":
        return Alert(
            level="CRITICAL",
            message=f"BUDGET EXCEEDED by ${tracker.accumulated_cost - tracker.budget:.2f}",
            action="kill_session, escalate to team lead"
        )
    
    return None

Cost Optimization Strategies

Strategy 1: Hybrid Routing (up to 80-90% cost savings when most requests route locally)

Use cheap models for simple tasks, expensive models only for hard tasks:

class HybridRouter:
    """Route to optimal model based on task difficulty"""
    
    def route(self, task: str, complexity_score: float) -> str:
        """
        complexity_score: 0-1 where 0 is trivial, 1 is very hard
        Returns model name to use
        """
        if complexity_score < 0.3:
            # Simple task: local small model
            return "qwen-7b-local"
        elif complexity_score < 0.7:
            # Medium task: cheap cloud SLM
            return "llama-13b-cloud"
        else:
            # Hard task: powerful model
            return "claude-3-5-sonnet"

# Analyze task
router = HybridRouter()
complexity = analyze_task(user_input)  # Returns 0-1
model = router.route(user_input, complexity)
result = call_model(model, user_input)

# Savings: 70% of calls use cheap models
# Cost/call: $0.01 (cheap) vs $0.27 (expensive)
# 100 calls/day: $1 (hybrid) vs $27 (all expensive) = 96% savings

Strategy 2: Token Reduction

Compress memory before agent starts:

def compress_memory(memory_tokens: int) -> str:
    """Summarize old memory to fit in token budget"""
    # Don't send raw 20K token history
    # Instead, send 2K token summary
    summary = summarize_with_cheap_model(memory_tokens)
    return summary  # Much shorter

Strategy 3: Context Trimming

Stop when you have enough answer:

def should_continue_loop(iterations: int, quality: float) -> bool:
    """Don't loop forever just to use budget"""
    if iterations > 20:
        return False  # Safety limit
    if quality > 0.9:
        return False  # Good enough answer
    return True

Part 4: Observability Patterns

Distributed Tracing for Multi-Agent Systems

When Agent A calls Agent B, trace the whole request:

import uuid

class Trace:
    """Distributed trace across agents"""
    
    def __init__(self, trace_id: str = None):
        self.trace_id = trace_id or str(uuid.uuid4())
        self.spans: List[Span] = []
    
    def start_span(self, name: str, tags: Dict = None) -> "Span":
        """Start a new span within this trace"""
        span = Span(
            trace_id=self.trace_id,
            name=name,
            tags=tags or {}
        )
        self.spans.append(span)
        return span

class Span:
    """Single operation within a trace"""
    
    def __init__(self, trace_id: str, name: str, tags: Dict):
        self.trace_id = trace_id
        self.name = name
        self.tags = tags
        self.start_time = time.time()
        self.end_time = None
        self.error = None
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end_time = time.time()
        if exc_type:
            self.error = str(exc_val)
    
    def duration_ms(self) -> float:
        if not self.end_time:
            return None
        return (self.end_time - self.start_time) * 1000

# Usage: trace entire agent workflow
trace = Trace()

with trace.start_span("session_start") as span:
    span.tags["agent_id"] = "agent-42"
    load_memory()

with trace.start_span("main_loop"):
    for iteration in range(5):
        with trace.start_span("reasoning", tags={"iteration": iteration}):
            think()
        
        with trace.start_span("tool_call", tags={"tool": "web_search"}):
            search()

# Print trace timeline
for span in trace.spans:
    indent = "  " * span.name.count("_")
    print(f"{indent}{span.name}: {span.duration_ms():.0f}ms")

Session Tracking Across Calls

Link all events in a session:

class Session:
    """Track a complete agent session"""
    
    def __init__(self, agent_id: str):
        self.session_id = str(uuid.uuid4())
        self.agent_id = agent_id
        self.start_time = time.time()
        self.events = []
    
    def log(self, event_name: str, **kwargs):
        """All events in this session are linked"""
        self.events.append({
            "timestamp": datetime.utcnow().isoformat(),
            "session_id": self.session_id,
            "agent_id": self.agent_id,
            "event": event_name,
            **kwargs
        })
    
    def get_timeline(self) -> List[Dict]:
        """Retrieve all events for this session"""
        return self.events

# Usage
session = Session(agent_id="agent-42")
session.log("step", iteration=1, thought="search for docs")
session.log("tool_call", tool="web_search", latency_ms=1240)
session.log("step", iteration=2, thought="analyze results")

# Later: query by session_id to see everything that happened
timeline = session.get_timeline()

Tool Call Tracing

Track which tools are called, by whom, and results:

class ToolTracer:
    """Trace all tool calls"""
    
    def __init__(self):
        self.calls = []
    
    def trace_call(
        self,
        tool_name: str,
        agent_id: str,
        session_id: str,
        input_params: Dict,
        result: Any = None,
        error: str = None,
        latency_ms: float = None
    ):
        self.calls.append({
            "timestamp": datetime.utcnow().isoformat(),
            "tool": tool_name,
            "agent_id": agent_id,
            "session_id": session_id,
            "input": input_params,
            "result": result,
            "error": error,
            "latency_ms": latency_ms
        })
    
    def summary_by_tool(self) -> Dict:
        """Which tools are used most?"""
        summary = {}
        for call in self.calls:
            tool = call["tool"]
            if tool not in summary:
                summary[tool] = {
                    "count": 0,
                    "errors": 0,
                    "avg_latency_ms": 0,
                    "latencies": []
                }
            summary[tool]["count"] += 1
            if call["error"]:
                summary[tool]["errors"] += 1
            if call["latency_ms"]:
                summary[tool]["latencies"].append(call["latency_ms"])
        
        # Calculate averages
        for tool in summary:
            if summary[tool]["latencies"]:
                avg = sum(summary[tool]["latencies"]) / len(summary[tool]["latencies"])
                summary[tool]["avg_latency_ms"] = avg
        
        return summary

# Usage
tracer = ToolTracer()

def call_tool(tool_name: str, **params):
    start = time.time()
    try:
        result = tools[tool_name](**params)
        latency = (time.time() - start) * 1000
        tracer.trace_call(
            tool_name=tool_name,
            agent_id="agent-42",
            session_id=session.session_id,
            input_params=params,
            result=result,
            latency_ms=latency
        )
        return result
    except Exception as e:
        latency = (time.time() - start) * 1000
        tracer.trace_call(
            tool_name=tool_name,
            agent_id="agent-42",
            session_id=session.session_id,
            input_params=params,
            error=str(e),
            latency_ms=latency
        )
        raise

# Query
summary = tracer.summary_by_tool()
print(f"web_search: {summary['web_search']['count']} calls, {summary['web_search']['avg_latency_ms']:.0f}ms avg")

Part 5: Debugging Stuck & Looping Agents

Detecting Infinite Loops

Agents can get stuck repeating the same action. Detect with:

class LoopDetector:
    """Detect when agent is stuck in a loop"""
    
    def __init__(self, max_iterations: int = 50, max_repeat_threshold: int = 5):
        self.max_iterations = max_iterations
        self.max_repeat_threshold = max_repeat_threshold
        self.iteration_history = []
    
    def record_step(self, iteration: int, action: str, hash: str = None):
        """Record each step"""
        # Hash the action to detect if it's identical
        if hash is None:
            hash = hashlib.md5(action.encode()).hexdigest()
        
        self.iteration_history.append({
            "iteration": iteration,
            "action": action,
            "action_hash": hash
        })
    
    def is_looping(self) -> Tuple[bool, Optional[str]]:
        """Check if agent is repeating"""
        
        # Check 1: Exceeded max iterations
        if len(self.iteration_history) >= self.max_iterations:
            return True, f"Exceeded max iterations ({self.max_iterations})"
        
        # Check 2: Last N steps are identical
        if len(self.iteration_history) >= 5:
            last_5_hashes = [s["action_hash"] for s in self.iteration_history[-5:]]
            if len(set(last_5_hashes)) == 1:
                # All last 5 are the same
                return True, f"Identical action repeated 5 times: {self.iteration_history[-1]['action']}"
        
        # Check 3: Action appears too many times total
        action_counts = {}
        for step in self.iteration_history:
            hash = step["action_hash"]
            action_counts[hash] = action_counts.get(hash, 0) + 1
        
        for hash, count in action_counts.items():
            if count > self.max_repeat_threshold:
                action = next(s["action"] for s in self.iteration_history if s["action_hash"] == hash)
                return True, f"Action repeated {count} times: {action}"
        
        return False, None

# Usage in agent loop
detector = LoopDetector(max_iterations=50)

for iteration in range(100):  # Safety limit
    action = agent_think()
    detector.record_step(iteration, action)
    
    is_stuck, reason = detector.is_looping()
    if is_stuck:
        print(f"Agent stuck: {reason}")
        break
    
    execute_action(action)

Escaping Loops Gracefully

When a loop is detected, try these escapes in order:

def escape_loop(agent, reason: str, escape_attempt: int = 1) -> bool:
    """Try to escape a stuck loop"""
    
    if escape_attempt == 1:
        # Try 1: Give agent a hint
        agent.add_context("You seem to be repeating. Try a different approach.")
        return True  # Continue with hint
    
    elif escape_attempt == 2:
        # Try 2: Reduce context to reset thinking
        agent.trim_memory(keep_recent=3)
        agent.add_context("Reset. Try from scratch with fresh perspective.")
        return True
    
    elif escape_attempt == 3:
        # Try 3: Switch to different model for new perspective
        agent.switch_model("different_model")
        return True
    
    elif escape_attempt == 4:
        # Try 4: Ask for user input
        user_hint = input("Agent is stuck. Give it a hint: ")
        agent.add_context(f"User hint: {user_hint}")
        return True
    
    else:
        # Gave up
        return False

# Usage
escape_count = 0
while escape_count < 4:
    is_stuck, reason = detector.is_looping()
    
    if is_stuck:
        escape_count += 1
        success = escape_loop(agent, reason, escape_count)
        if not success:
            print("Failed to escape loop, giving up")
            break
    
    action = agent_think()
    execute_action(action)

Post-Mortem Debugging: Replaying Sessions

Save session transcripts to debug later:

class SessionRecorder:
    """Record session for later replay"""
    
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.transcript = []
        self.metadata = {}
    
    def record_step(self, step: Dict):
        """Record each step"""
        self.transcript.append({
            "timestamp": datetime.utcnow().isoformat(),
            **step
        })
    
    def save(self, path: str):
        """Save session to file"""
        with open(path, 'w') as f:
            json.dump({
                "session_id": self.session_id,
                "metadata": self.metadata,
                "transcript": self.transcript
            }, f, indent=2)
    
    @classmethod
    def load(cls, path: str) -> "SessionRecorder":
        """Load session from file"""
        with open(path, 'r') as f:
            data = json.load(f)
        
        recorder = cls(data["session_id"])
        recorder.metadata = data["metadata"]
        recorder.transcript = data["transcript"]
        return recorder

# Usage: save during execution
recorder = SessionRecorder(session_id)
for iteration in range(max_iter):
    action = agent_think()
    result = execute(action)
    recorder.record_step({
        "iteration": iteration,
        "action": action,
        "result": result
    })
recorder.save(f"sessions/{session_id}.json")

# Later: analyze what went wrong
loaded = SessionRecorder.load(f"sessions/{session_id}.json")
for i, step in enumerate(loaded.transcript):
    print(f"Step {i}: {step['action']}{step['result']}")

Part 6: Health Checks & Recovery

Health Check Patterns

Run these checks periodically:

class HealthCheck:
    """Verify system is operational"""
    
    async def check_model_responsive(self, model: str, timeout_sec: int = 5) -> bool:
        """Can we reach the model?"""
        try:
            response = await call_model_with_timeout(
                model,
                "Respond with 'ok'",
                timeout_sec
            )
            return response.strip().lower() == "ok"
        except Exception:
            return False
    
    async def check_memory_accessible(self, memory_path: str) -> bool:
        """Can we read memory?"""
        try:
            with open(memory_path, 'r') as f:
                data = json.load(f)
            return len(data) > 0
        except Exception:
            return False
    
    async def check_tools_available(self, tools: List[str]) -> Dict[str, bool]:
        """Are tools working?"""
        status = {}
        for tool in tools:
            try:
                # Try a safe test call
                result = test_tool(tool)
                status[tool] = result is not None
            except Exception:
                status[tool] = False
        return status
    
    async def check_budget_ok(self, tracker: CostTracker) -> bool:
        """Do we still have budget?"""
        return tracker.check_budget()
    
    async def run_full_check(self) -> Dict[str, Any]:
        """Run all health checks"""
        return {
            "model_responsive": await self.check_model_responsive(),
            "memory_accessible": await self.check_memory_accessible(),
            "tools_available": await self.check_tools_available(),
            "budget_ok": await self.check_budget_ok(),
            "timestamp": datetime.utcnow().isoformat()
        }

# Usage: check periodically
health = HealthCheck()
status = await health.run_full_check()

if not status["model_responsive"]:
    log.error("Model unreachable, pausing agent")
    await pause_agent()

if not status["budget_ok"]:
    log.error("Budget exceeded, stopping agent")
    await stop_agent()

Automatic Recovery Strategies

class RecoveryManager:
    """Automatically recover from failures"""
    
    async def handle_model_failure(self):
        """Model is unreachable, retry with backoff"""
        backoff_sec = 1
        for attempt in range(5):
            try:
                await wait(backoff_sec)
                response = await test_model()
                log.info(f"Model recovered after {attempt} attempts")
                return True
            except Exception as e:
                backoff_sec *= 2
                log.warning(f"Attempt {attempt} failed: {e}")
        
        log.error("Model failed to recover, giving up")
        return False
    
    async def handle_tool_failure(self, tool_name: str):
        """Tool failed, try fallback"""
        log.warning(f"{tool_name} failed")
        
        # Fallback tool map
        fallbacks = {
            "web_search": "local_search",
            "image_gen": "placeholder_image",
            "api_call": "cached_response"
        }
        
        if tool_name in fallbacks:
            fallback = fallbacks[tool_name]
            log.info(f"Switching to fallback: {fallback}")
            return fallback
        
        return None
    
    async def handle_context_overflow(self, agent):
        """Context window full, compress memory"""
        log.warning("Context overflow, compressing memory")
        agent.trim_memory(keep_recent=2)
        agent.compress_history()
        return True
    
    async def handle_budget_exceeded(self):
        """Stop immediately and notify"""
        log.critical("Budget exceeded, stopping all agents")
        await stop_all_agents()
        await send_alert("Budget exceeded - check cost tracking")

# Usage
recovery = RecoveryManager()

try:
    await call_model()
except ModelUnavailableError:
    recovered = await recovery.handle_model_failure()
    if not recovered:
        raise

try:
    await call_tool("web_search")
except ToolFailedError:
    fallback = await recovery.handle_tool_failure("web_search")
    if fallback:
        await call_tool(fallback)

Graceful Degradation

When something fails, keep going with reduced capability:

class DegradableAgent:
    """Agent that degrades gracefully"""
    
    async def run_with_degradation(self):
        """Try full mode, degrade if needed"""
        
        # Try 1: Full capability
        try:
            return await self.run_full()
        except ModelError:
            log.warning("Switching to degraded mode (cheaper model)")
            self.use_cheap_model()
        
        # Try 2: Cheaper model
        try:
            return await self.run_cheap()
        except ContextError:
            log.warning("Switching to minimal mode (summary only)")
            self.trim_context(50)
        
        # Try 3: Absolute minimum
        try:
            return await self.run_minimal()
        except Exception as e:
            log.error(f"Even minimal mode failed: {e}")
            return {"error": str(e), "status": "failed"}

# Usage
agent = DegradableAgent()
result = await agent.run_with_degradation()

if result["status"] == "failed":
    # Still failed, escalate to human
    send_to_human_review(result)

Part 7: Implementation Checklist

Before Going to Production

  • Logging

    • Structured JSON logging configured
    • All 5 event types logged (session, step, tool, error, cost)
    • Log retention policy defined (24h local, 90d archive)
    • Log parsing tested (can query “errors from last hour?”)
  • Metrics

    • Latency p50/p95/p99 tracked
    • Token throughput measured
    • Error rate tracked
    • Cost per session calculated
    • Quality metrics defined (domain-specific)
  • Cost Tracking

    • Token accounting implemented
    • Cost per model calculated
    • Budget alerts set up
    • Cost dashboard visible
    • Hard limit enforced (stops agent if over budget)
  • Observability

    • Session ID propagated through all logs
    • Trace IDs link multi-agent calls
    • Tool call tracing enabled
    • Can replay session from logs
  • Debugging

    • Loop detector configured with iteration limit
    • Escape strategies coded (hint, reset, etc.)
    • Session transcript saves implemented
    • Post-mortem analysis tools ready
  • Health & Recovery

    • Health checks defined (model, memory, tools, budget)
    • Auto-recovery strategies coded (backoff, fallback)
    • Graceful degradation tested
    • Circuit breaker pattern used for external APIs
    • On-call escalation process defined

Alerting Strategy

Configure these alerts:

AlertConditionAction
Latency spikep95 > baseline * 1.5 for 5minPage on-call, check for model/tool issues
Error rate high>5% errors in 5minPage on-call, check logs
Budget warning>75% of daily budgetNotify team, may throttle
Budget critical>95% of daily budgetNotify team, reduce iterations
Budget exceeded>100% spentStop all agents, page on-call
Context overflowAgent hits context limitInvestigate memory usage, may OOM
Loop detectedAgent repeats 5+ timesPage on-call, may infinite loop
Model unreachableModel API down for >30sPage on-call, escalate to provider
Tool failure rateSingle tool fails >20%Investigate tool, switch to fallback

Dashboard Setup

Create these dashboards day-1:

  1. Production Dashboard (show to stakeholders)

    • Success rate, cost/day, latency p95, errors
  2. Cost Dashboard (daily standup)

    • Spend to date, projected, budget remaining
    • Cost by agent, cost by model
    • Alert thresholds
  3. Operations Dashboard (oncall reference)

    • Error rate, error types, tool status
    • Context overflow incidents, loop detections
    • Recent incidents with links to logs
  4. Agent Health Dashboard (per-agent)

    • Success rate, latency, cost
    • Last error, last successful run
    • Quality metrics (if available)

  • 06_harness_architecture.md — Component design (logging fits into orchestration layer)
  • 08_claw_code_python.md — Reference implementation patterns
  • 05_ai_agents.md — Agent loops (where to inject metrics)

Key Takeaways

  1. Structured logging (JSON) is not optional — unstructured logs are impossible to query at scale
  2. Track p95 latency, not just average — tail latency matters for SLAs
  3. Cost tracking is critical — agent can cost $100/day before you notice
  4. Loop detection saves money — stuck agents are expensive, detect quickly
  5. Health checks prevent cascading failures — one broken tool shouldn’t crash entire system
  6. Graceful degradation > crash — cheaper/slower is better than down

Bottom line: The best harness is worthless in production without observability. Instrument everything before day-1.


Part 8: End-to-End Debugging Scenario

Scenario: Agent Stuck in a Loop

You get an alert at 2:14 PM: “Loop detected — agent-77 repeating action 5+ times.” The agent is a research assistant processing customer tickets. It has been running for 12 minutes and has already spent $4.20 of its $5.00 session budget. Here is how to diagnose and fix it.

Step 1: Check the Logs

Query structured logs for the session. Filter by session ID and sort by timestamp:

# Query logs from your observability platform (example: jq on local JSON logs)
cat /var/log/harness/agent.log \
  | jq 'select(.session_id == "sess-f8a2c1") | {timestamp, event, iteration, tool_name, status, cost_usd}' \
  | tail -30

You see output like this:

{"timestamp":"2026-04-18T14:02:11Z","event":"agent_step","iteration":1,"tool_name":null,"status":"reasoning","cost_usd":0.31}
{"timestamp":"2026-04-18T14:03:05Z","event":"tool_call","iteration":2,"tool_name":"web_search","status":"success","cost_usd":0.28}
{"timestamp":"2026-04-18T14:04:22Z","event":"tool_call","iteration":3,"tool_name":"web_search","status":"success","cost_usd":0.35}
{"timestamp":"2026-04-18T14:05:41Z","event":"tool_call","iteration":4,"tool_name":"web_search","status":"timeout","cost_usd":0.42}
{"timestamp":"2026-04-18T14:06:58Z","event":"tool_call","iteration":5,"tool_name":"web_search","status":"timeout","cost_usd":0.48}
{"timestamp":"2026-04-18T14:08:15Z","event":"tool_call","iteration":6,"tool_name":"web_search","status":"timeout","cost_usd":0.52}
{"timestamp":"2026-04-18T14:09:33Z","event":"tool_call","iteration":7,"tool_name":"web_search","status":"timeout","cost_usd":0.55}
{"timestamp":"2026-04-18T14:10:50Z","event":"tool_call","iteration":8,"tool_name":"web_search","status":"timeout","cost_usd":0.58}
{"timestamp":"2026-04-18T14:12:07Z","event":"tool_call","iteration":9,"tool_name":"web_search","status":"timeout","cost_usd":0.61}
{"timestamp":"2026-04-18T14:13:24Z","event":"tool_call","iteration":10,"tool_name":"web_search","status":"timeout","cost_usd":0.64}

Step 2: Identify the Pattern

The pattern is clear:

  • Iterations 1-3: Normal operation (reasoning, then successful searches)
  • Iteration 4 onward: web_search starts timing out, but the agent keeps retrying the same tool with the same query

Check the reasoning log for what the agent is thinking:

cat /var/log/harness/agent.log \
  | jq 'select(.session_id == "sess-f8a2c1" and .event == "agent_step") | {iteration, reasoning_summary}'
{"iteration":4,"reasoning_summary":"Search timed out, let me try again"}
{"iteration":5,"reasoning_summary":"Search timed out, let me try again"}
{"iteration":6,"reasoning_summary":"Still timing out, trying once more"}
{"iteration":7,"reasoning_summary":"The search should work this time"}
{"iteration":8,"reasoning_summary":"Search timed out, let me try again"}

Root cause identified: The external search API went down at iteration 4. The agent has no fallback strategy and keeps retrying the same failing tool indefinitely.

Step 3: Check Tool Health

Confirm the tool is actually down:

cat /var/log/harness/agent.log \
  | jq 'select(.event == "tool_call" and .tool_name == "web_search") | {timestamp, status, error}' \
  | tail -5
{"timestamp":"2026-04-18T14:10:50Z","status":"timeout","error":"Connection timeout after 30s: search.api.example.com"}
{"timestamp":"2026-04-18T14:12:07Z","status":"timeout","error":"Connection timeout after 30s: search.api.example.com"}
{"timestamp":"2026-04-18T14:13:24Z","status":"timeout","error":"Connection timeout after 30s: search.api.example.com"}

The external API search.api.example.com is unreachable. This is a downstream dependency failure, not a bug in the agent logic.

Step 4: Fix the Root Cause

Two fixes are needed — one immediate, one preventive:

Immediate fix — Kill the stuck session and requeue the task:

# kill_stuck_session.py
import requests

# Stop the stuck agent
requests.post("http://localhost:8000/admin/sessions/sess-f8a2c1/stop", json={
    "reason": "manual_intervention",
    "requeue": True  # Requeue the original task for later processing
})

Preventive fix — Add a fallback strategy to the agent loop:

class ResilientToolCaller:
    """Call tools with retry limits and fallbacks."""

    def __init__(self, max_retries: int = 2, fallback_tools: dict = None):
        self.max_retries = max_retries
        self.fallback_tools = fallback_tools or {}
        self.consecutive_failures: dict[str, int] = {}

    def call(self, tool_name: str, **params) -> str:
        """Call a tool with retry limit and automatic fallback."""
        failures = self.consecutive_failures.get(tool_name, 0)

        if failures >= self.max_retries:
            fallback = self.fallback_tools.get(tool_name)
            if fallback:
                logger.warning(
                    f"{tool_name} failed {failures} times, switching to {fallback}"
                )
                self.consecutive_failures[tool_name] = 0
                return self.call(fallback, **params)
            else:
                raise ToolExhaustedError(
                    f"{tool_name} failed {failures} times with no fallback"
                )

        try:
            result = execute_tool(tool_name, **params)
            self.consecutive_failures[tool_name] = 0
            return result
        except Exception as e:
            self.consecutive_failures[tool_name] = failures + 1
            logger.warning(f"{tool_name} attempt {failures + 1} failed: {e}")
            raise

# Wire it into the agent
caller = ResilientToolCaller(
    max_retries=2,
    fallback_tools={"web_search": "local_search", "api_call": "cached_response"}
)

Step 5: Verify the Fix

After deploying the fix, run a targeted test:

# test_tool_fallback.py
from unittest.mock import patch

def test_agent_falls_back_on_tool_failure():
    """Verify agent switches to fallback after 2 consecutive failures."""
    caller = ResilientToolCaller(
        max_retries=2,
        fallback_tools={"web_search": "local_search"}
    )

    with patch("harness.tools.web_search", side_effect=TimeoutError("down")):
        # First two calls raise, third should use fallback
        try:
            caller.call("web_search", query="test")
        except TimeoutError:
            pass
        try:
            caller.call("web_search", query="test")
        except TimeoutError:
            pass

        # Third call should route to local_search
        with patch("harness.tools.local_search", return_value="fallback result"):
            result = caller.call("web_search", query="test")
            assert result == "fallback result"

Then monitor production for the next hour. Check that the loop detector alert does not re-fire:

cat /var/log/harness/agent.log \
  | jq 'select(.event == "error" and .error_type == "loop_detected")' \
  | wc -l
# Expected: 0

Debugging Checklist (Quick Reference)

  1. Check logs — Filter by session ID, look at the last 20-30 events
  2. Find the pattern — Is it the same tool? Same action hash? Same error?
  3. Check tool health — Is the downstream dependency responding?
  4. Check cost — How much has the stuck agent spent? Kill it if approaching budget
  5. Identify root cause — Missing fallback? No retry limit? Bad prompt causing circular reasoning?
  6. Fix and verify — Deploy fix, run targeted test, monitor for recurrence

For more failure scenarios and their solutions, see Doc 18 (Troubleshooting & FAQ).


See Also

  • Doc 06 (Harness Architecture) — Understand the components you’re monitoring; observability extends each component
  • Doc 05 (AI Agents) — Understand agentic loops to design better monitoring for agent behavior
  • Doc 11 (Testing & QA) — Establish quality baselines before deploying; monitoring detects regressions
  • Doc 18 (Troubleshooting & FAQ) — Use observability signals to diagnose and fix stuck agents

Changelog

  • April 2026: Created document
    • Structured logging patterns
    • Metrics and dashboard templates
    • Cost tracking implementation
    • Loop detection and debugging
    • Health checks and recovery