Source code for pymodaq.control_modules.move_utility_classes

from time import perf_counter
from typing import Union, List, Dict, TYPE_CHECKING
from numbers import Number

from easydict import EasyDict as edict
import numpy as np
from qtpy import QtWidgets
from qtpy.QtCore import QObject, Slot, Signal, QTimer

import pymodaq.utils.daq_utils as utils
import pymodaq.utils.parameter.utils as putils
from pymodaq.utils.parameter import Parameter
from pymodaq.utils.logger import set_logger, get_module_name
from pymodaq.utils.parameter import ioxml

from pymodaq.utils.daq_utils import ThreadCommand, getLineInfo, find_keys_from_val
from pymodaq.utils import config as configmod
from pymodaq.utils.tcp_ip.tcp_server_client import TCPServer, tcp_parameters
from pymodaq.utils.messenger import deprecation_msg
from pymodaq.utils.data import DataActuator
from pymodaq.utils.enums import BaseEnum, enum_checker
from pymodaq.utils.tcp_ip.mysocket import Socket
from pymodaq.utils.tcp_ip.serializer import DeSerializer, Serializer

if TYPE_CHECKING:
    from pymodaq.control_modules.daq_move import DAQ_Move_Hardware

logger = set_logger(get_module_name(__file__))
config = configmod.Config()


class DataActuatorType(BaseEnum):
    """Enum for new or old style holding the value of the actuator"""
    float = 0
    DataActuator = 1


def comon_parameters(epsilon=config('actuator', 'epsilon_default')):
    return [{'title': 'Units:', 'name': 'units', 'type': 'str', 'value': '', 'readonly': True},
            {'title': 'Epsilon:', 'name': 'epsilon', 'type': 'float',
             'value': epsilon,
             'tip': 'Differential Value at which the controller considers it reached the target position'},
            {'title': 'Timeout (s):', 'name': 'timeout', 'type': 'int',
             'value': config('actuator', 'polling_timeout_s')},
            {'title': 'Bounds:', 'name': 'bounds', 'type': 'group', 'children': [
                {'title': 'Set Bounds:', 'name': 'is_bounds', 'type': 'bool', 'value': False},
                {'title': 'Min:', 'name': 'min_bound', 'type': 'float', 'value': 0, 'default': 0},
                {'title': 'Max:', 'name': 'max_bound', 'type': 'float', 'value': 1, 'default': 1}, ]},
            {'title': 'Scaling:', 'name': 'scaling', 'type': 'group', 'children': [
                {'title': 'Use scaling:', 'name': 'use_scaling', 'type': 'bool', 'value': False,
                 'default': False},
                {'title': 'Scaling factor:', 'name': 'scaling', 'type': 'float', 'value': 1., 'default': 1.},
                {'title': 'Offset factor:', 'name': 'offset', 'type': 'float', 'value': 0., 'default': 0.}]}]


MOVE_COMMANDS = ['abs', 'rel', 'home']


class MoveCommand:
    """Utility class to contain a given move type and value

    Attributes
    ----------
    move_type: str
        either:

        * 'abs': performs an absolute action
        * 'rel': performs a relative action
        * 'home': find the actuator's home
    value: float
        the value the move should reach

    """
    def __init__(self, move_type, value=0):
        if move_type not in MOVE_COMMANDS:
            raise ValueError(f'The allowed move types fro an actuator are {MOVE_COMMANDS}')
        self.move_type = move_type
        self.value = value


def comon_parameters_fun(is_multiaxes=False, axes_names=[], axis_names=[], master=True, epsilon=config('actuator', 'epsilon_default')):
    """Function returning the common and mandatory parameters that should be on the actuator plugin level

    Parameters
    ----------
    is_multiaxes: bool
        If True, display the particular settings to define which axis the controller is driving
    axis_names: list of str
        The string identifier of every axis the controller can drive
    master: bool
        If True consider this plugin has to init the controller, otherwise use an already initialized instance
    """
    if axis_names == [] and len(axes_names) != 0:
        axis_names = axes_names

    params = [
                 {'title': 'MultiAxes:', 'name': 'multiaxes', 'type': 'group', 'visible': is_multiaxes, 'children': [
                     {'title': 'is Multiaxes:', 'name': 'ismultiaxes', 'type': 'bool', 'value': is_multiaxes,
                      'default': False},
                     {'title': 'Status:', 'name': 'multi_status', 'type': 'list',
                      'value': 'Master' if master else 'Slave', 'limits': ['Master', 'Slave']},
                     {'title': 'Axis:', 'name': 'axis', 'type': 'list', 'limits': axis_names},
                 ]},
             ] + comon_parameters(epsilon)
    return params


params = [
    {'title': 'Main Settings:', 'name': 'main_settings', 'type': 'group', 'children': [
        {'title': 'Actuator type:', 'name': 'move_type', 'type': 'str', 'value': '', 'readonly': True},
        {'title': 'Actuator name:', 'name': 'module_name', 'type': 'str', 'value': '', 'readonly': True},
        {'title': 'Plugin Config:', 'name': 'plugin_config', 'type': 'bool_push', 'label': 'Show Config', },
        {'title': 'Controller ID:', 'name': 'controller_ID', 'type': 'int', 'value': 0, 'default': 0},
        {'title': 'Refresh value (ms):', 'name': 'refresh_timeout', 'type': 'int',
            'value': config('actuator', 'refresh_timeout_ms')},
        {'title': 'TCP/IP options:', 'name': 'tcpip', 'type': 'group', 'visible': True, 'expanded': False,
         'children': [
             {'title': 'Connect to server:', 'name': 'connect_server', 'type': 'bool_push', 'label': 'Connect',
              'value': False},
             {'title': 'Connected?:', 'name': 'tcp_connected', 'type': 'led', 'value': False},
             {'title': 'IP address:', 'name': 'ip_address', 'type': 'str',
              'value': config('network', 'tcp-server', 'ip')},
             {'title': 'Port:', 'name': 'port', 'type': 'int', 'value': config('network', 'tcp-server', 'port')},
         ]},
    ]},
    {'title': 'Actuator Settings:', 'name': 'move_settings', 'type': 'group'}
]


def main(plugin_file, init=True, title='test'):
    """
    this method start a DAQ_Move object with this defined plugin as actuator
    Returns
    -------

    """
    import sys
    from qtpy import QtWidgets
    from pymodaq.control_modules.daq_move import DAQ_Move
    from pathlib import Path
    app = QtWidgets.QApplication(sys.argv)
    if config('style', 'darkstyle'):
        import qdarkstyle
        app.setStyleSheet(qdarkstyle.load_stylesheet())

    widget = QtWidgets.QWidget()
    prog = DAQ_Move(widget, title=title,)
    widget.show()
    prog.actuator = Path(plugin_file).stem[9:]
    if init:
        prog.init_hardware_ui()

    sys.exit(app.exec_())


[docs]class DAQ_Move_base(QObject): """ The base class to be inherited by all actuator modules This base class implements all necessary parameters and methods for the plugin to communicate with its parent (the DAQ_Move module) Parameters ---------- parent : DAQ_Move_Hardware params_state : Parameter pyqtgraph Parameter instance from which the module will get the initial settings (as defined in the preset) Attributes ---------- move_done_signal: Signal signal represented by a float. Is emitted each time the hardware reached the target position within the epsilon precision (see comon_parameters variable) controller: object the object representing the hardware in the plugin. Used to access hardware functionality settings: Parameter instance representing the hardware settings defined from the params attribute. Modifications on the GUI settings will be transferred to this attribute. It stores at all times the current state of the hardware/plugin settings params: List of dict used to create a Parameter object. Its definition on the class level enable the automatic update of the GUI settings when changing plugins (even in managers mode creation). To be populated on the plugin level as the base class does't represents a real hardware is_multiaxes: bool class level attribute. Defines if the plugin controller controls multiple axes. If True, one has to define a Master instance of this plugin and slave instances of this plugin (all sharing the same controller_ID parameter) current_value: DataActuator stores the current position after each call to the get_actuator_value in the plugin target_value: DataActuator stores the target position the controller should reach within epsilon """ move_done_signal = Signal(DataActuator) is_multiaxes = False stage_names = [] params = [] _controller_units = '' _epsilon = 1 data_actuator_type = DataActuatorType['float'] data_shape = (1, ) # expected shape of the underlying actuator's value (in general a float so shape = (1, )) def __init__(self, parent: 'DAQ_Move_Hardware' = None, params_state: dict = None): QObject.__init__(self) # to make sure this is the parent class self.move_is_done = False self.parent = parent self.stage = None self.status = edict(info="", controller=None, stage=None, initialized=False) self._ispolling = True self.parent_parameters_path = [] # this is to be added in the send_param_status to take into account when the # current class instance parameter list is a child of some other class self.settings = Parameter.create(name='Settings', type='group', children=self.params) if params_state is not None: if isinstance(params_state, dict): self.settings.restoreState(params_state) elif isinstance(params_state, Parameter): self.settings.restoreState(params_state.saveState()) self.settings.sigTreeStateChanged.connect(self.send_param_status) # self.settings.child('multiaxes', # 'axis').sigLimitsChanged.connect(lambda param, # limits: self.send_param_status( # param, [(param, 'limits', None)])) if parent is not None: self._title = parent.title else: self._title = "myactuator" self._current_value = DataActuator(self._title, data=[np.zeros(self.data_shape, dtype=float)]) self._target_value = DataActuator(self._title, data=[np.zeros(self.data_shape, dtype=float)]) self.controller_units = self._controller_units self.poll_timer = QTimer() self.poll_timer.setInterval(config('actuator', 'polling_interval_ms')) self._poll_timeout = config('actuator', 'polling_timeout_s') self.poll_timer.timeout.connect(self.check_target_reached) self.ini_attributes() @property def axis_name(self) -> Union[str, object]: """Get/Set the current axis using its string identifier""" limits = self.settings.child('multiaxes', 'axis').opts['limits'] if isinstance(limits, list): return self.settings['multiaxes', 'axis'] elif isinstance(limits, dict): return find_keys_from_val(limits, val=self.settings['multiaxes', 'axis'])[0] @axis_name.setter def axis_name(self, name: str): limits = self.settings.child('multiaxes', 'axis').opts['limits'] if name in limits: if isinstance(limits, list): self.settings.child('multiaxes', 'axis').setValue(name) elif isinstance(limits, dict): self.settings.child('multiaxes', 'axis').setValue(limits[name]) QtWidgets.QApplication.processEvents() @property def axis_names(self) -> Union[List, Dict]: """ Get/Set the names of all axes controlled by this instrument plugin Returns ------- List of string or dictionary mapping names to integers """ return self.settings.child('multiaxes', 'axis').opts['limits'] @axis_names.setter def axis_names(self, names: Union[List, Dict]): self.settings.child('multiaxes', 'axis').setLimits(names) QtWidgets.QApplication.processEvents() @property def axis_value(self) -> object: """Get the current value selected from the current axis""" return self.settings['multiaxes', 'axis']
[docs] def ini_attributes(self): """ To be subclassed, in order to init specific attributes needed by the real implementation""" self.controller = None
[docs] def ini_stage_init(self, old_controller=None, new_controller=None): """Manage the Master/Slave controller issue First initialize the status dictionnary Then check whether this stage is controlled by a multiaxe controller (to be defined for each plugin) if it is a multiaxes controller then: * if it is Master: init the controller here * if it is Slave: use an already initialized controller (defined in the preset of the dashboard) Parameters ---------- old_controller: object The particular object that allow the communication with the hardware, in general a python wrapper around the hardware library. In case of Slave this one comes from a previously initialized plugin new_controller: object The particular object that allow the communication with the hardware, in general a python wrapper around the hardware library. In case of Master it is the new instance of your plugin controller """ self.status.update(edict(info="", controller=None, initialized=False)) if self.settings['multiaxes', 'ismultiaxes'] and self.settings['multiaxes', 'multi_status'] == "Slave": if old_controller is None: raise Exception('no controller has been defined externally while this axe is a slave one') else: controller = old_controller else: # Master stage controller = new_controller self.controller = controller return controller
@property def current_value(self): if self.data_actuator_type.name == 'float': return self._current_value.value() else: return self._current_value @current_value.setter def current_value(self, value: Union[float, DataActuator]): if not isinstance(value, DataActuator): self._current_value = DataActuator(self._title, data=value) else: self._current_value = value @property def target_value(self): if self.data_actuator_type.name == 'float': return self._target_value.value() else: return self._target_value @target_value.setter def target_value(self, value: Union[float, DataActuator]): if not isinstance(value, DataActuator): self._target_value = DataActuator(self._title, data=value) else: self._target_value = value @property def current_position(self): deprecation_msg('current_position attribute should not be used, use current_value') return self.current_value @current_position.setter def current_position(self, value): self.current_value = value @property def target_position(self): deprecation_msg('target_position attribute should not be used, use target_value') return self.target_value @target_position.setter def target_position(self, value): self.target_value = value @property def controller_units(self): """ Get/Set the units of this plugin""" return self._controller_units @controller_units.setter def controller_units(self, units: str = ''): self._controller_units = units try: self.settings.child('units').setValue(units) except Exception: pass @property def ispolling(self): """ Get/Set the polling status""" return self._ispolling @ispolling.setter def ispolling(self, polling=True): self._ispolling = polling
[docs] def check_bound(self, position: DataActuator) -> DataActuator: """ Check if the current position is within the software bounds Return the new position eventually coerced within the bounds """ if self.settings.child('bounds', 'is_bounds').value(): if position > self.settings.child('bounds', 'max_bound').value(): position = DataActuator(self._title, data=self.settings.child('bounds', 'max_bound').value()) self.emit_status(ThreadCommand('outofbounds', [])) elif position < self.settings.child('bounds', 'min_bound').value(): position = DataActuator(self._title, data=self.settings.child('bounds', 'min_bound').value()) self.emit_status(ThreadCommand('outofbounds', [])) return position
def get_actuator_value(self): if hasattr(self, 'check_position'): deprecation_msg('check_position method in plugins is deprecated, use get_actuator_value',3) return self.check_position() else: raise NotImplementedError def move_abs(self, value: Union[float, DataActuator]): if hasattr(self, 'move_Abs'): deprecation_msg('move_Abs method in plugins is deprecated, use move_abs', 3) self.move_Abs(value) else: raise NotImplementedError def move_rel(self, value: Union[float, DataActuator]): if hasattr(self, 'move_Rel'): deprecation_msg('move_Rel method in plugins is deprecated, use move_rel', 3) self.move_Rel(value) else: raise NotImplementedError def move_home(self, value: Union[float, DataActuator]): if hasattr(self, 'move_Home'): deprecation_msg('move_Home method in plugins is deprecated, use move_home', 3) self.move_Home() else: raise NotImplementedError
[docs] def emit_status(self, status: ThreadCommand): """ Emit the status_sig signal with the given status ThreadCommand back to the main GUI. """ if self.parent is not None: self.parent.status_sig.emit(status) QtWidgets.QApplication.processEvents() else: print(status)
[docs] def emit_value(self, pos: DataActuator): """Convenience method to emit the current actuator value back to the UI""" self.emit_status(ThreadCommand('get_actuator_value', [pos]))
[docs] def commit_settings(self, param: Parameter): """ to subclass to transfer parameters to hardware """
def commit_common_settings(self, param): pass
[docs] def move_done(self, position: DataActuator = None): # the position argument is just there to match some signature of child classes """ | Emit a move done signal transmitting the float position to hardware. | The position argument is just there to match some signature of child classes. =============== ========== ============================================================================= **Arguments** **Type** **Description** *position* float The position argument is just there to match some signature of child classes =============== ========== ============================================================================= """ if position is None: if self.data_actuator_type.name == 'float': position = DataActuator(self._title, data=self.get_actuator_value()) else: position = self.get_actuator_value() if position.name != self._title: # make sure the emitted DataActuator has the name of the real implementation #of the plugin position = DataActuator(self._title, data=position.value()) self.move_done_signal.emit(position) self.move_is_done = True
[docs] def poll_moving(self): """ Poll the current moving. In case of timeout emit the raise timeout Thread command. See Also -------- DAQ_utils.ThreadCommand, move_done """ if 'TCPServer' not in self.__class__.__name__: self.start_time = perf_counter() if self.ispolling: self.poll_timer.start() else: if self.data_actuator_type.name == 'float': self._current_value = DataActuator(data=self.get_actuator_value()) else: self._current_value = self.get_actuator_value() logger.debug(f'Current position: {self._current_value}') self.move_done(self._current_value)
def check_target_reached(self): # if not isinstance(self._current_value, DataActuator): # self._current_value = DataActuator(data=self._current_value) # if not isinstance(self._target_value, DataActuator): # self._target_value = DataActuator(data=self._target_value) logger.debug(f"epsilon value is {self.settings['epsilon']}") logger.debug(f"current_value value is {self._current_value}") logger.debug(f"target_value value is {self._target_value}") if not (self._current_value - self._target_value).abs() < self.settings['epsilon']: logger.debug(f'Check move_is_done: {self.move_is_done}') if self.move_is_done: self.emit_status(ThreadCommand('Move has been stopped', )) logger.info(f'Move has been stopped') self.current_value = self.get_actuator_value() self.emit_value(self._current_value) logger.debug(f'Current value: {self._current_value}') if perf_counter() - self.start_time >= self.settings['timeout']: self.poll_timer.stop() self.emit_status(ThreadCommand('raise_timeout', )) logger.info(f'Timeout activated') else: self.poll_timer.stop() logger.debug(f'Current value: {self._current_value}') self.move_done(self._current_value)
[docs] def send_param_status(self, param, changes): """ Send changes value updates to the gui to update consequently the User Interface The message passing is made via the ThreadCommand "update_settings". """ for param, change, data in changes: path = self.settings.childPath(param) if change == 'childAdded': self.emit_status(ThreadCommand('update_settings', [self.parent_parameters_path + path, [data[0].saveState(), data[1]], change])) # send parameters values/limits back to the GUI. Send kind of a copy back the GUI otherwise the child reference will be the same in both th eUI and the plugin so one of them will be removed elif change == 'value' or change == 'limits' or change == 'options': self.emit_status(ThreadCommand('update_settings', [self.parent_parameters_path + path, data, change])) # send parameters values/limits back to the GUI elif change == 'parent': pass elif change == 'limits': self.emit_status(ThreadCommand('update_settings', [self.parent_parameters_path + path, data, change]))
[docs] def get_position_with_scaling(self, pos: DataActuator) -> DataActuator: """ Get the current position from the hardware with scaling conversion. """ if self.settings['scaling', 'use_scaling']: pos = (pos - self.settings['scaling', 'offset']) * self.settings['scaling', 'scaling'] return pos
[docs] def set_position_with_scaling(self, pos: DataActuator) -> DataActuator: """ Set the current position from the parameter and hardware with scaling conversion. """ if self.settings['scaling', 'use_scaling']: pos = pos / self.settings['scaling', 'scaling'] + self.settings['scaling', 'offset'] return pos
[docs] def set_position_relative_with_scaling(self, pos: DataActuator) -> DataActuator: """ Set the scaled positions in case of relative moves """ if self.settings['scaling', 'use_scaling']: pos = pos / self.settings['scaling', 'scaling'] return pos
[docs] @Slot(edict) def update_settings(self, settings_parameter_dict): # settings_parameter_dict=edict(path=path,param=param) """ Receive the settings_parameter signal from the param_tree_changed method and make hardware updates of modified values. """ path = settings_parameter_dict['path'] param = settings_parameter_dict['param'] change = settings_parameter_dict['change'] apply_settings = True try: self.settings.sigTreeStateChanged.disconnect(self.send_param_status) except Exception: pass if change == 'value': self.settings.child(*path[1:]).setValue(param.value()) # blocks signal back to main UI elif change == 'childAdded': try: child = Parameter.create(name='tmp') child.restoreState(param) param = child self.settings.child(*path[1:]).addChild(child) # blocks signal back to main UI except ValueError: apply_settings = False elif change == 'parent': children = putils.get_param_from_name(self.settings, param.name()) if children is not None: path = putils.get_param_path(children) self.settings.child(*path[1:-1]).removeChild(children) self.settings.sigTreeStateChanged.connect(self.send_param_status) if apply_settings: self.commit_common_settings(param) self.commit_settings(param)
class DAQ_Move_TCP_server(DAQ_Move_base, TCPServer): """ ================= ============================== **Attributes** **Type** *command_server* instance of Signal *x_axis* 1D numpy array *y_axis* 1D numpy array *data* double precision float array ================= ============================== See Also -------- utility_classes.DAQ_TCP_server """ params_client = [] # parameters of a client grabber command_server = Signal(list) data_actuator_type = DataActuatorType['DataActuator'] message_list = ["Quit", "Status", "Done", "Server Closed", "Info", "Infos", "Info_xml", "move_abs", 'move_home', 'move_rel', 'get_actuator_value', 'stop_motion', 'position_is', 'move_done'] socket_types = ["ACTUATOR"] params = comon_parameters() + tcp_parameters def __init__(self, parent=None, params_state=None): """ Parameters ---------- parent params_state """ self.client_type = "ACTUATOR" DAQ_Move_base.__init__(self, parent, params_state) # initialize base class with commom attribute and methods self.settings.child('bounds').hide() self.settings.child('scaling').hide() self.settings.child('epsilon').setValue(1) TCPServer.__init__(self, self.client_type) def command_to_from_client(self, command): sock: Socket = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client 'ACTUATOR' is connected then send it the command if command == 'position_is': pos = DeSerializer(sock).dwa_deserialization() pos = self.get_position_with_scaling(pos) self._current_value = pos self.emit_status(ThreadCommand('get_actuator_value', [pos])) elif command == 'move_done': pos = DeSerializer(sock).dwa_deserialization() pos = self.get_position_with_scaling(pos) self._current_value = pos self.emit_status(ThreadCommand('move_done', [pos])) else: self.send_command(sock, command) def commit_settings(self, param): if param.name() in putils.iter_children(self.settings.child('settings_client'), []): actuator_socket: Socket = [client['socket'] for client in self.connected_clients if client['type'] == 'ACTUATOR'][0] actuator_socket.check_sended_with_serializer('set_info') path = putils.get_param_path(param)[2:] # get the path of this param as a list starting at parent 'infos' actuator_socket.check_sended_with_serializer(path) # send value data = ioxml.parameter_to_xml_string(param) actuator_socket.check_sended_with_serializer(data) def ini_stage(self, controller=None): """ | Initialisation procedure of the detector updating the status dictionnary. | | Init axes from image , here returns only None values (to tricky to di it with the server and not really necessary for images anyway) See Also -------- utility_classes.DAQ_TCP_server.init_server, get_xaxis, get_yaxis """ self.settings.child('infos').addChildren(self.params_client) self.init_server() self.controller = self.serversocket self.settings.child('units').hide() self.settings.child('epsilon').hide() info = 'TCP Server actuator' initialized = True return info, initialized def close(self): """ Should be used to uninitialize hardware. See Also -------- utility_classes.DAQ_TCP_server.close_server """ self.listening = False self.close_server() def move_abs(self, position: DataActuator): """ """ position = self.check_bound(position) self.target_value = position position = self.set_position_with_scaling(position) sock = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client self.client_type is connected then send it the command sock.check_sended_with_serializer('move_abs') sock.check_sended_with_serializer(position) def move_rel(self, position: DataActuator): position = self.check_bound(self.current_value + position) - self.current_value self.target_value = position + self.current_value position = self.set_position_relative_with_scaling(position) sock = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client self.client_type is connected then send it the command sock.check_sended_with_serializer('move_rel') sock.check_sended_with_serializer(position) def move_home(self): """ Make the absolute move to original position (0). See Also -------- move_Abs """ sock = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client self.client_type is connected then send it the command sock.check_sended_with_serializer('move_home') def get_actuator_value(self): """ Get the current hardware position with scaling conversion given by get_position_with_scaling. See Also -------- daq_move_base.get_position_with_scaling, daq_utils.ThreadCommand """ sock = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client self.client_type is connected then send it the command self.send_command(sock, 'get_actuator_value') return self._current_value def stop_motion(self): """ See Also -------- daq_move_base.move_done """ sock = self.find_socket_within_connected_clients(self.client_type) if sock is not None: # if client self.client_type is connected then send it the command self.send_command(sock, 'stop_motion') def stop(self): """ not implemented. """ pass return "" if __name__ == '__main__': test = DAQ_Move_base()