Skip to content

Tag System Migration and Management

Henry edited this page Aug 23, 2025 · 1 revision

Tag System Migration and Management

This guide demonstrates how to manage and migrate tag formats in the MCP Memory Service, based on real-world experience migrating 293 memories with multiple tag formats.

Table of Contents

Overview

Tag management is crucial for organizing and retrieving memories effectively. This guide covers the complete process of migrating tag formats, validating data integrity, and implementing robust backup/rollback procedures.

Problem Statement

During development, different tag storage formats may accumulate:

  • JSON string arrays: [\"tag1\", \"tag2\"]
  • Comma-separated strings: \"tag1,tag2\"
  • Mixed formats causing search inconsistencies
  • Invalid or empty tags

Pre-Migration Planning

1. Create File Structure

mcp_memory_service/
├── tests/
│   └── test_tag_storage.py
└── scripts/
    ├── validate_memories.py
    └── migrate_tags.py

2. Validation Script

Create scripts/validate_memories.py:

import asyncio
import json
from mcp_memory_service.storage.chroma import ChromaMemoryStorage

async def run_validation_report():
    storage = ChromaMemoryStorage(\"path/to/your/db\")
    results = storage.collection.get(include=[\"metadatas\", \"documents\"])
    
    report = {
        \"total_memories\": len(results[\"ids\"]),
        \"tag_formats\": {},
        \"invalid_tags\": [],
        \"missing_tags\": []
    }
    
    for i, metadata in enumerate(results[\"metadatas\"]):
        tags = metadata.get(\"tags\", \"\")
        
        # Analyze tag format
        if isinstance(tags, str):
            try:
                parsed = json.loads(tags)
                if isinstance(parsed, list):
                    report[\"tag_formats\"][\"json_array\"] = report[\"tag_formats\"].get(\"json_array\", 0) + 1
                else:
                    report[\"tag_formats\"][\"json_other\"] = report[\"tag_formats\"].get(\"json_other\", 0) + 1
            except json.JSONDecodeError:
                if \",\" in tags:
                    report[\"tag_formats\"][\"comma_separated\"] = report[\"tag_formats\"].get(\"comma_separated\", 0) + 1
                else:
                    report[\"tag_formats\"][\"single_tag\"] = report[\"tag_formats\"].get(\"single_tag\", 0) + 1
        elif isinstance(tags, list):
            report[\"tag_formats\"][\"native_list\"] = report[\"tag_formats\"].get(\"native_list\", 0) + 1
        else:
            report[\"missing_tags\"].append(results[\"ids\"][i])
    
    return report

if __name__ == \"__main__\":
    report = asyncio.run(run_validation_report())
    print(json.dumps(report, indent=2))

Migration Implementation

1. Backup Phase

Always create a timestamped backup before migration:

async def backup_memories():
    storage = ChromaMemoryStorage(\"path/to/your/db\")
    results = storage.collection.get(include=[\"metadatas\", \"documents\"])
    
    backup_data = {
        \"timestamp\": datetime.now().isoformat(),
        \"total_memories\": len(results[\"ids\"]),
        \"memories\": [{
            \"id\": results[\"ids\"][i],
            \"content\": results[\"documents\"][i],
            \"metadata\": results[\"metadatas\"][i]
        } for i in range(len(results[\"ids\"]))]
    }
    
    backup_file = f'memory_backup_{datetime.now().strftime(\"%Y%m%d_%H%M%S\")}.json'
    with open(backup_file, 'w') as f:
        json.dump(backup_data, f, indent=2)
    
    print(f\"Backup created: {backup_file}\")
    return backup_file

2. Migration Script

Create scripts/migrate_tags.py:

import asyncio
import json
import sys
from datetime import datetime
from mcp_memory_service.storage.chroma import ChromaMemoryStorage

async def migrate_tags():
    storage = ChromaMemoryStorage(\"path/to/your/db\")
    results = storage.collection.get(include=[\"metadatas\", \"documents\"])
    
    migrated_count = 0
    error_count = 0
    
    for i, metadata in enumerate(results[\"metadatas\"]):
        try:
            # Extract current tags
            current_tags = metadata.get(\"tags\", \"[]\")
            
            # Normalize to list format
            if isinstance(current_tags, str):
                try:
                    # Try parsing as JSON first
                    tags = json.loads(current_tags)
                    if isinstance(tags, str):
                        # Handle comma-separated string
                        tags = [t.strip() for t in tags.split(\",\") if t.strip()]
                    elif not isinstance(tags, list):
                        tags = []
                except json.JSONDecodeError:
                    # Handle as comma-separated string
                    tags = [t.strip() for t in current_tags.split(\",\") if t.strip()]
            elif isinstance(current_tags, list):
                tags = [str(t).strip() for t in current_tags if str(t).strip()]
            else:
                tags = []
            
            # Remove duplicates while preserving order
            seen = set()
            unique_tags = []
            for tag in tags:
                if tag not in seen:
                    seen.add(tag)
                    unique_tags.append(tag)
            
            # Update with normalized format
            new_metadata = metadata.copy()
            new_metadata[\"tags\"] = json.dumps(unique_tags)
            
            # Update memory in ChromaDB
            storage.collection.update(
                ids=[results[\"ids\"][i]],
                metadatas=[new_metadata]
            )
            
            migrated_count += 1
            
        except Exception as e:
            print(f\"Error migrating memory {results['ids'][i]}: {str(e)}\")
            error_count += 1
    
    print(f\"\
Migration complete!\")
    print(f\"Successfully migrated: {migrated_count}\")
    print(f\"Errors encountered: {error_count}\")
    
    return migrated_count, error_count

async def main():
    # Check for rollback flag
    if len(sys.argv) > 1 and sys.argv[1] == \"--rollback\":
        await rollback_migration()
        return
    
    # Create backup
    print(\"Creating backup...\")
    backup_file = await backup_memories()
    
    # Run validation
    print(\"\
Validating current state...\")
    pre_migration = await validate_current_state()
    print(f\"Pre-migration state: {json.dumps(pre_migration, indent=2)}\")
    
    # Confirm with user
    proceed = input(\"\
Proceed with migration? (yes/no): \")
    if proceed.lower() != 'yes':
        print(\"Migration cancelled\")
        return
    
    # Run migration
    print(\"\
Running migration...\")
    migrated, errors = await migrate_tags()
    
    # Verify migration
    print(\"\
Verifying migration...\")
    post_migration = await verify_migration()
    print(f\"Post-migration state: {json.dumps(post_migration, indent=2)}\")
    
    print(f\"\
Backup saved to: {backup_file}\")
    print(\"Keep this backup for at least 7 days!\")

if __name__ == \"__main__\":
    asyncio.run(main())

Validation and Testing

1. Integration Tests

Create tests/test_tag_storage.py:

import asyncio
import json
import pytest
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
from mcp_memory_service.models.memory import Memory

async def test_tag_formats():
    \"\"\"Test that all tag formats can be stored and retrieved correctly\"\"\"
    storage = ChromaMemoryStorage(\"test_db\")
    
    test_cases = [
        # Format: (input_tags, expected_stored_format)
        ([\"tag1\", \"tag2\"], '[\"tag1\", \"tag2\"]'),
        (\"tag1,tag2\", '[\"tag1\", \"tag2\"]'),
        ('[\"tag1\", \"tag2\"]', '[\"tag1\", \"tag2\"]'),
        ([\"tag1\", \"tag1\", \"tag2\"], '[\"tag1\", \"tag2\"]'),  # Duplicates removed
        ([\"  tag1  \", \"tag2  \"], '[\"tag1\", \"tag2\"]'),    # Whitespace trimmed
    ]
    
    for input_tags, expected in test_cases:
        memory = Memory(
            content=f\"Test with tags: {input_tags}\",
            metadata={\"tags\": input_tags}
        )
        
        # Store memory
        await storage.store(memory)
        
        # Retrieve and verify
        results = await storage.search_by_tag([\"tag1\"])
        assert len(results) > 0
        assert results[0].metadata.get(\"tags\") == expected
        
        # Clean up
        await storage.delete(memory.id)

async def test_search_consistency():
    \"\"\"Test that searches work consistently across tag formats\"\"\"
    storage = ChromaMemoryStorage(\"test_db\")
    
    # Store memories with different tag formats
    memories = [
        Memory(content=\"Memory 1\", metadata={\"tags\": [\"python\", \"testing\"]}),
        Memory(content=\"Memory 2\", metadata={\"tags\": \"python,testing\"}),
        Memory(content=\"Memory 3\", metadata={\"tags\": '[\"python\", \"testing\"]'}),
    ]
    
    for memory in memories:
        await storage.store(memory)
    
    # Search should find all three
    results = await storage.search_by_tag([\"python\"])
    assert len(results) == 3
    
    # Clean up
    for memory in memories:
        await storage.delete(memory.id)

2. Verification Script

async def verify_migration():
    \"\"\"Comprehensive post-migration verification\"\"\"
    storage = ChromaMemoryStorage(\"path/to/your/db\")
    results = storage.collection.get(include=[\"metadatas\"])
    
    verification = {
        \"total_memories\": len(results[\"ids\"]),
        \"all_json_format\": True,
        \"sample_searches\": {},
        \"format_consistency\": True
    }
    
    # Check all memories have correct format
    for metadata in results[\"metadatas\"]:
        tags = metadata.get(\"tags\", \"[]\")
        try:
            parsed = json.loads(tags)
            if not isinstance(parsed, list):
                verification[\"all_json_format\"] = False
                break
        except:
            verification[\"all_json_format\"] = False
            break
    
    # Run sample searches
    test_searches = [\"python\", \"testing\", \"debug\"]
    for tag in test_searches:
        results = await storage.search_by_tag([tag])
        verification[\"sample_searches\"][tag] = len(results)
    
    return verification

Rollback Procedures

Implement a safe rollback mechanism:

async def rollback_migration():
    \"\"\"Restore from backup file\"\"\"
    import glob
    
    # Find most recent backup
    backups = sorted(glob.glob(\"memory_backup_*.json\"), reverse=True)
    if not backups:
        print(\"No backup files found!\")
        return
    
    backup_file = backups[0]
    confirm = input(f\"Restore from {backup_file}? (yes/no): \")
    if confirm.lower() != 'yes':
        print(\"Rollback cancelled\")
        return
    
    # Load backup
    with open(backup_file, 'r') as f:
        backup = json.load(f)
    
    storage = ChromaMemoryStorage(\"path/to/your/db\")
    
    # Restore each memory
    restored = 0
    for memory_data in backup[\"memories\"]:
        try:
            storage.collection.update(
                ids=[memory_data[\"id\"]],
                metadatas=[memory_data[\"metadata\"]],
                documents=[memory_data[\"content\"]]
            )
            restored += 1
        except Exception as e:
            print(f\"Error restoring {memory_data['id']}: {str(e)}\")
    
    print(f\"Restored {restored} memories from backup\")

Best Practices

1. Pre-Migration Checklist

  • Run validation report
  • Create timestamped backup
  • Test migration on small subset
  • Notify users of maintenance window
  • Have rollback plan ready

2. Tag Format Standards

  • Always store as JSON string arrays
  • Strip whitespace from tags
  • Remove duplicate tags
  • Validate tag content (no empty strings)
  • Use lowercase for consistency (optional)

3. Monitoring Post-Migration

  • Daily validation for first week
  • Monitor error logs for tag-related issues
  • Test search functionality regularly
  • Keep backups for minimum 7 days
  • Document any issues encountered

4. Code Implementation

def sanitize_tags(tags):
    \"\"\"Standardize tag format for storage\"\"\"
    if isinstance(tags, str):
        try:
            tags = json.loads(tags)
            if isinstance(tags, str):
                tags = [t.strip() for t in tags.split(\",\")]
        except:
            tags = [t.strip() for t in tags.split(\",\")]
    elif not isinstance(tags, list):
        tags = []
    
    # Clean and deduplicate
    seen = set()
    clean_tags = []
    for tag in tags:
        tag = str(tag).strip()
        if tag and tag not in seen:
            seen.add(tag)
            clean_tags.append(tag)
    
    return json.dumps(clean_tags)

Conclusion

Tag migration is a critical maintenance task that requires careful planning and execution. By following this guide, you can safely migrate tag formats while maintaining data integrity and search functionality. The key is thorough validation, comprehensive backups, and systematic testing throughout the process.

Clone this wiki locally