diff --git a/checkbox-support/checkbox_support/dbus/gnome_monitor.py b/checkbox-support/checkbox_support/dbus/gnome_monitor.py index 4a5b6c3c7c..6905780b32 100644 --- a/checkbox-support/checkbox_support/dbus/gnome_monitor.py +++ b/checkbox-support/checkbox_support/dbus/gnome_monitor.py @@ -1,6 +1,7 @@ # Copyright 2024 Canonical Ltd. # Written by: # Paolo Gentili +# Zhongning Li # # This is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3, @@ -21,14 +22,186 @@ - https://gitlab.gnome.org/GNOME/mutter/-/blob/main/tools/get-state.py """ -from collections import namedtuple -from typing import Dict, List, Tuple, Set, Callable, Any -from gi.repository import GLib, Gio import itertools - +from collections import OrderedDict +from enum import IntEnum +from typing import ( + Any, + Callable, + Dict, + List, + Mapping, + NamedTuple, + Optional, + Set, +) from checkbox_support.monitor_config import MonitorConfig +from gi.repository import Gio, GLib # type: ignore + + +class Transform(IntEnum): + NORMAL_0 = 0 # landscape + NORMAL_90 = 1 # portrait right + NORMAL_180 = 2 # landscape flipped + NORMAL_270 = 3 # portrait left + + # The following are listed in the xml file + # but they aren't available in gnome control center + # maybe it's only intended for devices with an accelerometer? + FLIPPED_0 = 4 + FLIPPED_90 = 5 + FLIPPED_180 = 6 + FLIPPED_270 = 7 + + +"""A plain 4-tuple with some basic info about the monitor""" +MonitorInfo = NamedTuple( + "MonitorInfo", + [ + ("connector", str), # HDMI-1, eDP-1, ... + ("vendor", str), # vendor string like BOE, Asus, etc. + ("product", str), + ("serial", str), + ], +) + + +# py3.5 can't use inline type annotations, +# otherwise the _*T types should be merged with their no-underscore conterparts +_MutterDisplayModeT = NamedTuple( + "_MutterDisplayModeT", + [ + ("id", str), + ("width", int), + ("height", int), + ("refresh_rate", float), + ("preferred_scale", float), + ("supported_scales", List[float]), + ("properties", Mapping[str, Any]), + ], +) + + +class MutterDisplayMode(_MutterDisplayModeT): + @property + def is_current(self) -> bool: + return self.properties.get("is-current", False) + + @property + def is_preferred(self) -> bool: + return self.properties.get("is-preferred", False) + + @property + def resolution(self) -> str: + """ + Resolution string, makes this class compatible with the Mode type + !! WARNING: This property does not exist on the original dbus object + !! This is only here for code that expects a string, new code should + !! use the width and height numbers + """ + return "{}x{}".format(self.width, self.height) + + +_PhysicalMonitorT = NamedTuple( + "_PhysicalMonitorT", + [ + ("info", MonitorInfo), + ("modes", List[MutterDisplayMode]), + # See: https://gitlab.gnome.org/GNOME/mutter/-/blob/main/data/ + # dbus-interfaces/org.gnome.Mutter.DisplayConfig.xml#L414 + ("properties", Mapping[str, Any]), + ], +) -Mode = namedtuple("Mode", ["id", "resolution", "is_preferred", "is_current"]) + +class PhysicalMonitor(_PhysicalMonitorT): + + @classmethod + def from_variant(cls, v: GLib.Variant): + # not going to do extensive checks here + # since get_current_state already checked + assert len(v) == 3 + return cls( + MonitorInfo(*v[0]), [MutterDisplayMode(*raw) for raw in v[1]], v[2] + ) + + @property + def is_builtin(self) -> bool: + return self.properties.get("is-builtin", False) + + +_LogicalMonitorT = NamedTuple( + "_LogicalMonitorT", + [ + ("x", int), + ("y", int), + ("scale", float), + ("transform", Transform), + ("is_primary", bool), + ("monitors", List[MonitorInfo]), + ("properties", Mapping[str, Any]), + ], +) + + +class LogicalMonitor(_LogicalMonitorT): + @classmethod + def from_variant(cls, v: GLib.Variant): + assert len(v) == 7 + return cls( + *v[0:5], # the first 5 elements are flat, so just spread them + [MonitorInfo(*m) for m in v[5]], # type: ignore + v[6], # type: ignore + ) + + +_MutterDisplayConfigT = NamedTuple( + "_MutterDisplayConfigT", + [ + ("serial", int), + ("physical_monitors", List[PhysicalMonitor]), + ("logical_monitors", List[LogicalMonitor]), + # technically value type is GLib.Variant + ("properties", Mapping[str, Any]), + ], +) + + +class MutterDisplayConfig(_MutterDisplayConfigT): + """The top level object that represents + the return value of the GetCurrentState dbus call + """ + + @classmethod + def from_variant(cls, v: GLib.Variant): + return cls( + v[0], + [PhysicalMonitor.from_variant(physical) for physical in v[1]], + [LogicalMonitor.from_variant(logical) for logical in v[2]], + v[3], + ) + + @property + def supports_mirroring(self) -> bool: + return self.properties.get("supports-mirroring", False) + + @property + def layout_mode(self) -> Optional[int]: + # only 2 possible layouts + # layout-mode = 2 => physical, 1 => logical + # If the key doesn't exist, then layout mode can't be changed + return self.properties.get("layout-mode", None) + + @property + def supports_changing_layout_mode(self) -> bool: + return self.properties.get("supports-changing-layout-mode", False) + + @property + def global_scale_required(self) -> bool: + return self.properties.get("global-scale-required", False) + + +ResolutionFilter = Callable[[List[MutterDisplayMode]], List[MutterDisplayMode]] class MonitorConfigGnome(MonitorConfig): @@ -43,6 +216,9 @@ class MonitorConfigGnome(MonitorConfig): NAME = "org.gnome.Mutter.DisplayConfig" INTERFACE = "org.gnome.Mutter.DisplayConfig" OBJECT_PATH = "/org/gnome/Mutter/DisplayConfig" + CONFIG_VARIANT_TYPE = GLib.VariantType.new( + "(ua((ssss)a(siiddada{sv})a{sv})a(iiduba(ssss)a{sv})a{sv})" + ) def __init__(self): self._proxy = Gio.DBusProxy.new_for_bus_sync( @@ -56,61 +232,78 @@ def __init__(self): ) def get_connected_monitors(self) -> Set[str]: - """Get list of connected monitors, even if inactive.""" - state = self._get_current_state() - return {monitor for monitor in state[1]} + """ + Get the connector name of each connected monitor, even if inactive. + """ + state = self.get_current_state() + return {monitor.info.connector for monitor in state.physical_monitors} def get_current_resolutions(self) -> Dict[str, str]: """Get current active resolutions for each monitor.""" - state = self._get_current_state() - return { - monitor: mode.resolution - for monitor, modes in state[1].items() - for mode in modes - if mode.is_current - } + state = self.get_current_state() + resolution_map = {} # type: dict[str, str] + + for monitor in state.physical_monitors: + for mode in monitor.modes: + if mode.is_current: + resolution_map[monitor.info.connector] = mode.resolution + return resolution_map def set_extended_mode(self) -> Dict[str, str]: """ Set to extend mode so that each monitor can be displayed at preferred, or if missing, maximum resolution. + - This always arranges the displays in a line - :return configuration: ordered list of applied Configuration + :return configuration: ordered dict of applied Configuration """ - state = self._get_current_state() + state = self.get_current_state() extended_logical_monitors = [] - configuration = {} + # [connector] = resolution + configuration = OrderedDict() # type: dict[str, str] position_x = 0 - for monitor, modes in state[1].items(): + for physical_monitor in state.physical_monitors: try: - target_mode = next(mode for mode in modes if mode.is_preferred) + target_mode = next( + mode + for mode in physical_monitor.modes + if mode.is_preferred + ) except StopIteration: - target_mode = self._get_mode_at_max(modes) + target_mode = self._get_mode_at_max(physical_monitor.modes) + + if type(target_mode) is not MutterDisplayMode: + # if something was changed in _get_mode_at_max + raise TypeError("Unexpected mode:", target_mode) + extended_logical_monitors.append( ( - position_x, - 0, - 1.0, - 0, + position_x, # x + 0, # y + 1.0, # scale + Transform.NORMAL_0, position_x == 0, # first monitor is primary - [(monitor, target_mode.id, {})], + # .id is specific to MutterDisplayMode + [(physical_monitor.info.connector, target_mode.id, {})], ) ) - position_x += int(target_mode.resolution.split("x")[0]) - configuration[monitor] = target_mode.resolution + position_x += int(target_mode.width) + configuration[physical_monitor.info.connector] = ( + target_mode.resolution + ) - self._apply_monitors_config(state[0], extended_logical_monitors) + self._apply_monitors_config(state.serial, extended_logical_monitors) return configuration def cycle( self, resolution: bool = True, transform: bool = False, - resolution_filter: Callable[[List[Mode]], List[Mode]] = None, - action: Callable[..., Any] = None, + resolution_filter: Optional[ResolutionFilter] = None, + post_cycle_action: Optional[Callable[..., Any]] = None, **kwargs ): """ @@ -125,76 +318,98 @@ def cycle( it will take List[Mode] as parameter and return the same data type - action: For extra steps for each cycle, + post_cycle_action: Called after each cycle for each monitor, the string is constructed by [monitor name]_[resolution]_[transform]_. Please note that the delay is needed inside this callback to wait the monitors to response + + kwargs: args for post_cycle_action """ - monitors = [] - modes_list = [] - # ["normal": 0, "left": 1, "inverted": 6, "right": 3] - trans_list = [0, 1, 6, 3] if transform else [0] + connectors = [] # type: list[str] + modes_list = [] # type: list[list[MutterDisplayMode]] + trans_list = ( + ( + Transform.NORMAL_0, + Transform.NORMAL_90, + # preserving original behavior in case something depends on it + Transform.FLIPPED_180, + Transform.NORMAL_270, + ) + if transform + else (Transform.NORMAL_0,) + ) + transformation_name_map = { + Transform.NORMAL_0: "normal", + Transform.NORMAL_270: "left", + Transform.FLIPPED_180: "inverted", + Transform.NORMAL_90: "right", + } # for multiple monitors, we need to create resolution combination - state = self._get_current_state() - for monitor, modes in state[1].items(): - monitors.append(monitor) + state = self.get_current_state() + for monitor in state.physical_monitors: + connectors.append(monitor.info.connector) if resolution_filter: - modes_list.append(resolution_filter(modes)) + modes_list.append(resolution_filter(monitor.modes)) else: - modes_list.append(modes) - mode_combination = list(itertools.product(*modes_list)) + modes_list.append(monitor.modes) - for mode in mode_combination: + for combined_mode in itertools.product(*modes_list): for trans in trans_list: logical_monitors = [] position_x = 0 - uni_string = "" - for monitor, m in zip(monitors, mode): + uni_string = "" # unique string for the current monitor state + for connector, mode in zip(connectors, combined_mode): + transformation_str = transformation_name_map[trans] uni_string += "{}_{}_{}_".format( - monitor, - m.resolution, - { - 0: "normal", - 1: "left", - 3: "right", - 6: "inverted", - }.get(trans), + connector, mode.resolution, transformation_str ) logical_monitors.append( ( - position_x, - 0, - 1.0, - trans, - position_x == 0, # first monitor is primary - [(monitor, m.id, {})], + position_x, # x + 0, # y + 1.0, # scale + trans, # rotation + position_x == 0, # make the first monitor primary + [(connector, mode.id, {})], ) ) - # left and right should convert x and y - xy = 1 if (trans == 1 or trans == 3) else 0 - position_x += int(m.resolution.split("x")[xy]) + + print( + "Setting", + connector, + "to mode:", + mode.id, + "transform:", + transformation_str, + flush=True, + ) # checkbox runtime might buffer this, + # force a flush here so it doesn't look frozen + + x_offset = ( + mode.height + if trans in (Transform.NORMAL_90, Transform.NORMAL_270) + else mode.width + ) # left and right should convert x and y + position_x += x_offset # Sometimes the NVIDIA driver won't update the state. # Get the state before applying to avoid this issue. - state = self._get_current_state() - self._apply_monitors_config(state[0], logical_monitors) - if action: - action(uni_string, **kwargs) + state = self.get_current_state() + self._apply_monitors_config(state.serial, logical_monitors) + + if post_cycle_action is not None: + post_cycle_action(uni_string, **kwargs) + + print("-" * 80, flush=True) # just a divider + if not resolution: break # change back to preferred monitor configuration self.set_extended_mode() - def _get_current_state(self) -> Tuple[str, Dict[str, List[Mode]]]: - """ - Using DBus signal 'GetCurrentState' to get the available monitors - and related modes. - - Check the related DBus XML definition for details over the expected - output data format. - """ - state = self._proxy.call_sync( + def get_current_state(self) -> MutterDisplayConfig: + raw = self._proxy.call_sync( method_name="GetCurrentState", parameters=None, flags=Gio.DBusCallFlags.NO_AUTO_START, @@ -202,23 +417,15 @@ def _get_current_state(self) -> Tuple[str, Dict[str, List[Mode]]]: cancellable=None, ) - return ( - state[0], - { - monitor[0][0]: [ - Mode( - mode[0], - "{}x{}".format(mode[1], mode[2]), - mode[6].get("is-preferred", False), - mode[6].get("is-current", False), - ) - for mode in monitor[1] - ] - for monitor in state[1] - }, - ) + if not raw.get_type().equal(self.CONFIG_VARIANT_TYPE): + raise TypeError( + "DBus GetCurrentState returned unexpected type: " + + str(raw.get_type()) + ) + + return MutterDisplayConfig.from_variant(raw) - def _apply_monitors_config(self, serial: str, logical_monitors: List): + def _apply_monitors_config(self, serial: int, logical_monitors: List): """ Using DBus signal 'ApplyMonitorsConfig' to apply the given monitor configuration. diff --git a/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py b/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py index 10526ec291..9b878a55b9 100644 --- a/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py +++ b/checkbox-support/checkbox_support/dbus/tests/test_gnome_monitor.py @@ -8,13 +8,33 @@ sys.modules["gi"] = MagicMock() sys.modules["gi.repository"] = MagicMock() -from gi.repository import GLib, Gio +from gi.repository import GLib, Gio # type: ignore from checkbox_support.dbus.gnome_monitor import MonitorConfigGnome class MonitorConfigGnomeTests(unittest.TestCase): """This class provides test cases for the MonitorConfig DBus class.""" + class MockGetCurrentStateReturnValue: + + class MockGLibType: + def __init__(self, rv): + self.rv = rv + + def equal(self, o): + return self.rv + + def __init__(self, t, gtrv=True): + self.t = t + self.gtrv = gtrv + super() + + def __getitem__(self, k): + return self.t[k] + + def get_type(self): + return self.MockGLibType(self.gtrv) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") def test_get_connected_monitors(self, mock_dbus_proxy): """ @@ -26,7 +46,7 @@ def test_get_connected_monitors(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -75,6 +95,9 @@ def test_get_connected_monitors(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) monitors = gnome_monitor.get_connected_monitors() self.assertSetEqual(monitors, {"eDP-1", "HDMI-1"}) @@ -89,7 +112,8 @@ def test_get_current_resolution(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + + raw = ( 1, [ ( @@ -138,11 +162,25 @@ def test_get_current_resolution(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) resolutions = gnome_monitor.get_current_resolutions() self.assertEqual( resolutions, {"eDP-1": "1920x1200", "HDMI-1": "2560x1440"} ) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") + def test_bad_input_type(self, mock_dbus_proxy): + mock_proxy = Mock() + mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy + + gnome_monitor = MonitorConfigGnome() + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(tuple(), False) + ) + self.assertRaises(TypeError, gnome_monitor.get_current_state) + @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") def test_set_extended_mode(self, mock_dbus_proxy): """ @@ -155,7 +193,7 @@ def test_set_extended_mode(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -204,6 +242,9 @@ def test_set_extended_mode(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) configuration = gnome_monitor.set_extended_mode() logical_monitors = [ @@ -243,7 +284,8 @@ def test_cycle(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + + raw = ( 1, [ ( @@ -292,6 +334,9 @@ def test_cycle(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) gnome_monitor.cycle() logical_monitors = [ @@ -318,7 +363,10 @@ def test_cycle(self, mock_dbus_proxy): ) @patch("checkbox_support.dbus.gnome_monitor.Gio.DBusProxy") - def test_cycle_no_cycling(self, mock_dbus_proxy): + def test_cycle_no_cycling( + self, + mock_dbus_proxy, + ): """ Test the cycle could get the right monitors configuration (without res and transform change) and send to ApplyMonitorsConfig. @@ -328,7 +376,7 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): mock_dbus_proxy.new_for_bus_sync.return_value = mock_proxy gnome_monitor = MonitorConfigGnome() - mock_proxy.call_sync.return_value = ( + raw = ( 1, [ ( @@ -377,13 +425,20 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): [], {}, ) + mock_proxy.call_sync.return_value = ( + self.MockGetCurrentStateReturnValue(raw) + ) # mock callback - mock_callback = MagicMock() + mock_resolution_filter = MagicMock() + mock_resolution_filter.side_effect = ( + lambda x: x + ) # keep the real mode values + mock_post_cycle_action = MagicMock() gnome_monitor.cycle( resolution=False, transform=False, - resoultion_filter=mock_callback, - action=mock_callback, + resolution_filter=mock_resolution_filter, + post_cycle_action=mock_post_cycle_action, ) logical_monitors = [ @@ -408,8 +463,12 @@ def test_cycle_no_cycling(self, mock_dbus_proxy): timeout_msec=-1, cancellable=None, ) - argument_string = mock_callback.call_args[0][0] + argument_string = mock_post_cycle_action.call_args[0][0] p1 = "HDMI-1_2560x1440_normal_" p2 = "eDP-1_1920x1200_normal_" pattern = re.compile("{}{}|{}{}".format(p1, p2, p2, p1)) assert pattern.match(argument_string) + + +if __name__ == "__main__": + unittest.main()