Skip to content

Performance Tuning Guide

Performance Tuning Guide

Comprehensive performance optimization strategies for Klira AI SDK, based on real-world deployments.

Performance Overview

Klira AI SDK provides multiple layers of performance optimization:

  • Multi-layer Caching: L1 (memory) + L2 (Redis) + L3 (future)
  • Framework Detection Caching: Compiled regex patterns with LRU cache
  • Streaming Optimization: Real-time processing with backpressure handling
  • Connection Pooling: Redis connection pools for high throughput
  • Lazy Loading: On-demand adapter registration
  • Batch Processing: Efficient analytics and telemetry batching

Cache Optimization

1. Cache Hierarchy Configuration

from klira.cache.cache_hierarchy import CacheHierarchy, CacheHierarchyConfig
from klira.cache.redis_adapter import RedisConfig
# High-performance cache configuration
cache_config = CacheHierarchyConfig(
# L1: Local memory cache (fastest)
l1_enabled=True,
l1_max_size=50000, # Larger for high-traffic apps
l1_ttl_seconds=600, # 10 minutes
# L2: Distributed Redis cache
l2_enabled=True,
l2_ttl_seconds=3600, # 1 hour
l2_fallback_on_error=True,
# Performance optimizations
async_write_behind=True, # Non-blocking writes
promotion_threshold=1, # Aggressive promotion
# Monitoring
metrics_enabled=True,
health_check_interval=30
)
# Redis configuration for maximum performance
redis_config = RedisConfig(
url="redis://redis-cluster:6379",
max_connections=200, # High connection pool
socket_timeout=1.0, # Fast timeouts
socket_connect_timeout=2.0,
retry_on_timeout=True,
retry_attempts=2, # Quick retries
enable_cluster=True, # Distributed Redis
key_prefix="perf:klira:",
default_ttl=1800
)
cache_hierarchy = CacheHierarchy(cache_config)

2. Cache Performance Monitoring

import time
from typing import Dict, Any
class CachePerformanceMonitor:
"""Monitor and optimize cache performance."""
def __init__(self, cache_hierarchy: CacheHierarchy):
self.cache = cache_hierarchy
self.metrics_history = []
def collect_metrics(self) -> Dict[str, Any]:
"""Collect current cache performance metrics."""
stats = self.cache.get_stats()
metrics = {
"timestamp": time.time(),
"l1_hit_rate": stats.get("l1_hit_rate", 0),
"l2_hit_rate": stats.get("l2_hit_rate", 0),
"overall_hit_rate": stats.get("overall_hit_rate", 0),
"avg_latency_ms": stats.get("avg_latency_ms", 0),
"l1_size": stats.get("l1_size", 0),
"total_requests": stats.get("total_requests", 0)
}
self.metrics_history.append(metrics)
return metrics
def analyze_performance(self) -> Dict[str, Any]:
"""Analyze cache performance and provide recommendations."""
if not self.metrics_history:
return {"status": "no_data"}
latest = self.metrics_history[-1]
recommendations = []
# Hit rate analysis
if latest["overall_hit_rate"] < 70:
recommendations.append({
"issue": "Low cache hit rate",
"current": f"{latest['overall_hit_rate']:.1f}%",
"target": ">80%",
"action": "Increase cache size or TTL"
})
# Latency analysis
if latest["avg_latency_ms"] > 50:
recommendations.append({
"issue": "High cache latency",
"current": f"{latest['avg_latency_ms']:.1f}ms",
"target": "<20ms",
"action": "Check Redis connection or reduce cache size"
})
# Memory usage analysis
if latest["l1_size"] > 40000: # 80% of 50k max
recommendations.append({
"issue": "L1 cache near capacity",
"current": f"{latest['l1_size']} items",
"target": "<40k items",
"action": "Increase l1_max_size or reduce TTL"
})
return {
"status": "healthy" if not recommendations else "needs_attention",
"current_metrics": latest,
"recommendations": recommendations
}
# Usage
monitor = CachePerformanceMonitor(cache_hierarchy)
# Periodic monitoring
def monitor_cache_performance():
metrics = monitor.collect_metrics()
analysis = monitor.analyze_performance()
print(f"Cache Performance: {metrics['overall_hit_rate']:.1f}% hit rate, "
f"{metrics['avg_latency_ms']:.1f}ms latency")
if analysis["recommendations"]:
print("Performance recommendations:")
for rec in analysis["recommendations"]:
print(f" - {rec['issue']}: {rec['action']}")

Framework Detection Optimization

1. Optimized Framework Detection Configuration

from klira.config import KliraConfig
# High-performance framework detection
config = KliraConfig(
framework_detection_cache_size=20000, # Large cache
lazy_loading=True, # Load adapters on demand
debug_mode=False, # Disable debug overhead
verbose=False # Disable verbose logging
)

2. Framework Detection Performance Monitoring

from klira.utils.framework_detection import detect_framework
from klira.performance import performance_instrumented
import time
from functools import lru_cache
@lru_cache(maxsize=10000)
def cached_framework_detection(context_hash: str) -> str:
"""Cached framework detection for repeated contexts."""
return detect_framework()
@performance_instrumented("framework_detection", "detect")
def optimized_framework_detection() -> str:
"""Optimized framework detection with caching."""
# Create context hash for caching
context_hash = hash(str(globals().keys()))
return cached_framework_detection(str(context_hash))
# Performance testing
def benchmark_framework_detection(iterations: int = 1000):
"""Benchmark framework detection performance."""
# Warm up
for _ in range(10):
optimized_framework_detection()
# Benchmark
start_time = time.time()
for _ in range(iterations):
optimized_framework_detection()
end_time = time.time()
avg_time_ms = (end_time - start_time) / iterations * 1000
print(f"Framework detection: {avg_time_ms:.2f}ms average over {iterations} iterations")
return avg_time_ms
# Run benchmark
benchmark_framework_detection()

Streaming Performance Optimization

1. High-Performance Streaming Configuration

from klira.streaming.types import StreamConfig
# Optimized streaming configuration
stream_config = StreamConfig(
# Performance settings
max_chunk_size=4096, # Larger chunks for efficiency
buffer_size=50, # Large buffer for throughput
timeout_seconds=120.0, # Longer timeout for complex operations
# Guardrail optimization
enable_guardrails=True,
realtime_validation=True,
validation_interval=3, # Validate every 3 chunks for performance
# Caching optimization
enable_caching=True,
cache_partial_responses=True,
cache_ttl_seconds=900, # 15 minutes
# Monitoring optimization
enable_metrics=True,
metrics_interval=10.0, # Less frequent metrics for performance
# Framework-specific optimizations
framework_specific={
"openai": {
"stream_options": {
"include_usage": True,
"parallel_tool_calls": True
}
},
"anthropic": {
"max_tokens": 8192,
"stream": True
},
"langchain": {
"streaming": True,
"chunk_size": 4096
}
}
)

Performance Best Practices

1. Configuration Checklist

# High-performance production configuration checklist
PERFORMANCE_CONFIG_CHECKLIST = {
"cache": {
"l1_max_size": 50000, # Large L1 cache
"l2_enabled": True, # Redis distributed cache
"async_write_behind": True, # Non-blocking writes
"promotion_threshold": 1, # Aggressive promotion
},
"framework": {
"lazy_loading": True, # On-demand loading
"cache_size": 20000, # Large detection cache
"debug_mode": False, # No debug overhead
},
"streaming": {
"max_chunk_size": 4096, # Large chunks
"buffer_size": 50, # Large buffer
"validation_interval": 3, # Reduced validation
},
"redis": {
"max_connections": 200, # High connection pool
"socket_timeout": 1.0, # Fast timeouts
"enable_cluster": True, # Distributed Redis
},
"analytics": {
"batch_size": 200, # Large batches
"flush_interval": 10, # Frequent flushes
"keep_raw_data": False, # Memory optimization
}
}
def validate_performance_config(config: KliraConfig) -> List[str]:
"""Validate configuration against performance best practices."""
issues = []
if config.framework_detection_cache_size < 10000:
issues.append("Framework cache size too small for high performance")
if config.debug_mode:
issues.append("Debug mode enabled - disable for production performance")
if config.verbose:
issues.append("Verbose logging enabled - disable for production performance")
return issues

2. Monitoring and Alerting

class PerformanceMonitor:
"""Comprehensive performance monitoring."""
def __init__(self):
self.thresholds = {
"cache_hit_rate": 80, # Minimum 80% hit rate
"avg_latency_ms": 50, # Maximum 50ms latency
"error_rate": 1, # Maximum 1% error rate
"memory_usage_mb": 2000, # Maximum 2GB memory
"redis_response_ms": 10 # Maximum 10ms Redis response
}
def check_performance_health(self) -> Dict[str, any]:
"""Check overall performance health."""
health_status = {
"overall": "healthy",
"components": {},
"alerts": []
}
# Check cache performance
cache_stats = cache_hierarchy.get_stats()
if cache_stats["overall_hit_rate"] < self.thresholds["cache_hit_rate"]:
health_status["overall"] = "degraded"
health_status["alerts"].append({
"component": "cache",
"issue": f"Hit rate {cache_stats['overall_hit_rate']:.1f}% below threshold {self.thresholds['cache_hit_rate']}%"
})
return health_status
# Usage
monitor = PerformanceMonitor()
def check_system_health():
health = monitor.check_performance_health()
if health["overall"] == "healthy":
print(" System performance: HEALTHY")
else:
print("â️ Performance issues detected:")
for alert in health["alerts"]:
print(f" - {alert['component']}: {alert['issue']}")

Next Steps

  1. Security Best Practices - Secure your high-performance setup
  2. Scaling Guide - Scale your optimized configuration
  3. Production Setup - Deploy your performance-tuned system

Performance Optimization Checklist

Cache hierarchy configured for workload Redis connection pool optimized Framework detection cached Streaming configuration tuned Analytics batching optimized Performance monitoring enabled Benchmarks established Health checks implemented

Maximum performance for enterprise LLM governance