diff --git a/software/control/camera_andor.py b/software/control/camera_andor.py new file mode 100644 index 00000000..5921ff52 --- /dev/null +++ b/software/control/camera_andor.py @@ -0,0 +1,446 @@ +import time +import numpy as np +import threading +import os + +import pyAndorSDK3 +from pyAndorSDK3 import AndorSDK3 +from control._def import * +import squid.logging + + +# For using in Windows only +package_path = os.path.dirname(pyAndorSDK3.__file__) +library_path = os.path.join(package_path, "libs", "Windows", "64") +pyAndorSDK3.utils.add_library_path(library_path) + + +def get_sn_by_model(model_name): + pass + + +class Camera(object): + def __init__( + self, sn=None, resolution=(2048, 2048), is_global_shutter=False, rotate_image_angle=None, flip_image=None + ): + self.log = squid.logging.get_logger(self.__class__.__name__) + self.cam = None + self.exposure_time = 1 # ms + self.analog_gain = 0 + self.is_streaming = False + self.pixel_format = None + self.is_color = False + self.available_pixel_formats = ["MONO12", "MONO16"] + + self.frame_ID = -1 + self.frame_ID_software = -1 + self.frame_ID_offset_hardware_trigger = 0 + self.timestamp = 0 + self.trigger_mode = None + + self.strobe_delay_us = None + self.line_rate = None # in us + + self.image_locked = False + self.current_frame = None + self.callback_is_enabled = False + self.new_image_callback_external = None + self.stop_waiting = False + self.buffer_queue = [] + + self.GAIN_MAX = 0 + self.GAIN_MIN = 0 + self.GAIN_STEP = 0 + self.EXPOSURE_TIME_MS_MIN = 0.01 + self.EXPOSURE_TIME_MS_MAX = 30000.0 + + self.rotate_image_angle = rotate_image_angle + self.flip_image = flip_image + self.is_global_shutter = is_global_shutter + + self.ROI_offset_x = 0 + self.ROI_offset_y = 0 + self.ROI_width = resolution[0] + self.ROI_height = resolution[1] + + self.OffsetX = 0 + self.OffsetY = 0 + self.Width = resolution[0] + self.Height = resolution[1] + + self.WidthMax = resolution[0] + self.HeightMax = resolution[1] + + def open(self, index=0): + sdk3 = AndorSDK3() + self.cam = sdk3.GetCamera(index) + self.cam.open() + self._initialize_camera() + self.log.info(f"Andor Camera opened. SN: {self.cam.SerialNumber}") + + def open_by_sn(self, sn): + self.open() + + def close(self): + if self.is_streaming: + self.stop_streaming() + + self.disable_callback() + + if self.cam is not None: + self.cam.close() + self.cam = None + + def _initialize_camera(self): + if self.cam is None: + return + # Get exposure time limits + try: + self.EXPOSURE_TIME_MS_MIN = self.cam.min_ExposureTime * 1000 # convert to ms + self.EXPOSURE_TIME_MS_MAX = self.cam.max_ExposureTime * 1000 # convert to ms + self.log.info(f"exposure min: {self.EXPOSURE_TIME_MS_MIN}, max: {self.EXPOSURE_TIME_MS_MAX}") + except: + self.log.error("Could not determine exposure time limits") + + try: + self.line_rate = 1 / self.cam.LineScanSpeed * 1000000 + self.log.info(f"line rate: {self.line_rate} us") + except: + self.log.error("Could not determine line rate") + raise + + def set_callback(self, function): + self.new_image_callback_external = function + + def enable_callback(self): + if self.callback_is_enabled: + return + + if not self.is_streaming: + self.start_streaming() + + self.stop_waiting = False + self.callback_thread = threading.Thread(target=self._wait_and_callback) + self.callback_thread.start() + + self.callback_is_enabled = True + + def _wait_and_callback(self): + while True: + if self.stop_waiting: + break + try: + image = self.read_frame() + if image is not False: + self._on_new_frame(image) + except Exception as e: + self.log.warning(f"Error waiting for frame: {e}") + time.sleep(0.01) + + def _on_new_frame(self, image): + if self.image_locked: + self.log.warning("Last image is still being processed; a frame is dropped") + return + + self.current_frame = image + + self.frame_ID_software += 1 + self.frame_ID += 1 + + # Frame ID for hardware triggered acquisition + if self.trigger_mode == TriggerMode.HARDWARE: + if self.frame_ID_offset_hardware_trigger is None: + self.frame_ID_offset_hardware_trigger = self.frame_ID + self.frame_ID = self.frame_ID - self.frame_ID_offset_hardware_trigger + + self.timestamp = time.time() + self.new_image_callback_external(self) + + def disable_callback(self): + if not self.callback_is_enabled: + return + + was_streaming = self.is_streaming + if self.is_streaming: + self.stop_streaming() + + self.stop_waiting = True + time.sleep(0.2) + if hasattr(self, "callback_thread"): + self.callback_thread.join() + del self.callback_thread + self.callback_is_enabled = False + + if was_streaming: + self.start_streaming() + + def set_analog_gain(self, gain): + pass + + def set_exposure_time(self, exposure_time): + if self.trigger_mode == TriggerMode.SOFTWARE: + exposure_time_s = exposure_time / 1000.0 + elif self.trigger_mode == TriggerMode.HARDWARE: + exposure_time_s = self.strobe_delay_us / 1000000 + exposure_time / 1000 + try: + self.cam.ExposureTime = exposure_time_s + self.exposure_time = exposure_time + except Exception as e: + self.log.error(f"Error setting exposure time: {e}") + raise e + + def set_continuous_acquisition(self): + was_streaming = False + if self.is_streaming: + was_streaming = True + self.stop_streaming() + + try: + self.cam.CycleMode = "Continuous" + self.cam.TriggerMode = "Internal" + self.trigger_mode = TriggerMode.CONTINUOUS + except Exception as e: + self.log.error(f"Error setting continuous acquisition: {e}") + + if was_streaming: + self.start_streaming() + + def set_software_triggered_acquisition(self): + was_streaming = False + if self.is_streaming: + was_streaming = True + self.stop_streaming() + + try: + self.cam.CycleMode = "Continuous" + self.cam.TriggerMode = "Software" + self.trigger_mode = TriggerMode.SOFTWARE + except Exception as e: + self.log.error(f"Error setting software triggered acquisition: {e}") + + if was_streaming: + self.start_streaming() + + def set_hardware_triggered_acquisition(self): + was_streaming = False + if self.is_streaming: + was_streaming = True + self.stop_streaming() + + try: + self.cam.CycleMode = "Continuous" + self.cam.TriggerMode = "External" + self.frame_ID_offset_hardware_trigger = None + self.trigger_mode = TriggerMode.HARDWARE + except Exception as e: + self.log.error(f"Error setting hardware triggered acquisition: {e}") + + if was_streaming: + self.start_streaming() + + def set_pixel_format(self, pixel_format): + was_streaming = False + if self.is_streaming: + was_streaming = True + self.stop_streaming() + + try: + if pixel_format == "MONO12": + self.cam.PixelEncoding = "Mono12" + self.pixel_format = pixel_format + elif pixel_format == "MONO16": + self.cam.PixelEncoding = "Mono16" + self.pixel_format = pixel_format + else: + raise ValueError(f"Invalid pixel format: {pixel_format}") + + self.line_rate = 1 / self.cam.LineScanSpeed * 1000000 + self.calculate_strobe_delay() + if self.trigger_mode == TriggerMode.HARDWARE: + self.set_exposure_time(self.exposure_time) + + except Exception as e: + self.log.error(f"Error setting pixel format: {e}") + + if was_streaming: + self.start_streaming() + + def send_trigger(self): + try: + self.cam.SoftwareTrigger() + except Exception as e: + self.log.error(f"Trigger not sent - error: {e}") + + def read_frame(self): + try: + acq = self.cam.wait_buffer(2000) + self.cam.queue(acq._np_data, self.cam.ImageSizeBytes) + raw = np.asarray(acq._np_data, dtype=np.uint8) + img16 = raw.view("