diff --git a/.github/workflows/_release.yml b/.github/workflows/_release.yml index 10d8ed8..c771682 100644 --- a/.github/workflows/_release.yml +++ b/.github/workflows/_release.yml @@ -23,7 +23,7 @@ jobs: - name: Create GitHub Release # We pin to the SHA, not the tag, for security reasons. # https://docs.github.com/en/actions/learn-github-actions/security-hardening-for-github-actions#using-third-party-actions - uses: softprops/action-gh-release@c062e08bd532815e2082a85e87e3ef29c3e6d191 # v2.0.8 + uses: softprops/action-gh-release@7b4da11513bf3f43f9999e90eabced41ab8bb048 # v2.2.0 with: prerelease: ${{ contains(github.ref_name, 'a') || contains(github.ref_name, 'b') || contains(github.ref_name, 'rc') }} files: "*" diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml index f652d41..552b29d 100644 --- a/.github/workflows/_test.yml +++ b/.github/workflows/_test.yml @@ -54,7 +54,7 @@ jobs: run: tox -e tests - name: Upload coverage to Codecov - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: name: ${{ inputs.python-version }}/${{ inputs.runs-on }} files: cov.xml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 485b82b..9be1759 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: runs-on: ["ubuntu-latest"] # can add windows-latest, macos-latest - python-version: ["3.10", "3.11", "3.12"] + python-version: ["3.11", "3.12"] include: # Include one that runs in the dev environment - runs-on: "ubuntu-latest" diff --git a/.pyproject.toml.swp b/.pyproject.toml.swp deleted file mode 100644 index 6a2f34f..0000000 Binary files a/.pyproject.toml.swp and /dev/null differ diff --git a/docs/tutorials/installation.md b/docs/tutorials/installation.md index 5100480..dc834af 100644 --- a/docs/tutorials/installation.md +++ b/docs/tutorials/installation.md @@ -2,7 +2,7 @@ ## Check your version of python -You will need python 3.10 or later. You can check your version of python by +You will need python 3.11 or later. You can check your version of python by typing into a terminal: ``` diff --git a/pyproject.toml b/pyproject.toml index 9ad70a9..ea2cbfe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,16 +7,21 @@ name = "fastcs-pandablocks" classifiers = [ "Development Status :: 3 - Alpha", "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] description = "A softioc to control a PandABlocks-FPGA." -dependencies = [] # Add project dependencies here, e.g. ["click", "numpy"] +dependencies = [ + "fastcs@git+https://github.com/DiamondLightSource/FastCS@panda-conversion-improvements", + "pandablocks~=0.10.0", + "numpy<2", # until https://github.com/mdavidsaver/p4p/issues/145 is fixed + "pydantic>2", + "h5py", +] dynamic = ["version"] license.file = "LICENSE" readme = "README.md" -requires-python = ">=3.10" +requires-python = ">=3.11" [project.optional-dependencies] dev = [ @@ -37,7 +42,7 @@ dev = [ ] [project.scripts] -fastcs-PandABlocks = "fastcs_pandablocks.__main__:main" +fastcs-pandablocks = "fastcs_pandablocks.__main__:main" [project.urls] GitHub = "https://github.com/PandABlocks-ioc/fastcs-PandABlocks" @@ -114,3 +119,6 @@ lint.select = [ # See https://github.com/DiamondLightSource/python-copier-template/issues/154 # Remove this line to forbid private member access in tests "tests/**/*" = ["SLF001"] +# Ruff wants us to use `(|)` instead of `Union[,]` in types, but pyright warns against +# it. For now we'll use `Union` and turn the ruff error off. +"**" = ["UP007"] diff --git a/src/fastcs_pandablocks/__init__.py b/src/fastcs_pandablocks/__init__.py index a2ffbf3..2c02802 100644 --- a/src/fastcs_pandablocks/__init__.py +++ b/src/fastcs_pandablocks/__init__.py @@ -1,11 +1,59 @@ -"""Top level API. +"""Contains logic relevant to fastcs. Will use `fastcs_pandablocks.panda`.""" -.. data:: __version__ - :type: str +from pathlib import Path - Version number as calculated by https://github.com/pypa/setuptools_scm -""" +from fastcs.backends.epics.backend import EpicsBackend +from fastcs.backends.epics.gui import EpicsGUIFormat +from fastcs.backends.epics.ioc import EpicsIOCOptions +from fastcs.backends.epics.util import EpicsNameOptions, PvNamingConvention from ._version import __version__ +from .gui import PandaGUIOptions +from .panda.controller import PandaController -__all__ = ["__version__"] +DEFAULT_POLL_PERIOD = 0.1 + + +def ioc( + epics_prefix: str, + hostname: str, + screens_directory: Path | None = None, + clear_bobfiles: bool = False, + poll_period: float = DEFAULT_POLL_PERIOD, + naming_convention: PvNamingConvention = PvNamingConvention.CAPITALIZED, + pv_separator: str = ":", +): + name_options = EpicsNameOptions( + pv_naming_convention=naming_convention, pv_separator=pv_separator + ) + epics_ioc_options = EpicsIOCOptions(terminal=True, name_options=name_options) + + controller = PandaController(hostname, poll_period) + backend = EpicsBackend( + controller, pv_prefix=epics_prefix, ioc_options=epics_ioc_options + ) + + if clear_bobfiles and not screens_directory: + raise ValueError("`clear_bobfiles` is True with no `screens_directory`") + + if screens_directory: + if not screens_directory.is_dir(): + raise ValueError( + f"`screens_directory` {screens_directory} is not a directory" + ) + if not clear_bobfiles: + if list(screens_directory.iterdir()): + raise RuntimeError("`screens_directory` is not empty.") + + backend.create_gui( + PandaGUIOptions( + output_path=screens_directory / "output.bob", + file_format=EpicsGUIFormat.bob, + title="PandA", + ) + ) + + backend.run() + + +__all__ = ["__version__", "ioc", "DEFAULT_POLL_PERIOD"] diff --git a/src/fastcs_pandablocks/__main__.py b/src/fastcs_pandablocks/__main__.py index d6291de..3c13d95 100644 --- a/src/fastcs_pandablocks/__main__.py +++ b/src/fastcs_pandablocks/__main__.py @@ -1,23 +1,99 @@ """Interface for ``python -m fastcs_pandablocks``.""" -from argparse import ArgumentParser -from collections.abc import Sequence +import argparse +import logging +from pathlib import Path + +from fastcs.backends.epics.util import PvNamingConvention + +from fastcs_pandablocks import DEFAULT_POLL_PERIOD, ioc from . import __version__ __all__ = ["main"] -def main(args: Sequence[str] | None = None) -> None: +def main(): """Argument parser for the CLI.""" - parser = ArgumentParser() + parser = argparse.ArgumentParser( + description="Connect to the given HOST and create an IOC with the given PREFIX." + ) parser.add_argument( "-v", "--version", action="version", version=__version__, ) - parser.parse_args(args) + + subparsers = parser.add_subparsers(dest="command", required=True) + run_parser = subparsers.add_parser( + "run", help="Run the IOC with the given HOST and PREFIX." + ) + run_parser.add_argument("hostname", type=str, help="The host to connect to.") + run_parser.add_argument("prefix", type=str, help="The prefix for the IOC.") + run_parser.add_argument( + "--screens-dir", + type=str, + help=( + "Provide an existing directory to export generated bobfiles to, if no " + "directory is provided then bobfiles will not be generated." + ), + ) + run_parser.add_argument( + "--clear-bobfiles", + action="store_true", + help=( + "Overwrite existing bobfiles from the given `screens-dir` " + "before generating new ones." + ), + ) + + run_parser.add_argument( + "--log-level", + default="INFO", + choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], + help="Set the logging level.", + ) + run_parser.add_argument( + "--poll-period", + default=DEFAULT_POLL_PERIOD, + type=float, + help="Period in seconds with which to poll the panda.", + ) + run_parser.add_argument( + "--pv-naming-convention", + default=PvNamingConvention.CAPITALIZED.name, + choices=[choice.name for choice in PvNamingConvention], + help="Naming convention of the EPICS PVs.", + ) + run_parser.add_argument( + "--pv-separator", + default=":", + type=str, + help="Separator to use between EPICS PV sections.", + ) + + parsed_args = parser.parse_args() + if parsed_args.command != "run": + return + + # Set the logging level + level = getattr(logging, parsed_args.log_level.upper(), None) + logging.basicConfig(format="%(levelname)s:%(message)s", level=level) + + screens_directory = ( + Path(parsed_args.screens_dir) if parsed_args.screens_dir else None + ) + + ioc( + parsed_args.prefix, + parsed_args.hostname, + screens_directory=screens_directory, + clear_bobfiles=parsed_args.clear_bobfiles, + poll_period=parsed_args.poll_period, + naming_convention=PvNamingConvention(parsed_args.pv_naming_convention), + pv_separator=parsed_args.pv_separator, + ) if __name__ == "__main__": diff --git a/src/fastcs_pandablocks/gui.py b/src/fastcs_pandablocks/gui.py new file mode 100644 index 0000000..c4189f0 --- /dev/null +++ b/src/fastcs_pandablocks/gui.py @@ -0,0 +1,4 @@ +from fastcs.backends.epics.gui import EpicsGUIOptions + + +class PandaGUIOptions(EpicsGUIOptions): ... diff --git a/src/fastcs_pandablocks/handlers.py b/src/fastcs_pandablocks/handlers.py new file mode 100644 index 0000000..820e4ee --- /dev/null +++ b/src/fastcs_pandablocks/handlers.py @@ -0,0 +1,62 @@ +from dataclasses import asdict +from typing import Any + +from fastcs.attributes import Attribute, AttrR, AttrW, Handler, Sender, Updater + +from fastcs_pandablocks.types import PandaName + + +class DefaultFieldSender(Sender): + def __init__(self, panda_name: PandaName): + self.panda_name = panda_name + + async def put(self, controller: Any, attr: AttrW, value: str) -> None: + await controller.put_value_to_panda(self.panda_name, value) + + +class DefaultFieldUpdater(Updater): + #: We update the fields from the top level + update_period = None + + def __init__(self, panda_name: PandaName): + self.panda_name = panda_name + + async def update(self, controller: Any, attr: AttrR) -> None: + pass # TODO: update the attr with the value from the panda + + +class DefaultFieldHandler(DefaultFieldSender, DefaultFieldUpdater, Handler): + def __init__(self, panda_name: PandaName): + super().__init__(panda_name) + + +class EguSender(Sender): + def __init__(self, panda_name: PandaName, attr_to_update: Attribute): + """Update the attr""" + self.panda_name = panda_name + self.attr_to_update = attr_to_update + + async def put(self, controller: Any, attr: AttrW, value: str) -> None: + await controller.put_value_to_panda(self.panda_name, value) + kwargs = asdict(self.attr_to_update.datatype) + kwargs["units"] = value + new_attribute_datatype = type(self.attr_to_update.datatype)(**kwargs) + self.attr_to_update.update_datatype(new_attribute_datatype) + + +class CaptureHandler(Handler): + update_period = float("inf") + + def __init__(self, *args, **kwargs): + pass + + async def update(self, controller: Any, attr: AttrR) -> None: ... + async def put(self, controller: Any, attr: AttrW, value: Any) -> None: ... + + +class DatasetHandler(Handler): + update_period = float("inf") + + def __init__(self, *args, **kwargs): ... # TODO: work dataset + async def update(self, controller: Any, attr: AttrR) -> None: ... + async def put(self, controller: Any, attr: AttrW, value: Any) -> None: ... diff --git a/src/fastcs_pandablocks/panda/__init__.py b/src/fastcs_pandablocks/panda/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fastcs_pandablocks/panda/client_wrapper.py b/src/fastcs_pandablocks/panda/client_wrapper.py new file mode 100644 index 0000000..6f39755 --- /dev/null +++ b/src/fastcs_pandablocks/panda/client_wrapper.py @@ -0,0 +1,81 @@ +""" +This method has a `RawPanda` which handles all the io with the client. +""" + +import asyncio + +from pandablocks.asyncio import AsyncioClient +from pandablocks.commands import ( + ChangeGroup, + GetBlockInfo, + GetChanges, + GetFieldInfo, + Put, +) + +from fastcs_pandablocks.types import ( + PandaName, + RawBlocksType, + RawFieldsType, + RawInitialValuesType, +) + + +class RawPanda: + def __init__(self, hostname: str): + self._client = AsyncioClient(host=hostname) + + async def connect(self): + await self._client.connect() + + async def disconnect(self): + await self._client.close() + + async def introspect( + self, + ) -> tuple[ + RawBlocksType, RawFieldsType, RawInitialValuesType, RawInitialValuesType + ]: + blocks, fields, labels, initial_values = {}, [], {}, {} + + blocks = { + PandaName.from_string(name): block_info + for name, block_info in (await self._client.send(GetBlockInfo())).items() + } + fields = [ + { + PandaName(field=name): field_info + for name, field_info in block_values.items() + } + for block_values in await asyncio.gather( + *[self._client.send(GetFieldInfo(str(block))) for block in blocks] + ) + ] + + field_data = (await self._client.send(GetChanges(ChangeGroup.ALL, True))).values + + for field_name, value in field_data.items(): + if field_name.startswith("*METADATA"): + field_name_without_prefix = field_name.removeprefix("*METADATA.") + if field_name_without_prefix == "DESIGN": + continue # TODO: Handle design. + elif not field_name_without_prefix.startswith("LABEL_"): + raise TypeError( + "Received metadata not corresponding to a `LABEL_`: " + f"{field_name} = {value}." + ) + labels[ + PandaName.from_string( + field_name_without_prefix.removeprefix("LABEL_") + ) + ] = value + else: # Field is a default value + initial_values[PandaName.from_string(field_name)] = value + + return blocks, fields, labels, initial_values + + async def send(self, name: str, value: str): + await self._client.send(Put(name, value)) + + async def get_changes(self) -> dict[str, str]: + return (await self._client.send(GetChanges(ChangeGroup.ALL, False))).values diff --git a/src/fastcs_pandablocks/panda/controller.py b/src/fastcs_pandablocks/panda/controller.py new file mode 100644 index 0000000..aca7808 --- /dev/null +++ b/src/fastcs_pandablocks/panda/controller.py @@ -0,0 +1,125 @@ +import asyncio + +from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW +from fastcs.controller import Controller +from fastcs.wrappers import scan + +from fastcs_pandablocks.types import ( + PandaName, + RawBlocksType, + RawFieldsType, + RawInitialValuesType, +) + +from .client_wrapper import RawPanda +from .fields import FieldController + + +def _parse_introspected_data( + raw_blocks: RawBlocksType, + raw_field_infos: RawFieldsType, + raw_labels: RawInitialValuesType, + raw_initial_values: RawInitialValuesType, +): + block_controllers: dict[PandaName, FieldController] = {} + for (block_name, block_info), field_info in zip( + raw_blocks.items(), raw_field_infos, strict=True + ): + numbered_block_names = ( + [block_name] + if block_info.number in (None, 1) + else [ + block_name + PandaName(block_number=number) + for number in range(1, block_info.number + 1) + ] + ) + for numbered_block_name in numbered_block_names: + block_initial_values = { + key: value + for key, value in raw_initial_values.items() + if key in numbered_block_name + } + label = raw_labels.get(numbered_block_name, None) + block = FieldController( + numbered_block_name, + label=block_info.description or label, + ) + block.make_sub_fields(field_info, block_initial_values) + block_controllers[numbered_block_name] = block + + return block_controllers + + +class PandaController(Controller): + def __init__(self, hostname: str, poll_period: float) -> None: + # TODO https://github.com/DiamondLightSource/FastCS/issues/62 + self.poll_period = poll_period + + self._additional_attributes: dict[str, Attribute] = {} + self._raw_panda = RawPanda(hostname) + self._blocks: dict[PandaName, FieldController] = {} + + self.connected = False + + super().__init__() + + @property + def additional_attributes(self): + return self._additional_attributes + + async def connect(self) -> None: + if self.connected: + # `connect` needs to be called in `initialise`, + # then FastCS will attempt to call it again. + return + await self._raw_panda.connect() + blocks, fields, labels, initial_values = await self._raw_panda.introspect() + self._blocks = _parse_introspected_data(blocks, fields, labels, initial_values) + self.connected = True + + async def initialise(self) -> None: + await self.connect() + for block_name, block in self._blocks.items(): + if block.top_level_attribute is not None: + self._additional_attributes[block_name.attribute_name] = ( + block.top_level_attribute + ) + if block.additional_attributes or block.sub_fields: + self.register_sub_controller(block_name.attribute_name, block) + await block.initialise() + + def get_attribute(self, panda_name: PandaName) -> Attribute: + assert panda_name.block + block_controller = self._blocks[panda_name.up_to_block()] + if panda_name.field is None: + assert block_controller.top_level_attribute is not None + return block_controller.top_level_attribute + + field_controller = block_controller.sub_fields[panda_name.up_to_field()] + if panda_name.sub_field is None: + assert field_controller.top_level_attribute is not None + return field_controller.top_level_attribute + + sub_field_controller = field_controller.sub_fields[panda_name] + assert sub_field_controller.top_level_attribute is not None + return sub_field_controller.top_level_attribute + + async def update_field_value(self, panda_name: PandaName, value: str): + attribute = self.get_attribute(panda_name) + + if isinstance(attribute, AttrW): + await attribute.process(value) + elif isinstance(attribute, (AttrRW | AttrR)): + await attribute.set(value) + else: + raise RuntimeError(f"Couldn't find panda field for {panda_name}.") + + @scan(0.1) + async def update(self): + changes = await self._raw_panda.get_changes() + await asyncio.gather( + *[ + self.update_field_value(PandaName.from_string(raw_panda_name), value) + for raw_panda_name, value in changes.items() + ] + ) diff --git a/src/fastcs_pandablocks/panda/fields.py b/src/fastcs_pandablocks/panda/fields.py new file mode 100644 index 0000000..1e03272 --- /dev/null +++ b/src/fastcs_pandablocks/panda/fields.py @@ -0,0 +1,818 @@ +from __future__ import annotations + +from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW +from fastcs.controller import SubController +from fastcs.datatypes import Bool, Float, Int, String +from pandablocks.responses import ( + BitMuxFieldInfo, + BitOutFieldInfo, + EnumFieldInfo, + ExtOutBitsFieldInfo, + ExtOutFieldInfo, + FieldInfo, + PosMuxFieldInfo, + PosOutFieldInfo, + ScalarFieldInfo, + SubtypeTimeFieldInfo, + TableFieldInfo, + TimeFieldInfo, + UintFieldInfo, +) + +from fastcs_pandablocks.handlers import ( + CaptureHandler, + DatasetHandler, + DefaultFieldHandler, + DefaultFieldSender, + DefaultFieldUpdater, + EguSender, +) +from fastcs_pandablocks.types import ( + PandaName, + RawInitialValuesType, + ResponseType, + WidgetGroup, +) + +# EPICS hardcoded. TODO: remove once we switch to pvxs. +MAXIMUM_DESCRIPTION_LENGTH = 40 + + +def _strip_description(description: str | None) -> str | None: + return None if description is None else description[:MAXIMUM_DESCRIPTION_LENGTH] + + +class FieldController(SubController): + def __init__( + self, + panda_name: PandaName, + label: str | None = None, + ): + self.panda_name = panda_name + self.top_level_attribute: Attribute | None = None + + # Sub fields eg `PGEN.OUT` and `PGEN.TRIGGER` + self.sub_fields: dict[PandaName, FieldController] = {} + + self._additional_attributes: dict[str, Attribute] = {} + + if label is not None: + self._additional_attributes["label"] = AttrR( + String(), + description="Label from metadata.", + initial_value=label, + ) + + super().__init__(search_device_for_attributes=False) + + def make_sub_fields( + self, + field_infos: dict[PandaName, ResponseType], + initial_values: RawInitialValuesType, + ): + for sub_field_name, field_info in field_infos.items(): + full_sub_field_name = self.panda_name + sub_field_name + field_initial_values = { + key: value + for key, value in initial_values.items() + if key in sub_field_name + } + self.sub_fields[full_sub_field_name] = get_field_controller_from_field_info( + full_sub_field_name, field_info, field_initial_values + ) + + async def initialise(self): + for field_name, field in self.sub_fields.items(): + self.register_sub_controller( + field_name.attribute_name, sub_controller=field + ) + await field.initialise() + if field.top_level_attribute: + self._additional_attributes[field_name.attribute_name] = ( + field.top_level_attribute + ) + + @property + def additional_attributes(self) -> dict[str, Attribute]: + """ + Used by the FastCS mapping parser to get attributes since + we're not searching for device attributes. + """ + return self._additional_attributes + + +class TableFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: TableFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name, field_info.description) + + self.top_level_attribute = AttrR( + Float(), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + + +class TimeParamFieldController(FieldController): + # TODO: these `FieldInfo` are the exact same in pandablocks-client. + def __init__( + self, + panda_name: PandaName, + field_info: SubtypeTimeFieldInfo | TimeFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + units_panda_name = panda_name + PandaName(sub_field="units") + initial_units = initial_values[units_panda_name] + + self.top_level_attribute = AttrRW( + Float(units=initial_units), + handler=DefaultFieldHandler(panda_name), + description=_strip_description(field_info.description), + group=WidgetGroup.PARAMETERS.value, + initial_value=float(initial_values[panda_name]), + ) + self._additional_attributes["units"] = AttrW( + String(), + handler=EguSender(units_panda_name, self.top_level_attribute), + group=WidgetGroup.PARAMETERS.value, + allowed_values=field_info.units_labels, + ) + + +class TimeReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: SubtypeTimeFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + units_panda_name = panda_name + PandaName(sub_field="units") + initial_units = initial_values[units_panda_name] + + self.top_level_attribute = AttrR( + Float(units=initial_units), + handler=DefaultFieldUpdater( + panda_name=panda_name, + ), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + initial_value=float(initial_values[panda_name]), + ) + self._additional_attributes["units"] = AttrW( + String(), + handler=EguSender(units_panda_name, self.top_level_attribute), + group=WidgetGroup.OUTPUTS.value, + allowed_values=field_info.units_labels, + ) + + +class TimeWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: SubtypeTimeFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + units_panda_name = panda_name + PandaName(sub_field="units") + initial_units = initial_values[units_panda_name] + + self.top_level_attribute = AttrW( + Float(units=initial_units), + handler=DefaultFieldSender(panda_name), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + self._additional_attributes["units"] = AttrW( + String(), + handler=EguSender(self.top_level_attribute), + group=WidgetGroup.READBACKS.value, + allowed_values=field_info.units_labels, + ) + + +class BitOutFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: BitOutFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Bool(znam="0", onam="1"), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + initial_value=bool(int(initial_values[panda_name])), + ) + + +class PosOutFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: PosOutFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + top_level_attribute = AttrR( + Float(), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + initial_value=bool(int(initial_values[panda_name])), + ) + + scaled = AttrR( + Float(), + group=WidgetGroup.CAPTURE.value, + description="Value with scaling applied.", + ) + + scale = AttrRW( + Float(), + group=WidgetGroup.CAPTURE.value, + handler=DefaultFieldHandler(panda_name), + ) + offset = AttrRW( + Float(), + group=WidgetGroup.CAPTURE.value, + handler=DefaultFieldHandler(panda_name), + ) + + async def updated_scaled_on_offset_change(*_): + await scaled.set(scale.get() * top_level_attribute.get() + offset.get()) + + offset.set_update_callback(updated_scaled_on_offset_change) + + self._additional_attributes.update( + {"scaled": scaled, "scale": scale, "offset": offset} + ) + + self.top_level_attribute = top_level_attribute + self._additional_attributes["capture"] = AttrRW( + String(), + group=WidgetGroup.CAPTURE.value, + handler=CaptureHandler(), + allowed_values=field_info.capture_labels, + ) + self._additional_attributes["dataset"] = AttrRW( + String(), + group=WidgetGroup.CAPTURE.value, + handler=DatasetHandler(), + allowed_values=field_info.capture_labels, + ) + + +class ExtOutFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + field_info: ExtOutFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + self.top_level_attribute = AttrR( + Float(), + description=_strip_description(field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + self._additional_attributes["capture"] = AttrRW( + String(), + group=WidgetGroup.CAPTURE.value, + handler=CaptureHandler(), + allowed_values=field_info.capture_labels, + ) + self._additional_attributes["dataset"] = AttrRW( + String(), + group=WidgetGroup.CAPTURE.value, + handler=DatasetHandler(), + allowed_values=field_info.capture_labels, + ) + + +class _BitsSubFieldController(FieldController): + def __init__(self, panda_name: PandaName, label: str): + super().__init__(panda_name, label=label) + + self.top_level_attribute = AttrR( + Bool(znam="0", onam="1"), + description=_strip_description("Value of the field connected to this bit."), + group=WidgetGroup.OUTPUTS.value, + ) + self._additional_attributes["NAME"] = AttrR( + String(), + description="Name of the field connected to this bit.", + initial_value=label, + ) + + +class ExtOutBitsFieldController(ExtOutFieldController): + def __init__( + self, + panda_name: PandaName, + field_info: ExtOutBitsFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name, field_info, initial_values) + + for bit_number, label in enumerate(field_info.bits, start=1): + if label == "": + continue # Some rows are empty, do not create records. + + sub_field_panda_name = panda_name + PandaName(sub_field=f"bit{bit_number}") + self.sub_fields[sub_field_panda_name] = _BitsSubFieldController( + sub_field_panda_name, label=label + ) + + +class BitMuxFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + bit_mux_field_info: BitMuxFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + self.top_level_attribute = AttrRW( + String(), + description=_strip_description(bit_mux_field_info.description), + handler=DefaultFieldHandler(panda_name), + group=WidgetGroup.INPUTS.value, + initial_value=initial_values[panda_name], + ) + + self._additional_attributes["delay"] = AttrRW( + Int(max=bit_mux_field_info.max_delay, min=0), + description="Clock delay on input.", + handler=DefaultFieldHandler(panda_name), + group=WidgetGroup.INPUTS.value, + ) + + +class PosMuxFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + pos_mux_field_info: PosMuxFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + String(), + description=_strip_description(pos_mux_field_info.description), + group=WidgetGroup.INPUTS.value, + allowed_values=pos_mux_field_info.labels, + initial_value=initial_values[panda_name], + ) + + +class UintParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + uint_param_field_info: UintFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + Float( + max_alarm=uint_param_field_info.max_val, + max=uint_param_field_info.max_val, + min_alarm=0, + min=0, + ), + description=_strip_description(uint_param_field_info.description), + group=WidgetGroup.PARAMETERS.value, + initial_value=float(initial_values[panda_name]), + ) + + +class UintReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + uint_read_field_info: UintFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Float(prec=0, min_alarm=0, max_alarm=uint_read_field_info.max_val), + description=_strip_description(uint_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=float(initial_values[panda_name]), + ) + + +class UintWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + uint_write_field_info: UintFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrW( + Float( + prec=0, + max_alarm=uint_write_field_info.max_val, + max=uint_write_field_info.max_val, + min_alarm=0, + min=0, + ), + description=_strip_description(uint_write_field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + + +class IntParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + int_param_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + Int(), + description=_strip_description(int_param_field_info.description), + group=WidgetGroup.PARAMETERS.value, + initial_value=int(initial_values[panda_name]), + ) + + +class IntReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + int_read_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Int(), + description=_strip_description(int_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=int(initial_values[panda_name]), + ) + + +class IntWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + int_write_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrW( + Int(), + description=_strip_description(int_write_field_info.description), + group=WidgetGroup.PARAMETERS.value, + ) + + +class ScalarParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + scalar_param_field_info: ScalarFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + Float(units=scalar_param_field_info.units), + description=_strip_description(scalar_param_field_info.description), + group=WidgetGroup.PARAMETERS.value, + initial_value=float(initial_values[panda_name]), + ) + + +class ScalarReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + scalar_read_field_info: ScalarFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Float(), + description=_strip_description(scalar_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=float(initial_values[panda_name]), + ) + + +class ScalarWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + scalar_write_field_info: ScalarFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Float(), + description=_strip_description(scalar_write_field_info.description), + group=WidgetGroup.PARAMETERS.value, + ) + + +class BitParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + bit_param_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + Bool(znam="0", onam="1"), + description=_strip_description(bit_param_field_info.description), + group=WidgetGroup.PARAMETERS.value, + # Initial value is string "0"/"1". + # TODO: Equip each read/readwrite field with a converter + initial_value=bool(int(initial_values[panda_name])), + ) + + +class BitReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + bit_read_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + Bool(znam="0", onam="1"), + description=_strip_description(bit_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=bool(int(initial_values[panda_name])), + ) + + +class BitWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + bit_write_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrW( + Bool(znam="0", onam="1"), + description=_strip_description(bit_write_field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + + +class ActionWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + action_write_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrW( + Bool(znam="0", onam="1"), + description=_strip_description(action_write_field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + + +class LutParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + lut_param_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + String(), + description=_strip_description(lut_param_field_info.description), + group=WidgetGroup.PARAMETERS.value, + initial_value=initial_values[panda_name], + ) + + +class LutReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + lut_read_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + String(), + description=_strip_description(lut_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=initial_values[panda_name], + ) + + +class LutWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + lut_read_field_info: FieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrR( + String(), + description=_strip_description(lut_read_field_info.description), + group=WidgetGroup.OUTPUTS.value, + ) + + +class EnumParamFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + enum_param_field_info: EnumFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrRW( + String(), + description=_strip_description(enum_param_field_info.description), + allowed_values=enum_param_field_info.labels, + group=WidgetGroup.PARAMETERS.value, + initial_value=initial_values[panda_name], + ) + + +class EnumReadFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + enum_read_field_info: EnumFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + + # We use a raw string for this since many labels won't fit into + # `mbbIn` fields because of EPICS limitations. + # Since this is read only it doesn't matter. + self.top_level_attribute = AttrR( + String(), + description=_strip_description(enum_read_field_info.description), + group=WidgetGroup.READBACKS.value, + initial_value=initial_values[panda_name], + ) + + +class EnumWriteFieldController(FieldController): + def __init__( + self, + panda_name: PandaName, + enum_write_field_info: EnumFieldInfo, + initial_values: RawInitialValuesType, + ): + super().__init__(panda_name) + self.top_level_attribute = AttrW( + String(), + description=_strip_description(enum_write_field_info.description), + allowed_values=enum_write_field_info.labels, + group=WidgetGroup.OUTPUTS.value, + ) + + +FieldControllerType = ( + TableFieldController + | BitOutFieldController + | PosOutFieldController + | ExtOutFieldController + | ExtOutBitsFieldController + | BitMuxFieldController + | PosMuxFieldController + | UintParamFieldController + | UintReadFieldController + | UintWriteFieldController + | IntParamFieldController + | IntReadFieldController + | IntWriteFieldController + | ScalarParamFieldController + | ScalarReadFieldController + | ScalarWriteFieldController + | BitParamFieldController + | BitReadFieldController + | BitWriteFieldController + | ActionWriteFieldController + | LutParamFieldController + | LutReadFieldController + | LutWriteFieldController + | EnumParamFieldController + | EnumReadFieldController + | EnumWriteFieldController + | TimeParamFieldController + | TimeReadFieldController + | TimeWriteFieldController +) + + +def get_field_controller_from_field_info( + panda_name: PandaName, + field_info: ResponseType, + initial_values: RawInitialValuesType, +) -> FieldControllerType: + match field_info: + case TableFieldInfo(): + return TableFieldController(panda_name, field_info, initial_values) + # Time types + case TimeFieldInfo(subtype=None): + return TimeParamFieldController(panda_name, field_info, initial_values) + case SubtypeTimeFieldInfo(type="param"): + return TimeParamFieldController(panda_name, field_info, initial_values) + case SubtypeTimeFieldInfo(subtype="read"): + return TimeReadFieldController(panda_name, field_info, initial_values) + case SubtypeTimeFieldInfo(subtype="write"): + return TimeWriteFieldController(panda_name, field_info, initial_values) + + # Bit types + case BitOutFieldInfo(): + return BitOutFieldController(panda_name, field_info, initial_values) + case ExtOutBitsFieldInfo(subtype="timestamp"): + return ExtOutFieldController(panda_name, field_info, initial_values) + case ExtOutBitsFieldInfo(): + return ExtOutBitsFieldController(panda_name, field_info, initial_values) + case ExtOutFieldInfo(): + return ExtOutFieldController(panda_name, field_info, initial_values) + case BitMuxFieldInfo(): + return BitMuxFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="param", subtype="bit"): + return BitParamFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="read", subtype="bit"): + return BitReadFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="write", subtype="bit"): + return BitWriteFieldController(panda_name, field_info, initial_values) + + # Pos types + case PosOutFieldInfo(): + return PosOutFieldController(panda_name, field_info, initial_values) + case PosMuxFieldInfo(): + return PosMuxFieldController(panda_name, field_info, initial_values) + + # Uint types + case UintFieldInfo(type="param"): + return UintParamFieldController(panda_name, field_info, initial_values) + case UintFieldInfo(type="read"): + return UintReadFieldController(panda_name, field_info, initial_values) + case UintFieldInfo(type="write"): + return UintWriteFieldController(panda_name, field_info, initial_values) + + # Scalar types + case ScalarFieldInfo(subtype="param"): + return ScalarParamFieldController(panda_name, field_info, initial_values) + case ScalarFieldInfo(type="read"): + return ScalarReadFieldController(panda_name, field_info, initial_values) + case ScalarFieldInfo(type="write"): + return ScalarWriteFieldController(panda_name, field_info, initial_values) + + # Int types + case FieldInfo(type="param", subtype="int"): + return IntParamFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="read", subtype="int"): + return IntReadFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="write", subtype="int"): + return IntWriteFieldController(panda_name, field_info, initial_values) + + # Action types + case FieldInfo( + type="write", + subtype="action", + ): + return ActionWriteFieldController(panda_name, field_info, initial_values) + + # Lut types + case FieldInfo(type="param", subtype="lut"): + return LutParamFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="read", subtype="lut"): + return LutReadFieldController(panda_name, field_info, initial_values) + case FieldInfo(type="write", subtype="lut"): + return LutWriteFieldController(panda_name, field_info, initial_values) + + # Enum types + case EnumFieldInfo(type="param"): + return EnumParamFieldController(panda_name, field_info, initial_values) + case EnumFieldInfo(type="read"): + return EnumReadFieldController(panda_name, field_info, initial_values) + case EnumFieldInfo(type="write"): + return EnumWriteFieldController(panda_name, field_info, initial_values) + case _: + raise ValueError(f"Unknown field type: {type(field_info).__name__}.") diff --git a/src/fastcs_pandablocks/types/__init__.py b/src/fastcs_pandablocks/types/__init__.py new file mode 100644 index 0000000..efaf40d --- /dev/null +++ b/src/fastcs_pandablocks/types/__init__.py @@ -0,0 +1,34 @@ +from enum import Enum + +from ._annotations import ( + RawBlocksType, + RawFieldsType, + RawInitialValuesType, + ResponseType, +) +from ._string_types import ( + EPICS_SEPARATOR, + PANDA_SEPARATOR, + PandaName, +) + + +class WidgetGroup(Enum): + NONE = None + PARAMETERS = "Parameters" + OUTPUTS = "Outputs" + INPUTS = "Inputs" + READBACKS = "Readbacks" + CAPTURE = "Capture" + + +__all__ = [ + "EPICS_SEPARATOR", + "PANDA_SEPARATOR", + "PandaName", + "ResponseType", + "RawBlocksType", + "RawFieldsType", + "RawInitialValuesType", + "WidgetGroup", +] diff --git a/src/fastcs_pandablocks/types/_annotations.py b/src/fastcs_pandablocks/types/_annotations.py new file mode 100644 index 0000000..4d50a33 --- /dev/null +++ b/src/fastcs_pandablocks/types/_annotations.py @@ -0,0 +1,42 @@ +from typing import Union + +from pandablocks.responses import ( + BitMuxFieldInfo, + BitOutFieldInfo, + BlockInfo, + EnumFieldInfo, + ExtOutBitsFieldInfo, + ExtOutFieldInfo, + FieldInfo, + PosMuxFieldInfo, + PosOutFieldInfo, + ScalarFieldInfo, + SubtypeTimeFieldInfo, + TableFieldInfo, + TimeFieldInfo, + UintFieldInfo, +) + +from ._string_types import PandaName + +# Pyright gives us variable not allowed in type expression error +# if we try to use the new (|) syntax +ResponseType = Union[ + BitMuxFieldInfo, + BitOutFieldInfo, + EnumFieldInfo, + ExtOutBitsFieldInfo, + ExtOutFieldInfo, + FieldInfo, + PosMuxFieldInfo, + PosOutFieldInfo, + ScalarFieldInfo, + SubtypeTimeFieldInfo, + TableFieldInfo, + TimeFieldInfo, + UintFieldInfo, +] + +RawBlocksType = dict[PandaName, BlockInfo] +RawFieldsType = list[dict[PandaName, ResponseType]] +RawInitialValuesType = dict[PandaName, str] diff --git a/src/fastcs_pandablocks/types/_string_types.py b/src/fastcs_pandablocks/types/_string_types.py new file mode 100644 index 0000000..ea5bd28 --- /dev/null +++ b/src/fastcs_pandablocks/types/_string_types.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass +from functools import cached_property +from typing import TypeVar + +T = TypeVar("T") + +EPICS_SEPARATOR = ":" +PANDA_SEPARATOR = "." + + +def _extract_number_at_end_of_string(string: str) -> tuple[str, int | None]: + pattern = r"(\D+)(\d+)$" + match = re.match(pattern, string) + if match: + return (match.group(1), int(match.group(2))) + return string, None + + +def _format_with_separator( + separator: str, *sections: tuple[str | None, int | None] | str | None +) -> str: + result = "" + for section in sections: + if isinstance(section, tuple): + section_string, section_number = section + if section_string is not None: + result += f"{separator}{section_string}" + if section_number is not None: + result += f"{section_number}" + elif section is not None: + result += f"{separator}{section}" + + return result.lstrip(separator) + + +def _to_python_attribute_name(string: str): + return string.replace("-", "_").lower() + + +def _choose_sub_name(sub_pv_1: T, sub_pv_2: T) -> T: + if sub_pv_1 is not None and sub_pv_2 is not None: + if sub_pv_1 != sub_pv_2: + raise TypeError( + "Ambiguous pv elements on add " f"{sub_pv_1} and {sub_pv_2}" + ) + return sub_pv_2 or sub_pv_1 + + +@dataclass(frozen=True) +class PandaName: + block: str | None = None + block_number: int | None = None + field: str | None = None + sub_field: str | None = None + + def up_to_block(self) -> PandaName: + return PandaName(block=self.block, block_number=self.block_number) + + def up_to_field(self) -> PandaName: + return self.up_to_block() + PandaName(field=self.field) + + @cached_property + def _string_form(self) -> str: + return _format_with_separator( + PANDA_SEPARATOR, (self.block, self.block_number), self.field, self.sub_field + ) + + def __str__(self) -> str: + return self._string_form + + @classmethod + def from_string(cls, name: str): + split_name = name.split(PANDA_SEPARATOR) + + if split_name == [""]: + return PandaName() + + block, block_number, field, sub_field = None, None, None, None + block, block_number = _extract_number_at_end_of_string(split_name[0]) + field = split_name[1] if len(split_name) > 1 else None + sub_field = split_name[2] if len(split_name) > 2 else None + + return PandaName( + block=block, block_number=block_number, field=field, sub_field=sub_field + ) + + def __add__(self, other: PandaName) -> PandaName: + return PandaName( + block=_choose_sub_name(self.block, other.block), + block_number=_choose_sub_name(self.block_number, other.block_number), + field=_choose_sub_name(self.field, other.field), + sub_field=_choose_sub_name(self.sub_field, other.sub_field), + ) + + @cached_property + def attribute_name(self) -> str: + if self.sub_field: + return _to_python_attribute_name(self.sub_field) + if self.field: + return _to_python_attribute_name(self.field) + if self.block: + return _to_python_attribute_name(self.block) + ( + f"{self.block_number}" if self.block_number is not None else "" + ) + return "" + + def __contains__(self, other: PandaName) -> bool: + for attr in ("block", "block_number", "field", "sub_field"): + sub_value, super_value = getattr(other, attr), getattr(self, attr) + if super_value is None: + break + if sub_value != super_value: + return False + return True diff --git a/tests/test_cli.py b/tests/test_cli.py index c5072bd..381237b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -6,4 +6,4 @@ def test_cli_version(): cmd = [sys.executable, "-m", "fastcs_pandablocks", "--version"] - assert subprocess.check_output(cmd).decode().strip() == __version__ + assert __version__ in subprocess.check_output(cmd).decode().strip() diff --git a/tests/test_introspection.py b/tests/test_introspection.py new file mode 100644 index 0000000..e69de29