14
14
15
15
def get_worker_id ():
16
16
"""Get pytest-xdist worker ID for test isolation."""
17
- return os .environ .get (' PYTEST_XDIST_WORKER' , ' main' )
17
+ return os .environ .get (" PYTEST_XDIST_WORKER" , " main" )
18
18
19
19
20
20
def get_worker_prefix ():
@@ -30,11 +30,11 @@ def get_worker_prefix():
30
30
async def clean_redis (redis ):
31
31
"""Provide a clean Redis instance for schema migration tests."""
32
32
worker_prefix = get_worker_prefix ()
33
-
33
+
34
34
# Worker-specific Redis keys
35
35
applied_migrations_key = f"redis_om:schema_applied_migrations:{ worker_prefix } "
36
36
schema_key_pattern = f"redis_om:schema:*:{ worker_prefix } "
37
-
37
+
38
38
# Cleanup before test
39
39
await redis .delete (applied_migrations_key )
40
40
keys = await redis .keys (schema_key_pattern )
@@ -79,7 +79,9 @@ async def test_create_migration_file_when_no_ops(redis, monkeypatch):
79
79
80
80
try :
81
81
with tempfile .TemporaryDirectory () as tmp :
82
- migrator = _WorkerAwareSchemaMigrator (redis_client = redis , migrations_dir = tmp )
82
+ migrator = _WorkerAwareSchemaMigrator (
83
+ redis_client = redis , migrations_dir = tmp
84
+ )
83
85
fp = await migrator .create_migration_file ("noop" )
84
86
assert fp is None
85
87
finally :
@@ -90,7 +92,9 @@ async def test_create_migration_file_when_no_ops(redis, monkeypatch):
90
92
91
93
async def test_create_and_status_empty (clean_redis ):
92
94
with tempfile .TemporaryDirectory () as tmp :
93
- migrator = _WorkerAwareSchemaMigrator (redis_client = clean_redis , migrations_dir = tmp )
95
+ migrator = _WorkerAwareSchemaMigrator (
96
+ redis_client = clean_redis , migrations_dir = tmp
97
+ )
94
98
status = await migrator .status ()
95
99
assert status ["total_migrations" ] == 0
96
100
assert status ["applied_count" ] == 0
@@ -107,13 +111,15 @@ async def test_rollback_noop(redis):
107
111
108
112
class _WorkerAwareSchemaMigrator (SchemaMigrator ):
109
113
"""SchemaMigrator that uses worker-specific Redis keys for test isolation."""
110
-
114
+
111
115
def __init__ (self , redis_client , migrations_dir ):
112
116
super ().__init__ (redis_client , migrations_dir )
113
117
self .worker_prefix = get_worker_prefix ()
114
118
# Override the class constant with worker-specific key
115
- self .APPLIED_MIGRATIONS_KEY = f"redis_om:schema_applied_migrations:{ self .worker_prefix } "
116
-
119
+ self .APPLIED_MIGRATIONS_KEY = (
120
+ f"redis_om:schema_applied_migrations:{ self .worker_prefix } "
121
+ )
122
+
117
123
async def mark_unapplied (self , migration_id : str ):
118
124
"""Mark migration as unapplied using worker-specific key."""
119
125
await self .redis .srem (self .APPLIED_MIGRATIONS_KEY , migration_id )
@@ -138,8 +144,12 @@ async def up(self) -> None:
138
144
await self .redis .execute_command (f"FT.CREATE { index_name } { new_schema } " )
139
145
# Update tracking keys with worker isolation
140
146
new_hash = hashlib .sha1 (new_schema .encode ("utf-8" )).hexdigest ()
141
- await self .redis .set (f"{ schema_hash_key (index_name )} :{ worker_prefix } " , new_hash )
142
- await self .redis .set (f"{ schema_text_key (index_name )} :{ worker_prefix } " , new_schema )
147
+ await self .redis .set (
148
+ f"{ schema_hash_key (index_name )} :{ worker_prefix } " , new_hash
149
+ )
150
+ await self .redis .set (
151
+ f"{ schema_text_key (index_name )} :{ worker_prefix } " , new_schema
152
+ )
143
153
144
154
async def down (self ) -> None :
145
155
"""Rollback the migration operations."""
@@ -156,8 +166,12 @@ async def down(self) -> None:
156
166
f"FT.CREATE { index_name } { prev_schema } "
157
167
)
158
168
prev_hash = hashlib .sha1 (prev_schema .encode ("utf-8" )).hexdigest ()
159
- await self .redis .set (f"{ schema_hash_key (index_name )} :{ worker_prefix } " , prev_hash )
160
- await self .redis .set (f"{ schema_text_key (index_name )} :{ worker_prefix } " , prev_schema )
169
+ await self .redis .set (
170
+ f"{ schema_hash_key (index_name )} :{ worker_prefix } " , prev_hash
171
+ )
172
+ await self .redis .set (
173
+ f"{ schema_text_key (index_name )} :{ worker_prefix } " , prev_schema
174
+ )
161
175
162
176
163
177
class _TestSchemaMigrationNoRollback (BaseSchemaMigration ):
@@ -176,7 +190,9 @@ async def up(self) -> None:
176
190
async def test_rollback_successful_single_operation (clean_redis ):
177
191
"""Test successful rollback of migration with single operation."""
178
192
with tempfile .TemporaryDirectory () as tmp :
179
- migrator = _WorkerAwareSchemaMigrator (redis_client = clean_redis , migrations_dir = tmp )
193
+ migrator = _WorkerAwareSchemaMigrator (
194
+ redis_client = clean_redis , migrations_dir = tmp
195
+ )
180
196
redis = clean_redis
181
197
worker_prefix = get_worker_prefix ()
182
198
@@ -189,7 +205,9 @@ async def test_rollback_successful_single_operation(clean_redis):
189
205
await redis .execute_command (f"FT.CREATE { index_name } { original_schema } " )
190
206
original_hash = hashlib .sha1 (original_schema .encode ("utf-8" )).hexdigest ()
191
207
await redis .set (f"{ schema_hash_key (index_name )} :{ worker_prefix } " , original_hash )
192
- await redis .set (f"{ schema_text_key (index_name )} :{ worker_prefix } " , original_schema )
208
+ await redis .set (
209
+ f"{ schema_text_key (index_name )} :{ worker_prefix } " , original_schema
210
+ )
193
211
194
212
# Create and apply migration
195
213
migration = _TestSchemaMigration (
@@ -226,8 +244,12 @@ async def mock_discover():
226
244
assert success is True
227
245
228
246
# Verify rollback restored original schema
229
- restored_hash = await redis .get (f"{ schema_hash_key (index_name )} :{ worker_prefix } " )
230
- restored_text = await redis .get (f"{ schema_text_key (index_name )} :{ worker_prefix } " )
247
+ restored_hash = await redis .get (
248
+ f"{ schema_hash_key (index_name )} :{ worker_prefix } "
249
+ )
250
+ restored_text = await redis .get (
251
+ f"{ schema_text_key (index_name )} :{ worker_prefix } "
252
+ )
231
253
assert restored_hash == original_hash
232
254
assert restored_text == original_schema
233
255
@@ -314,9 +336,13 @@ async def test_rollback_multiple_operations(redis):
314
336
hash1 = hashlib .sha1 (original_schema1 .encode ("utf-8" )).hexdigest ()
315
337
hash2 = hashlib .sha1 (original_schema2 .encode ("utf-8" )).hexdigest ()
316
338
await redis .set (f"{ schema_hash_key (index1_name )} :{ worker_prefix } " , hash1 )
317
- await redis .set (f"{ schema_text_key (index1_name )} :{ worker_prefix } " , original_schema1 )
339
+ await redis .set (
340
+ f"{ schema_text_key (index1_name )} :{ worker_prefix } " , original_schema1
341
+ )
318
342
await redis .set (f"{ schema_hash_key (index2_name )} :{ worker_prefix } " , hash2 )
319
- await redis .set (f"{ schema_text_key (index2_name )} :{ worker_prefix } " , original_schema2 )
343
+ await redis .set (
344
+ f"{ schema_text_key (index2_name )} :{ worker_prefix } " , original_schema2
345
+ )
320
346
321
347
# Create migration with multiple operations
322
348
migration = _TestSchemaMigration (
@@ -353,10 +379,18 @@ async def mock_discover():
353
379
assert success is True
354
380
355
381
# Verify both indices were rolled back to original schemas
356
- restored_hash1 = await redis .get (f"{ schema_hash_key (index1_name )} :{ worker_prefix } " )
357
- restored_text1 = await redis .get (f"{ schema_text_key (index1_name )} :{ worker_prefix } " )
358
- restored_hash2 = await redis .get (f"{ schema_hash_key (index2_name )} :{ worker_prefix } " )
359
- restored_text2 = await redis .get (f"{ schema_text_key (index2_name )} :{ worker_prefix } " )
382
+ restored_hash1 = await redis .get (
383
+ f"{ schema_hash_key (index1_name )} :{ worker_prefix } "
384
+ )
385
+ restored_text1 = await redis .get (
386
+ f"{ schema_text_key (index1_name )} :{ worker_prefix } "
387
+ )
388
+ restored_hash2 = await redis .get (
389
+ f"{ schema_hash_key (index2_name )} :{ worker_prefix } "
390
+ )
391
+ restored_text2 = await redis .get (
392
+ f"{ schema_text_key (index2_name )} :{ worker_prefix } "
393
+ )
360
394
361
395
assert restored_hash1 == hash1
362
396
assert restored_text1 == original_schema1
@@ -556,7 +590,9 @@ async def test_rollback_state_consistency(redis):
556
590
await redis .execute_command (f"FT.CREATE { index_name } { original_schema } " )
557
591
original_hash = hashlib .sha1 (original_schema .encode ("utf-8" )).hexdigest ()
558
592
await redis .set (f"{ schema_hash_key (index_name )} :{ worker_prefix } " , original_hash )
559
- await redis .set (f"{ schema_text_key (index_name )} :{ worker_prefix } " , original_schema )
593
+ await redis .set (
594
+ f"{ schema_text_key (index_name )} :{ worker_prefix } " , original_schema
595
+ )
560
596
561
597
migration = _TestSchemaMigration (
562
598
migration_id = "008_consistency_test" ,
@@ -593,8 +629,12 @@ async def mock_discover():
593
629
assert success is True
594
630
595
631
# Verify complete state consistency after rollback
596
- restored_hash = await redis .get (f"{ schema_hash_key (index_name )} :{ worker_prefix } " )
597
- restored_text = await redis .get (f"{ schema_text_key (index_name )} :{ worker_prefix } " )
632
+ restored_hash = await redis .get (
633
+ f"{ schema_hash_key (index_name )} :{ worker_prefix } "
634
+ )
635
+ restored_text = await redis .get (
636
+ f"{ schema_text_key (index_name )} :{ worker_prefix } "
637
+ )
598
638
599
639
# Hash and text should match original exactly
600
640
assert restored_hash == original_hash
0 commit comments