|
| 1 | +""" |
| 2 | +Base classes and exceptions for data migrations. |
| 3 | +
|
| 4 | +This module contains the core base classes and exceptions used by the data |
| 5 | +migration system in Redis OM Python. |
| 6 | +""" |
| 7 | + |
| 8 | +import abc |
| 9 | +import time |
| 10 | +from typing import Any, Dict, List |
| 11 | + |
| 12 | +try: |
| 13 | + import psutil |
| 14 | +except ImportError: |
| 15 | + psutil = None |
| 16 | + |
| 17 | +from ....connections import get_redis_connection |
| 18 | + |
| 19 | + |
| 20 | +class DataMigrationError(Exception): |
| 21 | + """Exception raised when data migration operations fail.""" |
| 22 | + pass |
| 23 | + |
| 24 | + |
| 25 | +class PerformanceMonitor: |
| 26 | + """Monitor migration performance and resource usage.""" |
| 27 | + |
| 28 | + def __init__(self): |
| 29 | + self.start_time = None |
| 30 | + self.end_time = None |
| 31 | + self.start_memory = None |
| 32 | + self.peak_memory = None |
| 33 | + self.processed_items = 0 |
| 34 | + self.batch_times = [] |
| 35 | + |
| 36 | + def start(self): |
| 37 | + """Start performance monitoring.""" |
| 38 | + self.start_time = time.time() |
| 39 | + if psutil: |
| 40 | + try: |
| 41 | + process = psutil.Process() |
| 42 | + self.start_memory = process.memory_info().rss / 1024 / 1024 # MB |
| 43 | + self.peak_memory = self.start_memory |
| 44 | + except (psutil.NoSuchProcess, Exception): |
| 45 | + self.start_memory = None |
| 46 | + self.peak_memory = None |
| 47 | + else: |
| 48 | + self.start_memory = None |
| 49 | + self.peak_memory = None |
| 50 | + |
| 51 | + def update_progress(self, items_processed: int): |
| 52 | + """Update progress and check memory usage.""" |
| 53 | + self.processed_items = items_processed |
| 54 | + if psutil: |
| 55 | + try: |
| 56 | + process = psutil.Process() |
| 57 | + current_memory = process.memory_info().rss / 1024 / 1024 # MB |
| 58 | + if self.peak_memory is None or current_memory > self.peak_memory: |
| 59 | + self.peak_memory = current_memory |
| 60 | + except (psutil.NoSuchProcess, Exception): |
| 61 | + pass |
| 62 | + |
| 63 | + def record_batch_time(self, batch_time: float): |
| 64 | + """Record time taken for a batch.""" |
| 65 | + self.batch_times.append(batch_time) |
| 66 | + |
| 67 | + def finish(self): |
| 68 | + """Finish monitoring and calculate final stats.""" |
| 69 | + self.end_time = time.time() |
| 70 | + |
| 71 | + def get_stats(self) -> Dict[str, Any]: |
| 72 | + """Get performance statistics.""" |
| 73 | + if self.start_time is None: |
| 74 | + return {} |
| 75 | + |
| 76 | + total_time = (self.end_time or time.time()) - self.start_time |
| 77 | + avg_batch_time = ( |
| 78 | + sum(self.batch_times) / len(self.batch_times) if self.batch_times else 0 |
| 79 | + ) |
| 80 | + |
| 81 | + stats = { |
| 82 | + "total_time_seconds": total_time, |
| 83 | + "processed_items": self.processed_items, |
| 84 | + "items_per_second": ( |
| 85 | + self.processed_items / total_time if total_time > 0 else 0 |
| 86 | + ), |
| 87 | + "average_batch_time": avg_batch_time, |
| 88 | + "total_batches": len(self.batch_times), |
| 89 | + } |
| 90 | + |
| 91 | + if self.start_memory is not None: |
| 92 | + stats.update( |
| 93 | + { |
| 94 | + "start_memory_mb": self.start_memory, |
| 95 | + "peak_memory_mb": self.peak_memory, |
| 96 | + "memory_increase_mb": (self.peak_memory or 0) - self.start_memory, |
| 97 | + } |
| 98 | + ) |
| 99 | + |
| 100 | + return stats |
| 101 | + |
| 102 | + |
| 103 | +class BaseMigration(abc.ABC): |
| 104 | + """ |
| 105 | + Base class for all data migrations. |
| 106 | +
|
| 107 | + Each migration must implement the `up` method to apply the migration. |
| 108 | + Optionally implement `down` for rollback support and `can_run` for validation. |
| 109 | + """ |
| 110 | + |
| 111 | + migration_id: str = "" |
| 112 | + description: str = "" |
| 113 | + dependencies: List[str] = [] |
| 114 | + |
| 115 | + def __init__(self, redis_client=None): |
| 116 | + self.redis = redis_client or get_redis_connection() |
| 117 | + if not self.migration_id: |
| 118 | + raise DataMigrationError( |
| 119 | + f"Migration {self.__class__.__name__} must define migration_id" |
| 120 | + ) |
| 121 | + |
| 122 | + @abc.abstractmethod |
| 123 | + async def up(self) -> None: |
| 124 | + """Apply the migration. Must be implemented by subclasses.""" |
| 125 | + pass |
| 126 | + |
| 127 | + async def down(self) -> None: |
| 128 | + """ |
| 129 | + Reverse the migration (optional). |
| 130 | +
|
| 131 | + If not implemented, rollback will not be available for this migration. |
| 132 | + """ |
| 133 | + raise NotImplementedError( |
| 134 | + f"Migration {self.migration_id} does not support rollback" |
| 135 | + ) |
| 136 | + |
| 137 | + async def can_run(self) -> bool: |
| 138 | + """ |
| 139 | + Check if the migration can run (optional validation). |
| 140 | +
|
| 141 | + Returns: |
| 142 | + bool: True if migration can run, False otherwise |
| 143 | + """ |
| 144 | + return True |
0 commit comments