Source code for pymodaq.control_modules.utils

# -*- coding: utf-8 -*-
"""
Created the 03/10/2022

@author: Sebastien Weber
"""
from random import randint
from typing import Optional, Type, Union
from easydict import EasyDict as edict

from qtpy.QtCore import Signal, QObject, Qt, Slot, QThread

from pymodaq.control_modules.thread_commands import ThreadStatus
from pymodaq_utils.utils import ThreadCommand, find_dict_in_list_from_key_val
from pymodaq_utils.config import Config
from pymodaq_utils.enums import BaseEnum
from pymodaq_utils.logger import get_base_logger, set_logger, get_module_name

from pymodaq_gui.parameter import Parameter, ioxml
from pymodaq_gui.parameter.utils import ParameterWithPath
from pymodaq_gui.managers.parameter_manager import ParameterManager
from pymodaq_gui.plotting.data_viewers import ViewersEnum
from pymodaq_gui.h5modules.saving import H5Saver

from pymodaq.utils.tcp_ip.tcp_server_client import TCPClient
from pymodaq.utils.exceptions import DetectorError
from pymodaq.utils.leco.pymodaq_listener import ActorListener, LECOClientCommands, LECOCommands

from pymodaq.utils.daq_utils import get_plugins
from pymodaq.utils.h5modules.module_saving import DetectorSaver, ActuatorSaver
from pymodaq.utils.config import Config as ControlModulesConfig


class DAQTypesEnum(BaseEnum):
    """enum relating a given DAQType and a viewer type
    See Also
    --------
    pymodaq.utils.plotting.data_viewers.viewer.ViewersEnum
    """
    DAQ0D = 'Viewer0D'
    DAQ1D = 'Viewer1D'
    DAQ2D = 'Viewer2D'
    DAQND = 'ViewerND'

    def to_data_type(self):
        return ViewersEnum[self.value].value

    def to_viewer_type(self):
        return self.value

    def to_daq_type(self):
        return self.name

    def increase_dim(self, ndim: int):
        dim = self.get_dim()
        if dim != 'N':
            dim_as_int = int(dim) + ndim
            if dim_as_int > 2:
                dim = 'N'
            else:
                dim = str(dim_as_int)
        else:
            dim = 'N'
        return DAQTypesEnum(f'Viewer{dim}D')

    def get_dim(self):
        return self.value.split('Viewer')[1].split('D')[0]


DAQ_TYPES = DAQTypesEnum

DET_TYPES = {'DAQ0D': get_plugins('daq_0Dviewer'),
             'DAQ1D': get_plugins('daq_1Dviewer'),
             'DAQ2D': get_plugins('daq_2Dviewer'),
             'DAQND': get_plugins('daq_NDviewer'),
             }

if len(DET_TYPES['DAQ0D']) == 0:
    raise DetectorError('No installed Detector')


config_utils = Config()
config = ControlModulesConfig()
logger = set_logger(get_module_name(__file__))


class ViewerError(Exception):
    pass


def get_viewer_plugins(daq_type, det_name):
    parent_module = find_dict_in_list_from_key_val(DET_TYPES[daq_type], 'name', det_name)
    match_name = daq_type.lower()
    match_name = f'{match_name[0:3]}_{match_name[3:].upper()}viewer_'
    obj = getattr(getattr(parent_module['module'], match_name + det_name),
                  f'{match_name[0:7].upper()}{match_name[7:]}{det_name}')
    params = getattr(obj, 'params')
    det_params = Parameter.create(name='Det Settings', type='group', children=params)
    return det_params, obj


[docs] class ControlModule(QObject): """Abstract Base class common to both DAQ_Move and DAQ_Viewer control modules Attributes ---------- init_signal : Signal[bool] This signal is emitted when the chosen hardware is correctly initialized command_hardware : Signal[ThreadCommand] This signal is used to communicate with the instrument plugin within a separate thread command_tcpip : Signal[ThreadCommand] This signal is used to communicate through the TCP/IP Network quit_signal : Signal[] This signal is emitted when the user requested to stop the module """ init_signal = Signal(bool) command_hardware = Signal(ThreadCommand) _command_tcpip = Signal(ThreadCommand) quit_signal = Signal() _update_settings_signal = Signal(edict) status_sig = Signal(str) custom_sig = Signal(ThreadCommand) ui = None def __init__(self): super().__init__() self._title = "" self.config = config # the hardware controller instance set after initialization and to be used by other modules if they share the # same controller self.controller = None self._initialized_state = False self._send_to_tcpip = False self._tcpclient_thread = None self._hardware_thread = None self.plugin_config: Optional[Config] = None self._h5saver: Optional[H5Saver] = None self._module_and_data_saver = None def __repr__(self): return f'{self.__class__.__name__}: {self.title}' def create_new_file(self, new_file: bool): if new_file: self.close_file() self.module_and_data_saver.h5saver = self.h5saver return True @property def h5saver(self): if self._h5saver is None: self._h5saver = H5Saver(backend=config_utils('general', 'hdf5_backend')) if self._h5saver.h5_file is None: self._h5saver.init_file(update_h5=True) if not self._h5saver.isopen(): self._h5saver.init_file(addhoc_file_path=self._h5saver.settings['current_h5_file']) return self._h5saver @h5saver.setter def h5saver(self, h5saver_temp: H5Saver): self._h5saver = h5saver_temp def close_file(self): self.h5saver.close_file() @property def module_and_data_saver(self): if self._module_and_data_saver.h5saver is None or not self._module_and_data_saver.h5saver.isopen(): self._module_and_data_saver.h5saver = self.h5saver return self._module_and_data_saver @module_and_data_saver.setter def module_and_data_saver(self, mod: Union[DetectorSaver, ActuatorSaver]): self._module_and_data_saver = mod self._module_and_data_saver.h5saver = self.h5saver def custom_command(self, command: str, **kwargs): self.command_hardware.emit(ThreadCommand(command, kwargs))
[docs] def thread_status(self, status: ThreadCommand, control_module_type='detector'): """Get back info (using the ThreadCommand object) from the hardware And re-emit this ThreadCommand using the custom_sig signal if it should be used in a higher level module Parameters ---------- status: ThreadCommand The info returned from the hardware, the command (str) can be either: * Update_Status: display messages and log info (deprecated) * update_status: display info on the UI status bar * close: close the current thread and delete corresponding attribute on cascade. * update_settings: Update the "detector setting" node in the settings tree. * update_main_settings: update the "main setting" node in the settings tree * raise_timeout: * show_splash: Display the splash screen with attribute as message * close_splash * show_config: display the plugin configuration """ if status.command == "Update_Status": # legacy if len(status.attribute) > 1: self.update_status(status.attribute[0], log=status.attribute[1]) else: self.update_status(status.attribute[0]) elif status.command == ThreadStatus.UPDATE_STATUS: self.update_status(status.attribute) elif status.command == ThreadStatus.CLOSE: try: self.update_status(status.attribute[0]) self._hardware_thread.quit() terminated = self._hardware_thread.wait(5000) if not terminated: self._hardware_thread.terminate() self._hardware_thread.wait() self.update_status('thread is locked?!', 'log') except Exception as e: logger.exception(f'Wrong call to the "close" command: \n{str(e)}') self._initialized_state = False self.init_signal.emit(self._initialized_state) elif status.command == ThreadStatus.UPDATE_MAIN_SETTINGS: # this is a way for the plugins to update main settings of the ui (solely values, limits and options) try: if status.attribute[2] == 'value': self.settings.child('main_settings', *status.attribute[0]).setValue(status.attribute[1]) elif status.attribute[2] == 'limits': self.settings.child('main_settings', *status.attribute[0]).setLimits(status.attribute[1]) elif status.attribute[2] == 'options': self.settings.child('main_settings', *status.attribute[0]).setOpts(**status.attribute[1]) except Exception as e: logger.exception(f'Wrong call to the "update_main_settings" command: \n{str(e)}') elif status.command == ThreadStatus.UPDATE_SETTINGS: # using this the settings shown in the UI for the plugin reflects the real plugin settings try: self.settings.sigTreeStateChanged.disconnect( self.parameter_tree_changed) # any changes on the detcetor settings will update accordingly the gui except Exception as e: logger.exception(str(e)) try: if status.attribute[2] == 'value': self.settings.child(f'{control_module_type}_settings', *status.attribute[0]).setValue(status.attribute[1]) elif status.attribute[2] == 'limits': self.settings.child(f'{control_module_type}_settings', *status.attribute[0]).setLimits(status.attribute[1]) elif status.attribute[2] == 'options': self.settings.child(f'{control_module_type}_settings', *status.attribute[0]).setOpts(**status.attribute[1]) elif status.attribute[2] == 'childAdded': child = Parameter.create(name='tmp') child.restoreState(status.attribute[1][0]) self.settings.child(f'{control_module_type}_settings', *status.attribute[0]).addChild(status.attribute[1][0]) except Exception as e: logger.exception(f'Wrong call to the "update_settings" command: \n{str(e)}') self.settings.sigTreeStateChanged.connect(self.parameter_tree_changed) elif status.command == ThreadStatus.UPDATE_UI: try: if self.ui is not None: if hasattr(self.ui, status.attribute): getattr(self.ui, status.attribute)(*status.args, **status.kwargs) except Exception as e: logger.info(f'Wrong call to the "update_ui" command: \n{str(e)}') elif status.command == ThreadStatus.RAISE_TIMEOUT: self.raise_timeout() elif status.command == ThreadStatus.SHOW_SPLASH: self.settings_tree.setEnabled(False) self.splash_sc.show() self.splash_sc.raise_() self.splash_sc.showMessage(status.attribute, color=Qt.white) elif status.command == ThreadStatus.CLOSE_SPLASH: self.splash_sc.close() self.settings_tree.setEnabled(True) self.custom_sig.emit(status) # to be used if needed in custom application connected to this module
@property def module_type(self): """str: Get the module type, either DAQ_Move or DAQ_viewer""" return type(self).__name__ @property def initialized_state(self): """bool: Check if the module is initialized""" return self._initialized_state @property def title(self): """str: get the title of the module""" return self._title
[docs] def grab(self): """Programmatic entry to grab data from detectors or current value from actuator""" raise NotImplementedError
[docs] def stop_grab(self): """Programmatic entry to stop data grabbing from detectors or current value polling from actuator""" raise NotImplementedError
def _add_data_to_saver(self, *args, **kwargs): raise NotImplementedError def append_data(self, *args, **kwargs): raise NotImplementedError def insert_data(self, *args, **kwargs): raise NotImplementedError
[docs] def quit_fun(self): """Programmatic entry to quit the control module""" raise NotImplementedError
[docs] def init_hardware(self, do_init=True): """Programmatic entry to initialize/deinitialize the control module Parameters ---------- do_init : bool if True initialize the selected hardware else deinitialize it See Also -------- :meth:`init_hardware_ui` """ raise NotImplementedError
[docs] def init_hardware_ui(self, do_init=True): """Programmatic entry to simulate a click on the user interface init button Parameters ---------- do_init : bool if True initialize the selected hardware else deinitialize it Notes ----- This method should be preferred to :meth:`init_hardware` """ if self.ui is not None: self.ui.do_init(do_init)
[docs] def show_log(self): """Open the log file in the default text editor""" import webbrowser webbrowser.open(get_base_logger(logger).handlers[0].baseFilename)
[docs] def show_config(self, config: Config) -> Config: """ Display in a tree the current configuration""" if config is not None: from pymodaq_gui.utils.widgets.tree_toml import TreeFromToml config_tree = TreeFromToml(config) config_tree.show_dialog() return ControlModulesConfig()
[docs] def update_status(self, txt: str, log=True): """Display a message in the ui status bar and eventually log the message Parameters ---------- txt : str message to display log : bool if True, log the message in the logger """ if self.ui is not None: self.ui.display_status(txt) self.status_sig.emit(txt) if log: logger.info(txt)
[docs] def manage_ui_actions(self, action_name: str, attribute: str, value): """Method to manage actions for the UI (if any). Will try to apply the given value to the given attribute of the corresponding action Parameters ---------- action_name: str attribute: method signature or attribute value: object actual type and value depend on the triggered attribute Examples -------- >>>manage_ui_actions('quit', 'setEnabled', False) # will disable the quit action (button) on the UI """ if self.ui is not None: if self.ui.has_action(action_name): action = self.ui.get_action(action_name) if hasattr(action, attribute): attr = getattr(action, attribute) if callable(attr): attr(value) else: attr = value
class ParameterControlModule(ParameterManager, ControlModule): """Base class for a control module with parameters.""" _update_settings_signal = Signal(edict) listener_class: Type[ActorListener] = ActorListener def __init__(self, **kwargs): ParameterManager.__init__(self, action_list=('save', 'update')) ControlModule.__init__(self) def value_changed(self, param: Parameter) -> Optional[Parameter]: """ParameterManager subclassed method. Process events from value changed by user in the UI Settings Parameters ---------- param: Parameter a given parameter whose value has been changed by user """ if param.name() == 'plugin_config': self.show_config(self.plugin_config) elif param.name() == 'connect_server': if param.value(): self.connect_tcp_ip() else: self._command_tcpip.emit(ThreadCommand('quit', )) elif param.name() == 'ip_address' or param.name == 'port': self._command_tcpip.emit( ThreadCommand('update_connection', dict(ipaddress=self.settings['main_settings', 'tcpip', 'ip_address'], port=self.settings['main_settings', 'tcpip', 'port']))) elif param.name() == 'connect_leco_server': self.connect_leco(param.value()) elif param.name() == "name": name = param.value() try: self._leco_client.name = name except AttributeError: pass else: # not handled return param def _update_settings(self, param: Parameter): # I do not understand what it does path = self.settings.childPath(param) if path is not None: if 'main_settings' not in path: self._update_settings_signal.emit(edict(path=path, param=param, change='value')) if self.settings.child('main_settings', 'tcpip', 'tcp_connected').value(): self._command_tcpip.emit(ThreadCommand('send_info', dict(path=path, param=param))) if self.settings.child('main_settings', 'leco', 'leco_connected').value(): self._command_tcpip.emit( ThreadCommand(LECOCommands.SEND_INFO, ParameterWithPath(param, path))) def connect_tcp_ip(self, params_state=None, client_type: str = "GRABBER") -> None: """Init a TCPClient in a separated thread to communicate with a distant TCp/IP Server Use the settings: ip_address and port to specify the connection See Also -------- TCPServer """ if self.settings.child('main_settings', 'tcpip', 'connect_server').value(): self._tcpclient_thread = QThread() tcpclient = TCPClient(self.settings.child('main_settings', 'tcpip', 'ip_address').value(), self.settings.child('main_settings', 'tcpip', 'port').value(), params_state=params_state, client_type=client_type) tcpclient.moveToThread(self._tcpclient_thread) self._tcpclient_thread.tcpclient = tcpclient tcpclient.cmd_signal.connect(self.process_tcpip_cmds) self._command_tcpip[ThreadCommand].connect(tcpclient.queue_command) self._tcpclient_thread.started.connect(tcpclient.init_connection) self._tcpclient_thread.start() def get_leco_name(self) -> str: name = self.settings["main_settings", "leco", "leco_name"] if name == '': # take the module name as alternative name = self.settings["main_settings", "module_name"] if name == '': # a name is required, invent one name = f"viewer_{randint(0, 10000)}" name = self.settings.child("main_settings", "leco", "leco_name").setValue(name) return name def get_leco_host_port(self) -> tuple: host = self.settings["main_settings", "leco", "host"] port = self.settings["main_settings", "leco", "port"] if host == '': # take the localhost as default host = 'localhost' if port == '': # take the default port as 12300 port = 12300 return (host, port) def connect_leco(self, connect: bool) -> None: if connect: name = self.get_leco_name() host, port = self.get_leco_host_port() try: self._leco_client.name = name except AttributeError: self._leco_client = self.listener_class(name=name, host=host, port=port) self._leco_client.cmd_signal.connect(self.process_tcpip_cmds) self._command_tcpip[ThreadCommand].connect(self._leco_client.queue_command) self._leco_client.start_listen() # self._leco_client.cmd_signal.emit(ThreadCommand(LECOCommands.SET_INFO, attribute=["detector_settings", ""])) else: self._command_tcpip.emit(ThreadCommand(LECOCommands.QUIT, )) try: self._command_tcpip[ThreadCommand].disconnect(self._leco_client.queue_command) except TypeError: pass # already disconnected @Slot(ThreadCommand) def process_tcpip_cmds(self, status: ThreadCommand) -> Optional[ThreadCommand]: if status.command == 'connected': self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(True) elif status.command == 'disconnected': self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(False) elif status.command == LECOClientCommands.LECO_CONNECTED: self.settings.child('main_settings', 'leco', 'leco_connected').setValue(True) elif status.command == LECOClientCommands.LECO_DISCONNECTED: self.settings.child('main_settings', 'leco', 'leco_connected').setValue(False) elif status.command == 'Update_Status': self.thread_status(status) elif status.command == 'set_info': """ The Director sent a parameter to be updated""" path_in_settings = status.attribute.path if 'move' in self.__class__.__name__.lower(): common_param = 'move_settings' else: common_param = 'detector_settings' if common_param in path_in_settings: param = self.settings.child(*path_in_settings) elif 'settings_client' in path_in_settings: param = self.settings.child(common_param, *path_in_settings[1:]) else: param = self.settings.child(common_param, *path_in_settings) param.setValue(status.attribute.parameter.value()) elif status.command == LECOCommands.GET_SETTINGS: """ The Director requested the content of the actuator settings""" if 'move' in self.__class__.__name__.lower(): common_param = 'move_settings' else: common_param = 'detector_settings' self._command_tcpip.emit( ThreadCommand(LECOCommands.SET_DIRECTOR_SETTINGS, ioxml.parameter_to_xml_string( self.settings.child(common_param)))) else: # not handled return status