Skip to content

Performance Optimization ‐ From 8‐10s to Under 1s

Henry edited this page Aug 23, 2025 · 1 revision

Performance Optimization: From 8-10s to <1s

This guide documents the complete journey of optimizing MCP Memory Service stats queries from 8-10 seconds to under 1 second response time.

Table of Contents

Overview

Performance optimization is crucial for user experience. This case study shows how systematic analysis and intelligent caching reduced query times by over 90%.

Problem Identification

Initial Symptoms

  • Dashboard stats queries taking 8-10 seconds
  • UI freezing during stats refresh
  • Stats refreshing after every operation
  • Poor user experience

User Report

\"The MCP-MEMORY-DASHBOARD performance is really slow. 
Average query speed is 8-10 seconds, which is significantly 
slower than expected 2-5s performance.\"

Root Cause Analysis

1. Profiling the Bottleneck

Created performance monitoring script:

import time
import asyncio
from mcp_memory_service.storage.chroma import ChromaMemoryStorage

async def profile_stats_methods():
    storage = ChromaMemoryStorage()
    
    # Profile get_stats()
    start = time.time()
    stats = await storage.get_stats()
    end = time.time()
    
    print(f\"get_stats() took: {end - start:.2f} seconds\")
    
    # Profile individual components
    start = time.time()
    results = storage.collection.get(include=[\"metadatas\"])
    metadata_time = time.time() - start
    
    print(f\"Metadata fetch took: {metadata_time:.2f} seconds\")
    print(f\"Number of memories: {len(results['ids'])}\")

2. Findings

The bottleneck was in get_stats() method:

# Original implementation (SLOW)
def get_stats(self) -> Dict[str, Any]:
    # This loads ALL metadata for EVERY memory!
    results = self.collection.get(include=[\"metadatas\"])
    
    all_tags = []
    for metadata in results[\"metadatas\"]:
        # Processing every single memory's metadata
        tags = self._parse_tags(metadata.get(\"tags\", \"[]\"))
        all_tags.extend(tags)
    
    unique_tags = list(set(all_tags))
    
    return {
        \"total_memories\": len(results[\"ids\"]),
        \"unique_tags\": len(unique_tags),
        \"all_tags\": unique_tags,
        \"database_size\": self._get_database_size()
    }

3. Problem Areas Identified

  1. Full metadata scan: Loading metadata for all memories
  2. No caching: Same expensive operation repeated
  3. Frequent calls: Stats refreshed after every operation
  4. Large collections: Performance degraded with scale

Solution Architecture

1. Intelligent Caching System

Implemented a time-based cache with 30-second TTL:

from datetime import datetime, timedelta
from typing import Dict, Any, Optional

class StatsCache:
    \"\"\"Cache for expensive stats operations\"\"\"
    
    def __init__(self, ttl_seconds: int = 30):
        self.ttl = timedelta(seconds=ttl_seconds)
        self._cache: Optional[Dict[str, Any]] = None
        self._last_update: Optional[datetime] = None
    
    def get(self) -> Optional[Dict[str, Any]]:
        \"\"\"Get cached stats if still valid\"\"\"
        if not self._cache or not self._last_update:
            return None
            
        if datetime.now() - self._last_update > self.ttl:
            return None
            
        return self._cache.copy()
    
    def set(self, stats: Dict[str, Any]) -> None:
        \"\"\"Cache new stats\"\"\"
        self._cache = stats.copy()
        self._last_update = datetime.now()
    
    def invalidate(self) -> None:
        \"\"\"Invalidate cache on data changes\"\"\"
        self._cache = None
        self._last_update = None

2. Smart Sampling for Large Collections

For collections over 100 memories, use statistical sampling:

async def get_stats_optimized(self) -> Dict[str, Any]:
    \"\"\"Optimized stats with caching and sampling\"\"\"
    
    # Check cache first
    cached = self.stats_cache.get()
    if cached:
        return cached
    
    # Get total count efficiently
    total_memories = self.collection.count()
    
    # Smart sampling for large collections
    if total_memories > 100:
        # Sample 10% or max 50 documents
        sample_size = min(int(total_memories * 0.1), 50)
        
        # Get random sample
        results = self.collection.get(
            limit=sample_size,
            include=[\"metadatas\"]
        )
        
        # Extrapolate tag statistics
        sample_tags = []
        for metadata in results[\"metadatas\"]:
            tags = self._parse_tags(metadata.get(\"tags\", \"[]\"))
            sample_tags.extend(tags)
        
        # Estimate unique tags based on sample
        unique_in_sample = len(set(sample_tags))
        estimated_unique = int(unique_in_sample * (total_memories / sample_size) * 0.7)
        
        stats = {
            \"total_memories\": total_memories,
            \"unique_tags\": estimated_unique,
            \"is_estimate\": True,
            \"sample_size\": sample_size
        }
    else:
        # Small collection - scan everything
        results = self.collection.get(include=[\"metadatas\"])
        all_tags = []
        for metadata in results[\"metadatas\"]:
            tags = self._parse_tags(metadata.get(\"tags\", \"[]\"))
            all_tags.extend(tags)
        
        stats = {
            \"total_memories\": total_memories,
            \"unique_tags\": len(set(all_tags)),
            \"is_estimate\": False
        }
    
    # Cache the results
    self.stats_cache.set(stats)
    return stats

3. Cache Invalidation Strategy

Invalidate cache only on data modifications:

async def store(self, memory: Memory) -> Memory:
    \"\"\"Store memory and invalidate stats cache\"\"\"
    result = await self._store_internal(memory)
    
    # Invalidate cache on data change
    self.stats_cache.invalidate()
    
    return result

async def delete(self, memory_id: str) -> bool:
    \"\"\"Delete memory and invalidate stats cache\"\"\"
    result = await self._delete_internal(memory_id)
    
    if result:
        self.stats_cache.invalidate()
    
    return result

Implementation Details

1. Backend Changes

Modified server.py to use optimized methods:

class MemoryServer:
    def __init__(self):
        self.storage = None
        self.stats_cache = StatsCache(ttl_seconds=30)
    
    async def handle_dashboard_get_stats(self):
        \"\"\"Return cached or fresh stats\"\"\"
        try:
            # Ensure storage is initialized
            self._ensure_storage_initialized()
            
            # Use optimized method
            stats = await self.storage.get_stats_optimized()
            
            # Add cache metadata
            stats[\"cache_age_seconds\"] = self.stats_cache.age_seconds()
            
            return {
                \"success\": True,
                \"stats\": stats
            }
        except Exception as e:
            logger.error(f\"Error getting stats: {str(e)}\")
            return {
                \"success\": False,
                \"error\": str(e)
            }

2. Frontend Optimization

Reduced unnecessary stats calls:

// Before - stats refreshed after EVERY operation
const handleSearch = async () => {
    const results = await searchMemories(query);
    await loadStats(); // UNNECESSARY!
};

// After - stats only refreshed on data changes
const handleSearch = async () => {
    const results = await searchMemories(query);
    // No stats refresh - search doesn't change data
};

const handleStore = async () => {
    await storeMemory(content, tags);
    await loadStats(); // NECESSARY - data changed
};

3. UI Cache Indicators

Added visual feedback for cache status:

const StatsDisplay = ({ stats, lastUpdate }) => {
    const cacheAge = Date.now() - lastUpdate;
    const isStale = cacheAge > 30000; // 30 seconds
    
    return (
        <div className=\"stats-container\">
            <div className=\"stats-header\">
                <h3>Database Statistics</h3>
                {isStale && (
                    <Badge variant=\"warning\">
                        Cache expired
                    </Badge>
                )}
                <Button 
                    size=\"sm\" 
                    onClick={refreshStats}
                    disabled={!isStale}
                >
                    <RefreshIcon /> Refresh
                </Button>
            </div>
            {/* Stats display */}
        </div>
    );
};

Testing Strategy

1. Test Infrastructure

Created comprehensive testing setup:

# Directory structure
/archive/performance-optimization/
├── testing-plan.md
├── test-script.sh
├── performance_monitor.py
├── server_backup_original.py
└── results/
    ├── baseline_metrics.json
    └── optimized_metrics.json

2. Systematic Testing Script

test-script.sh:

#!/bin/bash

echo \"=== MCP Memory Dashboard Performance Testing ===\"
echo \"Testing optimizations for Issue #10\"
echo \"\"

# Test 1: Baseline performance
echo \"1. Testing baseline performance (original implementation)...\"
python performance_monitor.py --mode baseline

# Test 2: Cache effectiveness
echo \"2. Testing cache effectiveness...\"
python performance_monitor.py --mode cache

# Test 3: Large collection performance
echo \"3. Testing with large collection (1000+ memories)...\"
python performance_monitor.py --mode scale

# Test 4: Cache invalidation
echo \"4. Testing cache invalidation...\"
python performance_monitor.py --mode invalidation

# Generate report
echo \"5. Generating performance report...\"
python generate_report.py

3. Performance Monitor

import time
import statistics
from typing import List

class PerformanceMonitor:
    def __init__(self):
        self.measurements: List[float] = []
    
    async def measure_stats_performance(self, iterations: int = 10):
        \"\"\"Measure stats query performance\"\"\"
        storage = ChromaMemoryStorage()
        
        # Warm up
        await storage.get_stats_optimized()
        
        # Measure
        for i in range(iterations):
            start = time.time()
            await storage.get_stats_optimized()
            elapsed = time.time() - start
            self.measurements.append(elapsed)
            
            # Test cache hit vs miss
            if i % 3 == 0:
                storage.stats_cache.invalidate()
        
        return {
            \"min\": min(self.measurements),
            \"max\": max(self.measurements),
            \"mean\": statistics.mean(self.measurements),
            \"median\": statistics.median(self.measurements),
            \"cache_hits\": sum(1 for m in self.measurements if m < 0.1),
            \"cache_misses\": sum(1 for m in self.measurements if m >= 0.1)
        }

Results and Metrics

Performance Improvements

Metric Before After Improvement
Average Query Time 8-10s 0.8-1.2s ~90%
Cache Hit Response N/A <50ms N/A
Large Collection (1000+) 15-20s 1-2s ~90%
Frontend Responsiveness Freezing Smooth 100%
Stats Calls per Session 20-30 5-8 ~75%

Cache Effectiveness

Cache Hit Rate: 82%
Average Cache Hit Time: 45ms
Average Cache Miss Time: 980ms
Cache Memory Overhead: <1MB

User Experience Improvements

  • No more UI freezing
  • Instant stats on cache hits
  • Manual refresh option when needed
  • Visual cache status indicators

Best Practices

1. Caching Strategy

  • Time-based TTL: 30 seconds balances freshness vs performance
  • Smart invalidation: Only invalidate on data changes
  • Cache metadata: Include cache age for transparency

2. Sampling Techniques

def calculate_sample_size(total: int) -> int:
    \"\"\"Calculate optimal sample size\"\"\"
    if total < 100:
        return total  # No sampling needed
    elif total < 1000:
        return int(total * 0.1)  # 10% sample
    elif total < 10000:
        return int(total * 0.05)  # 5% sample
    else:
        return 500  # Cap at 500 for very large collections

3. Frontend Optimization

  • Batch stats requests
  • Debounce rapid operations
  • Show loading states
  • Provide manual refresh options

4. Monitoring and Alerts

async def monitor_performance(self):
    \"\"\"Monitor and log performance metrics\"\"\"
    if self.last_query_time > 5.0:
        logger.warning(f\"Slow query detected: {self.last_query_time}s\")
        
    if self.cache_hit_rate < 0.5:
        logger.warning(f\"Low cache hit rate: {self.cache_hit_rate}\")

Conclusion

This optimization journey demonstrates the power of:

  1. Systematic profiling to identify bottlenecks
  2. Intelligent caching to avoid repeated work
  3. Smart sampling for large datasets
  4. Frontend optimization to reduce unnecessary calls
  5. User feedback through cache indicators

The 90% performance improvement transformed the user experience from frustrating to delightful, proving that targeted optimization based on real metrics can achieve dramatic results.

Clone this wiki locally