Skip to content

Commit d3ec795

Browse files
committed
feat: event replica
1 parent d277384 commit d3ec795

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed

replica/events/config.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"replica": {
3+
"server_id": 12333,
4+
"connection_settings": {
5+
"host": "somehost",
6+
"port": 3306,
7+
"user": "user",
8+
"passwd": "password"
9+
},
10+
"only_schemas": [
11+
"content",
12+
"vote"
13+
],
14+
"only_tables": [
15+
"videos",
16+
"graph"
17+
],
18+
"blocking": true
19+
},
20+
"postgres": {
21+
"database": "your_database",
22+
"user": "your_username",
23+
"password": "your_password",
24+
"host": "your_host",
25+
"port": "your_port"
26+
}
27+
}

replica/events/events.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright (c) 2024 sixwaaaay.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
15+
import json
16+
import logging
17+
import os
18+
from contextlib import closing
19+
from datetime import datetime
20+
21+
import psycopg2
22+
from pymysqlreplication import BinLogStreamReader
23+
from pymysqlreplication.event import GtidEvent
24+
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
25+
26+
27+
def save_position(gtid: str):
28+
uid, x = gtid.split(":")
29+
x = int(x)
30+
try:
31+
with open('gtid.json', 'r') as f:
32+
data = json.load(f)
33+
a, b = map(int, data['gtid'].split(":")[1].split("-"))
34+
if x < a:
35+
a = x
36+
if x > b:
37+
b = x
38+
data['gtid'] = f"{uid}:{a}-{b}"
39+
except FileNotFoundError:
40+
data = {
41+
'gtid': f"{uid}:{1}-{x}"
42+
}
43+
with open('gtid.json', 'w') as f:
44+
json.dump(data, f)
45+
logging.info(f'Saved gtid: {data}')
46+
47+
48+
def load_position():
49+
try:
50+
with open('gtid.json', 'r') as f:
51+
data = json.load(f)
52+
return data['gtid']
53+
except FileNotFoundError:
54+
return None
55+
56+
57+
def load_conf():
58+
conf_path = os.environ.get("CONF_PATH", "config.json")
59+
with open(conf_path) as f:
60+
conf = json.load(f)
61+
return conf
62+
63+
64+
def events(stream: BinLogStreamReader):
65+
last_gtid: str | None = None
66+
for binlog_event in stream:
67+
if isinstance(binlog_event, GtidEvent):
68+
if last_gtid is not None:
69+
save_position(last_gtid)
70+
last_gtid = binlog_event.gtid
71+
continue
72+
for row in binlog_event.rows:
73+
if isinstance(binlog_event, DeleteRowsEvent):
74+
vals = row["values"]
75+
yield vals["id"], 1, vals.get("created_at", datetime.now())
76+
elif isinstance(binlog_event, UpdateRowsEvent):
77+
bf_view_count = row["before_values"].get("view_count", None)
78+
if bf_view_count is None:
79+
continue
80+
af_view_count = row["after_values"].get("view_count", None)
81+
if bf_view_count != af_view_count:
82+
yield row["after_values"]["id"], 3, row["after_values"].get("created_at", datetime.now())
83+
elif isinstance(binlog_event, WriteRowsEvent):
84+
vals = row["values"]
85+
yield vals["id"], 2, vals.get("created_at", datetime.now())
86+
87+
88+
if __name__ == "__main__":
89+
logging.basicConfig(level=logging.INFO)
90+
try:
91+
conf = load_conf()
92+
stream_reader = BinLogStreamReader(
93+
only_events=[GtidEvent, DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
94+
auto_position=load_position(),
95+
**conf["replica"],
96+
)
97+
with closing(psycopg2.connect(**conf["postgres"])) as conn, conn.cursor() as cur, closing(
98+
stream_reader) as streamer:
99+
for val in events(streamer):
100+
# Insert into the video_events table
101+
cur.execute("INSERT INTO video_events (video_id, event_type, event_time) VALUES (%s, %s, %s)", val)
102+
# Commit the transaction
103+
conn.commit()
104+
except KeyboardInterrupt:
105+
logging.info('KeyboardInterrupt exit')

0 commit comments

Comments
 (0)