# -*- coding: utf-8 -*-
"""
Created the 29/07/2022
@author: Sebastien Weber
"""
from __future__ import annotations
import numbers
from importlib import import_module
from numbers import Number
import sys
from typing import List, Union, Optional, Dict, TypeVar, TYPE_CHECKING
import numpy as np
from qtpy.QtCore import QObject, Signal, QThread, Slot, Qt, QTimer
from qtpy import QtWidgets
from easydict import EasyDict as edict
from pymodaq_utils.logger import set_logger, get_module_name
from pymodaq_utils.utils import find_keys_from_val
from pymodaq_utils import utils
from pymodaq.utils.gui_utils import get_splash_sc
from pymodaq_utils import config as config_mod
from pymodaq.utils.exceptions import ActuatorError
from pymodaq_utils.warnings import deprecation_msg
from pymodaq.utils.data import DataToExport, DataActuator
from pymodaq_data.h5modules.backends import Node, SaveType
from pymodaq_gui.h5modules.saving import H5Saver
from pymodaq_gui.parameter import ioxml, Parameter
from pymodaq_gui.parameter import utils as putils
from pymodaq_gui.utils.utils import mkQApp
from pymodaq.utils.h5modules import module_saving
from pymodaq.control_modules.utils import ParameterControlModule
from pymodaq.control_modules.thread_commands import (
ThreadStatus,
ThreadStatusMove,
ControlToHardwareMove,
UiToMainMove,
)
from pymodaq.control_modules.move_utility_classes import (
ThreadCommand,
MoveCommand,
DAQ_Move_base,
DataActuatorType,
check_units,
DataUnitError,
)
from pymodaq.control_modules.move_utility_classes import params as daq_move_params
from pymodaq.utils.leco.pymodaq_listener import (
MoveActorListener,
LECOMoveCommands,
LECOCommands,
)
from pymodaq.utils.daq_utils import get_plugins
from pymodaq import Q_, Unit
from pymodaq.control_modules.daq_move_ui.factory import ActuatorUIFactory
from pymodaq.utils.config import Config as ControlModulesConfig
if TYPE_CHECKING:
from pymodaq.control_modules.daq_move_ui.ui_base import DAQ_Move_UI_Base
local_path = config_mod.get_set_local_dir()
sys.path.append(str(local_path))
logger = set_logger(get_module_name(__file__))
config_utils = config_mod.Config()
config = ControlModulesConfig()
HardwareController = TypeVar("HardwareController")
DAQ_Move_Actuators = get_plugins("daq_move")
ACTUATOR_TYPES = [mov["name"] for mov in DAQ_Move_Actuators]
if len(ACTUATOR_TYPES) == 0:
raise ActuatorError("No installed Actuator")
STATUS_WAIT_TIME = 1000
[docs]
class DAQ_Move(ParameterControlModule):
"""Main PyMoDAQ class to drive actuators
Qt object and generic UI to drive actuators.
Attributes
----------
init_signal: Signal[bool]
This signal is emitted when the chosen actuator is correctly initialized
move_done_signal: Signal[str, DataActuator]
This signal is emitted when the chosen actuator finished its action. It gives the actuator's name and current
value
bounds_signal: Signal[bool]
This signal is emitted when the actuator reached defined limited boundaries.
See Also
--------
:class:`ControlModule`, :class:`ParameterManager`
"""
settings_name = "daq_move_settings"
move_done_signal = Signal(DataActuator)
current_value_signal = Signal(DataActuator)
bounds_signal = Signal(bool)
params = daq_move_params + [
{'title': 'Saver Settings:', 'name': 'saver_settings', 'type': 'group',
'visible': False, 'children': H5Saver.get_params_for_save_type(SaveType.actuator)}]
listener_class = MoveActorListener
ui: Optional[DAQ_Move_UI_Base]
def __init__(
self, parent=None, title="DAQ Move", ui_identifier: Optional[str] = None, **kwargs
) -> None:
"""
Parameters
----------
parent: QWidget or None
parent: QWidget or None
if it is a valid QWidget, it will hold the user interface to drive it
title: str
The unique (should be unique) string identifier for the underlying actuator
"""
self.logger = set_logger(f"{logger.name}.{title}")
self.logger.info(f"Initializing DAQ_Move: {title}")
super().__init__(action_list=("save", "update"), **kwargs)
if not (
ui_identifier is not None and ui_identifier in ActuatorUIFactory.keys()
):
ui_identifier = config("actuator", "ui")
self.settings.child("main_settings", "ui_type").setValue(ui_identifier)
self.settings.child("main_settings", "ui_type").setOpts(readonly=True)
DAQ_Move_UI = ActuatorUIFactory.get(ui_identifier)
self.parent = parent
if parent is not None:
self.ui = DAQ_Move_UI(parent, title)
else:
self.ui = None
if self.ui is not None:
self.ui.actuators = ACTUATOR_TYPES
self.ui.set_settings_tree(self.settings_tree)
self.ui.command_sig.connect(self.process_ui_cmds)
self.splash_sc = get_splash_sc()
self._title = title
if len(ACTUATOR_TYPES) > 0: # will be 0 if no valid plugins are installed
self.actuator = kwargs.get("actuator", ACTUATOR_TYPES[0])
self._module_and_data_saver: module_saving.ActuatorTimeSaver = None
for hidden_param in ('custom_name',
'current_scan_name',
'current_scan_path',
'current_h5_file',
'new_file',
'base_name'):
self.settings.child('saver_settings', hidden_param).setOpts(visible=False)
self._move_done_bool = True
self._current_value = DataActuator(title, units=self.units)
self._target_value = DataActuator(title, units=self.units)
self._relative_value = DataActuator(title, units=self.units)
self._refresh_timer = QTimer(self)
self._refresh_timer.timeout.connect(self.get_actuator_value)
def process_ui_cmds(self, cmd: utils.ThreadCommand):
"""Process commands sent by actions done in the ui
Parameters
----------
cmd: ThreadCommand
Possible values are :
* init
* quit
* get_value
* loop_get_value
* find_home
* stop
* move_abs
* move_rel
* show_log
* actuator_changed
* rel_value
* show_config
"""
if cmd.command == UiToMainMove.INIT:
self.init_hardware(cmd.attribute[0])
elif cmd.command == UiToMainMove.QUIT:
self.quit_fun()
elif cmd.command == UiToMainMove.GET_VALUE:
self.get_actuator_value()
elif cmd.command == UiToMainMove.LOOP_GET_VALUE:
self.get_continuous_actuator_value(cmd.attribute)
elif cmd.command == UiToMainMove.FIND_HOME:
self.move_home()
elif cmd.command == UiToMainMove.STOP:
self.stop_motion()
elif cmd.command == UiToMainMove.MOVE_ABS:
data_act: DataActuator = cmd.attribute
if (
not Unit(data_act.units).is_compatible_with(self.units)
and data_act.units != ""
):
data_act.force_units(self.units)
self.move_abs(data_act)
elif cmd.command == UiToMainMove.MOVE_REL:
data_act: DataActuator = cmd.attribute
if (
not Unit(data_act.units).is_compatible_with(self.units)
and data_act.units != ""
):
data_act.force_units(self.units)
self.move_rel(data_act)
elif cmd.command == UiToMainMove.SHOW_LOG:
self.show_log()
elif cmd.command == UiToMainMove.SHOW_CONFIG:
self.config = self.show_config(self.config)
self.ui.config = self.config
elif cmd.command == UiToMainMove.ACTUATOR_CHANGED:
self.actuator = cmd.attribute
elif cmd.command == UiToMainMove.REL_VALUE:
self._relative_value = cmd.attribute
@property
def master(self) -> bool:
"""Get/Set programmatically the Master/Slave status of an actuator"""
if self.initialized_state:
return (
self.settings["move_settings", "multiaxes", "multi_status"] == "Master"
)
else:
return True
@master.setter
def master(self, is_master: bool):
if self.initialized_state:
self.settings.child("move_settings", "multiaxes", "multi_status").setValue(
"Master" if is_master else "Slave"
)
def append_data(
self, dte: Optional[DataToExport] = None, where: Union[Node, str, None] = None
):
"""Appends current DataToExport to an ActuatorEnlargeableSaver
Parameters
----------
dte: DataToExport, optional
where: Node or str
See Also
--------
ActuatorEnlargeableSaver
"""
if dte is None:
dte = DataToExport(name=self.title, data=[self._current_value])
self._add_data_to_saver(dte, where=where)
self.settings.child('saver_settings', 'N_saved').setValue(self.settings['saver_settings', 'N_saved'] + 1)
def _add_data_to_saver(self, data: DataToExport, where=None, **kwargs):
"""Adds DataToExport data to the current node using the declared module_and_data_saver
Filters the data to be saved by DataSource as specified in the current H5Saver (see self.module_and_data_saver)
Parameters
----------
data: DataToExport
The data to be saved
kwargs: dict
Other named parameters to be passed as is to the module_and_data_saver
See Also
--------
DetectorSaver, DetectorEnlargeableSaver, DetectorExtendedSaver
"""
# todo: test this for logging
node = self.module_and_data_saver.get_set_node(where)
self.module_and_data_saver.add_data(node, data, **kwargs)
[docs]
def stop_motion(self):
"""Stop any motion"""
try:
self.command_hardware.emit(ThreadCommand(ControlToHardwareMove.STOP_MOTION))
except Exception as e:
self.logger.exception(str(e))
[docs]
def move(self, move_command: MoveCommand):
"""Generic method to trigger the correct action on the actuator
Parameters
----------
move_command: MoveCommand
MoveCommand with move_type attribute either:
* 'abs': performs an absolute action
* 'rel': performs a relative action
* 'home': find the actuator's home
See Also
--------
:meth:`move_abs`, :meth:`move_rel`, :meth:`move_home`, :class:`..utility_classes.MoveCommand`
"""
if move_command.move_type == "abs":
self.move_abs(move_command.value)
elif move_command.move_type == "rel":
self.move_rel(move_command.value)
elif move_command.move_type == "home":
self.move_home(move_command.value)
[docs]
def move_abs(self, value: Union[DataActuator, numbers.Number], send_to_tcpip=False):
"""Move the connected hardware to the absolute value
Returns nothing but the move_done_signal will be send once the action is done
Parameters
----------
value: ndarray
The value the actuator should reach
send_to_tcpip: bool
if True, this position is send through the TCP/IP communication canal
"""
try:
if isinstance(value, Number):
value = DataActuator(
self.title, data=[np.array([value])], units=self.units
)
self._send_to_tcpip = send_to_tcpip
if value != self._current_value:
if self.ui is not None:
self.ui.move_done = False
self._move_done_bool = False
self._target_value = value
self.update_status("Moving")
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.RESET_STOP_MOTION)
)
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.MOVE_ABS, attribute=[value])
)
except Exception as e:
self.logger.exception(str(e))
[docs]
def move_home(self, send_to_tcpip=False):
"""Move the connected actuator to its home value (if any)
Parameters
----------
send_to_tcpip: bool
if True, this position is send through the TCP/IP communication canal
"""
self._send_to_tcpip = send_to_tcpip
try:
if self.ui is not None:
self.ui.move_done = False
self._move_done_bool = False
self.update_status("Moving")
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.RESET_STOP_MOTION)
)
self.command_hardware.emit(ThreadCommand(ControlToHardwareMove.MOVE_HOME))
except Exception as e:
self.logger.exception(str(e))
[docs]
def move_rel(
self, rel_value: Union[DataActuator, numbers.Number], send_to_tcpip=False
):
"""Move the connected hardware to the relative value
Returns nothing but the move_done_signal will be send once the action is done
Parameters
----------
value: float
The relative value the actuator should reach
send_to_tcpip: bool
if True, this position is send through the TCP/IP communication canal
"""
try:
if isinstance(rel_value, Number):
rel_value = DataActuator(
self.title, data=[np.array([rel_value])], units=self.units
)
self._send_to_tcpip = send_to_tcpip
if self.ui is not None:
self.ui.move_done = False
self._move_done_bool = False
self._target_value = self._current_value + rel_value
self.update_status("Moving")
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.RESET_STOP_MOTION)
)
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.MOVE_REL, attribute=[rel_value])
)
except Exception as e:
self.logger.exception(str(e))
def move_rel_p(self):
self.move_rel(self._relative_value)
def move_rel_m(self):
self.move_rel(-self._relative_value)
[docs]
def quit_fun(self):
"""Programmatic quitting of the current instance of DAQ_Move
Des-init the actuator then close the UI parent widget
"""
# insert anything that needs to be closed before leaving
if self._initialized_state:
self.init_hardware(False)
self.quit_signal.emit()
if self.ui is not None:
self.ui.close()
# self.parent.close()
def init_hardware(self, do_init=True):
"""Init or desinit the selected instrument plugin class"""
if not do_init:
try:
self.command_hardware.emit(ThreadCommand(ControlToHardwareMove.CLOSE))
if self.ui is not None:
self.ui.actuator_init = False
except Exception as e:
self.logger.exception(str(e))
else:
try:
hardware = DAQ_Move_Hardware(
self._actuator_type, self._current_value, self._title
)
self._hardware_thread = QThread()
hardware.moveToThread(self._hardware_thread)
self.command_hardware[ThreadCommand].connect(hardware.queue_command)
hardware.status_sig[ThreadCommand].connect(self.thread_status)
self._update_settings_signal[edict].connect(hardware.update_settings)
self._hardware_thread.hardware = hardware
self._hardware_thread.start()
self.command_hardware.emit(
ThreadCommand(
ControlToHardwareMove.INI_STAGE,
attribute=[
self.settings.child("move_settings").saveState(),
self.controller,
],
)
)
except Exception as e:
self.logger.exception(str(e))
@property
def initialized_state(self):
"""bool: status of the actuator's initialization (init or not)"""
return self._initialized_state
@property
def move_done_bool(self):
"""bool: status of the actuator's status (done or not)"""
return self._move_done_bool
def value_changed(self, param: Parameter):
"""Apply changes of value in the settings"""
super().value_changed(param=param)
path = self.settings.childPath(param)
if param.name() == "refresh_timeout":
self._refresh_timer.setInterval(param.value())
elif param.name() == 'continuous_saving_opt':
self.settings.child('saver_settings').setOpts(visible=param.value())
elif param.name() in putils.iter_children(self.settings.child('saver_settings'), []):
if param.name() == 'do_save':
self.setup_continuous_saving(param.value())
self.h5saver.settings.child(*path[1:]).setValue(param.value())
self._update_settings(param=param)
def setup_continuous_saving(self, init: bool = True):
"""Configure the objects dealing with the continuous saving mode"""
if init:
self.module_and_data_saver = module_saving.ActuatorTimeSaver(self)
self.module_and_data_saver.h5saver = self.h5saver
self.h5saver.settings.child('do_save').sigValueChanged.connect(self._init_continuous_save)
else:
self.h5saver.close_file()
def _init_continuous_save(self):
""" Initialize the continuous saving H5Saver object
Update the module_and_data_saver attribute as :class:`DetectorTimeSaver` object
"""
if self.settings.child('saver_settings', 'do_save').value():
self.settings.child('saver_settings', 'base_name').setValue('Data')
self.settings.child('saver_settings', 'N_saved').show()
self.settings.child('saver_settings', 'N_saved').setValue(0)
self.h5saver.init_file(update_h5=True)
else:
self.settings.child('saver_settings', 'N_saved').hide()
def param_deleted(self, param):
"""Apply deletion of settings"""
if param.name() not in putils.iter_children(
self.settings.child("main_settings"), []
):
self._update_settings_signal.emit(
edict(path=["move_settings"], param=param, change="parent")
)
def child_added(self, param, data):
"""Apply addition of settings"""
path = self.settings.childPath(param)
if "main_settings" not in path:
self._update_settings_signal.emit(
edict(path=path, param=data[0].saveState(), change="childAdded")
)
def raise_timeout(self):
"""Update status with "Timeout occurred" statement and change the timeout flag."""
self.update_status("Timeout occurred")
self.wait_position_flag = False
[docs]
@Slot(ThreadCommand)
def thread_status(
self, status: ThreadCommand
): # general function to get datas/infos from all threads back to the main
"""Get back info (using the ThreadCommand object) from the hardware
And re-emit this ThreadCommand using the custom_sig signal if it should be used in a higher level module
Commands valid for all control modules are defined in the parent class, here are described only the specific
ones
Parameters
----------
status: ThreadCommand
Possible values are:
* **ini_stage**: obtains info from the initialization
* **get_actuator_value**: update the UI current value
* **move_done**: update the UI current value and emits the move_done signal
* **outofbounds**: emits the bounds_signal signal with a True argument
* **set_allowed_values**: used to change the behaviour of the spinbox controlling absolute values (see
:meth:`daq_move_ui.set_abs_spinbox_properties`
* stop: stop the motion
"""
super().thread_status(status, "move")
if status.command == ThreadStatusMove.INI_STAGE:
self.update_status(
f"Stage initialized: {status.attribute['initialized']} "
f"info: {status.attribute['info']}"
)
if status.attribute["initialized"]:
self.controller = status.attribute["controller"]
if self.ui is not None:
self.ui.actuator_init = True
self._initialized_state = True
else:
self._initialized_state = False
if self._initialized_state:
self.get_actuator_value()
self.init_signal.emit(self._initialized_state)
elif (
status.command == ThreadStatusMove.GET_ACTUATOR_VALUE
or status.command == "check_position"
):
data_act = self._check_data_type(status.attribute)
if self.ui is not None:
self.ui.display_value(data_act)
if self.ui.has_action("show_graph") and self.ui.is_action_checked(
"show_graph"
):
self.ui.show_data(DataToExport(name=self.title, data=[data_act]))
self._current_value = data_act
if self.settings['saver_settings', 'do_save']:
self.append_data()
self.current_value_signal.emit(self._current_value)
if (
self.settings["main_settings", "tcpip", "tcp_connected"]
and self._send_to_tcpip
):
self._command_tcpip.emit(ThreadCommand("position_is", data_act))
if (
self.settings["main_settings", "leco", "leco_connected"]
and self._send_to_tcpip
):
self._command_tcpip.emit(
ThreadCommand(LECOMoveCommands.POSITION, data_act)
)
elif status.command == ThreadStatusMove.MOVE_DONE:
data_act = self._check_data_type(status.attribute)
if self.ui is not None:
self.ui.display_value(data_act)
self.ui.move_done = True
self._current_value = data_act
self._move_done_bool = True
self.move_done_signal.emit(data_act)
if (
self.settings.child("main_settings", "tcpip", "tcp_connected").value()
and self._send_to_tcpip
):
self._command_tcpip.emit(ThreadCommand("move_done", data_act))
if (
self.settings.child("main_settings", "leco", "leco_connected").value()
and self._send_to_tcpip
):
self._command_tcpip.emit(
ThreadCommand(LECOMoveCommands.MOVE_DONE, data_act)
)
elif status.command == ThreadStatusMove.OUT_OF_BOUNDS:
logger.warning(f"The Actuator {self.title} has reached its defined bounds")
self.bounds_signal.emit(True)
elif status.command == ThreadStatusMove.SET_ALLOWED_VALUES:
if self.ui is not None:
self.ui.set_abs_spinbox_properties(**status.attribute)
elif status.command == ThreadStatusMove.STOP:
self.stop_motion()
elif status.command == ThreadStatusMove.UNITS:
self.units = status.attribute
def _check_data_type(
self, data_act: Union[list, np.ndarray, Number, DataActuator]
) -> DataActuator:
"""Make sure the data is a DataActuator
Mostly to make sure DAQ_Move is backcompatible with old style plugins
"""
if isinstance(data_act, list): # backcompatibility
if isinstance(data_act[0], Number):
data_act = DataActuator(
data=[np.atleast_1d(val) for val in data_act], units=self.units
)
elif isinstance(data_act[0], np.ndarray):
data_act = DataActuator(data=data_act, units=self.units)
elif isinstance(data_act[0], DataActuator):
data_act = data_act[0]
else:
raise TypeError("Unknown data type")
elif isinstance(data_act, np.ndarray): # backcompatibility
data_act = DataActuator(data=[data_act], units=self.units)
data_act.name = (
self.title
) # for the DataActuator name to be the title of the DAQ_Move
if (
not Unit(self.units).is_compatible_with(Unit(data_act.units))
and data_act.units == ""
): # this happens if the units have not been specified in
# the plugin
data_act.force_units(self.units)
return data_act
[docs]
def get_actuator_value(self):
"""Get the current actuator value via the "get_actuator_value" command send to the hardware
Returns nothing but the `move_done_signal` will be send once the action is done
"""
try:
self.command_hardware.emit(
ThreadCommand(ControlToHardwareMove.GET_ACTUATOR_VALUE)
)
except Exception as e:
self.logger.exception(str(e))
def grab(self):
if self.ui is not None:
self.manage_ui_actions("refresh_value", "setChecked", False)
self.get_continuous_actuator_value(False)
def stop_grab(self):
"""Stop value polling. Mandatory
First uncheck the ui action if ui is not None, then stop the polling
"""
if self.ui is not None:
self.manage_ui_actions("refresh_value", "setChecked", False)
self.get_continuous_actuator_value(False)
[docs]
def get_continuous_actuator_value(self, get_value=True):
"""Start the continuous getting of the actuator's value
Parameters
----------
get_value: bool
if True start the timer to periodically fetch the actuator's value, else stop it
Notes
-----
The current timer period is set by the refresh value *'refresh_timeout'* in the actuator main settings.
"""
if get_value:
self._refresh_timer.setInterval(
self.settings["main_settings", "refresh_timeout"]
)
self._refresh_timer.start()
else:
self._refresh_timer.stop()
@property
def actuator(self):
"""str: the selected actuator's type
Returns
-------
"""
return self._actuator_type
@actuator.setter
def actuator(self, act_type):
if act_type in ACTUATOR_TYPES:
self._actuator_type = act_type
self.update_plugin_config()
if self.ui is not None:
self.ui.actuator = act_type
self.update_settings()
else:
raise ActuatorError(
f"{act_type} is an invalid actuator, should be within {ACTUATOR_TYPES}"
)
@property
def actuators(self) -> List[str]:
"""Get the list of possible actuators"""
return ACTUATOR_TYPES
def update_plugin_config(self):
parent_module = utils.find_dict_in_list_from_key_val(
DAQ_Move_Actuators, "name", self.actuator
)
mod = import_module(parent_module["module"].__package__.split(".")[0])
if hasattr(mod, "config"):
self.plugin_config = mod.config
@property
def units(self):
"""Get/Set the units for the controller"""
return self.settings["move_settings", "units"]
@units.setter
def units(self, unit: str):
self.settings.child("move_settings", "units").setValue(unit)
if self.ui is not None and config("actuator", "display_units"):
unit = self.get_unit_to_display(unit)
self.ui.set_unit_as_suffix(unit)
self.ui.set_unit_prefix(
config("actuator", "siprefix")
and (unit != "" or config("actuator", "siprefix_even_without_units"))
)
@property
def axis_names(self) -> Union[List, Dict]:
""" Get the names of all possible axis"""
return self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
@property
def axis_name(self) -> str:
""" Get/Set the current axis"""
limits = self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
if isinstance(limits, list):
return self.settings['move_settings', 'multiaxes', 'axis']
elif isinstance(limits, dict):
return find_keys_from_val(limits,
val=self.settings['move_settings', 'multiaxes', 'axis'])[0]
@axis_name.setter
def axis_name(self, name: str):
""" Get/Set the current axis"""
limits = self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
if name in limits:
if isinstance(limits, list):
self.settings.child('move_settings', 'multiaxes', 'axis').setValue(name)
elif isinstance(limits, dict):
self.settings.child('move_settings', 'multiaxes', 'axis').setValue(limits[name])
@property
def axis_names(self) -> Union[List, Dict]:
""" Get the names of all possible axis"""
return self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
@property
def axis_name(self) -> str:
""" Get/Set the current axis"""
limits = self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
if isinstance(limits, list):
return self.settings['move_settings', 'multiaxes', 'axis']
elif isinstance(limits, dict):
return find_keys_from_val(limits,
val=self.settings['move_settings', 'multiaxes', 'axis'])[0]
@axis_name.setter
def axis_name(self, name: str):
""" Get/Set the current axis"""
limits = self.settings.child('move_settings', 'multiaxes', 'axis').opts['limits']
if name in limits:
if isinstance(limits, list):
self.settings.child('move_settings', 'multiaxes', 'axis').setValue(name)
elif isinstance(limits, dict):
self.settings.child('move_settings', 'multiaxes', 'axis').setValue(limits[name])
@staticmethod
def get_unit_to_display(unit: str) -> str:
"""Get the unit to be displayed in the UI
If the controller units are in mm the displayed unit will be m
because m is the base unit, then the user could ask for mm, km, µm...
only issue is when the usual displayed unit is not the base one, then add cases below
Parameters
----------
unit: str
Returns
-------
str: the unit to be displayed on the ui
"""
if ("°" in unit or "degree" in unit) and not "°C" in unit:
# special cas as pint base unit for angles are radians
return "°"
elif "°C" in unit:
return "°C"
else:
for key in config("actuator", "allowed_units"):
if key in unit:
return config("actuator", "allowed_units", key)
return str(Q_(1, unit).to_base_units().units)
def update_settings(self):
self.settings.child("main_settings", "move_type").setValue(self._actuator_type)
self.settings.child("main_settings", "module_name").setValue(self._title)
try:
for child in self.settings.child("move_settings").children():
child.remove()
parent_module = utils.find_dict_in_list_from_key_val(
DAQ_Move_Actuators, "name", self._actuator_type
)
class_ = getattr(
getattr(parent_module["module"], "daq_move_" + self._actuator_type),
"DAQ_Move_" + self._actuator_type,
)
params = getattr(class_, "params")
move_params = Parameter.create(
name="move_settings", type="group", children=params
)
self.settings.child("move_settings").addChildren(move_params.children())
except Exception as e:
self.logger.exception(str(e))
def connect_tcp_ip(self):
super().connect_tcp_ip(
params_state=self.settings.child("move_settings"), client_type="ACTUATOR"
)
def connect_leco(self, connect: bool) -> None:
super().connect_leco(connect)
@Slot(ThreadCommand)
def process_tcpip_cmds(self, status: ThreadCommand) -> None:
if super().process_tcpip_cmds(status=status) is None:
return
if LECOMoveCommands.MOVE_ABS == status.command:
self.move_abs(status.attribute, send_to_tcpip=True)
elif LECOMoveCommands.MOVE_REL == status.command:
self.move_rel(status.attribute, send_to_tcpip=True)
elif LECOMoveCommands.MOVE_HOME == status.command:
self.move_home(send_to_tcpip=True)
elif "check_position" in status.command:
deprecation_msg(
"check_position is deprecated, you should use get_actuator_value"
)
self._send_to_tcpip = True
self.get_actuator_value()
elif LECOMoveCommands.GET_ACTUATOR_VALUE in status.command:
self._send_to_tcpip = True
self.get_actuator_value()
elif status.command == LECOMoveCommands.STOP:
self.stop_motion()
class DAQ_Move_Hardware(QObject):
"""
================== ========================
**Attributes** **Type**
*status_sig* instance of Signal
*hardware* ???
*actuator_type* string
*current_position* float
*target_value* float
*hardware_adress* string
*axis_address* string
*motion_stoped* boolean
================== ========================
"""
status_sig = Signal(ThreadCommand)
def __init__(self, actuator_type, position: DataActuator, title="actuator"):
super().__init__()
self.logger = set_logger(f"{logger.name}.{title}.actuator")
self._title = title
self.hardware: Optional[DAQ_Move_base] = None
self.actuator_type = actuator_type
self.hardware_adress = None
self.axis_address = None
self.motion_stoped = False
@property
def title(self):
return self._title
def close(self):
"""
Uninitialize the stage closing the hardware.
"""
if self.hardware is not None and self.hardware.controller is not None:
self.hardware.close()
return "Stage uninitialized"
def get_actuator_value(self):
"""Get the current position checking the hardware value."""
if self.hardware is not None:
pos = self.hardware.get_actuator_value()
if self.hardware.data_actuator_type == DataActuatorType.float:
pos = DataActuator(self._title, data=pos, units=self.hardware.axis_unit)
return pos
def check_position(self):
"""Get the current position checking the hardware position (deprecated)"""
deprecation_msg("check_position is deprecated, use get_actuator_value")
pos = self.hardware.get_actuator_value()
return pos
def ini_stage(self, params_state=None, controller: Optional[HardwareController] = None) -> edict:
"""
Init a stage updating the hardware and sending an hardware move_done signal.
=============== =================================== ==========================================================================================================================
**Parameters** **Type** **Description**
*params_state* ordered dictionary list The parameter state of the hardware class composed by a list representing the tree to keep a temporary save of the tree
*controller* one or many instance of DAQ_Move The controller id of the hardware
*stage* instance of DAQ_Move Defining axes and motors
=============== =================================== ==========================================================================================================================
See Also
--------
DAQ_utils.ThreadCommand, DAQ_Move
"""
status = edict(initialized=False, info="")
try:
parent_module = utils.find_dict_in_list_from_key_val(
DAQ_Move_Actuators, "name", self.actuator_type
)
class_ = getattr(
getattr(parent_module["module"], "daq_move_" + self.actuator_type),
"DAQ_Move_" + self.actuator_type,
)
self.hardware = class_(self, params_state)
assert self.hardware is not None
try:
infos = self.hardware.ini_stage(
controller
) # return edict(info="", controller=, stage=)
except Exception as e:
logger.exception("Hardware couldn't be initialized", exc_info=e)
infos = str(e), False
if isinstance(infos, edict): # following old plugin templating
status.update(infos)
deprecation_msg(
"Returns from init_stage should now be a string and a boolean,"
" see pymodaq_plugins_template",
stacklevel=3,
)
else:
status.info = infos[0]
status.initialized = infos[1]
status.controller = self.hardware.controller
self.hardware.move_done_signal.connect(self.move_done)
if status.initialized:
self.status_sig.emit(
ThreadCommand(
ThreadStatusMove.GET_ACTUATOR_VALUE, self.get_actuator_value()
)
)
return status
except Exception as e:
self.logger.exception(str(e))
return status
def move_abs(self, position: DataActuator, polling: bool = True) -> None:
"""
"""
assert self.hardware is not None
position = check_units(position, self.hardware.axis_unit)
self.hardware.move_is_done = False
self.hardware.ispolling = polling
if self.hardware.data_actuator_type == self.hardware.data_actuator_type.float:
self.hardware.move_abs(
position.units_as(self.hardware.axis_unit).value()
) # convert to plugin controller current axis units
else:
position.units = (
self.hardware.axis_unit
) # convert to plugin controller current axis units
self.hardware.move_abs(position)
self.hardware.poll_moving()
def move_rel(self, rel_position: DataActuator, polling: bool = True) -> None:
"""
"""
assert self.hardware is not None
rel_position = check_units(rel_position, self.hardware.axis_unit)
self.hardware.move_is_done = False
self.hardware.ispolling = polling
if self.hardware.data_actuator_type.name == 'float':
self.hardware.move_rel(rel_position.units_as(self.hardware.axis_unit).value())
else:
rel_position.units = (
self.hardware.axis_unit
) # convert to plugin current axis units
self.hardware.move_rel(rel_position)
self.hardware.poll_moving()
@Slot(float)
def Move_Stoped(self, pos):
"""
Send a "move_done" Thread Command with the given position as an attribute.
See Also
--------
DAQ_utils.ThreadCommand
"""
self.status_sig.emit(ThreadCommand(ThreadStatusMove.MOVE_DONE, pos))
def move_home(self):
"""
Make the hardware move to the init position.
"""
assert self.hardware is not None
self.hardware.move_is_done = False
self.hardware.move_home()
@Slot(DataActuator)
def move_done(self, pos: DataActuator):
"""Send the move_done signal back to the main class"""
self._current_value = pos
self.status_sig.emit(
ThreadCommand(command=ThreadStatusMove.MOVE_DONE, attribute=pos)
)
@Slot(ThreadCommand)
def queue_command(self, command: ThreadCommand):
"""Interpret command send by DAQ_Move class
* **ini_stage** command, init a stage from command attribute.
* **close** command, unitinalise the stage closing hardware and emitting the corresponding status signal
* **move_abs** command, call the move_Abs method with position from command attribute
* **move_rel** command, call the move_Rel method with the relative position from the command attribute.
* **move_home** command, call the move_home method
* **get_actuator_value** command, get the current position from the check_position method
* **Stop_motion** command, stop any motion via the stop_Motion method
* **reset_stop_motion** command, set the motion_stopped attribute to false
Parameters
----------
command: ThreadCommand
Possible commands are:
* **ini_stage** command, init a stage from command attribute.
* **close** command, unitinalise the stage closing hardware and emitting the corresponding status signal
* **move_abs** command, call the move_abs method with position from command attribute
* **move_rel** command, call the move_rel method with the relative position from the command attribute.
* **move_home** command, call the move_home method
* **get_actuator_value** command, get the current position from the check_position method
* **stop_motion** command, stop any motion via the stop_Motion method
* **reset_stop_motion** command, set the motion_stopped attribute to false
"""
try:
logger.debug(f"Threadcommand {command.command} sent to {self.title}")
if command.command == ControlToHardwareMove.INI_STAGE:
status: edict = self.ini_stage(*command.attribute)
self.status_sig.emit(
ThreadCommand(command=ThreadStatusMove.INI_STAGE, attribute=status)
)
elif command.command == ControlToHardwareMove.CLOSE:
status = self.close()
self.status_sig.emit(
ThreadCommand(command=ThreadStatus.CLOSE, attribute=[status])
)
elif command.command == ControlToHardwareMove.MOVE_ABS:
self.move_abs(*command.attribute)
elif command.command == ControlToHardwareMove.MOVE_REL:
self.move_rel(*command.attribute)
elif command.command == ControlToHardwareMove.MOVE_HOME:
self.move_home()
elif command.command == ControlToHardwareMove.GET_ACTUATOR_VALUE:
pos = self.get_actuator_value()
self.status_sig.emit(
ThreadCommand(ThreadStatusMove.GET_ACTUATOR_VALUE, pos)
)
elif command.command == ControlToHardwareMove.STOP_MOTION:
self.stop_motion()
elif command.command == ControlToHardwareMove.RESET_STOP_MOTION:
self.motion_stoped = False
else: # custom commands for particular plugins (see spectrometer module 'get_spectro_wl' for instance)
if hasattr(self.hardware, command.command):
cmd = getattr(self.hardware, command.command)
if isinstance(command.attribute, list):
cmd(*command.attribute)
elif isinstance(command.attribute, dict):
cmd(**command.attribute)
except Exception as e:
self.logger.exception(str(e))
def stop_motion(self):
"""
stop hardware motion with motion_stopped attribute updtaed to True and a status signal sended with an "update_status" Thread Command
See Also
--------
DAQ_utils.ThreadCommand, stop_motion
"""
self.status_sig.emit(
ThreadCommand(command="Update_Status", attribute=["Motion stoping", "log"])
)
self.motion_stoped = True
assert self.hardware is not None
if self.hardware is not None and self.hardware.controller is not None:
self.hardware.stop_motion()
self.hardware.poll_timer.stop()
@Slot(edict)
def update_settings(self, settings_parameter_dict):
"""
Update settings of hardware with dictionary parameters in case of "Move_Settings" path, else update attribute with dictionnary parameters.
========================= =========== ======================================================
**Parameters** **Type** **Description**
*settings_parameter_dict* dictionary Dictionary containing the path and linked parameter
========================= =========== ======================================================
See Also
--------
update_settings
"""
# settings_parameter_dict = edict(path=path,param=param)
path = settings_parameter_dict["path"]
param = settings_parameter_dict["param"]
if path[0] == "main_settings":
if hasattr(self, path[-1]):
setattr(self, path[-1], param.value())
elif path[0] == "move_settings":
if self.hardware is not None:
self.hardware.update_settings(settings_parameter_dict)
def main(init_qt=True):
if init_qt: # used for the test suite
app = mkQApp("PyMoDAQ Move")
widget = QtWidgets.QWidget()
prog = DAQ_Move(widget, title="test")
widget.show()
if init_qt:
sys.exit(app.exec_())
return prog, widget
if __name__ == "__main__":
main()