# -*- coding: utf-8 -*-
"""
Created on Wed Jan 10 16:54:14 2018
@author: Weber Sébastien
"""
from __future__ import annotations
from importlib import import_module
import os
from pathlib import Path
import sys
from typing import List, Tuple, Union, Optional
import time
from easydict import EasyDict as edict
import numpy as np
from qtpy import QtWidgets
from qtpy.QtCore import Qt, QObject, Slot, QThread, Signal
from pymodaq_data.data import DataToExport, Axis, DataDistribution
from pymodaq.utils.data import DataFromPlugins
from pymodaq_utils.logger import set_logger, get_module_name
from pymodaq.control_modules.utils import ParameterControlModule
from pymodaq_gui.utils.file_io import select_file
from pymodaq_gui.utils.widgets.lcd import LCD
from pymodaq_utils.config import Config, get_set_local_dir
from pymodaq_gui.h5modules.browsing import browse_data
from pymodaq_gui.h5modules.saving import H5Saver
from pymodaq.utils.h5modules import module_saving
from pymodaq_data.h5modules.backends import Node, SaveType
from pymodaq_utils.utils import ThreadCommand
from pymodaq_gui.parameter import ioxml, Parameter
from pymodaq_gui.parameter import utils as putils
from pymodaq.control_modules.viewer_utility_classes import params as daq_viewer_params
from pymodaq_utils import utils
from pymodaq_utils.warnings import deprecation_msg
from pymodaq_gui.utils import DockArea, Dock
from pymodaq_gui.utils.utils import mkQApp
from pymodaq.utils.gui_utils import get_splash_sc
from pymodaq.control_modules.daq_viewer_ui import DAQ_Viewer_UI
from pymodaq.control_modules.utils import (DET_TYPES, get_viewer_plugins, DAQTypesEnum,
DetectorError)
from pymodaq.control_modules.thread_commands import (ThreadStatus, ThreadStatusViewer, ControlToHardwareViewer,
UiToMainViewer)
from pymodaq_gui.plotting.data_viewers.viewer import ViewerBase
from pymodaq_gui.plotting.data_viewers import ViewersEnum
from pymodaq_utils.enums import enum_checker
from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base
from pymodaq.utils.leco.pymodaq_listener import ViewerActorListener, LECOClientCommands, LECOViewerCommands
from pymodaq.utils.config import Config as ControlModulesConfig
logger = set_logger(get_module_name(__file__))
config_utils = Config()
config = ControlModulesConfig()
local_path = get_set_local_dir()
[docs]
class DAQ_Viewer(ParameterControlModule):
""" Main PyMoDAQ class to drive detectors
Qt object and generic UI to drive actuators. The class is giving you full functionality to select (daq_detector),
initialize detectors (init_hardware), grab or snap data (grab_data) and save them (save_new, save_current). If
a DockArea is given as parent widget, the full User Interface (DAQ_Viewer_UI) is loaded allowing easy control of the
instrument.
Attributes
----------
grab_done_signal: Signal[DataToExport]
Signal emitted when the data from the plugin (and eventually from the data viewers) has been received. To be
used by connected objects.
custom_sig: Signal[ThreadCommand]
use this to propagate info/data coming from the hardware plugin to another object
overshoot_signal: Signal[bool]
This signal is emitted when some 0D data from the plugin is higher than the overshoot threshold set in the
settings
See Also
--------
ControlModule, DAQ_Viewer_UI, ParameterManager
Notes
-----
A particular signal from the 2D DataViewer is directly connected to the plugin: ROI_select_signal. The position and
size of the corresponding ROI is then directly transferred to a plugin function named `ROISelect` that you have to
create if one want to receive infos from the ROI
"""
settings_name = 'daq_viewer_settings'
custom_sig = Signal(ThreadCommand) # particular case where DAQ_Viewer is used for a custom module
grab_done_signal = Signal(DataToExport)
overshoot_signal = Signal(bool)
data_saved = Signal()
grab_status = Signal(bool)
params = daq_viewer_params + [
{'title': 'Saver Settings:', 'name': 'saver_settings', 'type': 'group',
'visible': False, 'children': H5Saver.get_params_for_save_type(SaveType.detector)}]
listener_class = ViewerActorListener
ui: Optional[DAQ_Viewer_UI]
def __init__(
self,
parent: Optional[DockArea] = None,
title: str = "Testing",
daq_type=config("viewer", "daq_type"),
dock_settings=None,
dock_viewer=None,
**kwargs,
):
self.logger = set_logger(f'{logger.name}.{title}')
self.logger.info(f'Initializing DAQ_Viewer: {title}')
super().__init__(**kwargs)
daq_type = enum_checker(DAQTypesEnum, daq_type)
self._daq_type: DAQTypesEnum = daq_type
self._viewer_types: List[ViewersEnum] = []
self._viewers: List[ViewerBase] = []
self.override_grab_from_extension = False # boolean allowing an extension to tell to init a grab or not
# (see DataMixer for reasons and use case in ModulesManager and dashboard method add_det_from_extension)
if isinstance(parent, DockArea):
self.dockarea = parent
else:
self.dockarea = None
self.parent = parent
if parent is not None:
self.ui = DAQ_Viewer_UI(parent, title, daq_type=daq_type,
dock_settings=dock_settings,
dock_viewer=dock_viewer)
else:
self.ui = None
if self.ui is not None:
QtWidgets.QApplication.processEvents()
self.ui.add_setting_tree(self.settings_tree)
self.ui.command_sig.connect(self.process_ui_cmds)
self.viewers = self.ui.viewers
self._viewer_types = self.ui.viewer_types
self.splash_sc = get_splash_sc()
self._title = title
self._module_and_data_saver: Union[None,
module_saving.DetectorSaver,
module_saving.DetectorTimeSaver,
module_saving.DetectorExtendedSaver] = None
self._h5saver_continuous: Optional[H5Saver] = None
self._ind_continuous_grab = 0
self.settings.child('main_settings', 'DAQ_type').setValue(self.daq_type.name)
self._detectors: List[str] = [det_dict['name'] for det_dict in DET_TYPES[self.daq_type.name]]
if len(self._detectors) > 0: # will be 0 if no valid plugins are installed
self._detector: str = self._detectors[0]
else:
raise DetectorError('No detected Detector')
self.settings.child('main_settings', 'detector_type').setValue(self._detector)
for hidden_param in ('custom_name',
'current_scan_name',
'current_scan_path',
'current_h5_file',
'new_file',
'base_name'):
self.settings.child('saver_settings', hidden_param).setOpts(visible=False)
self._grabing: bool = False
self._do_bkg: bool = False
self._take_bkg: bool = False
self._grab_done: bool = False
self._start_grab_time: float = 0. # used for the refreshing rate
self._received_data: int = 0
self._lcd: Optional[LCD] = None
self._bkg: Optional[DataToExport] = None # buffer to store background
self._save_file_pathname: Optional[Path] = None # to store last active path, will be an Path object
self._snapshot_pathname: Optional[Path] = None
self._data_to_save_export: Optional[DataToExport] = None
self._do_save_data: bool = False
self._set_setting_tree() # to activate parameters of default Mock detector
self.grab_done_signal.connect(self._save_export_data)
self.update_plugin_config()
def __repr__(self):
return f'{self.__class__.__name__}: {self.title} ({self.daq_type}/{self.detector}'
[docs]
def setup_continuous_saving(self, init: bool = True):
"""Configure the objects dealing with the continuous saving mode"""
if init:
self.module_and_data_saver = module_saving.DetectorSaver(self)
self._h5saver_continuous = H5Saver(save_type='detector')
self._h5saver_continuous.settings.child('do_save').sigValueChanged.connect(self._init_continuous_save)
else:
self._h5saver_continuous.close_file()
[docs]
def process_ui_cmds(self, cmd: utils.ThreadCommand):
"""Process commands sent by actions done in the ui
See pymodaq.control_modules.thread_commands.UiToMainViewer
Parameters
----------
cmd: ThreadCommand
Possible values are:
* init
* quit
* grab
* snap
* stop
* show_log
* detector_changed
* daq_type_changed
* save_current
* save_new
* do_bkg
* take_bkg
* viewers_changed
* show_config
"""
if cmd.command == UiToMainViewer.INIT:
self.init_hardware(cmd.attribute[0])
elif cmd.command == UiToMainViewer.QUIT:
self.quit_fun()
elif cmd.command == UiToMainViewer.STOP:
self.stop()
elif cmd.command == UiToMainViewer.SHOW_LOG:
self.show_log()
elif cmd.command == UiToMainViewer.GRAB:
self.grab_data(cmd.attribute, snap_state=False)
elif cmd.command == UiToMainViewer.SNAP:
self.grab_data(False, snap_state=True)
elif cmd.command == UiToMainViewer.SAVE_NEW:
self.save_new()
elif cmd.command == UiToMainViewer.SAVE_CURRENT:
self.save_current()
elif cmd.command == UiToMainViewer.OPEN:
self.load_data()
elif cmd.command == UiToMainViewer.DETECTOR_CHANGED:
if cmd.attribute != '':
self.detector_changed_from_ui(cmd.attribute)
elif cmd.command == UiToMainViewer.DAQ_TYPE_CHANGED:
if cmd.attribute != '':
self.daq_type_changed_from_ui(cmd.attribute)
elif cmd.command == UiToMainViewer.TAKE_BKG:
self.take_bkg()
elif cmd.command == UiToMainViewer.DO_BKG:
self.do_bkg = cmd.attribute
elif cmd.command == UiToMainViewer.VIEWERS_CHANGED:
self._viewer_types: List[ViewersEnum] = cmd.attribute['viewer_types']
self.viewers = cmd.attribute['viewers']
elif cmd.command == UiToMainViewer.SHOW_CONFIG:
self.config = self.show_config(self.config)
self.ui.config = self.config
@property
def bkg(self) -> DataToExport:
"""Get the background data object"""
return self._bkg
@property
def viewer_docks(self) -> List[Dock]:
"""list of Viewer Docks from the UI"""
if self.ui is not None:
return self.ui.viewer_docks
@property
def viewers_docks(self) -> List[Dock]:
"""list of Viewer Docks from the UI, for back compatibility"""
deprecation_msg('viewers_docks is a deprecated property use viewer_docks instead')
return self.viewer_docks
@property
def master(self) -> bool:
""" Get/Set programmatically the Master/Slave status of a detector"""
if self.initialized_state:
return self.settings['detector_settings', 'controller_status'] == 'Master'
else:
return True
@master.setter
def master(self, is_master: bool):
if self.initialized_state:
self.settings.child('detector_settings', 'controller_status').setValue(
'Master' if is_master else 'Slave')
[docs]
def daq_type_changed_from_ui(self, daq_type: DAQTypesEnum):
""" Apply changes from the selection of a different DAQTypesEnum in the UI
Parameters
----------
daq_type: DAQTypesEnum
"""
daq_type = enum_checker(DAQTypesEnum, daq_type)
self._daq_type = daq_type
self.settings.child('main_settings', 'DAQ_type').setValue(daq_type.name)
self.detectors = [det_dict['name'] for det_dict in DET_TYPES[daq_type.name]]
self.detector = self.detectors[0]
@property
def daq_type(self) -> DAQTypesEnum:
"""Get/Set the daq_type as a DAQTypesEnum
Update the detector property with the list of available detectors of a given daq_type
"""
return self._daq_type
@daq_type.setter
def daq_type(self, daq_type: DAQTypesEnum):
daq_type = enum_checker(DAQTypesEnum, daq_type)
self._daq_type = daq_type
if self.ui is not None:
self.ui.daq_type = daq_type
self.settings.child('main_settings', 'DAQ_type').setValue(daq_type.name)
self.detectors = [det_dict['name'] for det_dict in DET_TYPES[daq_type.name]]
self.detector = self.detectors[0]
@property
def daq_types(self) -> List[str]:
"""List of available DAQ_TYPES as keys of the DAQTypesEnum"""
return DAQTypesEnum.names()
def detector_changed_from_ui(self, detector: str):
self._detector = detector
self.update_plugin_config()
self._set_setting_tree()
@property
def detector(self) -> str:
""":obj:`str`: Get/Set the currently selected detector among available detectors"""
return self._detector
@detector.setter
def detector(self, det: str):
if det not in self.detectors:
raise ValueError(f'{det} is not a valid Detector: {self.detectors}')
self._detector = det
self.update_plugin_config()
if self.ui is not None:
self.ui.detector = det
self._set_setting_tree()
@property
def Naverage(self):
return self.settings['main_settings', 'Naverage']
@Naverage.setter
def Naverage(self, ngrab: int):
if ngrab >= 1:
self.settings.child('main_settings', 'Naverage').setValue(ngrab)
def update_plugin_config(self):
parent_module = utils.find_dict_in_list_from_key_val(DET_TYPES[self.daq_type.name], 'name', self.detector)
mod = import_module(parent_module['module'].__package__.split('.')[0])
if hasattr(mod, 'config'):
self.plugin_config = mod.config
def detectors_changed_from_ui(self, detectors: List[str]):
self._detectors = detectors
@property
def detectors(self) -> str:
""":obj:`list` of :obj:`str`: List of available detectors of the current daq_type (DAQTypesEnum)"""
return self._detectors
@detectors.setter
def detectors(self, detectors):
self._detectors = detectors
if self.ui is not None:
self.ui.detectors = detectors
@property
def grab_state(self):
""":obj:`bool`: Get the current grabbing status"""
return self._grabing
@property
def do_bkg(self) -> bool:
""":obj:`bool`: Get/Set if background subtraction should be done"""
return self._do_bkg
@do_bkg.setter
def do_bkg(self, doit: bool):
self._do_bkg = doit
@property
def viewers(self) -> List[ViewerBase]:
""":obj:`list`: Get/Set the Viewers (instances of real implementation of ViewerBase class) from the UI"""
if self.ui is not None:
return self._viewers
@viewers.setter
def viewers(self, viewers: List[ViewerBase]):
for viewer in self._viewers:
try:
viewer.data_to_export_signal.disconnect()
except:
pass
for ind_viewer, viewer in enumerate(viewers):
viewer.data_to_export_signal.connect(self._get_data_from_viewer)
viewer.roi_select_signal.connect(
lambda roi_info: self.command_hardware.emit(
ThreadCommand(ControlToHardwareViewer.ROI_SELECT,
dict(roi_info=roi_info,
ind_viewer=ind_viewer))))
viewer.crosshair_dragged.connect(
lambda crosshair_info: self.command_hardware.emit(
ThreadCommand(ControlToHardwareViewer.CROSSHAIR,
dict(crosshair_info=crosshair_info,
ind_viewer=ind_viewer))))
self._viewers = viewers
[docs]
def quit_fun(self):
""" Quit the application, closing the hardware and other modules """
# insert anything that needs to be closed before leaving
if self._initialized_state: # means initialized
self.init_hardware(False)
self.quit_signal.emit()
if self._lcd is not None:
try:
self._lcd.parent.close()
except Exception as e:
self.logger.exception(str(e))
try:
if self.ui is not None:
self.ui.close()
except Exception as e:
self.logger.exception(str(e))
if __name__ == '__main__':
self.parent.close()
# #####################################
# Methods for running the acquisition
[docs]
def init_hardware(self, do_init=True):
""" Init the selected detector
Parameters
----------
do_init: bool
If True, create a DAQ_Detector instance and move it into a separated thread, connected its signals/slots
to the DAQ_Viewer object (self)
If False, force the instrument to close and kill the Thread (still not done properly in some cases)
"""
if not do_init:
try:
self.command_hardware.emit(ThreadCommand(ControlToHardwareViewer.CLOSE))
QtWidgets.QApplication.processEvents()
if self.ui is not None:
self.ui.detector_init = False
except Exception as e:
self.logger.exception(str(e))
else:
try:
hardware = DAQ_Detector(self._title, self.settings, self.detector)
self._hardware_thread = QThread()
if self.config('viewer', 'viewer_in_thread'):
hardware.moveToThread(self._hardware_thread)
self.command_hardware[ThreadCommand].connect(hardware.queue_command)
hardware.data_detector_sig[DataToExport].connect(self.show_data)
hardware.data_detector_temp_sig[DataToExport].connect(self.show_temp_data)
hardware.status_sig[ThreadCommand].connect(self.thread_status)
self._update_settings_signal[edict].connect(hardware.update_settings)
self._hardware_thread.hardware = hardware
if self.config('viewer', 'viewer_in_thread'):
self._hardware_thread.start()
self.command_hardware.emit(ThreadCommand(ControlToHardwareViewer.INI_DETECTOR,
attribute=[
self.settings.child('detector_settings').saveState(),
self.controller]))
if self.ui is not None:
for dock in self.ui.viewer_docks:
dock.setEnabled(True)
except Exception as e:
self.logger.exception(str(e))
[docs]
def snap(self, send_to_tcpip=False):
""" Launch a single grab """
self.grab_data(False, snap_state=True, send_to_tcpip=send_to_tcpip)
[docs]
def grab(self, send_to_tcpip=False):
""" Launch a continuous grab """
if self.ui is not None:
self.manage_ui_actions('grab', 'setChecked', not self._grabing)
self.grab_data(not self._grabing, snap_state=False, send_to_tcpip=send_to_tcpip)
[docs]
def snapshot(self, pathname=None, dosave=False, send_to_tcpip=False):
"""Do one single grab (snap) and eventually save the data.
Parameters
----------
pathname: str or Path object
The path where to save data
dosave: bool
Do save or just grab data
send_to_tcpip: bool
If True, send the grabbed data through the TCP/IP pipe
"""
try:
self._do_save_data = dosave
if pathname is None:
raise (ValueError("filepathanme has not been defined in snapshot"))
self._save_file_pathname = pathname
self.grab_data(grab_state=False, send_to_tcpip=send_to_tcpip, snap_state=True)
except Exception as e:
self.logger.exception(str(e))
[docs]
def grab_data(self, grab_state=False, send_to_tcpip=False, snap_state=False):
""" Generic method to grab or snap data from the selected (and initialized) detector
Parameters
----------
grab_state: bool
Defines the grab status: if True: do live grabbing if False stops the grab
send_to_tcpip: bool
If True, send the grabbed data through the TCP/IP pipe
snap_state: bool
if True performs a single grab
"""
self._grabing = grab_state
self._send_to_tcpip = send_to_tcpip
self._grab_done = False
if self.ui is not None:
self.ui.data_ready = False
self._start_grab_time = time.perf_counter()
if snap_state:
self.update_status(f'{self._title}: Snap')
self.command_hardware.emit(
ThreadCommand(ControlToHardwareViewer.SINGLE,
dict(Naverage=self.settings['main_settings', 'Naverage'])))
else:
if not grab_state:
self.update_status(f'{self._title}: Stop Grab')
self.command_hardware.emit(ThreadCommand(ControlToHardwareViewer.STOP_GRAB, ))
else:
self.thread_status(ThreadCommand(ThreadStatusViewer.UPDATE_CHANNELS, ))
self.update_status(f'{self._title}: Continuous Grab')
self.command_hardware.emit(
ThreadCommand(ControlToHardwareViewer.GRAB,
dict(Naverage=self.settings['main_settings', 'Naverage'])))
[docs]
def take_bkg(self):
""" Do a snap and store data to be used as background into an attribute: `self._bkg`
The content of the bkg will be saved if data is further saved with do_bkg property set to True
"""
self._take_bkg = True
self.grab_data(snap_state=True)
[docs]
def stop_grab(self):
""" Stop the current continuous grabbing and unchecked the stop button of the UI
See Also
--------
:meth:`stop`
"""
if self.ui is not None:
self.manage_ui_actions('grab', 'setChecked', False)
self.stop()
[docs]
def stop(self):
""" Stop the current continuous grabbing """
self.update_status(f'{self._title}: Stop Grab')
self.command_hardware.emit(ThreadCommand(ControlToHardwareViewer.STOP_GRAB, ))
self._grabing = False
@Slot()
def _raise_timeout(self):
""" Print the "timeout occurred" error message in the status bar via the update_status method.
"""
self.update_status("Timeout occurred", log_type="log")
[docs]
@staticmethod
def load_data():
"""Opens a H5 file in the H5Browser module
Convenience static method.
"""
browse_data()
[docs]
def save_current(self):
"""Save current data into a h5file"""
self._do_save_data = True
self._save_file_pathname = select_file(start_path=self._save_file_pathname, save=True,
ext='h5') # see daq_utils
self._save_export_data(self._data_to_save_export)
[docs]
def save_new(self):
"""Snap data and save them into a h5file"""
self._do_save_data = True
self._save_file_pathname = select_file(start_path=self._save_file_pathname, save=True,
ext='h5') # see daq_utils
self.snapshot(pathname=self._save_file_pathname, dosave=True)
def _init_continuous_save(self):
""" Initialize the continuous saving H5Saver object
Update the module_and_data_saver attribute as :class:`DetectorTimeSaver` object
"""
if self._h5saver_continuous.settings.child('do_save').value():
self.settings.child('saver_settings', 'base_name').setValue('Data')
self.settings.child('saver_settings', 'N_saved').show()
self.settings.child('saver_settings', 'N_saved').setValue(0)
self.module_and_data_saver.h5saver = self._h5saver_continuous
self._h5saver_continuous.init_file(update_h5=True)
self.module_and_data_saver = module_saving.DetectorTimeSaver(self)
self.module_and_data_saver.h5saver = self._h5saver_continuous
self.module_and_data_saver.get_set_node()
self.grab_done_signal.connect(self.append_data)
else:
self._do_continuous_save = False
self.settings.child('saver_settings', 'N_saved').hide()
self.grab_done_signal.disconnect(self.append_data)
try:
self._h5saver_continuous.close()
except Exception as e:
self.logger.exception(str(e))
[docs]
def append_data(self, dte: DataToExport = None,
where: Union[Node, str] = None,
**kwargs):
"""Appends current DataToExport to a DetectorTimeSaver
Method to be used when performing continuous saving into a h5file (continuous mode or DAQ_Logger)
Parameters
----------
dte: DataToExport
not really used
where: Node or str
kwargs: dict
See Also
--------
:class:`DetectorTimeSaver`
"""
if dte is None:
dte = self._data_to_save_export
init_step = kwargs.pop('init_step', None)
if init_step is None:
init_step = self.settings['saver_settings', 'N_saved'] == 0
self._add_data_to_saver(dte,
init_step=init_step,
where=where,
**kwargs)
self.settings.child('saver_settings', 'N_saved').setValue(self.settings['saver_settings', 'N_saved'] + 1)
[docs]
def insert_data(self, indexes: Tuple[int], where: Union[Node, str] = None,
distribution=DataDistribution['uniform']):
"""Insert DataToExport to a DetectorExtendedSaver at specified indexes
Method to be used when saving into an already initialized array within a h5file (DAQ_Scan for instance)
Parameters
----------
indexes: tuple(int)
The indexes within the extended array where to place these data
where: Node or str
distribution: DataDistribution enum
See Also
--------
DAQ_Scan, DetectorExtendedSaver
"""
self._add_data_to_saver(self._data_to_save_export, init_step=np.all(np.array(indexes) == 0), where=where,
indexes=indexes, distribution=distribution)
def _add_data_to_saver(self, dte: DataToExport, init_step=False, where=None, **kwargs):
"""Adds DataToExport data to the current node using the declared module_and_data_saver
Filters the data to be saved by DataSource as specified in the current H5Saver (see self.module_and_data_saver)
Parameters
----------
dte: DataToExport
The data to be saved
init_step: bool
If True, means this is the first step of saving (if multisaving), then save background if any and a png image
kwargs: dict
Other named parameters to be passed as is to the module_and_data_saver
See Also
--------
DetectorSaver, DetectorTimeSaver, DetectorExtendedSaver
"""
if dte is not None:
detector_node = self.module_and_data_saver.get_set_node(where)
dte = dte if not self.module_and_data_saver.h5saver.settings['save_raw_only'] else \
dte.get_data_from_source('raw') # filters depending on the source: raw or calculated
dte = DataToExport(name=dte.name, data= # filters depending on the extra argument 'save'
[dwa for dwa in dte if ('do_save' not in dwa.extra_attributes) or
('do_save' in dwa.extra_attributes and dwa.do_save)])
self.module_and_data_saver.add_data(detector_node, dte, **kwargs)
if init_step:
if self._do_bkg and self._bkg is not None:
self.module_and_data_saver.add_bkg(detector_node, self._bkg)
def _save_data(self, path=None, dte: DataToExport = None):
"""Private. Practical implementation to save data into a h5file altogether with metadata, axes, background...
Parameters
----------
path: Path
where to save the data as returned from browse_file for instance
dte: DataToExport
See Also
--------
browse_file, _get_data_from_viewers
"""
if path is not None:
path = Path(path)
h5saver = H5Saver(save_type='detector')
h5saver.init_file(update_h5=True, custom_naming=False, addhoc_file_path=path)
self.module_and_data_saver = module_saving.DetectorSaver(self)
self.module_and_data_saver.h5saver = h5saver
self._add_data_to_saver(dte, init_step=True)
if self.ui is not None:
(root, filename) = os.path.split(str(path))
filename, ext = os.path.splitext(filename)
image_path = os.path.join(root, filename + '.png')
self.dockarea.parent().grab().save(image_path)
h5saver.close_file()
self.data_saved.emit()
@Slot(DataToExport)
def _save_export_data(self, data: DataToExport):
"""Auxiliary method (Slot) to receive all data (raw and processed from rois) and save them
Parameters
----------
data: DataToExport
See Also
--------
_save_data
"""
if self._do_save_data:
self._save_data(self._save_file_pathname, data)
self._do_save_data = False
def _get_data_from_viewer(self, data: DataToExport):
"""Get all data emitted by the current viewers
Each viewer *data_to_export_signal* is connected to this slot. The collected data is stored in another
DataToExport `self._data_to_save_export` for further processing. All raw data are also stored in this attribute.
When all viewers have emitted this signal, the collected data are emitted with the
`grab_done_signal` signal.
Parameters
---------_
data: DataToExport
All data collected from the viewers
"""
if self._data_to_save_export is not None: # means that somehow data are not initialized so no further procsessing
self._received_data += 1
if len(data) != 0:
for dat in data:
dat.origin = f'{self.title} - {dat.origin}' if dat.origin is not None else f'{self.title}'
self._data_to_save_export.append(data)
if self._received_data == len(self.viewers):
self._grab_done = True
self.grab_done_signal.emit(self._data_to_save_export)
@property
def current_data(self) -> DataToExport:
""" Get the current data stored internally"""
return self._data_to_save_export
[docs]
@Slot(DataToExport)
def show_temp_data(self, data: DataToExport):
"""Send data to their dedicated viewers but those will not emit processed data signal
Slot receiving data from plugins emitted with the `data_grabed_signal_temp`
Parameters
----------
data: list of DataFromPlugins
"""
self._init_show_data(data)
if self.ui is not None:
self.set_data_to_viewers(data, temp=True)
[docs]
def show_data(self, dte: DataToExport):
""" Receive data from plugins and send them to their dedicated viewers
Slot receiving data from plugins emitted with the `data_grabed_signal`
Process the data as specified in the settings, display them into the dedicated data viewers depending on the
settings:
* create a container (DataToExport `_data_to_save_export`) with info from this DAQ_Viewer (title), a timestamp...
* call `_process_data`
* do background subtraction if any
* check refresh time (if set in the settings) to send or not data to data viewers
* either send to the data viewers (if refresh time is ok and/or show data option in settings is set)
* either
* send grab_done_signal (to the slot _save_export_data ) to save the data
Parameters
----------
dte: DataToExport
See Also
--------
_init_show_data, _process_data
"""
try:
dte = dte.deepcopy()
if self.settings['main_settings', 'tcpip', 'tcp_connected'] and self._send_to_tcpip:
self._command_tcpip.emit(ThreadCommand('data_ready', dte))
if self.settings['main_settings', 'leco', 'leco_connected'] and self._send_to_tcpip:
self._command_tcpip.emit(ThreadCommand('data_ready', dte))
if self.ui is not None:
self.ui.data_ready = True
if self.settings['main_settings', 'live_averaging']:
self.settings.child('main_settings', 'N_live_averaging').setValue(self._ind_continuous_grab)
_current_data = dte.deepcopy()
self._ind_continuous_grab += 1
if self._ind_continuous_grab > 1:
self._data_to_save_export = \
_current_data.average(self._data_to_save_export, self._ind_continuous_grab)
else:
for dwa in dte:
dwa.origin = self._title
if self.settings['main_settings', 'dynamic'] != 'as_is':
arrays = []
for data_array in dwa:
arrays.append(data_array.astype(self.settings['main_settings', 'dynamic'], copy=False))
dwa.data = arrays
self._data_to_save_export = DataToExport(self._title, control_module='DAQ_Viewer', data=dte.data)
if self._take_bkg:
self._bkg = self._data_to_save_export.deepcopy()
self._take_bkg = False
if self._grabing: # if live
refresh_time = self.settings['main_settings', 'refresh_time']
refresh = time.perf_counter() - self._start_grab_time > refresh_time / 1000
if refresh:
self._start_grab_time = time.perf_counter()
else:
refresh = True # if single
if self.ui is not None and self.settings.child('main_settings', 'show_data').value() and refresh:
self._received_data = 0 # so that data send back from viewers can be properly counted
data_to_plot = self._data_to_save_export.get_data_from_attribute('do_plot', True, deepcopy=True)
data_to_plot.append(self._data_to_save_export.get_data_from_missing_attribute('do_plot', deepcopy=True))
# process bkg if needed
if self.do_bkg and self._bkg is not None:
data_to_plot -= self._bkg
self._init_show_data(data_to_plot)
self.set_data_to_viewers(data_to_plot)
else:
self._grab_done = True
self.grab_done_signal.emit(self._data_to_save_export)
except Exception as e:
self.logger.exception(str(e))
def _init_show_data(self, dte: DataToExport):
"""Processing before showing data
* process the data to check if they overshoot
* check the data dimensionality to update the dedicated viewers
Parameters
----------
dte: DataToExport
See Also
--------
_process_overshoot
"""
self._process_overshoot(dte)
self._viewer_types = [ViewersEnum(dwa.dim.name) for dwa in dte if
('do_plot' not in dwa.extra_attributes) or
('do_plot' in dwa.extra_attributes and dwa.do_plot)]
if self.ui is not None:
if self.ui.viewer_types != self._viewer_types:
self.ui.update_viewers(self._viewer_types)
[docs]
def set_data_to_viewers(self, dte: DataToExport, temp=False):
"""Process data dimensionality and send appropriate data to their data viewers
Parameters
----------
dte: DataToExport
temp: bool
if True notify the data viewers to display data as temporary (meaning not exporting processed data from roi)
See Also
--------
ViewerBase, Viewer0D, Viewer1D, Viewer2D
"""
for ind, dwa in enumerate(dte):
if ('do_plot' not in dwa.extra_attributes) or \
('do_plot' in dwa.extra_attributes and dwa.do_plot):
self.viewers[ind].title = dwa.name
self.viewer_docks[ind].setTitle(self._title + ' ' + dwa.name)
if temp:
self.viewers[ind].show_data_temp(dwa)
else:
self.viewers[ind].show_data(dwa)
[docs]
def value_changed(self, param: Parameter):
"""ParameterManager subclassed method. Process events from value changed by user in the UI Settings
Parameters
----------
param: Parameter
a given parameter whose value has been changed by user
"""
super().value_changed(param=param)
path = self.settings.childPath(param)
if param.name() == 'DAQ_type':
self.settings.child('saver_settings', 'do_save').setValue(False)
self.settings.child('main_settings', 'axes').show(param.value() == 'DAQ2D')
elif param.name() == 'show_averaging':
self.settings.child('main_settings', 'live_averaging').setValue(False)
self._update_settings_signal.emit(edict(path=path, param=param, change='value'))
elif param.name() == 'live_averaging':
self.settings.child('main_settings', 'show_averaging').setValue(False)
if param.value():
self.settings.child('main_settings', 'N_live_averaging').show()
self._ind_continuous_grab = 0
self.settings.child('main_settings', 'N_live_averaging').setValue(0)
else:
self.settings.child('main_settings', 'N_live_averaging').hide()
#self._update_settings_signal.emit(edict(path=path, param=param, change='value'))
elif param.name() in putils.iter_children(self.settings.child('main_settings', 'axes'), []):
if self.daq_type.name == "DAQ2D":
if param.name() == 'use_calib':
if param.value() != 'None':
params = ioxml.XML_file_to_parameter(
os.path.join(local_path, 'camera_calibrations', param.value() + '.xml'))
param_obj = Parameter.create(name='calib', type='group', children=params)
self.settings.child('main_settings', 'axes').restoreState(
param_obj.child('axes').saveState(), addChildren=False, removeChildren=False)
self.settings.child('main_settings', 'axes').show()
else:
for viewer in self.viewers:
viewer.x_axis, viewer.y_axis = self.get_scaling_options()
elif param.name() == 'continuous_saving_opt':
self.settings.child('saver_settings').setOpts(visible=param.value())
elif param.name() == 'wait_time':
self.command_hardware.emit(ThreadCommand(ControlToHardwareViewer.UPDATE_WAIT_TIME,
[param.value()]))
elif param.name() in putils.iter_children(self.settings.child('saver_settings'), []):
try:
if param.name() == 'do_save':
self.setup_continuous_saving(param.value())
self._h5saver_continuous.settings.child(*path[1:]).setValue(param.value())
except KeyError:
pass
self._update_settings(param=param)
[docs]
def child_added(self, param, data):
""" Adds a child in the settings attribute
Parameters
----------
param: Parameter
the parameter where child will be added
data: Parameter
the child parameter
"""
if param.name() not in putils.iter_children(self.settings.child('main_settings'), []):
self._update_settings_signal.emit(edict(path=putils.get_param_path(param)[1:], param=data[0],
change='childAdded'))
[docs]
def param_deleted(self, param):
""" Remove a child from the settings attribute
Parameters
----------
param: Parameter
a given parameter whose value has been changed by user
"""
if param.name() not in putils.iter_children(self.settings.child('main_settings'), []):
self._update_settings_signal.emit(edict(path=['detector_settings'], param=param, change='parent'))
def _set_setting_tree(self):
"""Apply the specific settings of the selected detector (plugin)
Remove previous ones and load on the fly the new ones
See Also
--------
pymodaq.control_modules.utils:get_viewer_plugins
"""
try:
if len(self.settings.child('detector_settings').children()) > 0:
for child in self.settings.child('detector_settings').children():
child.remove()
det_params, _class = get_viewer_plugins(self.daq_type.name, self.detector)
self.settings.child('detector_settings').addChildren(det_params.children())
self.settings.child('main_settings', 'module_name').setValue(self._title)
except Exception as e:
self.logger.exception(str(e))
def _process_overshoot(self, dte: DataToExport):
"""Compare data value (0D) to the given overshoot setting
"""
if self.settings.child('main_settings', 'overshoot', 'stop_overshoot').value():
for dwa in dte:
for data_array in dwa.data:
if np.any(data_array >= self.settings['main_settings', 'overshoot',
'overshoot_value']):
self.overshoot_signal.emit(True)
[docs]
def get_scaling_options(self):
"""Create axes scaling options depending on the ('main_settings', 'axes') settings
Returns
-------
Tuple[Axis]
"""
scaled_xaxis = Axis(label=self.settings['main_settings', 'axes', 'xaxis', 'xlabel'],
units=self.settings['main_settings', 'axes', 'xaxis', 'xunits'],
offset=self.settings['main_settings', 'axes', 'xaxis', 'xoffset'],
scaling=self.settings['main_settings', 'axes', 'xaxis', 'xscaling'])
scaled_yaxis = Axis(label=self.settings['main_settings', 'axes', 'yaxis', 'ylabel'],
units=self.settings['main_settings', 'axes', 'yaxis', 'yunits'],
offset=self.settings['main_settings', 'axes', 'yaxis', 'yoffset'],
scaling=self.settings['main_settings', 'axes', 'yaxis', 'yscaling'])
return scaled_xaxis, scaled_yaxis
[docs]
def thread_status(self, status: ThreadCommand):
"""Get back info (using the ThreadCommand object) from the hardware
And re-emit this ThreadCommand using the custom_sig signal if it should be used in a higher level module
Commands valid for all control modules are defined in the parent class, here are described only the specific
ones
Parameters
----------
status: ThreadCommand
The info returned from the hardware, the command (str) can be either:
* ini_detector: update the status with "detector initialized" value and init state if attribute not null.
* grab : emit grab_status(True)
* grab_stopped: emit grab_status(False)
* init_lcd: display a LCD panel
* lcd: display on the LCD panel, the content of the attribute
* stop: stop the grab
"""
super().thread_status(status, 'detector')
if status.command == ThreadStatusViewer.INI_DETECTOR:
self.update_status("detector initialized: " + str(status.attribute['initialized']))
if self.ui is not None:
self.ui.detector_init = status.attribute['initialized']
if status.attribute['initialized']:
self.controller = status.attribute['controller']
self._initialized_state = True
else:
self._initialized_state = False
self.init_signal.emit(self._initialized_state)
elif status.command == ThreadStatusViewer.GRAB:
self.grab_status.emit(True)
elif status.command == ThreadStatusViewer.GRAB_STOPPED:
self.grab_status.emit(False)
elif status.command == ThreadStatusViewer.INI_LCD:
if self._lcd is not None:
try:
self._lcd.parent.close()
except Exception as e:
self.logger.exception(str(e))
# lcd module
lcd = QtWidgets.QWidget()
self._lcd = LCD(lcd, **status.attribute)
lcd.setVisible(True)
QtWidgets.QApplication.processEvents()
elif status.command == ThreadStatusViewer.LCD:
"""status.attribute should be a list of numpy arrays of shape (1,)"""
self._lcd.setvalues(status.attribute)
elif status.command == ThreadStatusViewer.STOP:
self.stop_grab()
def connect_tcp_ip(self):
super().connect_tcp_ip(params_state=self.settings.child('detector_settings'),
client_type="GRABBER")
[docs]
@Slot(ThreadCommand)
def process_tcpip_cmds(self, status: ThreadCommand) -> None:
"""Receive commands from the TCP Server (if connected) and process them
Parameters
----------
status: ThreadCommand
Possible commands are:
* 'Send Data: to trigger a snapshot
* 'connected': show that connection is ok
* 'disconnected': show that connection is not OK
* 'Update_Status': update a status command
* 'set_info': receive settings from the server side and update them on this side
See Also
--------
connect_tcp_ip, TCPServer
"""
if super().process_tcpip_cmds(status=status) is None:
return
if 'Send Data' in status.command:
self.snapshot('', send_to_tcpip=True)
elif status.command == LECOViewerCommands.GRAB:
self.grab(send_to_tcpip=True)
elif status.command ==LECOViewerCommands.SNAP:
self.snap( send_to_tcpip=True)
elif status.command == LECOViewerCommands.STOP:
self.stop()
elif status.command == LECOClientCommands.LECO_CONNECTED:
self.settings.child('main_settings', 'leco', 'leco_connected').setValue(True)
elif status.command == LECOClientCommands.LECO_DISCONNECTED:
self.settings.child('main_settings', 'leco', 'leco_connected').setValue(False)
[docs]
class DAQ_Detector(QObject):
""" Worker class to control the instrument plugin
Attributes
----------
detector: real instance of the instrument plugin class
controller: DAQ_Viewer_base
wrapper object used to control a given instrument in the instrument plugin
controller_adress: int
unique integer used to identify a controller shared among multiple instrument plugins
"""
status_sig = Signal(ThreadCommand)
data_detector_sig = Signal(DataToExport)
data_detector_temp_sig = Signal(DataToExport)
def __init__(self, title, settings_parameter, detector_name):
super().__init__()
self.waiting_for_data = False
self.controller = None
self.logger = set_logger(f'{logger.name}.{title}.detector')
self._title = title
self.detector_name = detector_name
self.detector: DAQ_Viewer_base = None
self.controller_adress: int = None
self.grab_state = False
self.single_grab = False
self.datas: DataToExport = None
self.ind_average = 0
self.Naverage = 1
self.average_done = False
self.hardware_averaging = False
self.show_averaging = False
self.wait_time = settings_parameter['main_settings', 'wait_time']
self.daq_type = DAQTypesEnum[settings_parameter['main_settings', 'DAQ_type']]
@property
def title(self):
return self._title
[docs]
def update_settings(self, settings_parameter_dict):
""" Apply a Parameter serialized as a dict to the instrument plugin class or to self
Parameters
----------
settings_parameter_dict: dict
dictionary serializing a Parameter object
Examples
--------
If the parameter is of the form ('detector_settings', 'xxx') then the parameter is sent to the instrument
plugin class.
"""
path = settings_parameter_dict['path']
param = settings_parameter_dict['param']
if path[0] == 'main_settings':
if hasattr(self, path[-1]):
setattr(self, path[-1], param.value())
elif path[0] == 'detector_settings':
self.detector.update_settings(settings_parameter_dict)
[docs]
def queue_command(self, command: ThreadCommand):
"""Transfer command from the main module to the hardware module
Parameters
----------
command: ThreadCommand
The specific (or generic) command (str) to pass to the hardware, either:
* ini_detector
* close
* grab
* single
* stop_grab
* stop_all
* update_scanner
* move_at_navigator
* update_wait_time
* get_axis
* any string that the hardware is able to understand
"""
if command.command == ControlToHardwareViewer.INI_DETECTOR:
status = self.ini_detector(*command.attribute)
self.status_sig.emit(ThreadCommand(ThreadStatusViewer.INI_DETECTOR, status))
elif command.command == ControlToHardwareViewer.CLOSE:
status = self.close()
self.status_sig.emit(ThreadCommand(ThreadStatus.CLOSE, [status, 'log']))
elif command.command == ControlToHardwareViewer.GRAB:
self.single_grab = False
self.grab_state = True
self.grab_data(**command.attribute)
elif command.command == ControlToHardwareViewer.SINGLE:
self.single_grab = True
self.grab_state = True
self.single(**command.attribute)
elif command.command == ControlToHardwareViewer.STOP_GRAB:
self.grab_state = False
self.detector.stop()
QtWidgets.QApplication.processEvents()
self.status_sig.emit(ThreadCommand(ThreadStatus.UPDATE_STATUS, 'Stopping grab'))
elif command.command == ControlToHardwareViewer.UPDATE_SCANNER: # may be deprecated
self.detector.update_scanner(command.attribute[0])
elif command.command == ControlToHardwareViewer.UPDATE_WAIT_TIME:
self.wait_time = command.attribute[0]
else: # custom commands for particular plugins
if hasattr(self.detector, command.command):
cmd = getattr(self.detector, command.command)
if isinstance(command.attribute, list):
cmd(*command.attribute)
elif isinstance(command.attribute, dict):
cmd(**command.attribute)
else:
cmd(command.attribute)
[docs]
def ini_detector(self, params_state=None, controller=None):
""" Initialize an instrument plugin class and tries to apply preset settings
When the instrument is initialized from the Dashboard using a Preset, tries to apply the preset
settings to the instrument instance
Parameters
----------
params_state: dict
controller: wrapper
"""
try:
# status="Not initialized"
status = edict(initialized=False, info="", x_axis=None, y_axis=None)
det_params, class_ = get_viewer_plugins(self.daq_type.name, self.detector_name)
self.detector: DAQ_Viewer_base = class_(self, params_state)
try:
self.detector.dte_signal.connect(self.data_ready)
self.detector.dte_signal_temp.connect(self.emit_temp_data)
infos = self.detector.ini_detector(controller)
status.controller = self.detector.controller
except Exception as e:
logger.exception("Hardware couldn't be initialized", exc_info=e)
infos = str(e), False
status.controller = None
if isinstance(infos, edict):
status.update(infos)
else:
status.info = infos[0]
status.initialized = infos[1]
self.hardware_averaging = class_.hardware_averaging # to check if averaging can be done directly by
# the hardware or done here software wise
return status
except Exception as e:
self.logger.exception(str(e))
return status
[docs]
def emit_temp_data(self, data: DataToExport):
""" Convenience method to export temporary data using the data_detector_temp_sig Signal
Parameters
----------
data: DataToExport
"""
self.data_detector_temp_sig.emit(data)
[docs]
def data_ready(self, data: DataToExport):
""" Process the data received from the instrument plugin class
Processing here is eventual software averaging if it was not possible in the instrument plugin class
Parameters
----------
data: DataToExport
"""
do_averaging = self.Naverage > 1 and not self.hardware_averaging
if do_averaging: # to execute if the averaging has to be done software wise
self.ind_average += 1
if self.ind_average == 1:
self.datas = data.deepcopy()
else:
self.datas = data.average(self.datas, self.ind_average)
if self.show_averaging:
self.emit_temp_data(self.datas)
if self.ind_average == self.Naverage:
self.average_done = True
self.data_detector_sig.emit(self.datas)
self.ind_average = 0
else:
self.average_done = True # expected to make sure the single_grab stop by itself
self.data_detector_sig.emit(data)
self.waiting_for_data = False
[docs]
def single(self, Naverage=1, *args, **kwargs):
""" Convenience function to grab a single set of data
Parameters
----------
Naverage: int
The number of data to average before displaying
kwargs: optional named arguments
"""
self.grab_data(Naverage, live=False, **kwargs)
[docs]
def grab_data(self, Naverage=1, live=True, **kwargs):
""" General method to grab data from the instrument plugin class
Will check if the plugin class can do hardware averaging (if NAverage > 1) and and live_mode, otherwise
do both software wise here
Parameters
----------
Naverage: int
The number of data to average
live: bool
Try to run the instrument plugin class grabbing in live mode
kwargs: optional named arguments passed to the grab_data method of the instrument plugin class
"""
try:
self.ind_average = 0
self.Naverage = Naverage
if Naverage > 1:
self.average_done = False
self.waiting_for_data = False
# for live mode:two possibilities: either snap one data and regrab softwarewise
# (while True) or if self.detector.live_mode_available is True all data is continuously
# emitted from the plugin
if self.detector.live_mode_available:
kwargs['wait_time'] = self.wait_time
else:
kwargs['wait_time'] = 0
self.status_sig.emit(ThreadCommand('grab'))
while True:
try:
if not self.waiting_for_data:
self.waiting_for_data = True
self.detector.grab_data(Naverage, live=live, **kwargs)
QtWidgets.QApplication.processEvents()
if self.single_grab:
if self.hardware_averaging:
break
else:
if self.average_done:
break
else:
QThread.msleep(self.wait_time) # if in grab mode apply a waiting time
# after acquisition
if not self.grab_state:
break # if not in grab mode breaks the while loop
if self.detector.live_mode_available and (not self.hardware_averaging and
self.average_done):
break # if live can be done in the plugin breaks the while loop except
# if average is asked but not done hardware wise
except Exception as e:
self.logger.exception(str(e))
self.status_sig.emit(ThreadCommand('grab_stopped'))
except Exception as e:
self.logger.exception(str(e))
[docs]
def close(self):
""" Call the close method of the instrument plugin class
"""
if self.detector is not None and self.detector.controller is not None:
status = self.detector.close()
return status
def prepare_docks(area, title):
""" Static method to init docks to be used within a DAQ_Viewer
Parameters
----------
area
title
Returns
-------
"""
dock_settings = Dock(title + " settings", size=(150, 250))
dock_viewer = Dock(title + " viewer", size=(350, 350))
area.addDock(dock_settings)
area.addDock(dock_viewer, 'right', dock_settings)
return dict(dock_settings=dock_settings, dock_viewer=dock_viewer)
def main(init_qt=True, init_det=False):
""" Method called to start the DAQ_Viewer in standalone mode"""
if init_qt: # used for the test suite
app = mkQApp("PyMoDAQ Viewer")
win = QtWidgets.QMainWindow()
area = DockArea()
win.setCentralWidget(area)
win.resize(1000, 500)
win.show()
title = "Testing"
viewer = DAQ_Viewer(area, title="Testing", daq_type=config('viewer', 'daq_type'),
**prepare_docks(area, title))
if init_det:
viewer.init_hardware_ui(init_det)
if init_qt:
sys.exit(app.exec_())
return viewer, win
if __name__ == '__main__':
main(init_det=False)