Skip to content

Commit fda6f03

Browse files
committed
ruff fix
1 parent ec743a8 commit fda6f03

File tree

1 file changed

+39
-41
lines changed

1 file changed

+39
-41
lines changed

src/reachy_mini_conversation_demo/mcp_server.py

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import json
1010
import logging
1111
import time
12-
from typing import Any, Dict, List, Optional
12+
from typing import Any
1313

1414
import cv2
15-
import mcp.types as types
1615
import numpy as np
16+
from mcp import types
1717
from mcp.server import NotificationOptions, Server
1818

1919
# MCP Server imports
@@ -43,8 +43,8 @@ def __init__(
4343
self,
4444
reachy_mini: ReachyMini,
4545
movement_manager: MovementManager,
46-
camera: Optional[cv2.VideoCapture] = None,
47-
vision_manager: Optional[VisionManager] = None,
46+
camera: cv2.VideoCapture | None = None,
47+
vision_manager: VisionManager | None = None,
4848
camera_retry_attempts: int = 5,
4949
camera_retry_delay_s: float = 0.10,
5050
vision_timeout_s: float = 8.0,
@@ -62,7 +62,7 @@ def __init__(
6262

6363
# Helper functions
6464
def _read_frame(
65-
cap: cv2.VideoCapture, attempts: int = 5, delay_s: float = 0.1
65+
cap: cv2.VideoCapture, attempts: int = 5, delay_s: float = 0.1,
6666
) -> np.ndarray:
6767
"""Read a frame from the camera with retries."""
6868
trials, frame, ret = 0, None, False
@@ -77,7 +77,7 @@ def _read_frame(
7777
return frame
7878

7979

80-
def _execute_motion(config: ReachyMCPConfig, target: Any) -> Dict[str, Any]:
80+
def _execute_motion(config: ReachyMCPConfig, target: Any) -> dict[str, Any]:
8181
"""Apply motion to reachy_mini and update movement_manager state."""
8282
movement_manager = config.movement_manager
8383
movement_manager.moving_start = time.monotonic()
@@ -111,7 +111,7 @@ def _setup_handlers(self):
111111
"""Set up MCP server handlers"""
112112

113113
@self.server.list_tools()
114-
async def handle_list_tools() -> List[Tool]:
114+
async def handle_list_tools() -> list[Tool]:
115115
"""List available tools"""
116116
return [
117117
Tool(
@@ -124,7 +124,7 @@ async def handle_list_tools() -> List[Tool]:
124124
"type": "string",
125125
"enum": ["left", "right", "up", "down", "front"],
126126
"description": "Direction to move the head",
127-
}
127+
},
128128
},
129129
"required": ["direction"],
130130
},
@@ -138,7 +138,7 @@ async def handle_list_tools() -> List[Tool]:
138138
"question": {
139139
"type": "string",
140140
"description": "Question to ask about the captured image",
141-
}
141+
},
142142
},
143143
"required": ["question"],
144144
},
@@ -147,26 +147,25 @@ async def handle_list_tools() -> List[Tool]:
147147

148148
@self.server.call_tool()
149149
async def handle_call_tool(
150-
name: str, arguments: dict
151-
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
150+
name: str, arguments: dict,
151+
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
152152
"""Handle tool calls"""
153153
if name == "move_head":
154154
return await self._handle_move_head(arguments)
155-
elif name == "camera":
155+
if name == "camera":
156156
return await self._handle_camera(arguments)
157-
else:
158-
raise ValueError(f"Unknown tool: {name}")
157+
raise ValueError(f"Unknown tool: {name}")
159158

160159
@self.server.list_resources()
161-
async def handle_list_resources() -> List[Resource]:
160+
async def handle_list_resources() -> list[Resource]:
162161
"""List available resources"""
163162
return [
164163
Resource(
165164
uri="reachy://status",
166165
name="Robot Status",
167166
description="Current status of the Reachy Mini robot",
168167
mimeType="application/json",
169-
)
168+
),
170169
]
171170

172171
@self.server.read_resource()
@@ -178,34 +177,33 @@ async def handle_read_resource(uri: str) -> str:
178177
"camera_available": self.config.camera is not None,
179178
"vision_available": self.config.vision_manager is not None,
180179
"current_head_pose": getattr(
181-
self.config.movement_manager, "current_head_pose", None
180+
self.config.movement_manager, "current_head_pose", None,
182181
),
183182
"is_moving": self._is_robot_moving(),
184183
}
185184
return json.dumps(status, indent=2)
186-
else:
187-
raise ValueError(f"Unknown resource: {uri}")
185+
raise ValueError(f"Unknown resource: {uri}")
188186

189187
def _is_robot_moving(self) -> bool:
190188
"""Check if robot is currently moving"""
191189
movement_manager = self.config.movement_manager
192190
if not hasattr(movement_manager, "moving_start") or not hasattr(
193-
movement_manager, "moving_for"
191+
movement_manager, "moving_for",
194192
):
195193
return False
196194

197195
elapsed = time.monotonic() - movement_manager.moving_start
198196
return elapsed < movement_manager.moving_for
199197

200-
async def _handle_move_head(self, arguments: dict) -> List[types.TextContent]:
198+
async def _handle_move_head(self, arguments: dict) -> list[types.TextContent]:
201199
"""Handle head movement tool call"""
202200
direction = arguments.get("direction")
203201
if not direction:
204202
return [
205203
types.TextContent(
206204
type="text",
207205
text=json.dumps({"error": "direction parameter is required"}),
208-
)
206+
),
209207
]
210208

211209
logger.info("MCP Tool call: move_head direction=%s", direction)
@@ -220,8 +218,8 @@ async def _handle_move_head(self, arguments: dict) -> List[types.TextContent]:
220218

221219
return [
222220
types.TextContent(
223-
type="text", text=json.dumps({"status": f"Now looking {direction}"})
224-
)
221+
type="text", text=json.dumps({"status": f"Now looking {direction}"}),
222+
),
225223
]
226224

227225
except Exception as e:
@@ -230,35 +228,35 @@ async def _handle_move_head(self, arguments: dict) -> List[types.TextContent]:
230228
types.TextContent(
231229
type="text",
232230
text=json.dumps(
233-
{"error": f"Head movement failed: {type(e).__name__}: {e}"}
231+
{"error": f"Head movement failed: {type(e).__name__}: {e}"},
234232
),
235-
)
233+
),
236234
]
237235

238-
async def _handle_camera(self, arguments: dict) -> List[types.TextContent]:
236+
async def _handle_camera(self, arguments: dict) -> list[types.TextContent]:
239237
"""Handle camera tool call"""
240238
question = arguments.get("question", "").strip()
241239
if not question:
242240
return [
243241
types.TextContent(
244242
type="text",
245243
text=json.dumps({"error": "question must be a non-empty string"}),
246-
)
244+
),
247245
]
248246

249247
if not self.config.camera:
250248
return [
251249
types.TextContent(
252-
type="text", text=json.dumps({"error": "Camera not available"})
253-
)
250+
type="text", text=json.dumps({"error": "Camera not available"}),
251+
),
254252
]
255253

256254
if not self.config.vision_manager:
257255
return [
258256
types.TextContent(
259257
type="text",
260258
text=json.dumps({"error": "Vision manager not available"}),
261-
)
259+
),
262260
]
263261

264262
logger.info("MCP Tool call: camera question=%s", question[:120])
@@ -274,7 +272,7 @@ async def _handle_camera(self, arguments: dict) -> List[types.TextContent]:
274272

275273
# Process with vision manager
276274
result = await asyncio.to_thread(
277-
self.config.vision_manager.processor.process_image, frame, question
275+
self.config.vision_manager.processor.process_image, frame, question,
278276
)
279277

280278
if isinstance(result, dict) and "error" in result:
@@ -287,10 +285,10 @@ async def _handle_camera(self, arguments: dict) -> List[types.TextContent]:
287285
{
288286
"image_description": result
289287
if isinstance(result, str)
290-
else str(result)
291-
}
288+
else str(result),
289+
},
292290
),
293-
)
291+
),
294292
]
295293

296294
except Exception as e:
@@ -299,9 +297,9 @@ async def _handle_camera(self, arguments: dict) -> List[types.TextContent]:
299297
types.TextContent(
300298
type="text",
301299
text=json.dumps(
302-
{"error": f"Camera capture failed: {type(e).__name__}: {e}"}
300+
{"error": f"Camera capture failed: {type(e).__name__}: {e}"},
303301
),
304-
)
302+
),
305303
]
306304

307305
async def run(self, transport_type: str = "stdio"):
@@ -330,8 +328,8 @@ async def run(self, transport_type: str = "stdio"):
330328
def create_reachy_mcp_server(
331329
reachy_mini: ReachyMini,
332330
movement_manager: MovementManager,
333-
camera: Optional[cv2.VideoCapture] = None,
334-
vision_manager: Optional[VisionManager] = None,
331+
camera: cv2.VideoCapture | None = None,
332+
vision_manager: VisionManager | None = None,
335333
**config_kwargs,
336334
) -> ReachyMCPServer:
337335
"""Factory function to create a configured Reachy MCP Server"""
@@ -380,7 +378,7 @@ async def async_main():
380378
help="Transport method (default: stdio)",
381379
)
382380
parser.add_argument(
383-
"--camera-index", type=int, default=0, help="Camera index (default: 0)"
381+
"--camera-index", type=int, default=0, help="Camera index (default: 0)",
384382
)
385383

386384
args = parser.parse_args()
@@ -439,7 +437,7 @@ async def async_main():
439437
# Initialize movement manager
440438
logger.info("Initializing movement manager...")
441439
movement_manager = MovementManager(
442-
current_robot=current_robot, head_tracker=head_tracker, camera=camera
440+
current_robot=current_robot, head_tracker=head_tracker, camera=camera,
443441
)
444442

445443
# Create MCP server

0 commit comments

Comments
 (0)