Skip to content

Commit c50756d

Browse files
committed
More race safety in E2E tests, enable FT
1 parent a3cb873 commit c50756d

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

noxfile.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
@nox.session(python=ALL_PYTHONS)
2424
def tests(session: nox.Session) -> None:
2525
free_threading = session.python.endswith("t")
26-
if free_threading:
27-
session.skip("Skipping tests on free threading Python builds.")
2826
coverage_file = f".coverage.{sys.platform}.{session.python}"
2927
pytest_env = {
3028
"COVERAGE_FILE": coverage_file,

tests/test_fake_broker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ def test_fake_broker() -> None:
6868
decoder = IncrementalDecoder()
6969
with FakeBroker() as broker:
7070
assert broker.port > 0
71-
assert broker.sock is None
7271
assert len(broker.received) == 0
7372

7473
sock = socket.create_connection(("localhost", broker.port))
@@ -162,7 +161,6 @@ def test_fake_broker() -> None:
162161
def test_fake_broker_websocket() -> None:
163162
with FakeWebsocketBroker() as broker:
164163
assert broker.port > 0
165-
assert broker.sock is None
166164
assert len(broker.received) == 0
167165

168166
sock = socket.create_connection(("localhost", broker.port))

tests/test_z2z.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,23 @@ def get_client(broker: FakeBroker, scheme: str, db_path: str) -> Client:
5959
return client
6060

6161

62+
def wait_for_queue_item(queue: deque[MQTTPacket], timeout: float = 0.1) -> object:
63+
t0 = time.monotonic()
64+
while True:
65+
if queue:
66+
return queue.popleft()
67+
if time.monotonic() > t0 + timeout:
68+
raise TimeoutError("Timed out waiting for queue item")
69+
time.sleep(0.001)
70+
71+
6272
@pytest.mark.parametrize("scheme", ["mqtt", "ws"])
6373
@pytest.mark.parametrize("db_path", ["", ":memory:"])
6474
def test_z2z_happy_path(db_path: str, scheme: str) -> None:
6575
broker = FakeWebsocketBroker() if scheme == "ws" else FakeBroker()
6676
with broker:
6777
client = get_client(broker, scheme, db_path)
68-
delay: Final = 0.01 # seconds grace period
78+
delay: Final = 0.05 # seconds grace period
6979

7080
client_received: deque[MQTTPacket] = deque()
7181
def callback(client: Client, packet: MQTTPacket) -> None:
@@ -75,7 +85,7 @@ def callback(client: Client, packet: MQTTPacket) -> None:
7585
sub_handle = client.subscribe("test/topic", callback)
7686
assert sub_handle is not None
7787
sub_handle.wait_for_ack(timeout=0.5)
78-
assert broker.received.popleft() == MQTTSubscribePacket(
88+
assert wait_for_queue_item(broker.received) == MQTTSubscribePacket(
7989
topics=[("test/topic", 2)],
8090
packet_id=1,
8191
)
@@ -84,7 +94,7 @@ def callback(client: Client, packet: MQTTPacket) -> None:
8494
for _ in range(10):
8595
pub_handle = client.publish("test/topic", b"banana", qos=0)
8696
time.sleep(delay)
87-
assert broker.received.popleft() == client_received.popleft() == MQTTPublishPacket(
97+
assert wait_for_queue_item(broker.received) == wait_for_queue_item(client_received) == MQTTPublishPacket(
8898
topic="test/topic",
8999
payload=b"banana",
90100
)
@@ -94,21 +104,21 @@ def callback(client: Client, packet: MQTTPacket) -> None:
94104
pub_handle = client.publish("test/topic", b"coconut", qos=1)
95105
pub_handle.wait_for_ack(timeout=0.25)
96106
time.sleep(delay)
97-
broker_rec = broker.received.popleft()
98-
client_rec = client_received.popleft()
107+
broker_rec = wait_for_queue_item(broker.received)
108+
client_rec = wait_for_queue_item(client_received)
99109
assert isinstance(broker_rec, MQTTPublishPacket)
100110
assert isinstance(client_rec, MQTTPublishPacket)
101111
assert broker_rec.topic == client_rec.topic == "test/topic"
102112
assert broker_rec.payload == client_rec.payload == b"coconut"
103113
assert broker_rec.qos == client_rec.qos == MQTTQoS.Q1
104-
assert broker.received.popleft() == MQTTPubAckPacket(packet_id=broker_rec.packet_id)
114+
assert wait_for_queue_item(broker.received) == MQTTPubAckPacket(packet_id=broker_rec.packet_id)
105115

106116
# UNSUBSCRIBE
107117
unsub_handle = client.unsubscribe("test/topic")
108118
assert unsub_handle is not None
109119
unsub_handle.wait_for_ack(timeout=0.5)
110120
time.sleep(delay)
111-
assert broker.received.popleft() == MQTTUnsubscribePacket(
121+
assert wait_for_queue_item(broker.received) == MQTTUnsubscribePacket(
112122
topics=["test/topic"],
113123
packet_id=1,
114124
)
@@ -118,18 +128,18 @@ def callback(client: Client, packet: MQTTPacket) -> None:
118128
pub_handle = client.publish("test/topic", b"pineapple", qos=2)
119129
pub_handle.wait_for_ack(timeout=0.25)
120130
time.sleep(delay)
121-
broker_rec = broker.received.popleft()
131+
broker_rec = wait_for_queue_item(broker.received)
122132
assert isinstance(broker_rec, MQTTPublishPacket)
123133
assert broker_rec.topic == "test/topic"
124134
assert broker_rec.payload == b"pineapple"
125135
assert broker_rec.qos == MQTTQoS.Q2
126-
assert broker.received.popleft() == MQTTPubRelPacket(packet_id=broker_rec.packet_id)
136+
assert wait_for_queue_item(broker.received) == MQTTPubRelPacket(packet_id=broker_rec.packet_id)
127137

128138
# DISCONNECT
129139
client.disconnect()
130140
client.wait_for_disconnect(timeout=0.25)
131141
time.sleep(delay)
132-
assert broker.received.popleft() == MQTTDisconnectPacket()
142+
assert wait_for_queue_item(broker.received) == MQTTDisconnectPacket()
133143

134144
client.shutdown()
135145
client.wait_for_shutdown(timeout=0.25)

tests/util/fake_broker.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import socket
99
import socketserver
1010
import threading
11+
import time
1112
from typing import Callable, cast, Final
1213
import uuid
1314

@@ -55,6 +56,9 @@ def handle(self) -> None:
5556
except ClosedSocketError:
5657
pass
5758

59+
def handle_error(self, request: socket.socket, client_address: tuple[str, int]) -> None:
60+
logger.exception("Exception in handler for %s", client_address)
61+
5862
def _handle_packet(self, packet: MQTTPacket) -> None:
5963
outbound: list[MQTTPacket] = []
6064
logger.info("FakeBroker <--- %s", packet)
@@ -147,6 +151,8 @@ def handle(self) -> None:
147151
if not self.handshake_done:
148152
self.server.sock = self.request
149153
data = self.request.recv(0xffff)
154+
if not data:
155+
return # Connection closed
150156
lines = data.split(b"\r\n")
151157
head = lines[0]
152158
assert head.startswith(b"GET "), "Invalid websocket handshake method"
@@ -216,6 +222,15 @@ def __init__(self) -> None:
216222

217223
def __enter__(self) -> FakeBroker:
218224
self.start()
225+
ready = False
226+
t0 = time.monotonic()
227+
while not ready and time.monotonic() - t0 < 5.0:
228+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
229+
try:
230+
sock.connect(("localhost", self.port))
231+
ready = True
232+
except ConnectionRefusedError:
233+
time.sleep(0.1)
219234
return self
220235

221236
def __exit__(self, *args: object) -> None:

0 commit comments

Comments
 (0)