Skip to content

Commit 579277e

Browse files
authored
Merge pull request #585 from roy0424/feature/format-description-event
2 parents 78f2114 + 59202a2 commit 579277e

File tree

4 files changed

+75
-9
lines changed

4 files changed

+75
-9
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ def __init__(
287287
else:
288288
self.pymysql_wrapper = pymysql.connect
289289
self.mysql_version = (0, 0, 0)
290+
self.dbms = None
290291

291292
def close(self):
292293
if self.__connected_stream:
@@ -748,14 +749,12 @@ def _allowed_event_list(
748749
def __get_dbms(self):
749750
if not self.__connected_ctl:
750751
self.__connect_to_ctl()
751-
752-
cur = self._ctl_connection.cursor()
753-
cur.execute("SELECT VERSION();")
754-
755-
version_info = cur.fetchone().get("VERSION()", "")
756-
757-
if "MariaDB" in version_info:
752+
if self.dbms:
753+
return self.dbms
754+
if "MariaDB" in self._ctl_connection.get_server_info():
755+
self.dbms = "mariadb"
758756
return "mariadb"
757+
self.dbms = "mysql"
759758
return "mysql"
760759

761760
def __log_valid_parameters(self):

pymysqlreplication/event.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(
4343
self._processed = True
4444
self.complete = True
4545
self._verify_event()
46+
self.dbms = self._ctl_connection._get_dbms()
4647

4748
def _read_table_id(self):
4849
# Table ID is 6 byte
@@ -368,10 +369,26 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
368369
self.mysql_version_str = self.packet.read(50).rstrip(b"\0").decode()
369370
numbers = self.mysql_version_str.split("-")[0]
370371
self.mysql_version = tuple(map(int, numbers.split(".")))
372+
self.created = struct.unpack("<I", self.packet.read(4))[0]
373+
self.common_header_len = struct.unpack("<B", self.packet.read(1))[0]
374+
offset = (
375+
4 + 2 + 50 + 1
376+
) # created + binlog_version + mysql_version_str + common_header_len
377+
checksum_algorithm = 1
378+
checksum = 4
379+
n = event_size - offset - self.common_header_len - checksum_algorithm - checksum
380+
self.post_header_len = struct.unpack(f"<{n}B", self.packet.read(n))
381+
self.server_version_split = struct.unpack("<3B", self.packet.read(3))
382+
self.number_of_event_types = struct.unpack("<B", self.packet.read(1))[0]
371383

372384
def _dump(self):
373385
print(f"Binlog version: {self.binlog_version}")
374-
print(f"MySQL version: {self.mysql_version_str}")
386+
print(f"mysql version: {self.mysql_version_str}")
387+
print(f"Created: {self.created}")
388+
print(f"Common header length: {self.common_header_len}")
389+
print(f"Post header length: {self.post_header_len}")
390+
print(f"Server version split: {self.server_version_split}")
391+
print(f"Number of event types: {self.number_of_event_types}")
375392

376393

377394
class StopEvent(BinLogEvent):

pymysqlreplication/row_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
782782
self.column_count = self.packet.read_length_coded_binary()
783783

784784
self.columns = []
785-
self.dbms = self._ctl_connection._get_dbms()
785+
self.dbms = self.dbms or self._ctl_connection._get_dbms()
786786
# Read columns meta data
787787
column_types = bytearray(self.packet.read(self.column_count))
788788
self.packet.read_length_coded_binary()

pymysqlreplication/tests/test_basic.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,32 @@ def test_json_update(self):
664664
),
665665
self.assertEqual(event.rows[0]["after_values"]["setting"], {b"btn": True}),
666666

667+
def test_format_description_event(self):
668+
self.stream.close()
669+
self.stream = BinLogStreamReader(
670+
self.database,
671+
server_id=1024,
672+
blocking=False,
673+
only_events=[FormatDescriptionEvent],
674+
)
675+
676+
event = self.stream.fetchone()
677+
self.assertIsInstance(event, FormatDescriptionEvent)
678+
self.assertIsInstance(event.binlog_version, tuple)
679+
self.assertIsInstance(event.mysql_version_str, str)
680+
self.assertTrue(
681+
event.mysql_version_str.startswith("5.")
682+
or event.mysql_version_str.startswith("8.")
683+
) # Example check
684+
self.assertIsInstance(event.common_header_len, int)
685+
self.assertIsInstance(event.post_header_len, tuple)
686+
self.assertIsInstance(event.mysql_version, tuple)
687+
self.assertEqual(len(event.mysql_version), 3)
688+
self.assertEqual(event.dbms, "mysql")
689+
self.assertIsInstance(event.server_version_split, tuple)
690+
self.assertEqual(len(event.server_version_split), 3)
691+
self.assertIsInstance(event.number_of_event_types, int)
692+
667693

668694
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
669695
def setUp(self):
@@ -1485,6 +1511,30 @@ def test_gtid_list_event(self):
14851511
self.assertEqual(event.event_type, 163)
14861512
self.assertEqual(event.gtid_list[0].gtid, "0-1-15")
14871513

1514+
def test_format_description_event(self):
1515+
self.stream.close()
1516+
self.stream = BinLogStreamReader(
1517+
self.database,
1518+
server_id=1024,
1519+
blocking=False,
1520+
only_events=[FormatDescriptionEvent],
1521+
is_mariadb=True,
1522+
)
1523+
1524+
event = self.stream.fetchone()
1525+
self.assertIsInstance(event, FormatDescriptionEvent)
1526+
self.assertIsInstance(event.binlog_version, tuple)
1527+
self.assertIsInstance(event.mysql_version_str, str)
1528+
self.assertTrue(event.mysql_version_str.startswith("10."))
1529+
self.assertIsInstance(event.common_header_len, int)
1530+
self.assertIsInstance(event.post_header_len, tuple)
1531+
self.assertIsInstance(event.mysql_version, tuple)
1532+
self.assertEqual(len(event.mysql_version), 3)
1533+
self.assertEqual(event.dbms, "mariadb")
1534+
self.assertIsInstance(event.server_version_split, tuple)
1535+
self.assertEqual(len(event.server_version_split), 3)
1536+
self.assertIsInstance(event.number_of_event_types, int)
1537+
14881538

14891539
class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase):
14901540
def setUp(self):

0 commit comments

Comments
 (0)