@@ -168,7 +168,7 @@ def format_local_time_filter(utc_dt, format_str='%Y-%m-%d %H:%M'):
168
168
169
169
def schedule_all_repositories ():
170
170
"""Schedule all active repositories on startup"""
171
- from datetime import datetime # Import to ensure availability
171
+ from datetime import datetime , timedelta # Import to ensure availability
172
172
173
173
try :
174
174
# Clean up any stuck 'running' jobs from previous sessions
@@ -182,13 +182,47 @@ def schedule_all_repositories():
182
182
logger .info (f"Marked stuck job as failed: { stuck_job .id } for repository { stuck_job .repository_id } " )
183
183
db .session .commit ()
184
184
185
+ # Auto-cleanup: Remove duplicate backup jobs created within last hour
186
+ cutoff = datetime .utcnow () - timedelta (hours = 1 )
187
+ recent_jobs = BackupJob .query .filter (BackupJob .created_at > cutoff ).all ()
188
+
189
+ # Group by repository and find duplicates
190
+ repo_jobs = {}
191
+ for job in recent_jobs :
192
+ repo_id = job .repository_id
193
+ if repo_id not in repo_jobs :
194
+ repo_jobs [repo_id ] = []
195
+ repo_jobs [repo_id ].append (job )
196
+
197
+ duplicates_cleaned = 0
198
+ for repo_id , jobs in repo_jobs .items ():
199
+ if len (jobs ) > 1 :
200
+ # Sort by creation time, keep the first one, mark others as failed
201
+ jobs .sort (key = lambda j : j .created_at )
202
+ for duplicate_job in jobs [1 :]:
203
+ if duplicate_job .status in ['pending' , 'running' ]:
204
+ duplicate_job .status = 'failed'
205
+ duplicate_job .error_message = 'Duplicate job automatically cleaned up'
206
+ duplicate_job .completed_at = datetime .utcnow ()
207
+ duplicates_cleaned += 1
208
+ logger .info (f"Auto-cleaned duplicate job { duplicate_job .id } for repository { repo_id } " )
209
+
210
+ if duplicates_cleaned > 0 :
211
+ db .session .commit ()
212
+ logger .info (f"Auto-cleaned { duplicates_cleaned } duplicate backup jobs" )
213
+
185
214
# First, clear any existing jobs to prevent duplicates
186
215
existing_jobs = scheduler .get_jobs ()
187
216
for job in existing_jobs :
188
217
if job .id .startswith ('backup_' ):
189
218
scheduler .remove_job (job .id )
190
219
logger .info (f"Removed existing job on startup: { job .id } " )
191
220
221
+ # Clear our tracking as well
222
+ with _job_tracking_lock :
223
+ _scheduled_jobs .clear ()
224
+ logger .info ("Cleared job tracking set" )
225
+
192
226
repositories = Repository .query .filter_by (is_active = True ).all ()
193
227
scheduled_count = 0
194
228
for repository in repositories :
@@ -197,6 +231,65 @@ def schedule_all_repositories():
197
231
scheduled_count += 1
198
232
logger .info (f"Scheduled backup job for repository: { repository .name } ({ repository .schedule_type } )" )
199
233
logger .info (f"Scheduled { scheduled_count } backup jobs on startup" )
234
+
235
+ # Schedule a periodic health check job to monitor for duplicates
236
+ def scheduler_health_check ():
237
+ from datetime import datetime , timedelta
238
+ with app .app_context ():
239
+ try :
240
+ # Check for duplicate jobs in scheduler
241
+ all_jobs = scheduler .get_jobs ()
242
+ backup_jobs = [job for job in all_jobs if job .id .startswith ('backup_' )]
243
+ job_ids = [job .id for job in backup_jobs ]
244
+
245
+ # Check for duplicate job IDs
246
+ if len (job_ids ) != len (set (job_ids )):
247
+ logger .error ("Duplicate scheduler job IDs detected! Cleaning up..." )
248
+ # Remove all backup jobs and reschedule
249
+ for job in backup_jobs :
250
+ scheduler .remove_job (job .id )
251
+
252
+ # Clear tracking and reschedule
253
+ with _job_tracking_lock :
254
+ _scheduled_jobs .clear ()
255
+
256
+ # Reschedule active repositories
257
+ repositories = Repository .query .filter_by (is_active = True ).all ()
258
+ for repo in repositories :
259
+ if repo .schedule_type != 'manual' :
260
+ schedule_backup_job (repo )
261
+
262
+ logger .info ("Scheduler health check: cleaned up and rescheduled jobs" )
263
+
264
+ # Auto-cleanup old failed jobs (older than 7 days)
265
+ old_cutoff = datetime .utcnow () - timedelta (days = 7 )
266
+ old_jobs = BackupJob .query .filter (
267
+ BackupJob .status == 'failed' ,
268
+ BackupJob .created_at < old_cutoff
269
+ ).all ()
270
+
271
+ if old_jobs :
272
+ for old_job in old_jobs :
273
+ db .session .delete (old_job )
274
+ db .session .commit ()
275
+ logger .info (f"Auto-cleaned { len (old_jobs )} old failed backup jobs" )
276
+
277
+ except Exception as e :
278
+ logger .error (f"Scheduler health check failed: { e } " )
279
+
280
+ # Schedule health check to run every 6 hours
281
+ scheduler .add_job (
282
+ func = scheduler_health_check ,
283
+ trigger = CronTrigger (hour = '*/6' , timezone = LOCAL_TZ ),
284
+ id = 'scheduler_health_check' ,
285
+ name = 'Scheduler Health Check' ,
286
+ replace_existing = True ,
287
+ misfire_grace_time = 300 ,
288
+ coalesce = True ,
289
+ max_instances = 1
290
+ )
291
+ logger .info ("Scheduled periodic scheduler health check" )
292
+
200
293
except Exception as e :
201
294
logger .error (f"Error scheduling repositories on startup: { e } " )
202
295
@@ -205,6 +298,10 @@ def schedule_all_repositories():
205
298
_scheduler_lock = threading .Lock ()
206
299
_scheduler_initialized = False
207
300
301
+ # Global tracking of scheduled jobs to prevent duplicates
302
+ _scheduled_jobs = set ()
303
+ _job_tracking_lock = threading .Lock ()
304
+
208
305
def ensure_scheduler_initialized ():
209
306
"""Ensure scheduler is initialized with existing repositories (thread-safe)"""
210
307
global _scheduler_initialized
@@ -622,11 +719,23 @@ def favicon():
622
719
623
720
def schedule_backup_job (repository ):
624
721
"""Schedule a backup job for a repository"""
722
+ global _scheduled_jobs
723
+
625
724
if not repository .is_active :
626
725
logger .info (f"Repository { repository .name } is inactive, not scheduling" )
627
726
return
628
727
629
728
job_id = f'backup_{ repository .id } '
729
+
730
+ # Thread-safe check to prevent duplicate scheduling
731
+ with _job_tracking_lock :
732
+ if job_id in _scheduled_jobs :
733
+ logger .warning (f"Job { job_id } already being scheduled, skipping duplicate" )
734
+ return
735
+
736
+ # Mark this job as being scheduled
737
+ _scheduled_jobs .add (job_id )
738
+
630
739
logger .info (f"Attempting to schedule job { job_id } for repository { repository .name } " )
631
740
632
741
# Remove existing job if it exists - try multiple ways to ensure it's gone
@@ -635,6 +744,9 @@ def schedule_backup_job(repository):
635
744
if existing_job :
636
745
scheduler .remove_job (job_id )
637
746
logger .info (f"Removed existing scheduled job: { job_id } " )
747
+ # Also remove from our tracking
748
+ with _job_tracking_lock :
749
+ _scheduled_jobs .discard (job_id )
638
750
else :
639
751
logger .info (f"No existing job found for { job_id } " )
640
752
except Exception as e :
@@ -643,6 +755,8 @@ def schedule_backup_job(repository):
643
755
# Double-check that job is really gone
644
756
if scheduler .get_job (job_id ):
645
757
logger .error (f"Job { job_id } still exists after removal attempt, aborting schedule" )
758
+ with _job_tracking_lock :
759
+ _scheduled_jobs .discard (job_id )
646
760
return
647
761
648
762
# Create a wrapper function that includes Flask app context
@@ -657,7 +771,26 @@ def backup_with_context():
657
771
logger .warning (f"Repository { repository .id } not found or inactive, skipping backup" )
658
772
return
659
773
660
- # Check if there's already a running backup for this repository
774
+ # Multiple layers of duplicate prevention
775
+
776
+ # 0. Auto-cleanup: Mark any long-running jobs as failed
777
+ stuck_cutoff = datetime .utcnow () - timedelta (hours = 2 )
778
+ stuck_jobs = BackupJob .query .filter_by (
779
+ repository_id = repository .id ,
780
+ status = 'running'
781
+ ).filter (
782
+ BackupJob .started_at < stuck_cutoff
783
+ ).all ()
784
+
785
+ if stuck_jobs :
786
+ logger .warning (f"Found { len (stuck_jobs )} stuck running jobs for repository { repo .name } , cleaning up" )
787
+ for stuck in stuck_jobs :
788
+ stuck .status = 'failed'
789
+ stuck .error_message = 'Job stuck for over 2 hours, automatically failed'
790
+ stuck .completed_at = datetime .utcnow ()
791
+ db .session .commit ()
792
+
793
+ # 1. Check if there's already a running backup for this repository
661
794
running_job = BackupJob .query .filter_by (
662
795
repository_id = repository .id ,
663
796
status = 'running'
@@ -667,20 +800,45 @@ def backup_with_context():
667
800
logger .warning (f"Backup already running for repository { repo .name } (job { running_job .id } ), skipping" )
668
801
return
669
802
670
- # Additional check: ensure no backup started in the last 30 seconds to prevent rapid duplicates
671
- recent_cutoff = datetime .utcnow () - timedelta (seconds = 30 )
803
+ # 2. Check for very recent backups (within last 2 minutes) to prevent rapid duplicates
804
+ recent_cutoff = datetime .utcnow () - timedelta (minutes = 2 )
672
805
recent_backup = BackupJob .query .filter_by (
673
806
repository_id = repository .id
674
807
).filter (
675
808
BackupJob .started_at > recent_cutoff
676
809
).first ()
677
810
678
811
if recent_backup :
679
- logger .warning (f"Recent backup found for repository { repo .name } (started at { recent_backup .started_at } ), skipping" )
812
+ logger .warning (f"Recent backup found for repository { repo .name } (started at { recent_backup .started_at } ), skipping to prevent duplicates " )
680
813
return
681
814
682
- logger .info (f"Starting scheduled backup for repository: { repo .name } " )
683
- backup_service .backup_repository (repo )
815
+ # 3. Use a file-based lock to prevent concurrent executions
816
+ import fcntl
817
+ import tempfile
818
+ import os
819
+
820
+ lock_file_path = os .path .join (tempfile .gettempdir (), f"backup_lock_{ repository .id } " )
821
+
822
+ try :
823
+ lock_file = open (lock_file_path , 'w' )
824
+ fcntl .flock (lock_file .fileno (), fcntl .LOCK_EX | fcntl .LOCK_NB )
825
+ logger .info (f"Acquired file lock for repository { repo .name } " )
826
+
827
+ try :
828
+ logger .info (f"Starting scheduled backup for repository: { repo .name } " )
829
+ backup_service .backup_repository (repo )
830
+ logger .info (f"Completed scheduled backup for repository: { repo .name } " )
831
+ finally :
832
+ fcntl .flock (lock_file .fileno (), fcntl .LOCK_UN )
833
+ lock_file .close ()
834
+ try :
835
+ os .unlink (lock_file_path )
836
+ except :
837
+ pass
838
+
839
+ except (IOError , OSError ) as lock_error :
840
+ logger .warning (f"Could not acquire lock for repository { repo .name } , another backup may be running: { lock_error } " )
841
+ return
684
842
685
843
except Exception as e :
686
844
logger .error (f"Error in scheduled backup for repository { repository .id } : { e } " , exc_info = True )
@@ -771,6 +929,9 @@ def backup_with_context():
771
929
logger .info (f"Job { job_id } successfully scheduled, next run: { added_job .next_run_time } " )
772
930
else :
773
931
logger .error (f"Failed to schedule job { job_id } - job not found after creation" )
932
+ # Remove from tracking if scheduling failed
933
+ with _job_tracking_lock :
934
+ _scheduled_jobs .discard (job_id )
774
935
775
936
# Initialize scheduler with existing repositories at startup
776
937
# This runs after all functions are defined
0 commit comments