Source code for pymodaq.control_modules.daq_move

# -*- coding: utf-8 -*-
"""
Created the 29/07/2022

@author: Sebastien Weber
"""

from __future__ import annotations

import numbers
from importlib import import_module
from numbers import Number
import sys
from typing import List, Tuple, Union
import numpy as np

from qtpy.QtCore import QObject, Signal, QThread, Slot, Qt, QTimer
from qtpy import QtWidgets

from easydict import EasyDict as edict

from pymodaq.utils.logger import set_logger, get_module_name
from pymodaq.control_modules.utils import ControlModule
from pymodaq.utils.parameter import ioxml
from pymodaq.control_modules.daq_move_ui import DAQ_Move_UI, ThreadCommand
from pymodaq.utils.managers.parameter_manager import ParameterManager, Parameter
from pymodaq.control_modules.move_utility_classes import MoveCommand, DAQ_Move_base
from pymodaq.utils.tcp_ip.tcp_server_client import TCPClient
from pymodaq.control_modules.move_utility_classes import params as daq_move_params
from pymodaq.utils import daq_utils as utils
from pymodaq.utils.parameter import utils as putils
from pymodaq.utils.gui_utils import get_splash_sc
from pymodaq.utils import config as config_mod
from pymodaq.utils.exceptions import ActuatorError
from pymodaq.utils.messenger import deprecation_msg
from pymodaq.utils.h5modules import module_saving
from pymodaq.utils.data import DataRaw, DataToExport, DataFromPlugins, DataActuator
from pymodaq.utils.h5modules.backends import Node


local_path = config_mod.get_set_local_dir()
sys.path.append(local_path)
logger = set_logger(get_module_name(__file__))
config = config_mod.Config()

DAQ_Move_Actuators = utils.get_plugins('daq_move')
ACTUATOR_TYPES = [mov['name'] for mov in DAQ_Move_Actuators]
if len(ACTUATOR_TYPES) == 0:
    raise ActuatorError('No installed Actuator')


STATUS_WAIT_TIME = 1000


[docs]class DAQ_Move(ParameterManager, ControlModule): """ Main PyMoDAQ class to drive actuators Qt object and generic UI to drive actuators. Attributes ---------- init_signal: Signal[bool] This signal is emitted when the chosen actuator is correctly initialized move_done_signal: Signal[str, DataActuator] This signal is emitted when the chosen actuator finished its action. It gives the actuator's name and current value bounds_signal: Signal[bool] This signal is emitted when the actuator reached defined limited boundaries. See Also -------- :class:`ControlModule`, :class:`ParameterManager` """ settings_name = 'daq_move_settings' move_done_signal = Signal(DataActuator) current_value_signal = Signal(DataActuator) _update_settings_signal = Signal(edict) bounds_signal = Signal(bool) params = daq_move_params def __init__(self, parent=None, title="DAQ Move"): """ Parameters ---------- parent: QWidget or None parent: QWidget or None if it is a valid QWidget, it will hold the user interface to drive it title: str The unique (should be unique) string identifier for the underlying actuator """ self.logger = set_logger(f'{logger.name}.{title}') self.logger.info(f'Initializing DAQ_Move: {title}') QObject.__init__(self) ParameterManager.__init__(self, action_list= ('save', 'update')) ControlModule.__init__(self) self.parent = parent if parent is not None: self.ui = DAQ_Move_UI(parent, title) else: self.ui: DAQ_Move_UI = None if self.ui is not None: self.ui.actuators = ACTUATOR_TYPES self.ui.set_settings_tree(self.settings_tree) self.ui.command_sig.connect(self.process_ui_cmds) self.splash_sc = get_splash_sc() self._title = title if len(ACTUATOR_TYPES) > 0: # will be 0 if no valid plugins are installed self.actuator = ACTUATOR_TYPES[0] self.module_and_data_saver = module_saving.ActuatorSaver(self) self._move_done_bool = True self._current_value = DataActuator(title) self._target_value: DataActuator(title) self._relative_value: DataActuator(title) self._refresh_timer = QTimer() self._refresh_timer.timeout.connect(self.get_actuator_value) def process_ui_cmds(self, cmd: utils.ThreadCommand): """Process commands sent by actions done in the ui Parameters ---------- cmd: ThreadCommand Possible values are : * init * quit * get_value * loop_get_value * find_home * stop * move_abs * move_rel * show_log * actuator_changed * rel_value * show_config """ if cmd.command == 'init': self.init_hardware(cmd.attribute[0]) elif cmd.command == 'quit': self.quit_fun() elif cmd.command == 'get_value': self.get_actuator_value() elif cmd.command == 'loop_get_value': self.get_continuous_actuator_value(cmd.attribute) elif cmd.command == 'find_home': self.move_home() elif cmd.command == 'stop': self.stop_motion() elif cmd.command == 'move_abs': self.move_abs(cmd.attribute) elif cmd.command == 'move_rel': self.move_rel(cmd.attribute) elif cmd.command == 'show_log': self.show_log() elif cmd.command == 'show_config': self.config = self.show_config(self.config) self.ui.config = self.config elif cmd.command == 'actuator_changed': self.actuator = cmd.attribute elif cmd.command == 'rel_value': self._relative_value = cmd.attribute def append_data(self, dte: DataToExport = None, where: Union[Node, str] = None): """Appends current DataToExport to an ActuatorEnlargeableSaver Parameters ---------- data where: Node or str See Also -------- ActuatorEnlargeableSaver """ if dte is None: dte = DataToExport(name=self.title, data=[self._current_value]) self._add_data_to_saver(dte, where=where) # todo: test this for logging def _add_data_to_saver(self, data: DataToExport, where=None, **kwargs): """Adds DataToExport data to the current node using the declared module_and_data_saver Filters the data to be saved by DataSource as specified in the current H5Saver (see self.module_and_data_saver) Parameters ---------- data: DataToExport The data to be saved kwargs: dict Other named parameters to be passed as is to the module_and_data_saver See Also -------- DetectorSaver, DetectorEnlargeableSaver, DetectorExtendedSaver """ #todo: test this for logging node = self.module_and_data_saver.get_set_node(where) self.module_and_data_saver.add_data(node, data, **kwargs)
[docs] def stop_motion(self): """Stop any motion """ try: self.command_hardware.emit(ThreadCommand(command="stop_motion")) except Exception as e: self.logger.exception(str(e))
[docs] def move(self, move_command: MoveCommand): """Generic method to trigger the correct action on the actuator Parameters ---------- move_command: MoveCommand MoveCommand with move_type attribute either: * 'abs': performs an absolute action * 'rel': performs a relative action * 'home': find the actuator's home See Also -------- :meth:`move_abs`, :meth:`move_rel`, :meth:`move_home`, :class:`..utility_classes.MoveCommand` """ if move_command.move_type == 'abs': self.move_abs(move_command.value) elif move_command.move_type == 'rel': self.move_rel(move_command.value) elif move_command.move_type == 'home': self.move_home(move_command.value)
[docs] def move_abs(self, value: Union[DataActuator, numbers.Number], send_to_tcpip=False): """Move the connected hardware to the absolute value Returns nothing but the move_done_signal will be send once the action is done Parameters ---------- value: ndarray The value the actuator should reach send_to_tcpip: bool if True, this position is send through the TCP/IP communication canal """ try: if isinstance(value, Number): value = DataActuator(self.title, data=[np.array([value])]) self._send_to_tcpip = send_to_tcpip if value != self._current_value: if self.ui is not None: self.ui.move_done = False self._move_done_bool = False self._target_value = value self.update_status("Moving") self.command_hardware.emit(ThreadCommand(command="reset_stop_motion")) self.command_hardware.emit(ThreadCommand(command="move_abs", attribute=[value])) except Exception as e: self.logger.exception(str(e))
[docs] def move_home(self, send_to_tcpip=False): """Move the connected actuator to its home value (if any) Parameters ---------- send_to_tcpip: bool if True, this position is send through the TCP/IP communication canal """ self._send_to_tcpip = send_to_tcpip try: if self.ui is not None: self.ui.move_done = False self._move_done_bool = False self.update_status("Moving") self.command_hardware.emit(ThreadCommand(command="reset_stop_motion")) self.command_hardware.emit(ThreadCommand(command="move_home")) except Exception as e: self.logger.exception(str(e))
[docs] def move_rel(self, rel_value: Union[DataActuator, numbers.Number], send_to_tcpip=False): """Move the connected hardware to the relative value Returns nothing but the move_done_signal will be send once the action is done Parameters ---------- value: float The relative value the actuator should reach send_to_tcpip: bool if True, this position is send through the TCP/IP communication canal """ try: if isinstance(rel_value, Number): rel_value = DataActuator(self.title, data=[np.array([rel_value])]) self._send_to_tcpip = send_to_tcpip if self.ui is not None: self.ui.move_done = False self._move_done_bool = False self._target_value = self._current_value + rel_value self.update_status("Moving") self.command_hardware.emit(ThreadCommand(command="reset_stop_motion")) self.command_hardware.emit(ThreadCommand(command="move_rel", attribute=[rel_value])) except Exception as e: self.logger.exception(str(e))
def move_rel_p(self): self.move_rel(self._relative_value) def move_rel_m(self): self.move_rel(-self._relative_value)
[docs] def quit_fun(self): """Programmatic quitting of the current instance of DAQ_Move Des-init the actuator then close the UI parent widget """ # insert anything that needs to be closed before leaving if self._initialized_state: self.init_hardware(False) self.quit_signal.emit() if self.ui is not None: self.ui.close()
# self.parent.close() def init_hardware(self, do_init=True): """ Init or desinit the selected instrument plugin class """ if not do_init: try: self.command_hardware.emit(ThreadCommand(command="close")) if self.ui is not None: self.ui.actuator_init = False except Exception as e: self.logger.exception(str(e)) else: try: hardware = DAQ_Move_Hardware(self._actuator_type, self._current_value, self._title) self._hardware_thread = QThread() hardware.moveToThread(self._hardware_thread) self.command_hardware[ThreadCommand].connect(hardware.queue_command) hardware.status_sig[ThreadCommand].connect(self.thread_status) self._update_settings_signal[edict].connect(hardware.update_settings) self._hardware_thread.hardware = hardware self._hardware_thread.start() self.command_hardware.emit( ThreadCommand(command="ini_stage", attribute=[self.settings.child('move_settings').saveState(), self.controller])) except Exception as e: self.logger.exception(str(e)) @property def initialized_state(self): """bool: status of the actuator's initialization (init or not)""" return self._initialized_state @property def move_done_bool(self): """bool: status of the actuator's status (done or not)""" return self._move_done_bool def value_changed(self, param): """ Apply changes of value in the settings""" if param.name() == 'connect_server': if param.value(): self.connect_tcp_ip() else: self._command_tcpip.emit(ThreadCommand('quit', )) elif param.name() == 'ip_address' or param.name == 'port': self._command_tcpip.emit( ThreadCommand('update_connection', dict(ipaddress=self.settings.child('main_settings', 'tcpip', 'ip_address').value(), port=self.settings.child('main_settings', 'tcpip', 'port').value()))) elif param.name() == 'refresh_timeout': self._refresh_timer.setInterval(param.value()) elif param.name() == 'plugin_config': self.show_config(self.plugin_config) path = self.settings.childPath(param) if path is not None: if 'main_settings' not in path: self._update_settings_signal.emit(edict(path=path, param=param, change='value')) if self.settings.child('main_settings', 'tcpip', 'tcp_connected').value(): self._command_tcpip.emit(ThreadCommand('send_info', dict(path=path, param=param))) def param_deleted(self, param): """ Apply deletion of settings """ if param.name() not in putils.iter_children(self.settings.child('main_settings'), []): self._update_settings_signal.emit(edict(path=['move_settings'], param=param, change='parent')) def child_added(self, param, data): """ Apply addition of settings """ path = self.settings.childPath(param) if 'main_settings' not in path: self._update_settings_signal.emit(edict(path=path, param=data[0].saveState(), change='childAdded')) def raise_timeout(self): """ Update status with "Timeout occurred" statement and change the timeout flag. """ self.update_status("Timeout occurred") self.wait_position_flag = False
[docs] @Slot(ThreadCommand) def thread_status(self, status: ThreadCommand): # general function to get datas/infos from all threads back to the main """Get back info (using the ThreadCommand object) from the hardware And re-emit this ThreadCommand using the custom_sig signal if it should be used in a higher level module Commands valid for all control modules are defined in the parent class, here are described only the specific ones Parameters ---------- status: ThreadCommand Possible values are: * **ini_stage**: obtains info from the initialization * **get_actuator_value**: update the UI current value * **move_done**: update the UI current value and emits the move_done signal * **outofbounds**: emits the bounds_signal signal with a True argument * **set_allowed_values**: used to change the behaviour of the spinbox controlling absolute values (see :meth:`daq_move_ui.set_abs_spinbox_properties` * stop: stop the motion """ super().thread_status(status, 'move') if status.command == "ini_stage": # status.attribute[0]=edict(initialized=bool,info="", controller=) self.update_status("Stage initialized: {:} info: {:}".format(status.attribute[0]['initialized'], status.attribute[0]['info'])) if status.attribute[0]['initialized']: self.controller = status.attribute[0]['controller'] if self.ui is not None: self.ui.actuator_init = True self._initialized_state = True else: self._initialized_state = False if self._initialized_state: self.get_actuator_value() self.init_signal.emit(self._initialized_state) elif status.command == "get_actuator_value" or status.command == 'check_position': data_act: DataActuator = status.attribute[0] data_act.name = self.title # for the DataActuator name to be the title of the DAQ_Move if self.ui is not None: self.ui.display_value(data_act) if self.ui.is_action_checked('show_graph'): self.ui.show_data(DataToExport(name=self.title, data=[data_act])) self._current_value = data_act self.current_value_signal.emit(self._current_value) if self.settings['main_settings', 'tcpip', 'tcp_connected'] and self._send_to_tcpip: self._command_tcpip.emit(ThreadCommand('position_is', status.attribute)) elif status.command == "move_done": data_act: DataActuator = status.attribute[0] data_act.name = self.title # for the DataActuator name to be the title of the DAQ_Move if self.ui is not None: self.ui.display_value(data_act) self.ui.move_done = True self._current_value = data_act self._move_done_bool = True self.move_done_signal.emit(data_act) if self.settings.child('main_settings', 'tcpip', 'tcp_connected').value() and self._send_to_tcpip: self._command_tcpip.emit(ThreadCommand('move_done', status.attribute)) elif status.command == 'outofbounds': self.bounds_signal.emit(True) elif status.command == 'set_allowed_values': if self.ui is not None: self.ui.set_abs_spinbox_properties(**status.attribute) elif status.command == 'stop': self.stop_motion()
[docs] def get_actuator_value(self): """Get the current actuator value via the "get_actuator_value" command send to the hardware Returns nothing but the `move_done_signal` will be send once the action is done """ try: self.command_hardware.emit(ThreadCommand(command="get_actuator_value")) except Exception as e: self.logger.exception(str(e))
[docs] def grab(self): if self.ui is not None: self.manage_ui_actions('refresh_value', 'setChecked', False) self.get_continuous_actuator_value(False)
def stop_grab(self): """Stop value polling. Mandatory First uncheck the ui action if ui is not None, then stop the polling """ if self.ui is not None: self.manage_ui_actions('refresh_value', 'setChecked', False) self.get_continuous_actuator_value(False)
[docs] def get_continuous_actuator_value(self, get_value=True): """Start the continuous getting of the actuator's value Parameters ---------- get_value: bool if True start the timer to periodically fetch the actuator's value, else stop it Notes ----- The current timer period is set by the refresh value *'refresh_timeout'* in the actuator main settings. """ if get_value: self._refresh_timer.setInterval(self.settings['main_settings', 'refresh_timeout']) self._refresh_timer.start() else: self._refresh_timer.stop()
@property def actuator(self): """str: the selected actuator's type Returns ------- """ return self._actuator_type @actuator.setter def actuator(self, act_type): if act_type in ACTUATOR_TYPES: self._actuator_type = act_type self.update_plugin_config() if self.ui is not None: self.ui.actuator = act_type self.update_settings() else: raise ActuatorError(f'{act_type} is an invalid actuator, should be within {ACTUATOR_TYPES}') def update_plugin_config(self): parent_module = utils.find_dict_in_list_from_key_val(DAQ_Move_Actuators, 'name', self.actuator) mod = import_module(parent_module['module'].__package__.split('.')[0]) if hasattr(mod, 'config'): self.plugin_config = mod.config @property def units(self): return self.settings['move_settings', 'units'] def update_settings(self): self.settings.child('main_settings', 'move_type').setValue(self._actuator_type) self.settings.child('main_settings', 'module_name').setValue(self._title) try: for child in self.settings.child('move_settings').children(): child.remove() parent_module = utils.find_dict_in_list_from_key_val(DAQ_Move_Actuators, 'name', self._actuator_type) class_ = getattr(getattr(parent_module['module'], 'daq_move_' + self._actuator_type), 'DAQ_Move_' + self._actuator_type) params = getattr(class_, 'params') move_params = Parameter.create(name='move_settings', type='group', children=params) self.settings.child('move_settings').addChildren(move_params.children()) except Exception as e: self.logger.exception(str(e)) def connect_tcp_ip(self): if self.settings.child('main_settings', 'tcpip', 'connect_server').value(): self._tcpclient_thread = QThread() tcpclient = TCPClient(self.settings.child('main_settings', 'tcpip', 'ip_address').value(), self.settings.child('main_settings', 'tcpip', 'port').value(), self.settings.child('move_settings'), client_type="ACTUATOR") tcpclient.moveToThread(self._tcpclient_thread) self._tcpclient_thread.tcpclient = tcpclient tcpclient.cmd_signal.connect(self.process_tcpip_cmds) self._command_tcpip[ThreadCommand].connect(tcpclient.queue_command) self._tcpclient_thread.started.connect(tcpclient.init_connection) self._tcpclient_thread.start() @Slot(ThreadCommand) def process_tcpip_cmds(self, status): if 'move_abs' in status.command: self.move_abs(status.attribute[0], send_to_tcpip=True) elif 'move_rel' in status.command: self.move_rel(status.attribute[0], send_to_tcpip=True) elif 'move_home' in status.command: self.move_home(send_to_tcpip=True) elif 'check_position' in status.command: deprecation_msg('check_position is deprecated, you should use get_actuator_value') self._send_to_tcpip = True self.command_hardware.emit(ThreadCommand('get_actuator_value', )) elif 'get_actuator_value' in status.command: self._send_to_tcpip = True self.command_hardware.emit(ThreadCommand('get_actuator_value', )) elif status.command == 'connected': self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(True) elif status.command == 'disconnected': self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(False) elif status.command == 'Update_Status': self.thread_status(status) elif status.command == 'set_info': param_dict = ioxml.XML_string_to_parameter(status.attribute[1])[0] param_tmp = Parameter.create(**param_dict) param = self.settings.child('move_settings', *status.attribute[0][1:]) param.restoreState(param_tmp.saveState())
class DAQ_Move_Hardware(QObject): """ ================== ======================== **Attributes** **Type** *status_sig* instance of Signal *hardware* ??? *actuator_type* string *current_position* float *target_value* float *hardware_adress* string *axis_address* string *motion_stoped* boolean ================== ======================== """ status_sig = Signal(ThreadCommand) def __init__(self, actuator_type, position: DataActuator, title='actuator'): super().__init__() self.logger = set_logger(f'{logger.name}.{title}.actuator') self._title = title self.hardware: DAQ_Move_base = None self.actuator_type = actuator_type self.current_position: DataActuator = position self._target_value: DataActuator = None self.hardware_adress = None self.axis_address = None self.motion_stoped = False @property def title(self): return self._title def close(self): """ Uninitialize the stage closing the hardware. """ self.hardware.close() return "Stage uninitialized" def get_actuator_value(self): """Get the current position checking the hardware value. """ pos = self.hardware.get_actuator_value() if self.hardware.data_actuator_type.name == 'float': return DataActuator(self._title, data=pos) else: return pos def check_position(self): """Get the current position checking the hardware position (deprecated) """ deprecation_msg('check_position is deprecated, use get_actuator_value') pos = self.hardware.get_actuator_value() return pos def ini_stage(self, params_state=None, controller=None): """ Init a stage updating the hardware and sending an hardware move_done signal. =============== =================================== ========================================================================================================================== **Parameters** **Type** **Description** *params_state* ordered dictionnary list The parameter state of the hardware class composed by a list representing the tree to keep a temporary save of the tree *controller* one or many instance of DAQ_Move The controller id of the hardware *stage* instance of DAQ_Move Defining axes and motors =============== =================================== ========================================================================================================================== See Also -------- DAQ_utils.ThreadCommand, DAQ_Move """ status = edict(initialized=False, info="") try: parent_module = utils.find_dict_in_list_from_key_val(DAQ_Move_Actuators, 'name', self.actuator_type) class_ = getattr(getattr(parent_module['module'], 'daq_move_' + self.actuator_type), 'DAQ_Move_' + self.actuator_type) self.hardware = class_(self, params_state) try: infos = self.hardware.ini_stage(controller) # return edict(info="", controller=, stage=) except Exception as e: logger.exception('Hardware couldn\'t be initialized' + str(e)) infos = str(e), False if isinstance(infos, edict): # following old plugin templating status.update(infos) deprecation_msg('Returns from init_stage should now be a string and a boolean,' ' see pymodaq_plugins_template', stacklevel=3) else: status.info = infos[0] status.initialized = infos[1] status.controller = self.hardware.controller self.hardware.move_done_signal.connect(self.move_done) return status except Exception as e: self.logger.exception(str(e)) return status def move_abs(self, position: DataActuator, polling=True): """ """ # if isinstance(position, Number): # position = float(position) # because it may be a numpy float and could cause issues # # see https://github.com/pythonnet/pythonnet/issues/1833 self._target_value = position self.hardware.move_is_done = False self.hardware.ispolling = polling if self.hardware.data_actuator_type.name == 'float': self.hardware.move_abs(position.value()) else: self.hardware.move_abs(position) self.hardware.poll_moving() def move_rel(self, rel_position: DataActuator, polling=True): """ """ self.hardware.move_is_done = False self._target_value = self.current_position + rel_position self.hardware.ispolling = polling if self.hardware.data_actuator_type.name == 'float': self.hardware.move_rel(rel_position.value()) else: self.hardware.move_rel(rel_position) self.hardware.poll_moving() @Slot(float) def Move_Stoped(self, pos): """ Send a "move_done" Thread Command with the given position as an attribute. See Also -------- DAQ_utils.ThreadCommand """ self.status_sig.emit(ThreadCommand("move_done", [pos])) def move_home(self): """ Make the hardware move to the init position. """ self.hardware.move_is_done = False self._target_value = 0 self.hardware.move_home() @Slot(DataActuator) def move_done(self, pos: DataActuator): """Send the move_done signal back to the main class """ self._current_value = pos self.status_sig.emit(ThreadCommand(command="move_done", attribute=[pos])) @Slot(ThreadCommand) def queue_command(self, command: ThreadCommand): """Interpret command send by DAQ_Move class * **ini_stage** command, init a stage from command attribute. * **close** command, unitinalise the stage closing hardware and emitting the corresponding status signal * **move_abs** command, call the move_Abs method with position from command attribute * **move_rel** command, call the move_Rel method with the relative position from the command attribute. * **move_home** command, call the move_home method * **get_actuator_value** command, get the current position from the check_position method * **Stop_motion** command, stop any motion via the stop_Motion method * **reset_stop_motion** command, set the motion_stopped attribute to false Parameters ---------- command: ThreadCommand Possible commands are: * **ini_stage** command, init a stage from command attribute. * **close** command, unitinalise the stage closing hardware and emitting the corresponding status signal * **move_abs** command, call the move_abs method with position from command attribute * **move_rel** command, call the move_rel method with the relative position from the command attribute. * **move_home** command, call the move_home method * **get_actuator_value** command, get the current position from the check_position method * **stop_motion** command, stop any motion via the stop_Motion method * **reset_stop_motion** command, set the motion_stopped attribute to false """ try: if command.command == "ini_stage": status = self.ini_stage( *command.attribute) # return edict(initialized=bool,info="", controller=, stage=) self.status_sig.emit(ThreadCommand(command=command.command, attribute=[status, 'log'])) elif command.command == "close": status = self.close() self.status_sig.emit(ThreadCommand(command=command.command, attribute=[status])) elif command.command == "move_abs": self.move_abs(*command.attribute) elif command.command == "move_rel": self.move_rel(*command.attribute) elif command.command == "move_home": self.move_home() elif command.command == "get_actuator_value": pos = self.get_actuator_value() self.status_sig.emit(ThreadCommand('get_actuator_value', [pos])) elif command.command == "stop_motion": self.stop_motion() elif command.command == "reset_stop_motion": self.motion_stoped = False else: # custom commands for particular plugins (see spectrometer module 'get_spectro_wl' for instance) if hasattr(self.hardware, command.command): cmd = getattr(self.hardware, command.command) cmd(*command.attribute) except Exception as e: self.logger.exception(str(e)) def stop_motion(self): """ stop hardware motion with motion_stopped attribute updtaed to True and a status signal sended with an "update_status" Thread Command See Also -------- DAQ_utils.ThreadCommand, stop_motion """ self.status_sig.emit(ThreadCommand(command="Update_Status", attribute=["Motion stoping", 'log'])) self.motion_stoped = True self.hardware.stop_motion() self.hardware.poll_timer.stop() @Slot(edict) def update_settings(self, settings_parameter_dict): """ Update settings of hardware with dictionnary parameters in case of "Move_Settings" path, else update attribute with dictionnary parameters. ========================= =========== ====================================================== **Parameters** **Type** **Description** *settings_parameter_dict* dictionnary Dictionnary containing the path and linked parameter ========================= =========== ====================================================== See Also -------- update_settings """ # settings_parameter_dict = edict(path=path,param=param) path = settings_parameter_dict['path'] param = settings_parameter_dict['param'] if path[0] == 'main_settings': if hasattr(self, path[-1]): setattr(self, path[-1], param.value()) elif path[0] == 'move_settings': self.hardware.update_settings(settings_parameter_dict) def main(init_qt=True): if init_qt: # used for the test suite app = QtWidgets.QApplication(sys.argv) if config('style', 'darkstyle'): import qdarkstyle app.setStyleSheet(qdarkstyle.load_stylesheet(qdarkstyle.DarkPalette)) widget = QtWidgets.QWidget() prog = DAQ_Move(widget, title="test") widget.show() if init_qt: sys.exit(app.exec_()) return prog, widget if __name__ == '__main__': main()