Azure AI Agent Service Part 4: Production Patterns—State Management, Sessions, and Observability

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • MyrinNew
    Senior Member
    • Feb 2024
    • 5168

    #1

    Azure AI Agent Service Part 4: Production Patterns—State Management, Sessions, and Observability

    Production Patterns: State Management, Sessions, and Observability

    Part 4 of 5 in the Azure AI Agent Service series





    You've built your agent, added tools, and everything works beautifully in development. Then you deploy to production and reality hits: conversations get lost, errors cascade silently, costs spiral, and debugging becomes archaeology.


    Production-ready agents need more than clever prompts and tool integrations. They need robust state management, proper observability, and defensive patterns that gracefully handle the chaos of real-world usage.


    In this article, we'll cover the infrastructure patterns that separate weekend projects from production systems.


    Understanding Threads and Sessions

    Azure AI Agent Service uses threads as the fundamental unit of conversation state. A thread maintains the full message history, context, and any attached files for a conversation.






    public class ConversationManager
    {
    private readonly AgentsClient _client;
    private readonly IDistributedCache _cache;
    private readonly ILoggerConversationManager> _logger;

    public ConversationManager(
    AgentsClient client,
    IDistributedCache cache,
    ILoggerConversationManager> logger)
    {
    _client = client;
    _cache = cache;
    _logger = logger;
    }

    public async Taskstring> GetOrCreateThreadAsync(
    string userId,
    string conversationId,
    CancellationToken ct = default)
    {
    var cacheKey = $"thread:{userId}:{conversationId}";

    // Check cache first
    var threadId = await _cache.GetStringAsync(cacheKey, ct);
    if (!string.IsNullOrEmpty(threadId))
    {
    // Verify thread still exists
    try
    {
    await _client.GetThreadAsync(threadId, ct);
    return threadId;
    }
    catch (RequestFailedException ex) when (ex.Status == 404)
    {
    _logger.LogWarning("Cached thread {ThreadId} no longer exists", threadId);
    }
    }

    // Create new thread
    var thread = await _client.CreateThreadAsync(ct);
    threadId = thread.Value.Id;

    // Cache with expiration matching your retention policy
    await _cache.SetStringAsync(
    cacheKey,
    threadId,
    new DistributedCacheEntryOptions
    {
    AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
    },
    ct);

    _logger.LogInformation(
    "Created new thread {ThreadId} for user {UserId}",
    threadId, userId);

    return threadId;
    }
    }







    Thread Lifecycle Patterns

    Threads aren't free—they consume storage and count toward your quotas. Design your lifecycle strategy early:






    public class ThreadLifecycleService : BackgroundService
    {
    private readonly AgentsClient _client;
    private readonly IThreadRepository _repository;
    private readonly ILoggerThreadLifecycleService> _logger;
    private readonly TimeSpan _maxThreadAge = TimeSpan.FromDays(30);
    private readonly TimeSpan _inactivityThreshold = TimeSpan.FromDays(7);

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
    while (!stoppingToken.IsCancellationRequested)
    {
    try
    {
    await CleanupStaleThreadsAsync(stoppingToken);
    }
    catch (Exception ex)
    {
    _logger.LogError(ex, "Thread cleanup failed");
    }

    await Task.Delay(TimeSpan.FromHours(6), stoppingToken);
    }
    }

    private async Task CleanupStaleThreadsAsync(CancellationToken ct)
    {
    var staleThreads = await _repository.GetThreadsOlderThanAsync(
    _inactivityThreshold, ct);

    foreach (var thread in staleThreads)
    {
    try
    {
    // Archive conversation before deletion if needed
    if (thread.MessageCount > 5)
    {
    await ArchiveThreadAsync(thread, ct);
    }

    await _client.DeleteThreadAsync(thread.ThreadId, ct);
    await _repository.MarkDeletedAsync(thread.ThreadId, ct);

    _logger.LogInformation(
    "Cleaned up thread {ThreadId} (inactive {Days} days)",
    thread.ThreadId,
    (DateTime.UtcNow - thread.LastActivity).TotalDays);
    }
    catch (Exception ex)
    {
    _logger.LogWarning(ex,
    "Failed to cleanup thread {ThreadId}", thread.ThreadId);
    }
    }
    }
    }







    State Persistence Patterns

    Agents often need to maintain state beyond the conversation—user preferences, accumulated context, or workflow progress. Here's a pattern that layers application state on top of thread state:






    public class AgentSession
    {
    public string ThreadId { get; set; } = string.Empty;
    public string UserId { get; set; } = string.Empty;
    public Dictionarystring, object> Metadata { get; set; } = new();
    public Liststring> ActiveWorkflows { get; set; } = new();
    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset LastActivity { get; set; }
    public int TotalTokensUsed { get; set; }
    public decimal EstimatedCost { get; set; }
    }

    public class SessionStateManager
    {
    private readonly IDistributedCache _cache;
    private readonly ISessionRepository _repository;
    private readonly JsonSerializerOptions _jsonOptions;

    public async TaskAgentSession> GetSessionAsync(
    string sessionId,
    CancellationToken ct = default)
    {
    // Try cache first
    var cached = await _cache.GetStringAsync($"session:{sessionId}", ct);
    if (!string.IsNullOrEmpty(cached))
    {
    return JsonSerializer.DeserializeAgentSession>(cached, _jsonOptions)!;
    }

    // Fall back to persistent storage
    var session = await _repository.GetAsync(sessionId, ct);
    if (session != null)
    {
    await CacheSessionAsync(session, ct);
    }

    return session ?? throw new SessionNotFoundException(sessionId);
    }

    public async Task UpdateSessionAsync(
    AgentSession session,
    CancellationToken ct = default)
    {
    session.LastActivity = DateTimeOffset.UtcNow;

    // Update cache immediately for fast reads
    await CacheSessionAsync(session, ct);

    // Persist asynchronously (eventual consistency is usually fine)
    _ = _repository.UpsertAsync(session, ct);
    }

    public async TaskT?> GetMetadataAsyncT>(
    string sessionId,
    string key,
    CancellationToken ct = default)
    {
    var session = await GetSessionAsync(sessionId, ct);

    if (session.Metadata.TryGetValue(key, out var value))
    {
    if (value is JsonElement element)
    {
    return element.DeserializeT>(_jsonOptions);
    }
    return (T)value;
    }

    return default;
    }

    private async Task CacheSessionAsync(AgentSession session, CancellationToken ct)
    {
    var json = JsonSerializer.Serialize(session, _jsonOptions);
    await _cache.SetStringAsync(
    $"session:{session.ThreadId}",
    json,
    new DistributedCacheEntryOptions
    {
    SlidingExpiration = TimeSpan.FromHours(2)
    },
    ct);
    }
    }







    Handling Session Recovery

    When things go wrong (and they will), you need graceful recovery:






    public class ResilientAgentService
    {
    private readonly AgentsClient _client;
    private readonly SessionStateManager _sessions;
    private readonly ILoggerResilientAgentService> _logger;

    public async TaskAgentResponse> ProcessMessageAsync(
    string sessionId,
    string userMessage,
    CancellationToken ct = default)
    {
    AgentSession session;

    try
    {
    session = await _sessions.GetSessionAsync(sessionId, ct);
    }
    catch (SessionNotFoundException)
    {
    _logger.LogWarning("Session {SessionId} not found, creating new", sessionId);
    session = await CreateNewSessionAsync(sessionId, ct);
    }

    // Verify thread is still valid
    try
    {
    await _client.GetThreadAsync(session.ThreadId, ct);
    }
    catch (RequestFailedException ex) when (ex.Status == 404)
    {
    _logger.LogWarning(
    "Thread {ThreadId} for session {SessionId} not found, recreating",
    session.ThreadId, sessionId);

    session = await RecoverSessionAsync(session, ct);
    }

    return await ExecuteWithRetryAsync(session, userMessage, ct);
    }

    private async TaskAgentSession> RecoverSessionAsync(
    AgentSession oldSession,
    CancellationToken ct)
    {
    // Create new thread
    var thread = await _client.CreateThreadAsync(ct);

    // Preserve session metadata but reset thread
    var newSession = new AgentSession
    {
    ThreadId = thread.Value.Id,
    UserId = oldSession.UserId,
    Metadata = oldSession.Metadata,
    CreatedAt = DateTimeOffset.UtcNow,
    LastActivity = DateTimeOffset.UtcNow
    };

    // Optionally inject context about the recovery
    await _client.CreateMessageAsync(
    newSession.ThreadId,
    MessageRole.User,
    "System note: This is a recovered session. Previous context may be limited.",
    cancellationToken: ct);

    await _sessions.UpdateSessionAsync(newSession, ct);
    return newSession;
    }
    }







    Observability with Azure Monitor

    You can't fix what you can't see. Production agents need comprehensive observability:






    public class ObservableAgentService
    {
    private readonly AgentsClient _client;
    private readonly ILoggerObservableAgentService> _logger;
    private readonly TelemetryClient _telemetry;
    private readonly Meter _meter;

    private readonly Counterlong> _messageCounter;
    private readonly Histogramdouble> _responseLatency;
    private readonly Counterlong> _tokenCounter;
    private readonly Counterlong> _errorCounter;

    public ObservableAgentService(
    AgentsClient client,
    ILoggerObservableAgentService> logger,
    TelemetryClient telemetry,
    IMeterFactory meterFactory)
    {
    _client = client;
    _logger = logger;
    _telemetry = telemetry;
    _meter = meterFactory.Create("AgentService");

    _messageCounter = _meter.CreateCounterlong>(
    "agent.messages.total",
    description: ""Total messages processed");"

    _responseLatency = _meter.CreateHistogramdouble>(
    "agent.response.duration",
    unit: "ms",
    description: ""Agent response time in milliseconds");"

    _tokenCounter = _meter.CreateCounterlong>(
    "agent.tokens.total",
    description: ""Total tokens consumed");"

    _errorCounter = _meter.CreateCounterlong>(
    "agent.errors.total",
    description: ""Total errors encountered");"
    }

    public async TaskAgentResponse> ProcessWithTelemetryAsync(
    string agentId,
    string threadId,
    string message,
    CancellationToken ct = default)
    {
    var stopwatch = Stopwatch.StartNew();
    var operationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();

    using var operation = _telemetry.StartOperationRequestTelemetry>(
    "AgentInteraction");
    operation.Telemetry.Properties["AgentId"] = agentId;
    operation.Telemetry.Properties["ThreadId"] = threadId;

    _logger.LogInformation(
    "Processing message for agent {AgentId} on thread {ThreadId}. " +
    "OperationId: {OperationId}",
    agentId, threadId, operationId);

    try
    {
    // Create message
    await _client.CreateMessageAsync(
    threadId,
    MessageRole.User,
    message,
    cancellationToken: ct);

    // Run agent
    var run = await _client.CreateRunAsync(threadId, agentId, ct);

    // Poll for completion with telemetry
    var result = await PollRunWithTelemetryAsync(
    threadId, run.Value.Id, ct);

    stopwatch.Stop();

    // Record metrics
    _messageCounter.Add(1,
    new KeyValuePairstring, object?>("agent_id", agentId),
    new KeyValuePairstring, object?>("status", "success"));

    _responseLatency.Record(stopwatch.ElapsedMilliseco nds,
    new KeyValuePairstring, object?>("agent_id", agentId));

    if (result.Usage != null)
    {
    _tokenCounter.Add(result.Usage.TotalTokens,
    new KeyValuePairstring, object?>("agent_id", agentId),
    new KeyValuePairstring, object?>("type", "total"));
    }

    _logger.LogInformation(
    "Completed agent interaction in {ElapsedMs}ms. " +
    "Tokens: {Tokens}. OperationId: {OperationId}",
    stopwatch.ElapsedMilliseconds,
    result.Usage?.TotalTokens ?? 0,
    operationId);

    return result;
    }
    catch (Exception ex)
    {
    _errorCounter.Add(1,
    new KeyValuePairstring, object?>("agent_id", agentId),
    new KeyValuePairstring, object?>("error_type", ex.GetType().Name));

    _telemetry.TrackException(ex, new Dictionarystring, string>
    {
    ["AgentId"] = agentId,
    ["ThreadId"] = threadId,
    ["OperationId"] = operationId
    });

    _logger.LogError(ex,
    "Agent interaction failed. AgentId: {AgentId}, " +
    "ThreadId: {ThreadId}, OperationId: {OperationId}",
    agentId, threadId, operationId);

    throw;
    }
    }
    }







    Custom Metrics Dashboard

    Set up Application Insights queries for your agent metrics:






    // Agent Response Time Percentiles
    customMetrics
    | where name == "agent.response.duration"
    | summarize
    p50 = percentile(value, 50),
    p95 = percentile(value, 95),
    p99 = percentile(value, 99)
    by bin(timestamp, 1h), tostring(customDimensions.agent_id)
    | render timechart

    // Token Consumption by Agent
    customMetrics
    | where name == "agent.tokens.total"
    | summarize TotalTokens = sum(value)
    by bin(timestamp, 1d), tostring(customDimensions.agent_id)
    | render columnchart

    // Error Rate
    customMetrics
    | where name == "agent.errors.total"
    | summarize Errors = sum(value)
    by bin(timestamp, 1h), tostring(customDimensions.error_type)
    | render timechart







    Error Handling and Retry Patterns

    Azure AI Agent Service calls can fail for various reasons. Here's a robust retry strategy using Polly:






    public static class AgentResiliencePolicy
    {
    public static IAsyncPolicyT> CreatePolicyT>(ILogger logger)
    {
    // Retry with exponential backoff for transient failures
    var retryPolicy = PolicyT>
    .HandleRequestFailedException>(ex => IsTransient(ex))
    .OrHttpRequestException>()
    .OrTaskCanceledException>()
    .WaitAndRetryAsync(
    retryCount: 3,
    sleepDurationProvider: attempt =>
    TimeSpan.FromSeconds(Math.Pow(2, attempt)),
    onRetry: (outcome, timespan, attempt, context) =>
    {
    logger.LogWarning(
    "Retry {Attempt} after {Delay}ms due to {Error}",
    attempt, timespan.TotalMilliseconds,
    outcome.Exception?.Message);
    });

    // Circuit breaker for sustained failures
    var circuitBreaker = PolicyT>
    .HandleRequestFailedException>()
    .CircuitBreakerAsync(
    handledEventsAllowedBeforeBreaking: 5,
    durationOfBreak: TimeSpan.FromMinutes(1),
    onBreak: (outcome, breakDelay) =>
    {
    logger.LogError(
    "Circuit breaker opened for {Duration}ms",
    breakDelay.TotalMilliseconds);
    },
    onReset: () => logger.LogInformation("Circuit breaker reset"));

    // Timeout for hanging requests
    var timeout = Policy.TimeoutAsyncT>(
    TimeSpan.FromMinutes(5),
    TimeoutStrategy.Pessimistic);

    return Policy.WrapAsync(timeout, retryPolicy, circuitBreaker);
    }

    private static bool IsTransient(RequestFailedException ex)
    {
    return ex.Status switch
    {
    429 => true, // Rate limited
    500 => true, // Internal server error
    502 => true, // Bad gateway
    503 => true, // Service unavailable
    504 => true, // Gateway timeout
    _ => false
    };
    }
    }

    // Usage in your service
    public class ResilientAgentClient
    {
    private readonly AgentsClient _client;
    private readonly IAsyncPolicyThreadRun> _runPolicy;

    public async TaskThreadRun> CreateRunWithResilienceAsync(
    string threadId,
    string agentId,
    CancellationToken ct = default)
    {
    return await _runPolicy.ExecuteAsync(async () =>
    {
    var response = await _client.CreateRunAsync(threadId, agentId, ct);
    return response.Value;
    });
    }
    }







    Handling Rate Limits Gracefully





    public class RateLimitHandler
    {
    private readonly SemaphoreSlim _semaphore;
    private readonly ILoggerRateLimitHandler> _logger;
    private DateTime _retryAfter = DateTime.MinValue;

    public RateLimitHandler(int maxConcurrency, ILoggerRateLimitHandler> logger)
    {
    _semaphore = new SemaphoreSlim(maxConcurrency);
    _logger = logger;
    }

    public async TaskT> ExecuteAsyncT>(
    FuncTaskT>> operation,
    CancellationToken ct = default)
    {
    // Wait if we're in a rate limit cooldown
    var waitTime = _retryAfter - DateTime.UtcNow;
    if (waitTime > TimeSpan.Zero)
    {
    _logger.LogInformation(
    "Rate limit active, waiting {WaitMs}ms",
    waitTime.TotalMilliseconds);
    await Task.Delay(waitTime, ct);
    }

    await _semaphore.WaitAsync(ct);
    try
    {
    return await operation();
    }
    catch (RequestFailedException ex) when (ex.Status == 429)
    {
    // Parse retry-after header
    var retryAfterSeconds = ParseRetryAfter(ex);
    _retryAfter = DateTime.UtcNow.AddSeconds(retryAfterSeconds);

    _logger.LogWarning(
    "Rate limited. Retry after {Seconds} seconds",
    retryAfterSeconds);

    throw;
    }
    finally
    {
    _semaphore.Release();
    }
    }

    private int ParseRetryAfter(RequestFailedException ex)
    {
    // Default to 60 seconds if header not present
    // In production, parse the actual header from the response
    return 60;
    }
    }







    Cost Management and Token Tracking

    Agents can get expensive fast. Build cost awareness into your system from day one:






    public class CostTracker
    {
    private readonly IMetricsService _metrics;
    private readonly ICostRepository _repository;
    private readonly ILoggerCostTracker> _logger;

    // Pricing as of 2024 - verify current rates!
    private static readonly Dictionarystring, (decimal Input, decimal Output)> Pricing = new()
    {
    ["gpt-4o"] = (0.005m / 1000, 0.015m / 1000),
    ["gpt-4o-mini"] = (0.00015m / 1000, 0.0006m / 1000),
    ["gpt-4-turbo"] = (0.01m / 1000, 0.03m / 1000)
    };

    public async TaskUsageReport> TrackUsageAsync(
    string userId,
    string agentId,
    string model,
    int inputTokens,
    int outputTokens,
    CancellationToken ct = default)
    {
    var (inputRate, outputRate) = Pricing.GetValueOrDefault(
    model, (0.01m / 1000, 0.03m / 1000));

    var cost = (inputTokens * inputRate) + (outputTokens * outputRate);

    var usage = new UsageRecord
    {
    UserId = userId,
    AgentId = agentId,
    Model = model,
    InputTokens = inputTokens,
    OutputTokens = outputTokens,
    EstimatedCost = cost,
    Timestamp = DateTimeOffset.UtcNow
    };

    await _repository.RecordUsageAsync(usage, ct);

    // Check if user is approaching limits
    var dailyUsage = await _repository.GetDailyUsageAsync(userId, ct);
    var monthlyUsage = await _repository.GetMonthlyUsageAsync(userId, ct);

    if (dailyUsage.TotalCost > 10m) // $10 daily threshold
    {
    _logger.LogWarning(
    "User {UserId} exceeded daily cost threshold: ${Cost:F2}",
    userId, dailyUsage.TotalCost);
    }

    return new UsageReport
    {
    CurrentRequest = usage,
    DailyTotal = dailyUsage,
    MonthlyTotal = monthlyUsage
    };
    }
    }

    public class BudgetEnforcementMiddleware
    {
    private readonly ICostRepository _repository;
    private readonly ILoggerBudgetEnforcementMiddleware> _logger;

    public async Taskbool> CanProcessRequestAsync(
    string userId,
    CancellationToken ct = default)
    {
    var budget = await _repository.GetUserBudgetAsync(userId, ct);
    var usage = await _repository.GetMonthlyUsageAsync(userId, ct);

    if (usage.TotalCost >= budget.MonthlyLimit)
    {
    _logger.LogWarning(
    "User {UserId} blocked - monthly budget exhausted. " +
    "Used: ${Used:F2}, Limit: ${Limit:F2}",
    userId, usage.TotalCost, budget.MonthlyLimit);
    return false;
    }

    // Warn at 80% threshold
    if (usage.TotalCost >= budget.MonthlyLimit * 0.8m)
    {
    _logger.LogInformation(
    "User {UserId} approaching budget limit: " +
    "${Used:F2} of ${Limit:F2}",
    userId, usage.TotalCost, budget.MonthlyLimit);
    }

    return true;
    }
    }







    Optimizing Token Usage





    public class TokenOptimizer
    {
    private readonly int _maxContextTokens;
    private readonly ITokenCounter _tokenCounter;

    public TokenOptimizer(int maxContextTokens, ITokenCounter tokenCounter)
    {
    _maxContextTokens = maxContextTokens;
    _tokenCounter = tokenCounter;
    }

    public async Taskstring> OptimizeContextAsync(
    IListMessage> messages,
    string systemPrompt,
    CancellationToken ct = default)
    {
    var systemTokens = await _tokenCounter.CountAsync(systemPrompt, ct);
    var availableTokens = _maxContextTokens - systemTokens - 1000; // Reserve for response

    var optimizedMessages = new ListMessage>();
    var currentTokens = 0;

    // Always keep the most recent messages
    foreach (var message in messages.Reverse())
    {
    var messageTokens = await _tokenCounter.CountAsync(message.Content, ct);

    if (currentTokens + messageTokens > availableTokens)
    {
    // Summarize older messages instead of dropping them
    var summary = await SummarizeOlderMessagesAsync(
    messages.Take(messages.Count - optimizedMessages.Count).ToList(),
    ct);

    optimizedMessages.Insert(0, new Message
    {
    Role = "system",
    Content = $"[Earlier conversation summary: {summary}]"
    });
    break;
    }

    optimizedMessages.Insert(0, message);
    currentTokens += messageTokens;
    }

    return JsonSerializer.Serialize(optimizedMessages);
    }
    }







    Putting It All Together

    Here's a complete production-ready agent service:






    public class ProductionAgentService
    {
    private readonly AgentsClient _client;
    private readonly SessionStateManager _sessions;
    private readonly CostTracker _costTracker;
    private readonly ObservableAgentService _telemetry;
    private readonly RateLimitHandler _rateLimiter;
    private readonly BudgetEnforcementMiddleware _budgetEnforcer;
    private readonly ILoggerProductionAgentService> _logger;

    public async TaskAgentResponse> ProcessAsync(
    AgentRequest request,
    CancellationToken ct = default)
    {
    // 1. Budget check
    if (!await _budgetEnforcer.CanProcessRequestAsync(request.Use rId, ct))
    {
    throw new BudgetExceededException(request.UserId);
    }

    // 2. Get or create session
    var session = await _sessions.GetSessionAsync(request.SessionId, ct);

    // 3. Execute with rate limiting and telemetry
    var response = await _rateLimiter.ExecuteAsync(async () =>
    {
    return await _telemetry.ProcessWithTelemetryAsync(
    request.AgentId,
    session.ThreadId,
    request.Message,
    ct);
    }, ct);

    // 4. Track costs
    if (response.Usage != null)
    {
    await _costTracker.TrackUsageAsync(
    request.UserId,
    request.AgentId,
    response.Model,
    response.Usage.PromptTokens,
    response.Usage.CompletionTokens,
    ct);
    }

    // 5. Update session state
    session.TotalTokensUsed += response.Usage?.TotalTokens ?? 0;
    await _sessions.UpdateSessionAsync(session, ct);

    return response;
    }
    }







    Key Takeaways

    1. Threads are your conversation state - Cache IDs, verify existence, and clean up stale threads regularly
    2. Layer application state - Use sessions to track metadata, costs, and workflow state on top of thread state
    3. Instrument everything - Response times, token counts, error rates, and costs should all be observable
    4. Build resilience in - Retry transient failures, respect rate limits, and implement circuit breakers
    5. Track costs religiously - Token usage can surprise you; build budgets and alerts from day one
    6. Plan for recovery - Sessions will get corrupted, threads will disappear—handle it gracefully


    In the final part of this series, we'll cover testing strategies and deployment patterns for Azure AI Agent Service, including integration testing, load testing, and CI/CD pipelines.





    Coming up next: Part 5 - "Testing and Deployment: CI/CD for AI Agents"


    Found this helpful? Follow me for more Azure AI content, and drop a comment with your production war stories!




    More...
Working...