Skip to content

Commit 9b07603

Browse files
narkqsiddontang
authored andcommitted
Replication delay should be updated with atomic.StoreUint32 (#392)
1 parent de6c3a8 commit 9b07603

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

canal/sync.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,12 @@ func (c *Canal) updateTable(db, table string) (err error) {
213213
return
214214
}
215215
func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
216-
atomic.AddUint32(c.delay, uint32(time.Now().Unix())-ev.Header.Timestamp)
216+
var newDelay uint32
217+
now := uint32(time.Now().Unix())
218+
if now >= ev.Header.Timestamp {
219+
newDelay = now - ev.Header.Timestamp
220+
}
221+
atomic.StoreUint32(c.delay, newDelay)
217222
}
218223

219224
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {

0 commit comments

Comments
 (0)