diff --git a/.gitignore b/.gitignore index 3446e37..7309605 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,11 @@ _build/* __pycache/ .vscode/c_cpp_properties.json .vscode/settings.json +.vscode/launch.json +.vscode/ +**/.DS_Store +BadgeFramework/rotation/rpi_sync.sh +.idea/ +*.log +notes.txt +**/matlab/local diff --git a/.vscode/launch.json b/.vscode/launch.json index bcbf2ab..a402ce9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,6 +1,30 @@ { "version": "0.2.0", "configurations": [ + { + "name": "Python: test.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/BadgeFramework/test.py", + "console": "integratedTerminal", + "args": "de:94:80:39:25:be" + }, + { + "name": "Python: badge_gui.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/BadgeFramework/badge_gui.py", + "console": "integratedTerminal", + // "args": "de:94:80:39:25:be" + }, + { + "name": "Python: badge_gui_bleak.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/BadgeFramework/badge_gui_bleak.py", + "console": "integratedTerminal", + // "args": "de:94:80:39:25:be" + }, { "name": "Cortex Debug", "cwd": "${workspaceFolder}", @@ -13,16 +37,20 @@ "configFiles": [ "interface/cmsis-dap.cfg", "target/nrf52.cfg" - ], - "openOCDLaunchCommands": ["adapter speed 2000"], + ], + "openOCDLaunchCommands": [ + "adapter speed 2000" + ], "interface": "swd", "armToolchainPath": "", "svdFile": "${workspaceRoot}/nrf52832.svd", - "preLaunchCommands":["set remotetimeout 60"], + "preLaunchCommands": [ + "set remotetimeout 60" + ], "rttConfig": { "enabled": true, "address": "auto", - "clearSearch": false, // OpenOCD users may have to un-comment this + "clearSearch": false, // OpenOCD users may have to un-comment this "decoders": [ { "port": 0, diff --git a/BadgeFramework/audio_parser_V0.py b/BadgeFramework/audio_parser_V0.py index 5cc6dd7..94164c5 100644 --- a/BadgeFramework/audio_parser_V0.py +++ b/BadgeFramework/audio_parser_V0.py @@ -51,5 +51,4 @@ def main(fn): parser = argparse.ArgumentParser(description='Parser for the audio data obtained from Mingle Midges') parser.add_argument('--fn', required=True,help='Please enter the path to the file') args = parser.parse_args() - main(fn=args.fn) - + main(fn=args.fn) \ No newline at end of file diff --git a/BadgeFramework/badge.py b/BadgeFramework/badge.py index 4a059f6..e0776f8 100644 --- a/BadgeFramework/badge.py +++ b/BadgeFramework/badge.py @@ -3,8 +3,12 @@ import logging import sys import struct -import Queue +import queue as Queue +import utils +from bleak import BleakClient, BLEDevice, BleakGATTCharacteristic +from typing import Union +CONNECTION_RETRY_TIMES = 15 DEFAULT_SCAN_WINDOW = 250 DEFAULT_SCAN_INTERVAL = 1000 @@ -18,6 +22,11 @@ logger = logging.getLogger(__name__) +# logging.basicConfig( +# level=logging.DEBUG, # or logging.INFO for less verbosity +# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" +# ) + # -- Helper methods used often in badge communication -- # We generally define timestamp_seconds to be in number of seconds since UTC epoch @@ -44,8 +53,8 @@ def timestamps_to_time(timestamp_seconds, timestamp_miliseconds): # The 'connection' should already be connected when it is used to initialize this class. # Implements methods that allow for interaction with that badge. class OpenBadge(object): - def __init__(self, connection): - self.connection = connection + def __init__(self, device: Union[BLEDevice, int], mac_address: str = None): + # self.connection = connection self.status_response_queue = Queue.Queue() self.start_microphone_response_queue = Queue.Queue() self.start_scan_response_queue = Queue.Queue() @@ -53,50 +62,90 @@ def __init__(self, connection): self.free_sdc_space_response_queue = Queue.Queue() self.sdc_errase_all_response_queue = Queue.Queue() self.get_imu_data_response_queue = Queue.Queue() + self.rx_queue = Queue.Queue() + + if isinstance(device, BLEDevice): + self.device_id = utils.get_device_id(device) + self.address = device.address + elif isinstance(device, int): + self.device_id = device + self.address = mac_address + else: + raise TypeError + self.client = BleakClient(self.address, disconnected_callback=self.badge_disconnected) + + async def __aenter__(self): + for _ in range(CONNECTION_RETRY_TIMES): + try: + await self.client.connect(timeout=1000) + await self.client.start_notify(utils.RX_CHAR_UUID, self.received_callback) + return self + except Exception as e: + print(e) + raise TimeoutError(f'Failed to connect to device after {CONNECTION_RETRY_TIMES} attempts.') + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.client.disconnect() + + @property + def is_connected(self) -> bool: + return self.client.is_connected + + def badge_disconnected(self, b: BleakClient) -> None: + """disconnection callback""" + print(f"Warning: disconnected badge") + + def received_callback(self, sender: BleakGATTCharacteristic, message: bytearray): + logger.debug("Recieved {}".format(message.hex())) + for b in message: + self.rx_queue.put(b) # Helper function to send a BadgeMessage `command_message` to a device, expecting a response # of class `response_type` that is a subclass of BadgeMessage, or None if no response is expected. - def send_command(self, command_message, response_type): - expected_response_length = response_type.length() if response_type else 0 - + async def send_command(self, command_message, response_type): serialized_command = command_message.serialize_message() logger.debug( "Sending: {}, Raw: {}".format( - command_message, serialized_command.encode("hex") + command_message, serialized_command.hex() ) ) - serialized_response = self.connection.send( - serialized_command, response_len=expected_response_length - ) - - if expected_response_length > 0: - response = response_type.deserialize_message(serialized_response) - logger.info("Recieved response {}".format(response)) - return response - else: - logger.info("No response expected, transmission successful.") - return True - - def send_request(self, request_message): + await self.client.write_gatt_char(utils.TX_CHAR_UUID, serialized_command, response=True) + + + async def send_request(self, request_message): serialized_request = request_message.encode() - # Adding length header: serialized_request_len = struct.pack(" 0: + while True: + while(not self.rx_queue.empty()): + rx_message.append(self.rx_queue.get()) + if(len(rx_message) == rx_bytes_expected): + return rx_message - self.connection.send(serialized_request, response_len=0) + response_rx = await self.client.read_gatt_char(utils.RX_CHAR_UUID) - def receive_response(self): - response_len = struct.unpack("", self.onMouseClick) # Handle mouse click @@ -97,21 +114,22 @@ def onMouseClick(self, event): self.new_window = NewWindow(self.badge, self.free_space, name=self.name, scan_info=self.scan_info, mic_info=self.mic_info) self.new_window.mainloop() - def start_imu(self): + async def start_imu(self): if self.badge_status.imu_status == 0: - self.badge.start_imu() + await self.badge.start_imu() else: print("IMU already started") - def stop_imu(self): - self.badge.stop_imu() + async def stop_imu(self): + await self.badge.stop_imu() - def start_microphone(self, t, mode): + async def start_microphone(self, t, mode): + print('starting microphone for badge: {}'.format(self.name)) if self.badge_status.microphone_status == 1: print("Microphone already started") return - start_mic_response = self.badge.start_microphone(mode) - + start_mic_response = await self.badge.start_microphone(mode) + if (start_mic_response.switch_pos == 2): switch_pos = "HIGH" elif (start_mic_response.switch_pos == 1): switch_pos = "LOW" else: switch_pos = "OFF" @@ -127,17 +145,17 @@ def start_microphone(self, t, mode): try: self.new_window.MicConfig.info = self.mic_info self.new_window.initUI() - except: - pass + except Exception as e: + print('error in updating microphone config: {}'.format(e)) - def stop_microphone(self): - self.badge.stop_microphone() + async def stop_microphone(self): + await self.badge.stop_microphone() - def start_scan(self): + async def start_scan(self): if self.badge_status.scan_status == 1: print("Scanner already started") return - start_scan_response = self.badge.start_scan() + start_scan_response = await self.badge.start_scan() self.scan_info = {'window': start_scan_response.window, 'interval': start_scan_response.interval} try: @@ -146,9 +164,48 @@ def start_scan(self): except: pass - def stop_scan(self): - self.badge.stop_scan() + async def stop_scan(self): + await self.badge.stop_scan() + async def async_init(self): + try: + if self.badge: + self.badge_status = await self.badge.get_status() + self.free_space = (await self.badge.get_free_sdc_space()).free_space + else: + # Use mock data if no badge + self.badge_status = MockStatusResponse() + self.free_space = 0 + except Exception as e: + print(f"Error in async_init: {e}") + # Keep using mock data on error + self.badge_status = MockStatusResponse() + self.free_space = 0 + finally: + # Always update UI + self.update_ui_with_status() + + def update_ui_with_status(self): + """Update all UI elements with actual values from badge_status""" + try: + self.battery.config(text='Battery: {}%'.format(self.badge_status.battery_level)) + self.statusCircle.update_status(self.badge_status.clock_status) + self.IMUCircle.update_status(self.badge_status.imu_status) + self.MicCircle.update_status(self.badge_status.microphone_status) + self.ScanCircle.update_status(getattr(self.badge_status, 'scan_status', False)) + except Exception as e: + print(f"Error updating UI: {e}") + + def button_click_handler(self, async_func, *args): + """Wrapper to run async functions from button clicks""" + try: + if not self.badge: + print("No device connected") + return + self.loop.create_task(async_func(*args)) + except Exception as e: + print(f"Error in button handler: {e}") + class MatplolibFrame(tk.Frame): def __init__(self, parent, time, x, y, z): tk.Frame.__init__(self, parent, relief=tk.RIDGE) @@ -207,7 +264,7 @@ def __init__(self, parent, imu_status): def initUI(self): info = ["acc_fsr (g): 16", "gyr_fsr (dps): 2000", "datarate: 50"] - title = tk.Label(self.top_frame, text='IMU Data', font=tkFont.Font(size=10, weight="bold")) + title = tk.Label(self.top_frame, text='IMU Data', font=tkinter.font.Font(size=10, weight="bold")) title.grid(row=0, column=0, columnspan=2, sticky='w') # Create information header for i, text in enumerate(info): @@ -263,7 +320,7 @@ def initUI(self): top_frame.grid(row=0, column=0, sticky='nsew') table.grid(row=1, column=0, sticky='nsew') - title = tk.Label(top_frame, text='Microphones Data', font=tkFont.Font(size=10, weight="bold")) + title = tk.Label(top_frame, text='Microphones Data', font=tkinter.font.Font(size=10, weight="bold")) title.grid(row=0, column=0, columnspan=2, sticky='w') # Loop over the dictionary and create labels for i, (key, value) in enumerate(self.info.items()): @@ -296,7 +353,7 @@ def initUI(self): top_frame.grid(row=0, column=0, sticky='nsew') table.grid(row=1, column=0, sticky='nsew') - title = tk.Label(top_frame, text='Scanner Data', font=tkFont.Font(size=10, weight="bold")) + title = tk.Label(top_frame, text='Scanner Data', font=tkinter.font.Font(size=10, weight="bold")) title.grid(row=0, column=0, columnspan=2, sticky='w') # Loop over the dictionary and create labels for i, (key, value) in enumerate(self.info.items()): @@ -304,7 +361,7 @@ def initUI(self): label = tk.Label(top_frame, text=label_text) label.grid(row=i+1, column=0, columnspan=2, sticky='w') - table_title = tk.Label(table, text='RSSI', font=tkFont.Font( weight="bold"), relief=tk.RIDGE, width=45) + table_title = tk.Label(table, text='RSSI', font=tkinter.font.Font( weight="bold"), relief=tk.RIDGE, width=45) table_title.grid(row=0, column=0, sticky="nsew") for i, e in enumerate(self.data): label = tk.Label(table, text=str(e), relief=tk.RIDGE, width=45) @@ -333,6 +390,37 @@ def __repr__(self): return str(self.queue) +@dataclass +class MockStatusResponse: + battery_level: int = 0 + clock_status: bool = False + imu_status: bool = False + microphone_status: bool = False + scan_status: bool = False + pdm_data: list = None + scan_data: list = None + + def __post_init__(self): + if self.pdm_data is None: + self.pdm_data = [] + if self.scan_data is None: + self.scan_data = [] + +@dataclass +class MockIMUResponse: + gyr_x: float = 0.0 + gyr_y: float = 0.0 + gyr_z: float = 0.0 + rot_x: float = 0.0 + rot_y: float = 0.0 + rot_z: float = 0.0 + acc_x: float = 0.0 + acc_y: float = 0.0 + acc_z: float = 0.0 + mag_x: float = 0.0 + mag_y: float = 0.0 + mag_z: float = 0.0 + class NewWindow(tk.Toplevel): def __init__(self, badge, free_space, name, scan_info, mic_info): GET_STATUS_MAIN = False @@ -343,16 +431,69 @@ def __init__(self, badge, free_space, name, scan_info, mic_info): self.badge = badge self.mic_data = LimitedQueue(10) self.scan_data = LimitedQueue(7) - self.status = self.badge.get_status() + self.status = MockStatusResponse() # Initialize with mock status self.mic_data.enqueue(self.status.pdm_data) self.scan_data.enqueue(self.status.scan_data) self.free_space = free_space - self.imu_status = self.badge.get_imu_data() + self.imu_status = MockIMUResponse() # Initialize with mock IMU status self.time = 0 self.scan_info = scan_info self.mic_info = mic_info self.initUI() - self.after(1000, self.update_data) + self.loop = asyncio.get_event_loop() + self.loop.create_task(self.async_init()) + self.schedule_update() + + def schedule_update(self): + """Schedule the next update""" + self.loop.create_task(self.update_data()) + self.after(5500, self.schedule_update) + + async def update_data(self): + self.time = self.time + 1 + try: + status = await self.badge.get_status() + imu_status = await self.badge.get_imu_data() + if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): + self.battery.config(text="Available memory: {} MB".format((await self.badge.get_free_sdc_space()).free_space)) + except: + print('Error with request') + try: + await self.badge.connect() + status = await self.badge.get_status() + imu_status = await self.badge.get_imu_data() + if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): + self.battery.config(text="Available memory: {} MB".format((await self.badge.get_free_sdc_space()).free_space)) + except: + print('Failed to reconnect') + return + + if status.microphone_status == 0: + self.MicConfig.data = [] + self.MicConfig.initUI() + else: + pdm_data = status.pdm_data + # update mic + self.mic_data.enqueue(pdm_data) + self.MicConfig.data = self.mic_data.queue + self.MicConfig.initUI() + if status.scan_status == 0: + self.ScanConfig.data = [] + self.ScanConfig.initUI() + else: + scan_data = status.scan_data + # update scan + self.scan_data.enqueue(scan_data) + self.ScanConfig.data = self.scan_data.queue + self.ScanConfig.initUI() + #update imu + self.IMUConfig.imu_status = imu_status + self.IMUConfig.time.enqueue(self.time) + self.IMUConfig.initUI() + # update circles + self.MicCircle.update_status(status.microphone_status) + self.IMUCircle.update_status(status.imu_status) + self.ScanCircle.update_status(status.scan_status) def initUI(self): # Create the layout frames @@ -366,11 +507,11 @@ def initUI(self): right_down.grid(row=1, column=1, sticky='nsew') # header - self.badgeId = Label(left_top, text="Badge Id: {}".format(self.name), font=tkFont.Font(size=10, )) + self.badgeId = Label(left_top, text="Badge Id: {}".format(self.name), font=tkinter.font.Font(size=10, )) self.badgeId.grid(row=0, column=0, columnspan=2, sticky='w') - self.midge = Label(left_top, text="Battery: {}%".format(self.status.battery_level), font=tkFont.Font(size=10)) + self.midge = Label(left_top, text="Battery: {}%".format(self.status.battery_level), font=tkinter.font.Font(size=10)) self.midge.grid(row=1, column=0, columnspan=2, sticky='w') - self.battery = Label(left_top, text="Available memory: {}MB".format(self.free_space), font=tkFont.Font(size=10)) + self.battery = Label(left_top, text="Available memory: {}MB".format(self.free_space), font=tkinter.font.Font(size=10)) self.battery.grid(row=2, column=0, columnspan=2, sticky='w') self.reset_battery = Button(left_top, text='Erase', command=self.badge.sdc_errase_all) self.reset_battery.grid(row=2, column=1, columnspan=2, sticky='w') @@ -399,63 +540,48 @@ def initUI(self): self.MicConfig = MicConfig(right_top, data=self.mic_data.queue, info=self.mic_info) self.MicConfig.grid(row=4, column=2, columnspan=2, padx=10, pady=5) - def update_data(self): - self.time = self.time + 1 + async def async_init(self): try: - status = self.badge.get_status() - time.sleep(0.1) - imu_status = self.badge.get_imu_data() - time.sleep(0.1) - if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): - self.battery.text="Available memory: {} MB".format(self.badge.get_free_sdc_space().free_space) - except: - print('Error with request') - self.badge.connect() - status = self.badge.get_status() - time.sleep(0.1) - imu_status = self.badge.get_imu_data() - time.sleep(0.1) - if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): - self.battery.text="Available memory: {} MB".format(self.badge.get_free_sdc_space().free_space) - - if status.microphone_status == 0: - self.MicConfig.data = [] - self.MicConfig.initUI() - else: - pdm_data = status.pdm_data - # update mic - self.mic_data.enqueue(pdm_data) - self.MicConfig.data = self.mic_data.queue - self.MicConfig.initUI() - if status.scan_status == 0: - self.ScanConfig.data = [] - self.ScanConfig.initUI() - else: - scan_data = status.scan_data - # update scan - self.scan_data.enqueue(scan_data) - self.ScanConfig.data = self.scan_data.queue - self.ScanConfig.initUI() - #update imu - self.IMUConfig.imu_status = imu_status - self.IMUConfig.time.enqueue(self.time) - self.IMUConfig.initUI() - # update circles - self.MicCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.microphone_status)) - self.IMUCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.imu_status)) - self.ScanCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.scan_status)) - - self.after(5500, self.update_data) + if self.badge: + self.status = await self.badge.get_status() + self.free_space = (await self.badge.get_free_sdc_space()).free_space + self.imu_status = await self.badge.get_imu_data() + print(self.status) + else: + # Use mock data if no badge + print('badge detected, using mock data') + self.status = MockStatusResponse() + self.free_space = 0 + self.imu_status = MockIMUResponse() + except Exception as e: + print(f"Error in async_init: {e}") + # Keep using mock data on error + self.status = MockStatusResponse() + self.free_space = 0 + self.imu_status = MockIMUResponse() + finally: + # Always update UI + self.update_ui_with_status() + + def update_ui_with_status(self): + """Update all UI elements with actual values from status""" + try: + self.battery.config(text='Battery: {}%'.format(self.status.battery_level)) + self.IMUCircle.update_status(self.status.imu_status) + self.MicCircle.update_status(self.status.microphone_status) + self.ScanCircle.update_status(self.status.scan_status) + except Exception as e: + print(f"Error updating UI: {e}") def get_badges(): badges = [] - with open('sample_mapping_file.csv', 'r') as f: + with open('BadgeFramework/mappings2.csv', 'r') as f: reader = csv.reader(f) next(reader) for row in reader: - badge = BadgeInterface(row[1]) + badge = BadgeInterface(row[2]) badge.connect() - badges.append({'badge': badge, 'name': row[0], 'address': row[1]}) + badges.append({'badge': badge, 'name': row[1], 'address': row[2]}) return badges class MainApp(tk.Tk): @@ -463,38 +589,54 @@ def __init__(self): tk.Tk.__init__(self) self.title("Lista de dispositivos") self.geometry("1100x700") - self.container_frame = ttk.Frame(self) + self.container_frame = tkinter.ttk.Frame(self) self.container_frame.pack(expand=True, fill="both") self.custom_components = [] self.badges = get_badges() + # Set up asyncio loop + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + for badge in self.badges: - custom_component = CustomComponent(self.container_frame, name=badge['name'], badge=badge['badge'], address=badge['address']) + custom_component = CustomComponent(self.container_frame, name=badge['name'], badge=badge['badge'], address=badge['address'], gui_instance=self) custom_component.pack() # Draw a border between custom components border = tk.Frame(self.container_frame, bg="gray", height=1) border.pack(fill="x") self.custom_components.append(custom_component) - self.after(1000, self.update_data) + # Schedule asyncio loop to run regularly + self.after(100, self.run_asyncio_loop) + self.schedule_update() + + def schedule_update(self): + """Schedule the next update""" + self.loop.create_task(self.update_data()) + self.after(1000, self.schedule_update) - def update_data(self): + async def update_data(self): for badge, custom_component in zip(self.badges, self.custom_components): - try: - badge_status = badge['badge'].get_status() - except: - print('Error with request') - try: - badge['badge'].connect() - except: - badge['badge'].connect() - badge_status = badge['badge'].get_status() + # TODO: fix update data + badge_status = custom_component.badge_status + # try: + # badge_status = await badge['badge'].get_status() + # except: + # print('Error with request') + # try: + # await badge['badge'].connect() + # badge_status = await badge['badge'].get_status() + # except: + # print('Failed to reconnect') + # continue + custom_component.badge_status = badge_status custom_component.battery['text'] = 'Battery: {}%'.format(badge_status.battery_level) - custom_component.statusCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.clock_status)) - custom_component.MicCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.microphone_status)) - custom_component.IMUCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.imu_status)) - custom_component.ScanCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.scan_status)) + custom_component.statusCircle.update_status(badge_status.clock_status) + custom_component.MicCircle.update_status(badge_status.microphone_status) + custom_component.IMUCircle.update_status(badge_status.imu_status) + custom_component.ScanCircle.update_status(badge_status.scan_status) + if (badge_status.microphone_status == 0): custom_component.MicStartMonoButton['state'] = 'normal' custom_component.MicStartStereoButton['state'] = 'normal' @@ -503,23 +645,28 @@ def update_data(self): custom_component.MicStartMonoButton['state'] = 'disabled' custom_component.MicStartStereoButton['state'] = 'disabled' custom_component.MicStopButton['state'] = 'normal' + if (badge_status.imu_status == 0): custom_component.IMUStartButton['state'] = 'normal' custom_component.IMUStopButton['state'] = 'disabled' else: custom_component.IMUStartButton['state'] = 'disabled' custom_component.IMUStopButton['state'] = 'normal' + if (badge_status.scan_status == 0): custom_component.ScanStartButton['state'] = 'normal' custom_component.ScanStopButton['state'] = 'disabled' else: custom_component.ScanStartButton['state'] = 'disabled' custom_component.ScanStopButton['state'] = 'normal' - - if (GET_STATUS_MAIN == True): - print('updating main') - self.after(1000, self.update_data) + + def run_asyncio_loop(self): + """Run a single iteration of the asyncio loop""" + self.loop.call_soon(self.loop.stop) + self.loop.run_forever() + self.after(100, self.run_asyncio_loop) if __name__ == '__main__': + print('starting app') app = MainApp() app.mainloop() diff --git a/BadgeFramework/badge_gui_bleak.py b/BadgeFramework/badge_gui_bleak.py new file mode 100644 index 0000000..173002f --- /dev/null +++ b/BadgeFramework/badge_gui_bleak.py @@ -0,0 +1,308 @@ +import tkinter as tk +from tkinter import ttk +import asyncio +import pandas as pd +from badge import OpenBadge +from datetime import datetime +import sys +from typing import Optional, Dict, Tuple + + +SENSOR_ALL = 0 +SENSOR_MICROPHONE = 1 +SENSOR_IMU = 2 +SENSOR_SCAN = 3 +SENSOR_CHECK_STATUS = 100 +SENSOR_TIMEOUT = 15 + +SENSOR_START = True +SENSOR_STOP = False +CHECK_SYNC = True +CHECK_NO_SYNC = False + +BADGES_ALL = -1000 +USE_ALL = False # True to use all badges in .csv file and False to use only marked badges + +sensors = ['All', 'Mic', 'IMU', 'Scan'] +indicators_long = ['clock', 'microphone', 'imu', 'scan'] +indicators_short = ['Clock', 'Mic', 'IMU', 'Scan'] +sensor_num = len(indicators_short) + + +class RedirectText: + """Class to redirect stdout to a tkinter Text widget.""" + + def __init__(self, text_widget: tk.Text): + self.text_widget = text_widget + self.text_widget.config(state=tk.NORMAL) + + def write(self, string: str): + """Redirects the text to the Text widget.""" + self.text_widget.insert(tk.END, string) + self.text_widget.see(tk.END) # Auto-scroll to the bottom + self.text_widget.update_idletasks() + + def flush(self): + """Handle flush.""" + pass # Needed for compatibility with the print function + + +class BadgeMonitorApp(tk.Tk): + def __init__(self, ): + super().__init__() + + self.title("Badge Status Monitor") + self.badges = pd.read_csv('BadgeFramework/mappings2.csv') + + self.main_frame = ttk.Frame(self) + self.main_frame.grid(row=0, column=0, sticky="nsew") # Use grid for main_frame + + # Make the window resizable in both directions + self.grid_rowconfigure(0, weight=1) + self.grid_columnconfigure(0, weight=1) + + self.canvas = tk.Canvas(self.main_frame, borderwidth=0) + self.frame = ttk.Frame(self.canvas) + self.scrollbar = ttk.Scrollbar(self.main_frame, orient="vertical", command=self.canvas.yview) + self.canvas.configure(yscrollcommand=self.scrollbar.set) + self.scrollbar.grid(row=0, column=0, sticky="ns") + self.canvas.grid(row=0, column=1, sticky="nsew") + + self.bind_mouse_scroll(self.canvas) + + # Make the badge section expand vertically + self.main_frame.grid_rowconfigure(0, weight=1) + self.canvas_frame = self.canvas.create_window((0, 0), window=self.frame, anchor="nw") + self.frame.bind("", self.on_frame_configure) + + self.terminal_frame = ttk.Frame(self.main_frame) + self.terminal_frame.grid(row=0, column=2, sticky="nsew") + self.terminal_text = tk.Text(self.terminal_frame, wrap="word", state="normal", width=40, height=20) + self.terminal_text.pack(fill="both", expand=True) + + self.stdout_redirector = RedirectText(self.terminal_text) + sys.stdout = self.stdout_redirector + + # Set column weights to control space allocation + self.main_frame.grid_columnconfigure(1, weight=3) # Badge area + self.main_frame.grid_columnconfigure(2, weight=1) # Terminal area + + self.timestamp_labels = {} + self.sensor_lights = {} + self.timestamps = {} + self.update_tasks = {} + + badge_label = ttk.Label(self.frame, text=f"ALL BADGES") + badge_label.grid(row=0, column=0, padx=80, pady=5) + check_button = ttk.Button(self.frame, text="Check Status", + command=lambda b=BADGES_ALL, s=SENSOR_CHECK_STATUS, + m=CHECK_NO_SYNC: self.schedule_async_task(b, s, m)) + check_button.grid(row=0, column=1, padx=20, pady=5) + for s_idx, sensor in enumerate(sensors): + s_start_button = ttk.Button(self.frame, text=sensor + " Start", command= + lambda b=BADGES_ALL, s=s_idx, m=SENSOR_START: self.schedule_async_task(b, s, m)) + s_start_button.grid(row=0, column=2 + 2 * s_idx, padx=5, pady=5) + + s_stop_button = ttk.Button(self.frame, text=sensor + " Stop", command= + lambda b=BADGES_ALL, s=s_idx, m=SENSOR_STOP: self.schedule_async_task(b, s, m)) + s_stop_button.grid(row=0, column=3 + 2 * s_idx, padx=5, pady=5) + + sync_button = ttk.Button(self.frame, text="Sync", + command=lambda b=BADGES_ALL, s=SENSOR_CHECK_STATUS, m=CHECK_SYNC: + self.schedule_async_task(b, s, m)) + sync_button.grid(row=0, column=10, padx=5, pady=5) + + # Create rows for each badge + badges_list = list(range(1, len(self.badges) + 5)) + for idx, badge in enumerate(badges_list, start=1): + row_button, row_status = idx * 2 + 1, idx * 2 + 2 + badge_label = ttk.Label(self.frame, text=f"Badge {badge}") + badge_label.grid(row=row_button, column=0, padx=80, pady=5) + + check_button = ttk.Button(self.frame, text="Check", + command=lambda b=badge, s=SENSOR_CHECK_STATUS, m=CHECK_NO_SYNC: + self.schedule_async_task(b, s, m)) + check_button.grid(row=row_button, column=1, padx=20, pady=5) + + for s_idx, sensor in enumerate(sensors): + s_start_button = ttk.Button(self.frame, text=sensor+" Start", command= + lambda b=badge, s=s_idx, m=SENSOR_START: self.schedule_async_task(b, s, m)) + s_start_button.grid(row=row_button, column=2+2*s_idx, padx=5, pady=5) + + s_stop_button = ttk.Button(self.frame, text=sensor+" Stop", command= + lambda b=badge, s=s_idx, m=SENSOR_STOP: self.schedule_async_task(b, s, m)) + s_stop_button.grid(row=row_button, column=3+2*s_idx, padx=5, pady=5) + + sync_button = ttk.Button(self.frame, text="Sync", + command=lambda b=badge, s=SENSOR_CHECK_STATUS, m=CHECK_SYNC: + self.schedule_async_task(b, s, m)) + sync_button.grid(row=row_button, column=10, padx=5, pady=5) + + sensor_light_canvases = [] + for s_idx, sensor in enumerate(indicators_short, start=1): + sensor_light_canvas = tk.Canvas(self.frame, width=20, height=20) + sensor_light_canvas.grid(row=row_status, column=1+2*s_idx, padx=5, pady=5) + sensor_light = sensor_light_canvas.create_oval(5, 5, 15, 15, fill="red") + sensor_light_canvases.append((sensor_light_canvas, sensor_light)) + + sensor_label = ttk.Label(self.frame, text=sensor + ' status:') + sensor_label.grid(row=row_status, column=2*s_idx, padx=5, pady=5) + + timestamp_label = ttk.Label(self.frame, text="Last updated: N/A") + timestamp_label.grid(row=row_status, column=0, padx=5, pady=5) + + elapsed_time_label = ttk.Label(self.frame, text="Elapsed: N/A") + elapsed_time_label.grid(row=row_status, column=1, padx=5, pady=5) + + self.timestamp_labels[badge] = (timestamp_label, elapsed_time_label) + self.sensor_lights[badge] = sensor_light_canvases + + def bind_mouse_scroll(self, widget: tk.Canvas): + """Bind the mouse scroll event to the canvas.""" + # Windows OS uses "", others use "" and "" + widget.bind_all("", self.on_mouse_wheel) + widget.bind_all("", self.on_mouse_wheel) + widget.bind_all("", self.on_mouse_wheel) + + def on_mouse_wheel(self, event: tk.Event): + """Handle the mouse scroll event.""" + if event.num == 4 or event.delta > 0: # Scroll up + self.canvas.yview_scroll(-1, "units") + elif event.num == 5 or event.delta < 0: # Scroll down + self.canvas.yview_scroll(1, "units") + + def on_frame_configure(self, event: tk.Event): + """Reset the scroll region to encompass the inner frame.""" + self.canvas.configure(scrollregion=self.canvas.bbox("all")) + + def schedule_async_task(self, badge_id: int, sensor_idx: int, mode: bool): + if badge_id == BADGES_ALL: + asyncio.create_task(self.async_task_all_badges(sensor_idx, mode, use_all=USE_ALL)) + else: + asyncio.create_task(self.async_task_sensors(badge_id, sensor_idx, mode)) + + async def async_task_all_badges(self, sensor_idx: int, mode: bool, use_all: bool): + for row_id in self.badges.index: + badge_id, use_flag = int(self.badges['Participant Id'][row_id]), bool(self.badges['Use'][row_id]) + if use_flag or use_all: + await self.async_task_sensors(badge_id=badge_id, sensor_idx=sensor_idx, mode=mode) + + async def async_task_sensors(self, badge_id: int, sensor_idx: int, mode: bool): + if sensor_idx == SENSOR_MICROPHONE: + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='microphone') + elif sensor_idx == SENSOR_IMU: + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='imu') + elif sensor_idx == SENSOR_SCAN: + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='scan') + elif sensor_idx == SENSOR_ALL: + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='microphone') + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='imu') + await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='scan') + + if sensor_idx == SENSOR_CHECK_STATUS: + await self.async_check_status(badge_id, mode=mode) + else: + await self.async_check_status(badge_id, mode=CHECK_NO_SYNC) + + async def async_check_status(self, badge_id: int, mode: bool=CHECK_NO_SYNC): + # Call the async function to check the status + statuses, timestamp = await self.async_sensor(badge_id=badge_id, mode=mode, sensor_name='status') + if statuses is None: + return + sensor_statuses = [getattr(statuses, s + '_status') for s in indicators_long] + + timestamp_label, elapsed_time_label = self.timestamp_labels[badge_id] + + sensor_light_canvases = self.sensor_lights[badge_id] + for sensor_idx, (sensor_light_canvas, sensor_light) in enumerate(sensor_light_canvases): + sensor_color = "green" if sensor_statuses[sensor_idx] == 1 else "red" + sensor_light_canvas.itemconfig(sensor_light, fill=sensor_color) + + formatted_time = timestamp.strftime("%Y-%m-%d %H:%M:%S") + timestamp_label.config(text=f"Last updated: {formatted_time}") + + self.timestamps[badge_id] = timestamp + + if badge_id in self.update_tasks: + self.update_tasks[badge_id].cancel() + + task = asyncio.create_task(self.update_elapsed_time(elapsed_time_label, timestamp)) + self.update_tasks[badge_id] = task + + @staticmethod + def get_operation_name(mode_name: str, sensor_name: str) -> str: + if sensor_name in indicators_long: + return f'{mode_name}_{sensor_name}' + elif sensor_name == 'status': + return 'get_status' + elif sensor_name == 'sdc_space': + return 'get_free_sdc_space' + else: + raise ValueError + + async def async_sensor(self, badge_id: int, sensor_name: str, mode: bool) -> Tuple[Optional[Dict], datetime]: + mode_name = 'start' if mode == SENSOR_START else 'stop' + op_name = self.get_operation_name(mode_name, sensor_name) + badge_op_desc = f'Badge {badge_id} {op_name}' + print(f"Executing: {badge_op_desc}...") + + try: + response = await asyncio.wait_for(self.async_sensor_operation(badge_id, op_name, mode), + timeout=SENSOR_TIMEOUT) + print(f'Info: {badge_op_desc} successfully.') + except asyncio.TimeoutError: + print(f"Warning: {badge_op_desc} has timed out! (>{SENSOR_TIMEOUT}s)") + response = None + except Exception as e: + print(e) + response = None + + timestamp = datetime.now() + return response, timestamp + + async def async_sensor_operation(self, badge_id: int, op_name: str, mode: int) -> Dict: + badge_addr = self.get_badge_address(badge_id) + async with OpenBadge(badge_id, badge_addr) as open_badge: + sensor_operation = getattr(open_badge, op_name) + if op_name == 'get_status' and mode == CHECK_SYNC: + return_message = await sensor_operation(t=datetime.now().timestamp()) + else: + return_message = await sensor_operation() + return return_message + + def get_badge_address(self, badge_id: int) -> str: + badge = self.badges[self.badges['Participant Id'] == badge_id] + address = badge['Mac Address'].values[0] + return address + + @staticmethod + async def update_elapsed_time(elapsed_time_label: ttk.Label, last_update_time: datetime): + """Continuously update the elapsed time since the last status update.""" + try: + while True: + await asyncio.sleep(1) # Update every second + now = datetime.now() + elapsed_seconds = (now - last_update_time).total_seconds() + elapsed_time_label.config(text=f"Elapsed: {int(elapsed_seconds)}s") + except asyncio.CancelledError: + pass # Task was cancelled + + def run(self): + self.mainloop() + + +# Main function that starts the tkinter app and runs the event loop +def run_tkinter_async(): + app = BadgeMonitorApp() + + async def main_loop(): + while True: + await asyncio.sleep(0.01) # Non-blocking sleep to allow tkinter to update + app.update_idletasks() + app.update() + + asyncio.run(main_loop()) + + +if __name__ == "__main__": + run_tkinter_async() \ No newline at end of file diff --git a/BadgeFramework/badge_gui_old.py b/BadgeFramework/badge_gui_old.py new file mode 100644 index 0000000..48bd348 --- /dev/null +++ b/BadgeFramework/badge_gui_old.py @@ -0,0 +1,525 @@ +import Tkinter as tk +import ttk +from Tkinter import Label, Button, Canvas +import tkFont +import random +import time +import csv +from badge_interface import * +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +from matplotlib.figure import Figure + +DEFAULT_RADIUS = 20 +GET_STATUS_MAIN = True + +def calculate_color(status): + if status == 1: + return "green" + else: + return "red" + +class Circle(Canvas): + def __init__(self, parent, radius=DEFAULT_RADIUS, status=0): + Canvas.__init__(self, parent, width=2 * radius, height=2 * radius, bd=0, highlightthickness=0) + self.radius = radius + self.status = status + self.color = calculate_color(self.status) + self.create_oval(0, 0, 2 * radius, 2 * radius, fill=self.color) # Draw a circular shape + +class CustomComponent(tk.Frame): + def __init__(self, parent, name, badge, address): + tk.Frame.__init__(self, parent, pady=10) + self.name = name + self.badge = badge + self.address = address + self.badge_status = badge.get_status() + self.free_space = badge.get_free_sdc_space().free_space + self.new_window = {} + self.scan_info = {'window': '', 'interval': ''} + self.mic_info = {'mode': '', 'gain_r': '', 'gain_l': '', 'switch_pos': '', 'pdm_freq': ''} + self.initUI() + + def initUI(self): + # Device info + self.badgeId = Label(self, text="Badge ID: {}".format(self.name)) + self.badgeId.grid(row=0, column=0, padx=10, pady=5) + self.midge = Label(self, text=self.address, font=tkFont.Font(size=12)) + self.midge.grid(row=1, column=0, padx=10, pady=5) + self.battery = Label(self, text='Battery: {}%'.format(self.badge_status.battery_level), relief="solid") + self.battery.grid(row=2, column=0, padx=10, pady=5) + + # Status + self.statusLabel = Label(self, text="Status") + self.statusLabel.grid(row=0, column=1, padx=10, pady=5) + self.statusCircle = Circle(self, status=self.badge_status.clock_status) + self.statusCircle.grid(row=1, column=1, padx=10, pady=5) + + # IMU + self.IMULabel = Label(self, text="IMU") + self.IMULabel.grid(row=0, column=2, columnspan=2, padx=10, pady=5) + self.IMUCircle = Circle(self, status=self.badge_status.imu_status) + self.IMUCircle.grid(row=1, column=2, columnspan=2, padx=10, pady=5) + self.IMUStartButton = Button(self, text="Start", command=self.start_imu) + self.IMUStartButton.grid(row=2, column=2, padx=10, pady=5) + self.IMUStopButton = Button(self, text="Stop", command=self.stop_imu) + self.IMUStopButton.grid(row=2, column=3, padx=10, pady=5) + + # Mic + self.MicLabel = Label(self, text="Microphones") + self.MicLabel.grid(row=0, column=4, columnspan=2, padx=10, pady=5) + self.MicCircle = Circle(self, status=self.badge_status.microphone_status) + self.MicCircle.grid(row=1, column=4, columnspan=2, padx=10, pady=5) + self.MicStartMonoButton = Button(self, text="Start mono", command=lambda: self.start_microphone(t=None, mode=1)) + self.MicStartMonoButton.grid(row=2, column=4, padx=10, pady=5) + self.MicStartStereoButton = Button(self, text="Start stereo", command=lambda: self.start_microphone(t=None, mode=0)) + self.MicStartStereoButton.grid(row=2, column=5, padx=10, pady=5) + self.MicStopButton = Button(self, text="Stop", command=self.badge.stop_microphone) + self.MicStopButton.grid(row=3, column=4, columnspan=2, padx=10, pady=5, sticky="nsew") + + # Scan + self.ScanLabel = Label(self, text="Scanner") + self.ScanLabel.grid(row=0, column=6, columnspan=2, padx=10, pady=5) + self.ScanCircle = Circle(self, status=self.badge_status.scan_status) + self.ScanCircle.grid(row=1, column=6, columnspan=2, padx=10, pady=5) + self.ScanStartButton = Button(self, text="Start", command=self.start_scan) + self.ScanStartButton.grid(row=2, column=6, padx=10, pady=5) + self.ScanStopButton = Button(self, text="Stop", command=self.stop_scan) + self.ScanStopButton.grid(row=2, column=7, padx=10, pady=5) + + self.bind("", self.onMouseClick) # Handle mouse click + + # Change the cursor when hovering + self.bind("", lambda event: self.config(cursor="hand2")) + self.bind("", lambda event: self.config(cursor="")) + + def onMouseClick(self, event): + # Handle mouse click event + self.new_window = NewWindow(self.badge, self.free_space, name=self.name, scan_info=self.scan_info, mic_info=self.mic_info) + self.new_window.mainloop() + + def start_imu(self): + if self.badge_status.imu_status == 0: + self.badge.start_imu() + else: + print("IMU already started") + + def stop_imu(self): + self.badge.stop_imu() + + def start_microphone(self, t, mode): + if self.badge_status.microphone_status == 1: + print("Microphone already started") + return + start_mic_response = self.badge.start_microphone(mode) + + if (start_mic_response.switch_pos == 2): switch_pos = "HIGH" + elif (start_mic_response.switch_pos == 1): switch_pos = "LOW" + else: switch_pos = "OFF" + + if (start_mic_response.mode == 1): mic_mode = "mono" + else: mic_mode = "stereo" + + self.mic_info = {'mode': mic_mode, + 'gain_r (dBm)': start_mic_response.gain_r, + 'gain_l (dBm)': start_mic_response.gain_l, + 'switch_pos': switch_pos, + 'pdm_freq (kHz)': start_mic_response.pdm_freq} + try: + self.new_window.MicConfig.info = self.mic_info + self.new_window.initUI() + except: + pass + + def stop_microphone(self): + self.badge.stop_microphone() + + def start_scan(self): + if self.badge_status.scan_status == 1: + print("Scanner already started") + return + start_scan_response = self.badge.start_scan() + self.scan_info = {'window': start_scan_response.window, + 'interval': start_scan_response.interval} + try: + self.new_window.ScanConfig.info = self.scan_info + self.new_window.ScanConfig.initUI() + except: + pass + + def stop_scan(self): + self.badge.stop_scan() + +class MatplolibFrame(tk.Frame): + def __init__(self, parent, time, x, y, z): + tk.Frame.__init__(self, parent, relief=tk.RIDGE) + self.figure = Figure(figsize=(3, 1.5), dpi=100) + self.plot = self.figure.add_subplot(111) + self.plot.plot(time, x, label='x') + self.plot.plot(time, y, label='y') + self.plot.plot(time, z, label='z') + # Adding the legend + # Place the legend outside the plot + self.plot.legend(fontsize='x-small') + # Set x-axis limits if your data is not displaying correctly + self.plot.set_xlim([min(time), max(time)]) + + # Set x-axis labels + self.plot.set_xticks(time) + self.plot.set_xticklabels(time) + + self.canvas = FigureCanvasTkAgg(self.figure, self) + self.canvas.draw() + + self.canvas.get_tk_widget().grid(row=0, column=0) + +class IMUConfig(tk.Frame): + def __init__(self, parent, imu_status): + tk.Frame.__init__(self, parent, borderwidth=1, relief="flat") + label = Label(self, text="IMU Config") + self.imu_status = imu_status + self.time = LimitedQueue(4) + self.time.enqueue(0) + # gyr + self.gyr_x = LimitedQueue(4) + self.gyr_y = LimitedQueue(4) + self.gyr_z = LimitedQueue(4) + # rot + self.rot_x = LimitedQueue(4) + self.rot_y = LimitedQueue(4) + self.rot_z = LimitedQueue(4) + # acc + self.acc_x = LimitedQueue(4) + self.acc_y = LimitedQueue(4) + self.acc_z = LimitedQueue(4) + # mag + self.mag_x = LimitedQueue(4) + self.mag_y = LimitedQueue(4) + self.mag_z = LimitedQueue(4) + + # Create the layout frames + self.top_frame = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + self.table = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + self.top_frame.grid(row=0, column=0, sticky='nsew') + self.table.grid(row=1, column=0, sticky='nsew') + + self.initUI() + + def initUI(self): + info = ["acc_fsr (g): 16", "gyr_fsr (dps): 2000", "datarate: 50"] + + title = tk.Label(self.top_frame, text='IMU Data', font=tkFont.Font(size=10, weight="bold")) + title.grid(row=0, column=0, columnspan=2, sticky='w') + # Create information header + for i, text in enumerate(info): + label = tk.Label(self.top_frame, text=text, bg='lightgrey') + label.grid(row=i+1, column=0, columnspan=2, sticky='w') + # Create table + # gyr + self.gyr_x.enqueue(self.imu_status.gyr_x) + self.gyr_y.enqueue(self.imu_status.gyr_y) + self.gyr_z.enqueue(self.imu_status.gyr_z) + label = tk.Label(self.table, text="gyr (dps vs time)", relief=tk.RIDGE) + label.grid(row=0, column=0, sticky='nsew') + line_chart = MatplolibFrame(self.table, time=self.time.queue, x=self.gyr_x.queue, y=self.gyr_y.queue, z=self.gyr_z.queue) + line_chart.grid(row=1, column=0) + # rot + self.rot_x.enqueue(self.imu_status.rot_x) + self.rot_y.enqueue(self.imu_status.rot_y) + self.rot_z.enqueue(self.imu_status.rot_z) + label = tk.Label(self.table, text="rot (rad/s vs time)", relief=tk.RIDGE) + label.grid(row=0, column=1, sticky='nsew') + line_chart = MatplolibFrame(self.table, time=self.time.queue, x=self.rot_x.queue, y=self.rot_y.queue, z=self.rot_z.queue) + line_chart.grid(row=1, column=1) + # acc + self.acc_x.enqueue(self.imu_status.acc_x) + self.acc_y.enqueue(self.imu_status.acc_y) + self.acc_z.enqueue(self.imu_status.acc_z) + label = tk.Label(self.table, text="acc (g vs time)", relief=tk.RIDGE) + label.grid(row=2, column=0, sticky='nsew') + line_chart = MatplolibFrame(self.table, time=self.time.queue, x=self.acc_x.queue, y=self.acc_y.queue, z=self.acc_z.queue) + line_chart.grid(row=3, column=0) + # mag + self.mag_x.enqueue(self.imu_status.mag_x) + self.mag_y.enqueue(self.imu_status.mag_y) + self.mag_z.enqueue(self.imu_status.mag_z) + label = tk.Label(self.table, text="mag (uT vs time)", relief=tk.RIDGE) + label.grid(row=2, column=1, sticky='nsew') + line_chart = MatplolibFrame(self.table, time=self.time.queue, x=self.mag_x.queue, y=self.mag_y.queue, z=self.mag_z.queue) + line_chart.grid(row=3, column=1) + + self.config(bg="white", relief="solid") + +class MicConfig(tk.Frame): + def __init__(self, parent, data, info): + tk.Frame.__init__(self, parent, borderwidth=1, relief="flat") + self.data = data + self.info = info + self.initUI() + + def initUI(self): + # Create the layout frames + top_frame = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + table = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + top_frame.grid(row=0, column=0, sticky='nsew') + table.grid(row=1, column=0, sticky='nsew') + + title = tk.Label(top_frame, text='Microphones Data', font=tkFont.Font(size=10, weight="bold")) + title.grid(row=0, column=0, columnspan=2, sticky='w') + # Loop over the dictionary and create labels + for i, (key, value) in enumerate(self.info.items()): + label_text = "{}: {}".format(key, value) + label = tk.Label(top_frame, text=label_text) + label.grid(row=i+1, column=0, columnspan=2, sticky='w') + + for i, e in enumerate(self.data): + label = tk.Label(table, text=str(e), relief=tk.RIDGE, width=45) + label.grid(row=i, column=0, sticky="nsew") + + # Configure row and column weights to make the cells expand + for i in range(len(self.data)): + self.grid_rowconfigure(i, weight=1) + + self.config(bg="white", relief="solid") + +# Scan +class ScanConfig(tk.Frame): + def __init__(self, parent, data, info): + tk.Frame.__init__(self, parent, borderwidth=1, relief="flat") + self.data = data + self.info = info + self.initUI() + + def initUI(self): + # Create the layout frames + top_frame = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + table = tk.Frame(self, padx=5, pady=5, bg='lightgrey') + top_frame.grid(row=0, column=0, sticky='nsew') + table.grid(row=1, column=0, sticky='nsew') + + title = tk.Label(top_frame, text='Scanner Data', font=tkFont.Font(size=10, weight="bold")) + title.grid(row=0, column=0, columnspan=2, sticky='w') + # Loop over the dictionary and create labels + for i, (key, value) in enumerate(self.info.items()): + label_text = "{}: {}".format(key, value) + label = tk.Label(top_frame, text=label_text) + label.grid(row=i+1, column=0, columnspan=2, sticky='w') + + table_title = tk.Label(table, text='RSSI', font=tkFont.Font( weight="bold"), relief=tk.RIDGE, width=45) + table_title.grid(row=0, column=0, sticky="nsew") + for i, e in enumerate(self.data): + label = tk.Label(table, text=str(e), relief=tk.RIDGE, width=45) + label.grid(row=i+1, column=0, sticky="nsew") + + # Configure row and column weights to make the cells expand + for i in range(len(self.data)): + self.grid_rowconfigure(i, weight=1) + + self.config(bg="white", relief="solid") + +class LimitedQueue: + def __init__(self, max_size): + self.queue = [] + self.max_size = max_size + + def enqueue(self, item): + if len(self.queue) >= self.max_size: + self.queue.pop(0) + self.queue.append(item) + + def dequeue(self): + return self.queue.pop(0) if self.queue else None + + def __repr__(self): + return str(self.queue) + + +class NewWindow(tk.Toplevel): + def __init__(self, badge, free_space, name, scan_info, mic_info): + GET_STATUS_MAIN = False + tk.Toplevel.__init__(self) + self.name = name + self.title("New Window for badge: {}".format(name)) + self.geometry("1500x700") + self.badge = badge + self.mic_data = LimitedQueue(10) + self.scan_data = LimitedQueue(7) + self.status = self.badge.get_status() + self.mic_data.enqueue(self.status.pdm_data) + self.scan_data.enqueue(self.status.scan_data) + self.free_space = free_space + self.imu_status = self.badge.get_imu_data() + self.time = 0 + self.scan_info = scan_info + self.mic_info = mic_info + self.initUI() + self.after(1000, self.update_data) + + def initUI(self): + # Create the layout frames + left_top = tk.Frame(self, padx=5, pady=5) + right_top = tk.Frame(self, padx=5, pady=5) + left_down = tk.Frame(self, padx=5, pady=5) + right_down = tk.Frame(self, padx=5, pady=5) + left_top.grid(row=0, column=0, sticky='nsew') + right_top.grid(row=0, column=1, sticky='nsew') + left_down.grid(row=1, column=0, sticky='nsew') + right_down.grid(row=1, column=1, sticky='nsew') + + # header + self.badgeId = Label(left_top, text="Badge Id: {}".format(self.name), font=tkFont.Font(size=10, )) + self.badgeId.grid(row=0, column=0, columnspan=2, sticky='w') + self.midge = Label(left_top, text="Battery: {}%".format(self.status.battery_level), font=tkFont.Font(size=10)) + self.midge.grid(row=1, column=0, columnspan=2, sticky='w') + self.battery = Label(left_top, text="Available memory: {}MB".format(self.free_space), font=tkFont.Font(size=10)) + self.battery.grid(row=2, column=0, columnspan=2, sticky='w') + self.reset_battery = Button(left_top, text='Erase', command=self.badge.sdc_errase_all) + self.reset_battery.grid(row=2, column=1, columnspan=2, sticky='w') + + # IMU + self.IMULabel = Label(left_top, text="IMU") + self.IMULabel.grid(row=4, column=0, padx=10, pady=5) + self.IMUCircle = Circle(left_top, radius=10, status=self.status.imu_status) + self.IMUCircle.grid(row=4, column=1, padx=10, pady=5) + self.IMUConfig = IMUConfig(left_top, imu_status=self.imu_status) + self.IMUConfig.grid(row=5, column=0, columnspan=2, padx=10, pady=5) + + # Scan + self.ScanLabel = Label(right_top, text="Scanner") + self.ScanLabel.grid(row=1, column=2, padx=10, pady=5) + self.ScanCircle = Circle(right_top, radius=10, status=self.status.scan_status) + self.ScanCircle.grid(row=1, column=3, padx=10, pady=5) + self.ScanConfig = ScanConfig(right_top, data=self.scan_data.queue, info=self.scan_info) + self.ScanConfig.grid(row=2, column=2, columnspan=2, padx=10, pady=5) + + # Mic + self.MicLabel = Label(right_top, text="Microphones") + self.MicLabel.grid(row=3, column=2, padx=10, pady=5) + self.MicCircle = Circle(right_top, radius=10, status=self.status.microphone_status) + self.MicCircle.grid(row=3, column=3, padx=10, pady=5) + self.MicConfig = MicConfig(right_top, data=self.mic_data.queue, info=self.mic_info) + self.MicConfig.grid(row=4, column=2, columnspan=2, padx=10, pady=5) + + def update_data(self): + self.time = self.time + 1 + try: + status = self.badge.get_status() + time.sleep(0.1) + imu_status = self.badge.get_imu_data() + time.sleep(0.1) + if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): + self.battery.text="Available memory: {} MB".format(self.badge.get_free_sdc_space().free_space) + except: + print('Error with request') + self.badge.connect() + status = self.badge.get_status() + time.sleep(0.1) + imu_status = self.badge.get_imu_data() + time.sleep(0.1) + if (status.microphone_status == 0 and status.scan_status == 0 and status.imu_status == 0): + self.battery.text="Available memory: {} MB".format(self.badge.get_free_sdc_space().free_space) + + if status.microphone_status == 0: + self.MicConfig.data = [] + self.MicConfig.initUI() + else: + pdm_data = status.pdm_data + # update mic + self.mic_data.enqueue(pdm_data) + self.MicConfig.data = self.mic_data.queue + self.MicConfig.initUI() + if status.scan_status == 0: + self.ScanConfig.data = [] + self.ScanConfig.initUI() + else: + scan_data = status.scan_data + # update scan + self.scan_data.enqueue(scan_data) + self.ScanConfig.data = self.scan_data.queue + self.ScanConfig.initUI() + #update imu + self.IMUConfig.imu_status = imu_status + self.IMUConfig.time.enqueue(self.time) + self.IMUConfig.initUI() + # update circles + self.MicCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.microphone_status)) + self.IMUCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.imu_status)) + self.ScanCircle.create_oval(0, 0, 2 * 10, 2 * 10, fill=calculate_color(status.scan_status)) + + self.after(5500, self.update_data) + +def get_badges(): + badges = [] + with open('sample_mapping_file.csv', 'r') as f: + reader = csv.reader(f) + next(reader) + for row in reader: + badge = BadgeInterface(row[1]) + badge.connect() + badges.append({'badge': badge, 'name': row[0], 'address': row[1]}) + return badges + +class MainApp(tk.Tk): + def __init__(self): + tk.Tk.__init__(self) + self.title("Lista de dispositivos") + self.geometry("1100x700") + self.container_frame = ttk.Frame(self) + self.container_frame.pack(expand=True, fill="both") + self.custom_components = [] + self.badges = get_badges() + + for badge in self.badges: + custom_component = CustomComponent(self.container_frame, name=badge['name'], badge=badge['badge'], address=badge['address']) + custom_component.pack() + # Draw a border between custom components + border = tk.Frame(self.container_frame, bg="gray", height=1) + border.pack(fill="x") + self.custom_components.append(custom_component) + + self.after(1000, self.update_data) + + def update_data(self): + for badge, custom_component in zip(self.badges, self.custom_components): + try: + badge_status = badge['badge'].get_status() + except: + print('Error with request') + try: + badge['badge'].connect() + except: + badge['badge'].connect() + badge_status = badge['badge'].get_status() + custom_component.badge_status = badge_status + custom_component.battery['text'] = 'Battery: {}%'.format(badge_status.battery_level) + custom_component.statusCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.clock_status)) + custom_component.MicCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.microphone_status)) + custom_component.IMUCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.imu_status)) + custom_component.ScanCircle.create_oval(0, 0, 2 * DEFAULT_RADIUS, 2 * DEFAULT_RADIUS, fill=calculate_color(badge_status.scan_status)) + if (badge_status.microphone_status == 0): + custom_component.MicStartMonoButton['state'] = 'normal' + custom_component.MicStartStereoButton['state'] = 'normal' + custom_component.MicStopButton['state'] = 'disabled' + else: + custom_component.MicStartMonoButton['state'] = 'disabled' + custom_component.MicStartStereoButton['state'] = 'disabled' + custom_component.MicStopButton['state'] = 'normal' + if (badge_status.imu_status == 0): + custom_component.IMUStartButton['state'] = 'normal' + custom_component.IMUStopButton['state'] = 'disabled' + else: + custom_component.IMUStartButton['state'] = 'disabled' + custom_component.IMUStopButton['state'] = 'normal' + if (badge_status.scan_status == 0): + custom_component.ScanStartButton['state'] = 'normal' + custom_component.ScanStopButton['state'] = 'disabled' + else: + custom_component.ScanStartButton['state'] = 'disabled' + custom_component.ScanStopButton['state'] = 'normal' + + if (GET_STATUS_MAIN == True): + print('updating main') + self.after(1000, self.update_data) + +if __name__ == '__main__': + app = MainApp() + app.mainloop() \ No newline at end of file diff --git a/BadgeFramework/badge_interface.py b/BadgeFramework/badge_interface.py index 8646eda..ef43700 100644 --- a/BadgeFramework/badge_interface.py +++ b/BadgeFramework/badge_interface.py @@ -1,47 +1,73 @@ -from badge import * -from ble_badge_connection import * - -class BadgeInterface(): - def __init__(self, address): - self.address = address - self.connection = None - self.badge = None - - def connect(self): - self.connection = BLEBadgeConnection.get_connection_to_badge(self.address) - self.connection.connect() - self.badge = OpenBadge(self.connection) - print("Connected!") - - def get_status(self): - return self.badge.get_status() - - def get_free_sdc_space(self): - return self.badge.get_free_sdc_space() - - def start_imu(self): - return self.badge.start_imu() - - def start_microphone(self, mode): - #print("MODE interface ",mode) - return self.badge.start_microphone(mode=mode) - - def stop_microphone(self): - return self.badge.stop_microphone() - - def stop_imu(self): - return self.badge.stop_imu() - - def start_scan(self): - return self.badge.start_scan() - - def stop_scan(self): - return self.badge.stop_scan() - - def get_imu_data(self): - return self.badge.get_imu_data() - - def sdc_errase_all(self): - return self.badge.sdc_errase_all() - +from badge import * +# from ble_badge_connection import * +from dataclasses import dataclass + +# Add a mock status class +@dataclass +class MockStatusResponse: + battery_level: int = 100 + clock_status: bool = False + imu_status: bool = False + microphone_status: bool = False + +class BadgeInterface(): + def __init__(self, address): + self.address = address + self.connection = None + self.badge = None + # Add a mock status that can be updated + self._mock_status = MockStatusResponse() + + def connect(self): + # self.connection = BLEBadgeConnection.get_connection_to_badge(self.address) + # self.connection.connect() + # self.badge = OpenBadge(self.connection) + pass + # print("Connected!") + + async def get_status(self): + try: + async with OpenBadge(0, self.address) as badge: + return await badge.get_status() + except Exception as e: + print(f"Using mock status due to: {e}") + return self._mock_status + + async def get_free_sdc_space(self): + async with OpenBadge(0, self.address) as badge: + return await badge.get_free_sdc_space() + + async def start_imu(self): + async with OpenBadge(0, self.address) as badge: + return await badge.start_imu() + + async def start_microphone(self, mode): + #print("MODE interface ",mode) + async with OpenBadge(0, self.address) as badge: + return await badge.start_microphone(mode=mode) + + async def stop_microphone(self): + async with OpenBadge(0, self.address) as badge: + return await badge.stop_microphone() + + async def stop_imu(self): + async with OpenBadge(0, self.address) as badge: + return await badge.stop_imu() + + async def start_scan(self): + async with OpenBadge(0, self.address) as badge: + return await badge.start_scan() + + async def stop_scan(self): + async with OpenBadge(0, self.address) as badge: + return await badge.stop_scan() + + async def get_imu_data(self): + async with OpenBadge(0, self.address) as badge: + return await badge.get_imu_data() + + async def sdc_errase_all(self): + async with OpenBadge(0, self.address) as badge: + return await badge.sdc_errase_all() + \ No newline at end of file diff --git a/BadgeFramework/badge_protocol.py b/BadgeFramework/badge_protocol.py index aa61d33..8dda673 100644 --- a/BadgeFramework/badge_protocol.py +++ b/BadgeFramework/badge_protocol.py @@ -936,29 +936,29 @@ def decode_internal(self, istream): def decode_clock_status(self, istream): #print("decode_clock_status:", istream.buf) #print("decode_clock_status:", istream.read(1)) - self.clock_status= struct.unpack(' 0: - while True: - while(not self.rx_queue.empty()): - rx_message += self.rx_queue.get() - if(len(rx_message) == rx_bytes_expected): - return rx_message - - self.conn.waitForNotifications(5.0) - - - - # Implements BadgeConnection's send() spec. - def send(self, message, response_len=0): - if not self.is_connected(): - raise RuntimeError("BLEBadgeConnection not connected before send()!") - - rx_message = "" - rx_bytes_expected = response_len - - self.tx.write(message,withResponse=True) - - - if rx_bytes_expected > 0: - while True: - while(not self.rx_queue.empty()): - rx_message += self.rx_queue.get() - if(len(rx_message) == rx_bytes_expected): - return rx_message - - self.conn.waitForNotifications(5.0) - - diff --git a/BadgeFramework/hub_V0.py b/BadgeFramework/hub_V0.py deleted file mode 100644 index 1ce19a4..0000000 --- a/BadgeFramework/hub_V0.py +++ /dev/null @@ -1,316 +0,0 @@ -from __future__ import division, absolute_import, print_function -import logging -import sys -import threading - -from badge import OpenBadge -from ble_badge_connection import BLEBadgeConnection -#from bluepy import * -from bluepy import btle -from bluepy.btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers ,Scanner -from bluepy.btle import BTLEException -import numpy as np -import pandas as pd -import time -import signal,tty,termios - -constant_group_number = 1 - -class Connection(): - def __init__(self,pid,address): - try: - self.connection = BLEBadgeConnection.get_connection_to_badge(address) - self.connection.connect() - self.badge = OpenBadge(self.connection) - self.badge_id = int(pid) - self.mac_address = address - self.group_number = int(constant_group_number) - print ("MY ID:" + str(self.badge_id)) - except: - raise Exception("Could not connect to participant" + str(pid)) - - def set_id_at_start(self): - try: - self.badge.get_status(new_id=self.badge_id, new_group_number=self.group_number) - except Exception as err: - print (err) - - def disconnect(self): - self.connection.disconnect() - - def handle_status_request(self): - try: - out = self.badge.get_status() - return out - except Exception as err: - print (str(err)) - raise Exception("Could not get status for participant") - - def handle_start_microphone_request(self): - try: - self.badge.start_microphone() - except: - raise Exception("Could not start mic for participant" + str(self.badge_id)) - - def handle_stop_microphone_request(self): - try: - self.badge.stop_microphone() - except: - raise Exception("Could not stop mic for participant" + str(self.badge_id)) - - def handle_start_scan_request(self): - try: - self.badge.start_scan() - except: - raise Exception("Could not start scan for participant" + str(self.badge_id)) - - def handle_stop_scan_request(self): - try: - self.badge.stop_scan() - except: - raise Exception("Could not stop scan for participant" + str(self.badge_id)) - - def handle_start_imu_request(self): - try: - self.badge.start_imu() - except: - raise Exception("Could not start IMU for participant" + str(self.badge_id)) - - def handle_stop_imu_request(self): - try: - self.badge.stop_imu() - except: - raise Exception("Could not stop IMU for participant " + str(self.badge_id)) - - def handle_identify_request(self): - try: - self.badge.identify() - except: - raise Exception("Could not identify for participant " + str(self.badge_id)) - - def handle_restart_request(self): - try: - self.badge.restart() - except: - raise Exception("Could not restart for participant " + str(self.badge_id)) - - def handle_get_free_space(self): - try: - print(self.badge.get_free_sdc_space()) - except: - raise Exception("Could not get free space for participant " + str(self.badge_id)) - - def start_recording_all_sensors(self): - self.handle_status_request() - self.handle_start_scan_request() - self.handle_start_microphone_request() - self.handle_start_imu_request() - - def stop_recording_all_sensors(self): - self.handle_stop_scan_request() - self.handle_stop_microphone_request() - self.handle_stop_imu_request() - - def print_help(self): - print("> Available commands:") - print("> status ") - print("> start_all_sensors") - print("> stop_all_sensors") - print("> start_microphone") - print("> stop_microphone") - print("> start_scan") - print("> stop_scan") - print("> start_imu") - print("> stop_imu") - print("> identify [led duration seconds | 'off']") - print("> restart") - print("> get_free_space") - print("> help") - print("> All commands use current system time as transmitted time.") - -################################################################################## -################################################################################## - -def choose_function(connection,input): - chooser = { - "help": connection.print_help, - "status": connection.handle_status_request, - "start_all_sensors": connection.start_recording_all_sensors, - "stop_all_sensors": connection.stop_recording_all_sensors, - "start_microphone": connection.handle_start_microphone_request, - "stop_microphone": connection.handle_stop_microphone_request, - "start_scan": connection.handle_start_scan_request, - "stop_scan": connection.handle_stop_scan_request, - "start_imu": connection.handle_start_imu_request, - "stop_imu": connection.handle_stop_imu_request, - "identify": connection.handle_identify_request, - "restart": connection.handle_restart_request, - "get_free_space": connection.handle_get_free_space, - } - func = chooser.get(input, lambda: "Invalid month") - out = func() - return out - - -def start_recording_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', sensors are not started.') - continue - try: - cur_connection.set_id_at_start() - cur_connection.start_recording_all_sensors() - cur_connection.disconnect() - except Exception as error: - print(error) - cur_connection.disconnect() - -def stop_recording_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', sensors are not stopped.') - continue - try: - cur_connection.stop_recording_all_sensors() - cur_connection.disconnect() - except Exception as error: - print(str(error)) - cur_connection.disconnect() - -def synchronise_and_check_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', cannot synchronise.') - continue - try: - out = cur_connection.handle_status_request() - if out.imu_status == 0: - print ('IMU is not recording for participant ' + str(current_participant)) - if out.microphone_status == 0: - print ('Mic is not recording for participant ' + str(current_participant)) - if out.scan_status == 0: - print ('Scan is not recording for participant ' + str(current_participant)) - if out.clock_status == 0: - print ('Cant synch for participant ' + str(current_participant)) - cur_connection.disconnect() - except Exception as error: - print(error) - cur_connection.disconnect() - -class TimeoutInput(object): - def __init__(self, poll_period=0.05): - self.poll_period = poll_period - - def _getch_nix(self): - from select import select - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - [i, _, _] = select([sys.stdin.fileno()], [], [], self.poll_period) - if i: - ch = sys.stdin.read(1) - else: - ch = '' - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - return ch - - def input(self, prompt=None, timeout=None, - extend_timeout_with_input=True, require_enter_to_confirm=True): - """timeout: float seconds or None (blocking)""" - prompt = prompt or '' - sys.stdout.write(prompt) - sys.stdout.flush() - input_chars = [] - start_time = time.time() - received_enter = False - while (time.time() - start_time) < timeout: - c = self._getch_nix() - if c in ('\n', '\r'): - received_enter = True - break - elif c: - input_chars.append(c) - sys.stdout.write(c) - sys.stdout.flush() - if extend_timeout_with_input: - start_time = time.time() - sys.stdout.write('\n') - sys.stdout.flush() - captured_string = ''.join(input_chars) - if require_enter_to_confirm: - return_string = captured_string if received_enter else '' - else: - return_string = captured_string - return return_string - -if __name__ == "__main__": - df = pd.read_csv('sample_mapping_file.csv') - while True: - print("> Type start to start data collection or stop to finish data collection.") - sys.stdout.write("> ") - command = sys.stdin.readline()[:-1] - if command == "start": - start_recording_all_devices(df) - while True: - ti = TimeoutInput(poll_period=0.05) - s = ti.input(prompt='>Type int if you would like to enter interactive shell.\n'+'>', timeout=10.0, - extend_timeout_with_input=False, require_enter_to_confirm=True) - if s == "int": - print("> Welcome to the interactive shell. Please type the id of the Midge you want to connect.") - print("> Type exit if you would like to stop recording for all devices.") - sys.stdout.write("> ") - command = sys.stdin.readline()[:-1] - if command == "exit": - print("> Stopping the recording of all devices.") - stop_recording_all_devices(df) - print("> Devices are stopped.") - break - command_args = command.split(" ") - current_mac_addr= (df.loc[df['Participant Id'] == int(command)]['Mac Address']).values[0] - try: - cur_connection = Connection(int(command),current_mac_addr) - except Exception as error: - print (str(error)) - continue - print ('> Connected to the badge. For available commands, please type help.') - print ('> ') - while True: - command = sys.stdin.readline()[:-1] - command_args = command.split(" ") - if command == "exit": - cur_connection.disconnect() - break - if command_args[0] in cur_connection.command_handlers: - try: - out = choose_function(cur_connection,command_args[0]) - print (out) - except Exception as error: - print (str(error)) - continue - else: - print("> Command not found!") - cur_connection.print_help() - else: - print('> Synchronisation is starting. Please wait till ends.') - synchronise_and_check_all_devices(df) - print('> Synchronisation is finished.') - elif command == "stop": - print("> Stopping data collection.") - quit(0) - else: - print("> Command not found, please type start or stop to start or stop data collection.") - diff --git a/BadgeFramework/hub_V1.py b/BadgeFramework/hub_V1.py index 39bbd9d..20f688b 100644 --- a/BadgeFramework/hub_V1.py +++ b/BadgeFramework/hub_V1.py @@ -68,4 +68,3 @@ else: print("Command not found, please type start or stop to start or stop data collection.") sys.stdout.flush() - diff --git a/BadgeFramework/hub_connection_V1.py b/BadgeFramework/hub_connection_V1.py deleted file mode 100644 index f0f3625..0000000 --- a/BadgeFramework/hub_connection_V1.py +++ /dev/null @@ -1,132 +0,0 @@ -from badge import OpenBadge -from ble_badge_connection import BLEBadgeConnection -import sys -constant_group_number = 1 - -class Connection(): - def __init__(self,pid,address): - try: - self.connection = BLEBadgeConnection.get_connection_to_badge(address) - self.connection.connect() - self.badge = OpenBadge(self.connection) - self.badge_id = int(pid) - self.mac_address = address - self.group_number = int(constant_group_number) - except Exception as err: - #if (err): - # print (str(err)) - raise Exception("Could not connect to participant" + str(pid)) - - def set_id_at_start(self): - try: - self.badge.get_status(new_id=self.badge_id, new_group_number=self.group_number) - except Exception as err: - print (err) - sys.stdout.flush() - - def disconnect(self): - self.connection.disconnect() - - def handle_status_request(self): - try: - out = self.badge.get_status() - return out - except Exception as err: - print (str(err)) - sys.stdout.flush() - raise Exception("Could not get status for participant"+ str(self.badge_id)) - - def handle_start_microphone_request(self): - try: - out = self.badge.start_microphone() - return out - except: - raise Exception("Could not start mic for participant" + str(self.badge_id)) - - def handle_stop_microphone_request(self): - try: - out = self.badge.stop_microphone() - return out - except: - raise Exception("Could not stop mic for participant" + str(self.badge_id)) - - def handle_start_scan_request(self): - try: - out = self.badge.start_scan() - return out - except: - raise Exception("Could not start scan for participant" + str(self.badge_id)) - - def handle_stop_scan_request(self): - try: - out = self.badge.stop_scan() - return out - except: - raise Exception("Could not stop scan for participant" + str(self.badge_id)) - - def handle_start_imu_request(self): - try: - out = self.badge.start_imu() - return out - except: - raise Exception("Could not start IMU for participant" + str(self.badge_id)) - - def handle_stop_imu_request(self): - try: - out = self.badge.stop_imu() - return out - except: - raise Exception("Could not stop IMU for participant " + str(self.badge_id)) - - def handle_identify_request(self): - try: - out = self.badge.identify() - return out - except: - raise Exception("Could not identify for participant " + str(self.badge_id)) - - def handle_restart_request(self): - try: - out = self.badge.restart() - return out - except Exception as err: - print (str(err)) - print ("Please wait at least 10 seconds to connect back to the device.") - print ("Don't forget to start the recording for the restarted badge.") - #raise Exception("Could not restart for participant " + str(self.badge_id)) - - def handle_get_free_space(self): - try: - out = (self.badge.get_free_sdc_space()) - return out - except: - raise Exception("Could not get free space for participant " + str(self.badge_id)) - - def start_recording_all_sensors(self): - self.handle_status_request() - self.handle_start_scan_request() - self.handle_start_microphone_request() - self.handle_start_imu_request() - - def stop_recording_all_sensors(self): - self.handle_stop_scan_request() - self.handle_stop_microphone_request() - self.handle_stop_imu_request() - - def print_help(self): - print(" Available commands:") - print(" status ") - print(" start_all_sensors") - print(" stop_all_sensors") - print(" start_microphone") - print(" stop_microphone") - print(" start_scan") - print(" stop_scan") - print(" start_imu") - print(" stop_imu") - print(" identify") - print(" restart") - print(" get_free_space") - print(" help") - print(" All commands use current system time as transmitted time.") - sys.stdout.flush() \ No newline at end of file diff --git a/BadgeFramework/hub_utilities.py b/BadgeFramework/hub_utilities.py new file mode 100644 index 0000000..84ae228 --- /dev/null +++ b/BadgeFramework/hub_utilities.py @@ -0,0 +1,173 @@ +from badge import OpenBadge +import sys +import tty +import termios +import logging +import time +import utils +from bleak import BleakScanner, BleakClient, BLEDevice + + +def get_logger(name): + log_format_file = '%(asctime)s %(levelname)5s %(message)s' + log_format_console = '%(message)s' + logging.basicConfig(level=logging.DEBUG, + format=log_format_file, + filename="data_collection.log", + filemode='w') + console = logging.StreamHandler() + console.setLevel(logging.INFO) + console.setFormatter(logging.Formatter(log_format_console)) + logging.getLogger(name).addHandler(console) + return logging.getLogger(name) + + +logger = get_logger("hub_utilities") + + +def choose_function(badge: OpenBadge, user_input): + chooser = { + "help": badge.print_help, + "status": badge.get_status, + "start_all_sensors": badge.start_recording_all_sensors, + "stop_all_sensors": badge.stop_recording_all_sensors, + "start_microphone": badge.start_microphone, + "stop_microphone": badge.stop_microphone, + "start_scan": badge.start_scan, + "stop_scan": badge.stop_scan, + "start_imu": badge.start_imu, + "stop_imu": badge.stop_imu, + "identify": badge.identify, + "restart": badge.restart, + "get_free_space": badge.get_free_sdc_space, + } + func = chooser.get(user_input, lambda: "Invalid command!") + logger.info("Following command is entered: " + user_input + ".") + try: + out = func() + return out + except Exception as error: + logger.info("Error: " + str(error)) + return + + +async def start_recording_all_devices(df): + for _, row in df.iterrows(): + device_id: int = row["Participant Id"] + device_addr: str = row["Mac Address"] + use_current_device: bool = row["Use"] + if not use_current_device: + continue + try: + async with OpenBadge(device_id, device_addr) as open_badge: + # await open_badge.set_id_at_start() + await open_badge.start_recording_all_sensors() + except Exception as error: + logger.info(f"Sensors for midge {device_id} are not started with the following error: {str(error)}") + continue + print('completed') + + +async def stop_recording_all_devices(df): + for _, row in df.iterrows(): + device_id: int = row["Participant Id"] + device_addr: str = row["Mac Address"] + use_current_device: bool = row["Use"] + if not use_current_device: + continue + try: + async with OpenBadge(device_id, device_addr) as open_badge: + await open_badge.stop_recording_all_sensors() + except Exception as error: + logger.info(f"Sensors for midge {str(device_id)} are not stopped with the following error: {str(error)}") + continue + print('completed') + + +# async def synchronise_one_device(df): + + +async def synchronise_and_check_all_devices(df): + for _, row in df.iterrows(): + device_id: int = row["Participant Id"] + device_addr: str = row["Mac Address"] + use_current_device: bool = row["Use"] + if not use_current_device: + continue + try: + async with OpenBadge(device_id, device_addr) as open_badge: + out = await open_badge.get_status() + logger.info("Status received for the following midge:" + str(device_id) + ".") + # TODO This is not actually the timestamp before, find how to get it. + logger.debug("Device timestamp before sync - seconds:" + + str(out.timestamp.seconds) + ", ms:" + + str(out.timestamp.ms) + ".") + error_message = "{} is not recording for participant " + str(device_id) + "." + if out.imu_status == 0: + logger.info(error_message.format("IMU")) + if out.microphone_status == 0: + logger.info(error_message.format("Mic")) + if out.scan_status == 0: + logger.info(error_message.format("Scan")) + if out.clock_status == 0: + logger.info("Cant sync for participant " + str(device_id) + ".") + except Exception as error: + logger.info(f"Status check for midge {str(device_id)} returned the following error: {str(error)}") + # sys.stdout.flush() + continue + print('completed') + + +class timeout_input(object): + def __init__(self, poll_period=0.05): + self.poll_period = poll_period + + def _getch_nix(self): + from select import select + + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + [i, _, _] = select([sys.stdin.fileno()], [], [], self.poll_period) + if i: + ch = sys.stdin.read(1) + else: + ch = "" + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + # pass + return ch + + def input( + self, + prompt=None, + timeout=None, + extend_timeout_with_input=True, + require_enter_to_confirm=True, + ): + prompt = prompt or "" + sys.stdout.write(prompt) + sys.stdout.flush() + input_chars = [] + start_time = time.time() + received_enter = False + while (time.time() - start_time) < timeout: + c = self._getch_nix() + if c in ("\n", "\r"): + received_enter = True + break + elif c: + input_chars.append(c) + sys.stdout.write(c) + sys.stdout.flush() + if extend_timeout_with_input: + start_time = time.time() + sys.stdout.write("\n") + sys.stdout.flush() + captured_string = "".join(input_chars) + if require_enter_to_confirm: + return_string = captured_string if received_enter else "" + else: + return_string = captured_string + return return_string diff --git a/BadgeFramework/hub_utilities_V1.py b/BadgeFramework/hub_utilities_V1.py deleted file mode 100644 index bdc6755..0000000 --- a/BadgeFramework/hub_utilities_V1.py +++ /dev/null @@ -1,134 +0,0 @@ -from hub_connection_V1 import Connection -import signal,sys,tty,termios -import time - -def choose_function(connection,input): - chooser = { - "help": connection.print_help, - "status": connection.handle_status_request, - "start_all_sensors": connection.start_recording_all_sensors, - "stop_all_sensors": connection.stop_recording_all_sensors, - "start_microphone": connection.handle_start_microphone_request, - "stop_microphone": connection.handle_stop_microphone_request, - "start_scan": connection.handle_start_scan_request, - "stop_scan": connection.handle_stop_scan_request, - "start_imu": connection.handle_start_imu_request, - "stop_imu": connection.handle_stop_imu_request, - "identify": connection.handle_identify_request, - "restart": connection.handle_restart_request, - "get_free_space": connection.handle_get_free_space, - } - func = chooser.get(input, lambda: "Invalid command!") - try: - out = func() - return out - except Exception as error: - print(error) - return - -def start_recording_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', sensors are not started.') - continue - try: - cur_connection.set_id_at_start() - cur_connection.start_recording_all_sensors() - cur_connection.disconnect() - except Exception as error: - print(error) - cur_connection.disconnect() - -def stop_recording_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', sensors are not stopped.') - continue - try: - cur_connection.stop_recording_all_sensors() - cur_connection.disconnect() - except Exception as error: - print(str(error)) - cur_connection.disconnect() - -def synchronise_and_check_all_devices(df): - for _, row in df.iterrows(): - current_participant = row['Participant Id'] - current_mac = row['Mac Address'] - try: - cur_connection=Connection(current_participant,current_mac) - except Exception as error: - print(str(error) + ', cannot synchronise.') - sys.stdout.flush() - continue - try: - out = cur_connection.handle_status_request() - if out.imu_status == 0: - print ('IMU is not recording for participant ' + str(current_participant)) - if out.microphone_status == 0: - print ('Mic is not recording for participant ' + str(current_participant)) - if out.scan_status == 0: - print ('Scan is not recording for participant ' + str(current_participant)) - if out.clock_status == 0: - print ('Cant synch for participant ' + str(current_participant)) - sys.stdout.flush() - cur_connection.disconnect() - except Exception as error: - print(error) - sys.stdout.flush() - cur_connection.disconnect() - -class timeout_input(object): - def __init__(self, poll_period=0.05): - self.poll_period = poll_period - - def _getch_nix(self): - from select import select - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - [i, _, _] = select([sys.stdin.fileno()], [], [], self.poll_period) - if i: - ch = sys.stdin.read(1) - else: - ch = '' - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - return ch - - def input(self, prompt=None, timeout=None, - extend_timeout_with_input=True, require_enter_to_confirm=True): - prompt = prompt or '' - sys.stdout.write(prompt) - sys.stdout.flush() - input_chars = [] - start_time = time.time() - received_enter = False - while (time.time() - start_time) < timeout: - c = self._getch_nix() - if c in ('\n', '\r'): - received_enter = True - break - elif c: - input_chars.append(c) - sys.stdout.write(c) - sys.stdout.flush() - if extend_timeout_with_input: - start_time = time.time() - sys.stdout.write('\n') - sys.stdout.flush() - captured_string = ''.join(input_chars) - if require_enter_to_confirm: - return_string = captured_string if received_enter else '' - else: - return_string = captured_string - return return_string \ No newline at end of file diff --git a/BadgeFramework/imu_parser_V0.py b/BadgeFramework/imu_parser_V0.py index db7de64..daa4b68 100644 --- a/BadgeFramework/imu_parser_V0.py +++ b/BadgeFramework/imu_parser_V0.py @@ -193,5 +193,3 @@ def main(fn,acc,mag,gyr,rot,plot,scan): - - diff --git a/BadgeFramework/mappings2.csv b/BadgeFramework/mappings2.csv new file mode 100644 index 0000000..debd935 --- /dev/null +++ b/BadgeFramework/mappings2.csv @@ -0,0 +1,60 @@ +,Participant Id,Mac Address,Data Frame Length,Use +0,1,de:94:80:39:25:be,32.0,0 +1,2,c3:4c:d5:ba:b1:44,32.0,0 +2,3,e5:93:96:ca:ee:c4,,0 +3,4,f9:39:24:a9:04:f1,,0 +4,5,d2:fd:13:bd:81:39,,0 +5,6,de:94:80:39:25:be,24.0,0 +6,7,fb:f5:d5:84:a1:68,,1 +7,8,e2:6e:4e:21:f1:a4,,0 +8,9,f5:40:84:e2:9a:16,,0 +9,10,e8:44:59:c0:39:de,,0 +10,11,d7:11:de:c6:c8:e3,24.0,0 +11,12,fa:24:bd:55:c7:ab,,1 +12,13,d6:12:78:32:80:19,,0 +13,14,e8:03:31:16:ce:3a,24.0,0 +14,15,d8:ae:5b:aa:55:ae,,0 +15,16,f5:ef:4c:9b:55:de,,0 +16,17,c3:2b:78:b5:c2:4a,,0 +17,18,c4:3e:0b:bc:e6:92,,0 +18,19,d9:0d:2e:b7:cc:6b,,1 +19,20,e6:cc:57:a3:6b:57,,0 +20,21,c6:30:71:35:02:5a,,0 +21,22,fb:42:af:eb:ba:3c,,0 +22,23,d2:91:1c:b9:6f:5c,24.0,0 +23,24,c1:a4:56:be:7f:7e,,1 +24,25,f2:26:6c:f5:ca:e5,,1 +25,26,cb:14:eb:9a:5c:a3,,0 +26,27,db:03:30:a6:ee:86,,0 +27,28,fc:3f:62:28:17:b4,,1 +28,29,ce:22:14:de:40:38,,1 +29,30,e3:5d:06:9a:55:0f,,0 +30,31,d4:56:f4:e1:ef:f1,,0 +31,32,d2:4f:7c:01:93:0e,,1 +32,33,da:03:10:bb:61:f5,,1 +33,34,f5:e7:4d:77:4e:74,,1 +34,35,fd:f3:eb:4b:d0:8c,,1 +35,36,dc:bc:ed:19:1f:09,,0 +36,37,c5:61:a9:e9:83:ef,,1 +37,39,e7:03:db:e9:16:3b,,0 +38,40,fe:82:88:fa:36:62,,0 +39,41,dc:a4:f4:7e:45:fb,,0 +40,42,f6:01:ad:d1:18:23,,0 +41,43,e0:c3:d6:2e:ab:44,,0 +42,44,de:7f:7f:a2:9c:f5,,0 +43,45,fe:71:db:20:4f:34,,1 +44,46,dc:11:e6:20:81:d8,,0 +45,47,d9:98:8a:68:f0:89,,0 +46,48,f7:5a:78:fd:21:46,,1 +47,49,e9:59:12:26:a7:63,,0 +48,50,d0:c5:cc:ef:90:e2,,0 +49,51,d0:c5:cc:ef:90:e2,,0 +50,52,d0:c5:cc:ef:90:e2,,0 +51,53,d0:c5:cc:ef:90:e2,,0 +52,54,CB:58:7B:77:44:D6,,1 +53,55,FE:63:F0:A4:C8:F4,,1 +54,56,d0:c5:cc:ef:90:e2,,0 +55,57,D8:52:CD:32:50:B0,,1 +56,58,D1:0E:45:AC:97:DD,,1 +57,59,DD:09:38:B5:ED:8C,,1 +58,60,E4:9C:CC:8C:8F:DA,,1 \ No newline at end of file diff --git a/BadgeFramework/scan_all.py b/BadgeFramework/scan_all.py deleted file mode 100644 index 40a8d4a..0000000 --- a/BadgeFramework/scan_all.py +++ /dev/null @@ -1,24 +0,0 @@ -from bluepy.btle import Scanner, DefaultDelegate - -class ScanDelegate(DefaultDelegate): - def __init__(self): - DefaultDelegate.__init__(self) - - # def handleDiscovery(self, dev, isNewDev, isNewData): - # if isNewDev: - # print "Discovered device", dev.addr - # elif isNewData: - # print "Received new data from", dev.addr - -scanner = Scanner().withDelegate(ScanDelegate()) -devices = scanner.scan(10.0) -device_temp_name = 'HDBDG' -midges = [] - -for dev in devices: - for (adtype, desc, value) in dev.getScanData(): - if desc == 'Complete Local Name' and value == device_temp_name: - midges.append(dev) - -for midge in midges: - print "Device %s, RSSI=%d dB" % (midge.addr, midge.rssi) \ No newline at end of file diff --git a/BadgeFramework/test.py b/BadgeFramework/test.py index d3df97e..db1cb2e 100644 --- a/BadgeFramework/test.py +++ b/BadgeFramework/test.py @@ -3,18 +3,17 @@ import sys import threading import time +from datetime import datetime +import utils +from badge import OpenBadge +from bleak import BleakScanner, BleakClient +import asyncio -from badge import * -from ble_badge_connection import * -from bluepy import * -from bluepy import btle -from bluepy.btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers ,Scanner -from bluepy.btle import BTLEException -def mic_test(badge, mode): +async def mic_test(badge, mode): if (mode==0): - start_stereo = badge.start_microphone(t=None,mode=0) # start mic in stereo mode + start_stereo = await badge.start_microphone(t=None,mode=0) # start mic in stereo mode print(" sensor: mic, stereo mode ") # check if mic was started in stereo mode if (start_stereo.mode==0): @@ -29,22 +28,22 @@ def mic_test(badge, mode): else: print(" mic gain: FAIL ") else: - print(" mic mode: FAIL ") + print(" mic mode: FAIL ") time.sleep(10) # reecording time - mic = badge.get_status() + mic = await badge.get_status() # check if mic was enabled if (mic.microphone_status): - print(" mic enabled: PASS ") + print(" mic enabled: PASS ") else: print(" mic enabled: FAIL ") - badge.stop_microphone() # stop recording - + await badge.stop_microphone() # stop recording + time.sleep(0.5) - mic = badge.get_status() + mic = await badge.get_status() # check if mic was disabled with success if (~mic.microphone_status): print(" mic disabled: PASS ") @@ -55,7 +54,7 @@ def mic_test(badge, mode): print(" sensor: mic, mono mode ") #print("-----------------------------------") - start_mono = badge.start_microphone(t=None,mode=1) + start_mono = await badge.start_microphone(t=None, mode=1) if (start_mono.mode==1): print(" mic mode: PASS ") @@ -70,34 +69,34 @@ def mic_test(badge, mode): print(" mic gain: FAIL ") else: - print(" mic mode: FAIL ") + print(" mic mode: FAIL ") time.sleep(10) # reecording time - mic = badge.get_status() + mic = await badge.get_status() # check if mic was enabled if (mic.microphone_status): - print(" mic enabled: PASS ") + print(" mic enabled: PASS ") else: print(" mic enabled: FAIL ") - badge.stop_microphone() # stop recording - + await badge.stop_microphone() # stop recording + time.sleep(0.5) - mic = badge.get_status() + mic = await badge.get_status() # check if mic was disabled with success if (~mic.microphone_status): print(" mic disabled: PASS ") else: print(" mic disabled: PASS ") -def imu_test(badge): +async def imu_test(badge): print("###################################") print(" sensor: imu ") #print("#-----------------------------------#") - imu_start = badge.start_imu() # start imu + imu_start = await badge.start_imu() # start imu time.sleep(0.1) # safety wait #check if imu self test was done if (imu_start.self_test_done): @@ -105,9 +104,9 @@ def imu_test(badge): else: print(" imu self test: FAIL ") - imu = badge.get_status() # get imu status + imu = await badge.get_status() # get imu status time.sleep(0.1) # safety wait - imu_data = badge.get_imu_data() # get imu data + imu_data = await badge.get_imu_data() # get imu data #print(imu_data) @@ -136,10 +135,10 @@ def imu_test(badge): time.sleep(10) # imu enabled time - badge.stop_imu() + await badge.stop_imu() time.sleep(0.5) # safety wait - imu = badge.get_status() + imu = await badge.get_status() if (~imu.imu_status): print(" imu disabled: PASS ") @@ -149,15 +148,15 @@ def imu_test(badge): -def scan_test(badge): +async def scan_test(badge): print("###################################") print(" sensor: scan ") #print("#-----------------------------------#") - badge.start_scan(window_ms = 250, interval_ms = 1000) ## start scan with default parameters + await badge.start_scan(window_ms = 250, interval_ms = 1000) ## start scan with default parameters time.sleep(10) # scan enabled time - scan = badge.get_status() + scan = await badge.get_status() # check if scan was enabled succesfully if (scan.scan_status): print(" scan enabled: PASS ") @@ -169,19 +168,19 @@ def scan_test(badge): else: print(" scan data: FAIL ") - badge.stop_scan() # stop scan + await badge.stop_scan() # stop scan time.sleep(0.5) # safety wait - scan = badge.get_status() # get scan status + scan =await badge.get_status() # get scan status # check if scan was disabled succesfully if (~scan.scan_status): print(" scan disabled: PASS ") else: print(" scan disabled: FAIL ") -def errase_mem(badge): - errased = badge.sdc_errase_all() # clean sd memory +async def errase_mem(badge): + errased = await badge.sdc_errase_all() # clean sd memory if (errased.done_errase): print(" memory cleaned: PASS ") @@ -198,42 +197,69 @@ def connect(self): return badge -def main(): +async def main(): device_addr = sys.argv[1] - connection = BLEBadgeConnection.get_connection_to_badge(device_addr) - connection.connect() - badge = OpenBadge(connection) - print(" connetced ") - print("###################################") - print(" Midge test ") - - try: - mic_test(badge,0) - except: - print("mic error") - - try: - mic_test(badge,1) - except: - print("mic error") - - try: - scan_test(badge) - except: - print("scan error") - - try: - imu_test(badge) - except: - print("imu error") - - try: - errase_mem(badge) - except: - print("errase memory error") + # In Bleak, my experience is that it is rather prone to disconnection when executing many commands in one + # context manager. Separate each command with context managers increase the rate of success. + + + async with OpenBadge(0, device_addr) as badge: + print(" connetced ") + print("###################################") + print(" Midge test ") + + try: + await mic_test(badge,0) + except: + print("mic error") + + try: + await mic_test(badge,1) + except: + print("mic error") + + try: + await scan_test(badge) + except: + print("scan error") + + try: + await imu_test(badge) + except: + print("imu error") - connection.disconnect() + try: + await errase_mem(badge) + except: + print("errase memory error") + + +async def test_client(): + for i in range(10): + try: + # Measure scanning time + scan_start = time.time() + device = await BleakScanner.find_device_by_address("de:94:80:39:25:be", timeout=10) + scan_time = time.time() - scan_start + + if device is None: + print("Device not found during scan") + continue + + # Measure connection time + print(f"Found device in {scan_time:.2f} seconds") + connect_start = time.time() + async with BleakClient(device, timeout=10) as client: + connect_time = time.time() - connect_start + print(f"Connected in {connect_time:.2f} seconds!") + + # await client.start_notify(utils.RX_CHAR_UUID, callback=lambda sender, data: print(f"Received data: {data}"), timeout=10) + except Exception as e: + total_time = time.time() - scan_start + print(f"Error after {total_time:.2f} seconds: {str(e)}") + if __name__ == "__main__": - main() \ No newline at end of file + # asyncio.run(main()) + asyncio.run(test_client()) \ No newline at end of file diff --git a/BadgeFramework/utils.py b/BadgeFramework/utils.py new file mode 100644 index 0000000..300ab83 --- /dev/null +++ b/BadgeFramework/utils.py @@ -0,0 +1,45 @@ +import uuid +import logging +import pandas +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData +from typing import Optional, Final + +UART_SERVICE_UUID = uuid.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") +TX_CHAR_UUID = uuid.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") +RX_CHAR_UUID = uuid.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") +df_badge = pandas.read_csv("BadgeFramework/mappings2.csv") + + +def get_logger(name): + log_format_file = '%(asctime)s %(levelname)5s %(message)s' + log_format_console = '%(message)s' + logging.basicConfig(level=logging.DEBUG, + format=log_format_file, + filename="data_collection.log", + filemode='w') + console = logging.StreamHandler() + console.setLevel(logging.INFO) + console.setFormatter(logging.Formatter(log_format_console)) + logging.getLogger(name).addHandler(console) + return logging.getLogger(name) + + +def get_mac_address(df, badge_id: int) -> str: + mac_address = df[df['Participant Id'] == badge_id]['Mac Address'] + mac_address = list(mac_address)[0] + return mac_address + + +def is_spcl_midge(device: BLEDevice) -> bool: + return device.name == 'HDBDG' + + +def get_device_id(device: BLEDevice, addr_df: Optional[pandas.DataFrame] = df_badge) -> int: + address = device.address.lower() + try: + badge_id = addr_df[addr_df['Mac Address'] == address]['Participant Id'] + badge_id = badge_id.values[0] + except IndexError: + badge_id = -1000 + return badge_id diff --git a/led/led.c b/led/led.c index 70a6c49..b54d8dd 100644 --- a/led/led.c +++ b/led/led.c @@ -25,9 +25,9 @@ void led_init_success(void) // If initialization was successful, blink the green LED 3 times. for(uint8_t i = 0; i < 3; i++) { nrf_gpio_pin_write(LED, LED_ON); //turn on LED - nrf_delay_ms(100); + nrf_delay_ms(1000); nrf_gpio_pin_write(LED, LED_OFF); //turn off LED - nrf_delay_ms(100); + nrf_delay_ms(1000); } }