Skip to content

Commit ddb77c4

Browse files
committed
feat: event replica
1 parent d277384 commit ddb77c4

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed

replica/events/config.json

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"replica": {
3+
"server_id": 12333,
4+
"connection_settings": {
5+
"host": "somehost",
6+
"port": 3306,
7+
"user": "user",
8+
"passwd": "password"
9+
},
10+
"only_schemas": [
11+
"content",
12+
"vote"
13+
],
14+
"only_tables": [
15+
"videos",
16+
"graph"
17+
],
18+
"blocking": true
19+
},
20+
"postgres": {
21+
"database": "your_database",
22+
"user": "your_username",
23+
"password": "your_password",
24+
"host": "your_host",
25+
"port": "your_port"
26+
}
27+
}

replica/events/events.py

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright (c) 2024 sixwaaaay.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
15+
import json
16+
import logging
17+
import os
18+
from contextlib import closing
19+
from datetime import datetime
20+
21+
import psycopg2
22+
from pymysqlreplication import BinLogStreamReader
23+
from pymysqlreplication.event import GtidEvent
24+
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
25+
26+
27+
def save_position(gtid: str):
28+
uid, x = gtid.split(":")
29+
x = int(x)
30+
try:
31+
with open('gtid.json', 'r') as f:
32+
data = json.load(f)
33+
a, b = map(int, data['gtid'].split(":")[1].split("-"))
34+
if x < a:
35+
a = x
36+
if x > b:
37+
b = x
38+
data['gtid'] = f"{uid}:{a}-{b}"
39+
except FileNotFoundError:
40+
data = {
41+
'gtid': f"{uid}:{1}-{x}"
42+
}
43+
with open('gtid.json', 'w') as f:
44+
json.dump(data, f)
45+
logging.info(f'Saved gtid: {data}')
46+
47+
48+
def load_position():
49+
try:
50+
with open('gtid.json', 'r') as f:
51+
data = json.load(f)
52+
return data['gtid']
53+
except FileNotFoundError:
54+
return None
55+
56+
57+
def load_conf():
58+
conf_path = os.environ.get("CONF_PATH", "config.json")
59+
with open(conf_path) as f:
60+
conf = json.load(f)
61+
return conf
62+
63+
64+
def events(stream: BinLogStreamReader):
65+
for binlog_event in stream:
66+
if isinstance(binlog_event, GtidEvent):
67+
save_position(binlog_event.gtid)
68+
continue
69+
for row in binlog_event.rows:
70+
if isinstance(binlog_event, DeleteRowsEvent):
71+
vals = row["values"]
72+
yield vals["id"], 1, vals.get("created_at", datetime.now())
73+
elif isinstance(binlog_event, UpdateRowsEvent):
74+
bf_view_count = row["before_values"].get("view_count", None)
75+
if bf_view_count is None:
76+
continue
77+
af_view_count = row["after_values"].get("view_count", None)
78+
if bf_view_count != af_view_count:
79+
yield row["after_values"]["id"], 3, row["after_values"].get("created_at", datetime.now())
80+
elif isinstance(binlog_event, WriteRowsEvent):
81+
vals = row["values"]
82+
yield vals["id"], 2, vals.get("created_at", datetime.now())
83+
84+
85+
if __name__ == "__main__":
86+
logging.basicConfig(level=logging.INFO)
87+
try:
88+
conf = load_conf()
89+
stream_reader = BinLogStreamReader(
90+
only_events=[GtidEvent, DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
91+
auto_position=load_position(),
92+
**conf["replica"],
93+
)
94+
conn = psycopg2.connect(**conf["postgres"])
95+
cur = conn.cursor()
96+
with closing(stream_reader) as streamer:
97+
for val in events(streamer):
98+
# Insert into the video_events table
99+
cur.execute("INSERT INTO video_events (video_id, event_type, event_time) VALUES (%s, %s, %s)", val)
100+
# Commit the transaction
101+
conn.commit()
102+
cur.close()
103+
conn.close()
104+
except KeyboardInterrupt:
105+
logging.info('KeyboardInterrupt exit')

0 commit comments

Comments
 (0)