@@ -87,6 +87,7 @@ def __init__(
8787
8888 self ._trigger_count = 0
8989 self .task = None # placeholder for the repeat task created in self.__wrapper
90+ self .__next_run_time = None # placeholder for the current target run time
9091
9192 def __call__ (self , func : CoroFunction ):
9293 return self .__wrapper (func )
@@ -102,13 +103,13 @@ async def wrapped() -> None:
102103 async def inner ():
103104 # maybe wait for next trigger cycle
104105 if not self .on_startup :
105- next_run = self .next_run
106+ self . __next_run_time = self .next_run
106107 if self .logger :
107108 self .logger .info (
108109 f'`on_startup` is set to `False`. First run of { self .__class__ .__name__ } for '
109- f'{ func .__name__ } : { next_run .isoformat ()} '
110+ f'{ func .__name__ } : { self . __next_run_time .isoformat ()} '
110111 )
111- await self .sleep_until (next_run )
112+ await self .sleep_until (self . __next_run_time )
112113
113114 # repeat indefinitely
114115 while True :
@@ -118,6 +119,13 @@ async def inner():
118119 if self .max_trigger_count is not None :
119120 self ._trigger_count += 1
120121
122+ # safeguard against early triggers - apparently, a desync is possible
123+ if (
124+ self .__next_run_time is not None
125+ and (now := datetime .datetime .now ().astimezone ()) < self .__next_run_time
126+ ):
127+ await asyncio .sleep (max ((self .__next_run_time - now ).total_seconds (), 0 ))
128+
121129 # call the decorated function
122130 try :
123131 if self .iter_args :
@@ -154,23 +162,24 @@ async def inner():
154162
155163 # sleep until next execution time
156164 try :
157- next_run = self .next_run
165+ self . __next_run_time = self .next_run
158166 except StopRunning :
159167 if self .logger :
160168 self .logger .info (
161169 f'{ self .__class__ .__name__ } received StopRunning for { func .__name__ } . Terminating'
162170 )
163171 break
164- if self .logger and datetime .datetime .now ().astimezone () <= next_run :
172+ if self .logger and datetime .datetime .now ().astimezone () <= self . __next_run_time :
165173 self .logger .info (
166- f'{ self .__class__ .__name__ } finished for { func .__name__ } . Next run: { next_run .isoformat ()} '
174+ f'{ self .__class__ .__name__ } finished for { func .__name__ } . '
175+ f'Next run: { self .__next_run_time .isoformat ()} '
167176 )
168177 elif self .logger : # i.e. next_run is in the past
169178 self .logger .warning (
170179 f'{ self .__class__ .__name__ } missed the scheduled run time for { func .__name__ } . Running now'
171180 )
172181
173- await self .sleep_until (next_run )
182+ await self .sleep_until (self . __next_run_time )
174183
175184 # create a reference to the repeating task to prevent it from accidentally being garbage collected
176185 self .task = self .loop .create_task (inner ())
0 commit comments