Skip to content

Commit 78f9bf3

Browse files
authored
Merge pull request #584 from junha6316/feature-json-binlogevent
add json feature for binlogevent
2 parents 579277e + 2511ae6 commit 78f9bf3

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

pymysqlreplication/event.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pymysqlreplication.exceptions import StatusVariableMismatch
1010
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
1111
from typing import Union, Optional
12+
import json
1213

1314

1415
class BinLogEvent(object):
@@ -68,15 +69,30 @@ def _verify_event(self):
6869
self.packet.read_bytes -= 19 + self.event_size + 4
6970
self.packet.rewind(20)
7071

72+
@property
73+
def formatted_timestamp(self) -> str:
74+
return datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()
75+
7176
def dump(self):
7277
print(f"=== {self.__class__.__name__} ===")
73-
print(f"Date: {datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()}")
78+
print(f"Date: {self.formatted_timestamp}")
7479
print(f"Log position: {self.packet.log_pos}")
7580
print(f"Event size: {self.event_size}")
7681
print(f"Read bytes: {self.packet.read_bytes}")
7782
self._dump()
7883
print()
7984

85+
def to_dict(self) -> dict:
86+
return {
87+
"timestamp": self.formatted_timestamp,
88+
"log_pos": self.packet.log_pos,
89+
"event_size": self.event_size,
90+
"read_bytes": self.packet.read_bytes,
91+
}
92+
93+
def to_json(self) -> str:
94+
return json.dumps(self.to_dict())
95+
8096
def _dump(self):
8197
"""Core data dumped for the event"""
8298
pass
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from pymysqlreplication.tests.base import PyMySQLReplicationTestCase
2+
from pymysqlreplication import BinLogStreamReader
3+
import json
4+
5+
6+
class BinLogEventTestCase(PyMySQLReplicationTestCase):
7+
def setUp(self):
8+
super(BinLogEventTestCase, self).setUp()
9+
self.execute("SET SESSION binlog_rows_query_log_events=1")
10+
11+
def tearDown(self):
12+
self.execute("SET SESSION binlog_rows_query_log_events=0")
13+
super(BinLogEventTestCase, self).tearDown()
14+
15+
target_fields = ["timestamp", "log_pos", "event_size", "read_bytes"]
16+
17+
def test_to_dict(self):
18+
self.stream = BinLogStreamReader(self.database, server_id=1024)
19+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
20+
self.execute(query)
21+
self.execute("COMMIT")
22+
23+
event = self.stream.fetchone()
24+
25+
event_dict = event.to_dict()
26+
27+
self.assertEqual(set(event_dict.keys()), set(self.target_fields))
28+
self.assertEqual(event_dict["timestamp"], event.formatted_timestamp)
29+
self.assertEqual(event_dict["log_pos"], event.packet.log_pos)
30+
self.assertEqual(event_dict["read_bytes"], event.packet.read_bytes)
31+
self.assertEqual(event_dict["event_size"], event.event_size)
32+
33+
def test_to_json(self):
34+
self.stream = BinLogStreamReader(self.database, server_id=1024)
35+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
36+
self.execute(query)
37+
self.execute("COMMIT")
38+
39+
event = self.stream.fetchone()
40+
41+
assert event.to_json() == json.dumps(event.to_dict())

0 commit comments

Comments
 (0)