diff --git a/checkbox-support/checkbox_support/camera_pipelines.py b/checkbox-support/checkbox_support/camera_pipelines.py new file mode 100644 index 0000000000..51fe4e5bd2 --- /dev/null +++ b/checkbox-support/checkbox_support/camera_pipelines.py @@ -0,0 +1,436 @@ +# This file is part of Checkbox. +# +# Copyright 2025 Canonical Ltd. +# Written by: +# Zhongning Li +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see . + + +from collections import OrderedDict +import tempfile +import gi +import typing as T +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) +logging.basicConfig( + format="%(asctime)s %(levelname)s - %(message)s\n", + datefmt="%m/%d %H:%M:%S", +) +logger.setLevel(logging.DEBUG) + +from gi.repository import GObject # type: ignore # noqa: E402 + +Gtk = None + +gi.require_version("Gst", "1.0") +from gi.repository import Gst # type: ignore # noqa: E402 + +gi.require_version("GLib", "2.0") +from gi.repository import GLib # type: ignore # noqa: E402 + +TimeoutCallback = T.Callable[[], None] +PipelineQuitHandler = T.Callable[[Gst.Message], bool] + + +def get_launch_line(device: Gst.Device) -> T.Optional[str]: + """Get the gst-device-monitor launch line for a device. + - Useful for pipelines that don't need to do anything special + while the pipeline is running + - This basically re-implements the one in the cli + https://github.com/GStreamer/gst-plugins-base/blob/master/tools/gst-device-monitor.c#L46 # noqa: E501 + + :param device: the device given by Gst.DeviceMonitor + :return: the gst-launch-1.0 launchline. Note that this only starts with the + element name, not "gst-launch-1.0" like you would see in the cli + """ + ignored_prop_names = set( + ["name", "parent", "direction", "template", "caps"] + ) # type: set[str] + element = device.create_element() + if element is None: + return None + + factory = element.get_factory() + if factory is None: + return None + + factory_name = factory.get_name() + if factory_name is None: + return None + + pure_element = Gst.ElementFactory.make(factory_name, None) + if pure_element is None: + return None + + launch_line_components = [factory_name] # type: list[str] + for prop in element.list_properties(): + if prop.name in ignored_prop_names: + continue + # eliminate all default properties and non-read-writable props + read_and_writable = ( + prop.flags & GObject.PARAM_READWRITE == GObject.PARAM_READWRITE + ) + if not read_and_writable: + continue + + default_value = pure_element.get_property(prop.name) + actual_value = element.get_property(prop.name) + + if ( + actual_value is not None + and default_value is not None + and Gst.value_compare(default_value, actual_value) + == Gst.VALUE_EQUAL + ): + continue + + if actual_value is None: + continue + + # now we only have the non-default, non-null values + try: + serialized_value = Gst.value_serialize(actual_value) + except Exception: + # unserializable values sometimes can throw + # such as non-int32 integers + continue + if not serialized_value: + continue # ignore non-serializable ones + + launch_line_components.append( + "{}={}".format(prop.name, serialized_value) + ) + + # example: pipewiresrc target-object=49 + return " ".join(launch_line_components) + + +def elem_to_str( + element: Gst.Element, exclude: T.List[str] = ["parent", "client-name"] +) -> str: + """Prints an element to string + + :param element: GStreamer element + :param exclude: property names to exclude + :return: String representation. This is a best guess for debug purposes, + not 100% accurate since there can be arbitrary objects in properties. + """ + properties = element.list_properties() # list[GObject.GParamSpec] + element_name = element.get_factory().get_name() # type: ignore + + prop_strings = [] # type: list[str] + + for prop in properties: + if prop.name in exclude: + continue + + try: + prop_value = element.get_property(prop.name) + except TypeError: + logger.debug( + "Property {} is unreadable in {}, ignored.".format( + prop.name, element_name + ) # not every property is readable, ignore unreadable ones + ) + continue + + if prop_value is None: + continue + + try: + serialized_value = Gst.value_serialize(prop_value) + except Exception: + # unserializable values sometimes can throw + # such as non-int32 integers + continue + if not serialized_value: + continue + + prop_strings.append( + "{}={}".format(prop.name, serialized_value) + ) # handle native python types + + return "{} {}".format( + element_name, " ".join(prop_strings) + ) # libcamerasrc name=cam_name location=p.jpeg + + +def gst_msg_handler( + _: Gst.Bus, + msg: Gst.Message, + pipeline: Gst.Pipeline, + custom_quit_handler: T.Optional[PipelineQuitHandler], + loop: GLib.MainLoop, + timeout_sources: T.List[GLib.Source] = [], +): + should_quit = False + + if custom_quit_handler: + # has the lowest precedence, ERROR and EOS will always take over + should_quit = custom_quit_handler(msg) + + if msg.type == Gst.MessageType.WARNING: + logger.warning(Gst.Message.parse_warning(msg)) + + if msg.type == Gst.MessageType.EOS: + logger.debug("Received EOS.") + should_quit = True + + if msg.type == Gst.MessageType.ERROR: + logger.error( + "Pipeline encountered an error, stopping. " + + str(Gst.Message.parse_error(msg)) + ) + should_quit = True + + if should_quit: + for timeout in timeout_sources: + # if the pipeline is terminated early, remove all timers asap + # because loop.quit() won't remove/stop those + # that are already scheduled => segfault (EOS on null pipeline) + # See: https://docs.gtk.org/glib/method.MainLoop.quit.html + timeout.destroy() + + # NOTE: setting NULL can be slow on certain encoders + # NOTE: it's also possible to block infinitely here + pipeline.set_state(Gst.State.NULL) + loop.quit() + + +def run_pipeline( + pipeline: Gst.Pipeline, + run_n_seconds: T.Optional[int] = None, + intermediate_calls: T.List[T.Tuple[int, TimeoutCallback]] = [], + custom_quit_handler: T.Optional[PipelineQuitHandler] = None, +): + """Run a GStreamer pipeline and handle Gst messages (blocking) + + :param pipeline: the pipeline to run + :param run_n_seconds: Number of seconds to run the pipeline before + sending EOS, defaults to None + - If None, only register the EOS handler + :param intermediate_calls: functions to run while the pipeline is running + - Each element is a (delay, callback) tuple + - Delay is the number of seconds to wait + (relative to the start of the pipeline) before calling the callback + :param custom_quit_handler: quit the pipeline if this function returns true + - Has lowest precedence + :raises RuntimeError: if the source element did not transition to playing + state in 5s after set_state(PLAYING) is called + """ + loop = GLib.MainLoop() + timeout_sources = [] # type: list[GLib.Source] + + if run_n_seconds is not None and run_n_seconds < 1: + raise ValueError("run_n_seconds must be >= 1 if specified") + + if run_n_seconds: + + def send_eos(): + logger.debug("Sending EOS.") + pipeline.send_event(Gst.Event.new_eos()) + + eos_timeout_id = GLib.timeout_add_seconds(run_n_seconds, send_eos) + # get the actual source object, so we can call .destroy() later. + # Removing a timeout by id will cause warnings if it doesn't exist, + # but destroying an already destroyed source is ok + # See: https://docs.gtk.org/glib/method.Source.destroy.html + # and: https://docs.gtk.org/glib/type_func.Source.remove.html + timeout_sources.append( + loop.get_context().find_source_by_id(eos_timeout_id) + ) + + for delay, intermediate_call in intermediate_calls: + if run_n_seconds is not None and delay > run_n_seconds: + raise ValueError( + "Delay for each call must be smaller than total run seconds, " + " (Got delay = {} for {}, run_n_seconds = {})".format( + delay, intermediate_call.__name__, run_n_seconds + ) + ) + + timeout_id = GLib.timeout_add_seconds(delay, intermediate_call) + timeout_sources.append( + loop.get_context().find_source_by_id(timeout_id) + ) + + bus = pipeline.get_bus() + bus.add_signal_watch() + bus.connect( + "message", + gst_msg_handler, + pipeline, + custom_quit_handler, + loop, + timeout_sources, + ) + + pipeline.set_state(Gst.State.PLAYING) + + # this does not necessarily mean that the pipeline has the PLAYING state + # it just means that set_state didn't hang + logger.info("[ OK ] Pipeline is playing!") + loop.run() + + +def msg_is_multifilesink_save(msg: Gst.Message) -> bool: + """Returns true when multifilesink saves a buffer + + :param msg: the GstMessage object + :return: whether msg is a multifilesink save message + """ + if msg.type == Gst.MessageType.ELEMENT: + struct = msg.get_structure() + return ( + struct is not None + and struct.get_name() == "GstMultiFileSink" + and struct.has_field("filename") + ) + else: + return False + + +def take_photo( + source: Gst.Element, + file_path: Path, + delay_seconds: int, + caps: T.Optional[Gst.Caps] = None, +): + """Take a photo using the source element + + :param source: The camera source element + :param caps: Which capability to use for the source + - If None, no caps filter will be inserted between source and decoder + :param file_path: the path to the photo + :param delay_seconds: number of seconds to keep the source "open" + before taking the photo + """ + + # dict order is not guaranteed on python < 3.7 + str_elements = OrderedDict( + ( + ("caps", 'capsfilter name=source-caps caps="{}"'), + ("decoder", "decodebin"), + ("converter", "videoconvert name=converter"), + ("photo-valve", "valve name=photo-valve drop=True"), + ("encoder", "jpegenc"), + ( + "sink", + "multifilesink post-messages=True location={}".format( + str(file_path) + ), + ), + ) + ) + head_elem_name = "source-caps" + + # using empty string as null values here + # they are filtered out at parse_launch + if caps: + if not caps.is_fixed(): + raise ValueError('"{}" is not fixed.'.format(caps.to_string())) + + str_elements["caps"] = str_elements["caps"].format(caps.to_string()) + mime_type = caps.get_structure(0).get_name() + + if mime_type == "image/jpeg": + # decodebin has a clock problem with pipewiresrc + # that outputs image/jpeg + str_elements["decoder"] = "jpegdec" + elif mime_type == "video/x-raw": + # don't need a decoder for raw + str_elements["decoder"] = "" + elif mime_type == "video/x-bayer": + # bayer2rgb is not considered a decoder + # so decodebin can't automatically find this + str_elements["decoder"] = "bayer2rgb" + # else case is using decodebin as a fallback + else: + # decode bin doesn't work with video/x-raw + str_elements["caps"] = str_elements["decoder"] = "" + head_elem_name = "converter" + + delay_seconds = max(delay_seconds, 0) + if delay_seconds == 0: + str_elements["photo-valve"] = "" + + partial = " ! ".join(elem for elem in str_elements.values() if elem) + pipeline = Gst.parse_launch(partial) + + if type(pipeline) is not Gst.Pipeline: + raise TypeError( + "Unexpected return type from parse_launch: Got {}".format( + type(pipeline) + ) + ) + + head_elem = pipeline.get_by_name(head_elem_name) + + # parse the partial pipeline, then get head element by name + # NOTE: this assertion only applies to the default python binding + # if the python3-gst-1.0 package is installed, .add() always return None + if not pipeline.add(source): + raise RuntimeError( + "Could not add source element {} to the pipeline".format( + elem_to_str(source) + ) + ) + + if not head_elem or not source.link(head_elem): + raise RuntimeError( + "Could not link source element to {}".format(head_elem) + ) + + if delay_seconds == 0: + intermediate_calls = [] + logger.info( + "Created photo pipeline with no delay. " + + '"{} ! {}"'.format(elem_to_str(source), partial) + ) + else: + valve = pipeline.get_by_name("photo-valve") + assert valve + + def open_valve(): + logger.debug("Opening valve!") + valve.set_property("drop", False) + + intermediate_calls = [(delay_seconds, open_valve)] + logger.info( + "Created photo pipeline with {} second delay. ".format( + delay_seconds + ) + + '"{} ! {}"'.format(elem_to_str(source), partial) + ) + + run_pipeline( + pipeline, + intermediate_calls=intermediate_calls, + custom_quit_handler=msg_is_multifilesink_save, + ) + + # NOTE: reaching here just means the pipeline successfully stopped + # not necessarily stopped gracefully + + logger.info( + "[ OK ] Photo pipeline for this capability: {}".format( + caps.to_string() if caps else "device default" + ) + + " has finished!" + ) + + # unparent the source, so that this function can be called again + source.unparent() diff --git a/checkbox-support/checkbox_support/tests/test_camera_pipelines.py b/checkbox-support/checkbox_support/tests/test_camera_pipelines.py new file mode 100644 index 0000000000..efab34b06c --- /dev/null +++ b/checkbox-support/checkbox_support/tests/test_camera_pipelines.py @@ -0,0 +1,600 @@ +from contextlib import suppress +import unittest as ut +from unittest.mock import MagicMock, patch +import sys +from pathlib import Path + +sys.modules["gi"] = MagicMock() +sys.modules["gi.repository"] = MagicMock() +import checkbox_support.camera_pipelines as cam # noqa: E402 + + +class TestPipelineLogic(ut.TestCase): + def test_run_pipeline_assertions(self): + pipeline = MagicMock() + + self.assertRaises(ValueError, lambda: cam.run_pipeline(pipeline, 0)) + + self.assertRaises( + ValueError, + lambda: cam.run_pipeline( + pipeline, + 5, + [(1, lambda: None), (0, lambda: None), (6, lambda: None)], + ), + ) + + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.GLib") + @patch("checkbox_support.camera_pipelines.Gst") + def test_run_pipeline_happy_path( + self, mock_Gst: MagicMock, mock_GLib: MagicMock, mock_logger + ): + pipeline = MagicMock() + + mock_timeout_sources = (MagicMock(), MagicMock(), MagicMock()) + mock_open_valve_fn = MagicMock(name="mock_open_valve") + mock_eos_signal_obj = MagicMock() + mock_eos_message = MagicMock(type=mock_Gst.MessageType.EOS) + mock_main_loop = MagicMock() + + mock_GLib.timeout_add_seconds.side_effect = mock_timeout_sources + pipeline.get_child_by_index(0).get_state.return_value = ( + mock_Gst.StateChangeReturn.SUCCESS, + ) + mock_Gst.Event.new_eos.return_value = mock_eos_signal_obj + + cam.run_pipeline( + pipeline, + 5, + intermediate_calls=[(3, mock_open_valve_fn)], + ) + + self.assertEqual( + mock_GLib.timeout_add_seconds.call_count, + # -1 because the last one is in the msg handler + len(mock_timeout_sources) - 1, + ) + + # first call, first pair, 2nd element + real_eos_handler = mock_GLib.timeout_add_seconds.call_args_list[0][0][ + 1 + ] + real_eos_handler() + pipeline.send_event.assert_called_with(mock_eos_signal_obj) + + # now pretend eos has been triggered + + cam.gst_msg_handler( + MagicMock(), + mock_eos_message, + pipeline, + None, # no custom_quit handler + mock_main_loop, + mock_timeout_sources, # type: ignore + ) + pipeline.set_state.assert_called_with(mock_Gst.State.NULL) + for mock_timeout in mock_timeout_sources: + # check everything has been destroyed + mock_timeout.destroy.assert_called_once_with() + + self.assertTrue(mock_main_loop.quit.called) + + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_x_raw( + self, mock_Gst: MagicMock, mock_run_pipeline, mock_logger + ): + mock_caps = MagicMock() + mock_caps.to_string.return_value = "video/x-raw,width=1280,height=720" + # video/x-raw doesn't need a decoder + mock_caps.get_structure(0).get_name.return_value = "video/x-raw" + with suppress(TypeError): + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=2, # with delay, valve should be inserted + ) + # -1 is taking the most recent call + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="video/x-raw,width=1280,height=720"', + "videoconvert name=converter", + "valve name=photo-valve drop=True", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + with suppress(TypeError): + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=0, # no delay -> no valve + ) + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="video/x-raw,width=1280,height=720"', + "videoconvert name=converter", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + @patch("checkbox_support.camera_pipelines.type") + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_image_jpeg( + self, + mock_Gst: MagicMock, + mock_run_pipeline, + mock_logger, + mock_type: MagicMock, + ): + mock_caps = MagicMock() + # jpeg caps should be handled by jpegdec + mock_caps.to_string.return_value = "image/jpeg,width=1280,height=720" + mock_caps.get_structure(0).get_name.return_value = "image/jpeg" + + mock_type.return_value = mock_Gst.Pipeline + + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=0, # no delay -> no valve + ) + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="image/jpeg,width=1280,height=720"', + "jpegdec", + "videoconvert name=converter", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="image/jpeg,width=1280,height=720"', + "jpegdec", + "videoconvert name=converter", + "valve name=photo-valve drop=True", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + @patch("checkbox_support.camera_pipelines.type") + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_x_bayer( + self, + mock_Gst: MagicMock, + mock_run_pipeline, + mock_logger, + mock_type: MagicMock, + ): + mock_caps = MagicMock() + mock_caps.to_string.return_value = ( + "video/x-bayer,width=1280,height=720,format=rggb" + ) + mock_caps.get_structure(0).get_name.return_value = "video/x-bayer" + mock_type.return_value = mock_Gst.Pipeline + + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="video/x-bayer,width=1280,height=720,format=rggb"', + "bayer2rgb", # this element should be inserted for bayer + "videoconvert name=converter", + "valve name=photo-valve drop=True", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + @patch("checkbox_support.camera_pipelines.type") + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_no_caps( + self, + mock_Gst: MagicMock, + mock_run_pipeline, + mock_logger, + mock_type: MagicMock, + ): + mock_type.return_value = mock_Gst.Pipeline + cam.take_photo( + MagicMock(), + caps=None, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "videoconvert name=converter", + "valve name=photo-valve drop=True", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + cam.take_photo( + MagicMock(), + caps=None, + file_path=Path("some/path"), + delay_seconds=0, + ) + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "videoconvert name=converter", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + @patch("checkbox_support.camera_pipelines.type") + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_unknown_caps( + self, + mock_Gst: MagicMock, + mock_run_pipeline, + mock_logger, + mock_type: MagicMock, + ): + mock_caps = MagicMock() + mock_caps.to_string.return_value = ( + "video/x-theora,width=1280,height=720" + ) + mock_caps.get_structure(0).get_name.return_value = "video/x-theora" + mock_type.return_value = mock_Gst.Pipeline + cam.take_photo( + MagicMock(), + caps=mock_caps, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + + parse_launch_arg = mock_Gst.parse_launch.call_args_list[-1][0][0] + self.assertEqual( + parse_launch_arg, + " ! ".join( + [ + "capsfilter name=source-caps " + 'caps="video/x-theora,width=1280,height=720"', + "decodebin", + "videoconvert name=converter", + "valve name=photo-valve drop=True", + "jpegenc", + "multifilesink post-messages=True location=some/path", + ] + ), + ) + + @patch("checkbox_support.camera_pipelines.type") + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.run_pipeline") + @patch("checkbox_support.camera_pipelines.Gst") + def test_pipeline_build_step_runtime_checks( + self, + mock_Gst: MagicMock, + mock_run_pipeline, + mock_logger, + mock_type: MagicMock, + ): + mock_type.return_value = mock_Gst.Pipeline + mock_pipeline = MagicMock() + mock_Gst.parse_launch.return_value = mock_pipeline + mock_pipeline.add.return_value = False + + with self.assertRaises(RuntimeError): + cam.take_photo( + MagicMock(), + caps=None, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + mock_pipeline.reset_mock() + mock_pipeline.add.return_value = True + mock_source = MagicMock() + mock_source.link.return_value = False + with self.assertRaises(RuntimeError): + cam.take_photo( + mock_source, + caps=None, + file_path=Path("some/path"), + delay_seconds=3, # with delay + ) + + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.Gst") + def test_custom_quit_has_lowest_precedence( + self, mock_Gst: MagicMock, mock_logger: MagicMock + ): + mock_message = MagicMock() + mock_message.type = mock_Gst.MessageType.ERROR + mock_loop = MagicMock() + + cam.gst_msg_handler( + MagicMock(), + mock_message, + MagicMock(), + lambda *args: False, # always quit when there's an error msg + # even if the handler says no + mock_loop, + [], + ) + self.assertTrue(mock_loop.quit.called) + + # warnings should be produced + mock_loop.reset_mock() + mock_message.type = mock_Gst.MessageType.WARNING + cam.gst_msg_handler( + MagicMock(), + mock_message, + MagicMock(), + cam.msg_is_multifilesink_save, # no error/EOS + mock_loop, + [], + ) + self.assertTrue(mock_logger.warning.called) + + # Now suppose we have a random element message + mock_loop.reset_mock() + mock_message.type = mock_Gst.MessageType.ELEMENT + + cam.gst_msg_handler( + MagicMock(), + mock_message, + MagicMock(), + cam.msg_is_multifilesink_save, + mock_loop, + [], + ) + self.assertFalse(mock_loop.quit.called) + + # finally the multifilesink save message + + mock_loop.reset_mock() + mock_message.type = mock_Gst.MessageType.ELEMENT + mock_struct = MagicMock() + mock_message.get_structure.return_value = mock_struct + mock_struct.get_name.return_value = "GstMultiFileSink" + mock_struct.has_field.return_value = True + cam.gst_msg_handler( + MagicMock(), + mock_message, + MagicMock(), + cam.msg_is_multifilesink_save, + mock_loop, + [], + ) + self.assertTrue(mock_loop.quit.called) + + +class TestUtilFunctions(ut.TestCase): + class MockElement: + name = "someelement0" + some_int_value = 1 + unreadable = "reading this will raise an exception" + # above properties should be printed + + def list_properties(self): + props = [] + for p in dir(self): + if not p.startswith("__") and not callable(getattr(self, p)): + mock_prop = MagicMock() + mock_prop.name = p + props.append(mock_prop) + return props + + def get_factory(self): + mock = MagicMock() + mock.get_name.return_value = "someelement" + return mock + + def get_property(self, prop_name: str): + if prop_name == "unreadable": + raise TypeError("unreadable prop") + return getattr(self, prop_name) + + @patch("checkbox_support.camera_pipelines.Gst") + def test_get_launch_line_null_and_exceptionchecks( + self, mock_Gst: MagicMock + ): + device = MagicMock() + device.create_element.return_value = None + self.assertIsNone(cam.get_launch_line(device)) + + mock_elem = MagicMock() + device.create_element.return_value = mock_elem + mock_elem.get_factory.return_value = None + self.assertIsNone(cam.get_launch_line(device)) + + mock_factory = MagicMock() + mock_elem.get_factory.return_value = mock_factory + mock_factory.get_name.return_value = None + self.assertIsNone(cam.get_launch_line(device)) + + mock_factory.get_name.return_value = "someelement" + mock_Gst.ElementFactory.make.return_value = None + self.assertIsNone(cam.get_launch_line(device)) + + @patch("checkbox_support.camera_pipelines.GObject") + @patch("checkbox_support.camera_pipelines.Gst") + def test_get_launch_line_happy_path( + self, mock_Gst: MagicMock, mock_GObject: MagicMock + ): + device = MagicMock() + mock_elem = MagicMock() + mock_factory = MagicMock() + mock_pure_elem = MagicMock() + device.create_element.return_value = mock_elem + mock_elem.get_factory.return_value = mock_factory + mock_factory.get_name.return_value = "someelement" + mock_Gst.ElementFactory.make.side_effect = ( + lambda name, _: name == "someelement" and mock_pure_elem + ) + mock_GObject.PARAM_READWRITE = 1 + + prop1 = MagicMock() + prop1.name = "prop1" + + ignored_prop = MagicMock() + ignored_prop.name = "parent" + + unreadable_prop = MagicMock() + unreadable_prop.name = "unreadable" + unreadable_prop.flags.__and__.return_value = ( + False # can be anything != PARAM_READWRITE + ) + + unserializable_prop = MagicMock() + unserializable_prop.name = "unserializable_prop" + unserializable_value = "unserializable_value" + unserializable_prop.flags.__and__.return_value = True + + panic_prop = MagicMock() + panic_prop.name = "panic_prop" + panic_value = "something that causes serialize to panic" + + def panic(v): + if v == panic_value: + raise ValueError(panic_value) + if v == unserializable_value: + return None + return v + + mock_Gst.value_serialize.side_effect = panic + + mock_elem.list_properties.return_value = [ + prop1, + ignored_prop, + unreadable_prop, + unserializable_prop, + panic_prop, + ] + prop1.flags.__and__.return_value = mock_GObject.PARAM_READWRITE + # this check will pass even if the value causes panic + panic_prop.flags.__and__.return_value = mock_GObject.PARAM_READWRITE + + mock_pure_elem.get_property.return_value = 0 + + def dummy_values(prop_name): + if prop_name == unserializable_prop.name: + return unserializable_value + elif prop_name == panic_prop.name: + return panic_value + else: + return 1 + + mock_elem.get_property.side_effect = dummy_values + mock_Gst.value_compare.side_effect = lambda x, y: x == y + mock_Gst.VALUE_EQUAL = True + + self.assertEqual(cam.get_launch_line(device), "someelement prop1=1") + + @patch("checkbox_support.camera_pipelines.logger") + @patch("checkbox_support.camera_pipelines.Gst") + def test_elem_to_str( + self, + mock_Gst: MagicMock, + mock_logger: MagicMock, + ): + mock_Gst.value_serialize = lambda x: x + elem = self.MockElement() # type: cam.Gst.Element # type: ignore + self.assertEqual( + cam.elem_to_str(elem), + "someelement name=someelement0 some_int_value=1", + ) + + elem2 = self.MockElement() # type: cam.Gst.Element # type: ignore + setattr(elem2, "parent", "someparentelem") + self.assertEqual( # parent should be omitted + cam.elem_to_str(elem2), + "someelement name=someelement0 some_int_value=1", + ) + + elem3 = self.MockElement() # type: cam.Gst.Element # type: ignore + setattr(elem3, "parent", "someparentelem") + # unserializable value + mock_Gst.value_serialize = lambda x: ( + x is None if x == elem3.some_int_value else x # type: ignore + ) + self.assertEqual( # parent should be omitted + cam.elem_to_str(elem3), + "someelement name=someelement0", + ) + + elem4 = self.MockElement() + setattr(elem4, "parent", "someparentelem") + + # panic value + def panic(v): + if v == elem4.some_int_value: + raise ValueError() + else: + return v + + mock_Gst.value_serialize = panic + self.assertEqual( + cam.elem_to_str(elem3), + "someelement name=someelement0", + ) + + +if __name__ == "__main__": + ut.main()