from typing import List, Union, Optional
import tempfile
from pathlib import Path
from qtpy import QtWidgets, QtCore
import time
import numpy as np
from pymodaq.utils.data import DataToExport, DataToActuators, DataCalculated, DataActuator
from pymodaq.utils.plotting.data_viewers.viewer0D import Viewer0D
from pymodaq.utils.plotting.data_viewers.viewer import ViewerDispatcher, ViewersEnum
from pymodaq.extensions.bayesian.utils import (get_bayesian_models, BayesianModelGeneric,
BayesianAlgorithm, UtilityKind,
UtilityParameters, StopType, StoppingParameters)
from pymodaq.utils.gui_utils import QLED
from pymodaq.utils.managers.modules_manager import ModulesManager
from pymodaq.utils import gui_utils as gutils
from pymodaq.utils import daq_utils as utils
from pymodaq.utils.parameter import utils as putils
from pymodaq.utils.h5modules.saving import H5Saver
from pymodaq.utils.h5modules.data_saving import DataEnlargeableSaver
from pymodaq.post_treatment.load_and_plot import LoaderPlotter
from pymodaq.extensions.bayesian.utils import BayesianConfig
from pymodaq.utils import config as configmod
from pymodaq.utils.logger import set_logger, get_module_name
EXTENSION_NAME = 'BayesianOptimisation'
CLASS_NAME = 'BayesianOptimisation'
logger = set_logger(get_module_name(__file__))
[docs]class BayesianOptimisation(gutils.CustomApp):
""" PyMoDAQ extension of the DashBoard to perform the optimization of a target signal
taken form the detectors as a function of one or more parameters controlled by the actuators.
"""
command_runner = QtCore.Signal(utils.ThreadCommand)
models = get_bayesian_models()
explored_viewer_name = 'algo/ProbedData'
optimisation_done_signal = QtCore.Signal(DataToExport)
params = [
{'title': 'Main Settings:', 'name': 'main_settings', 'expanded': True, 'type': 'group',
'children': [
{'title': 'Utility Function:', 'name': 'utility', 'expanded': False, 'type': 'group',
'children': [
{'title': 'Kind', 'name': 'kind', 'type': 'list',
'limits': UtilityKind.to_dict_value()},
{'title': 'Kappa:', 'name': 'kappa', 'type': 'slide', 'value': 2.576,
'min': 0.001, 'max': 100, 'subtype': 'log',
'tip': 'Parameter to indicate how closed are the next parameters sampled.'
'Higher value = favors spaces that are least explored.'
'Lower value = favors spaces where the regression function is the '
'highest.'},
{'title': 'Kappa actual:', 'name': 'kappa_actual', 'type': 'float', 'value': 2.576,
'tip': 'Current value of the kappa parameter', 'readonly': True},
{'title': 'xi:', 'name': 'xi', 'type': 'slide', 'value': 0,
'tip': 'Governs the exploration/exploitation tradeoff.'
'Lower prefers exploitation, higher prefers exploration.'},
{'title': 'Kappa decay:', 'name': 'kappa_decay', 'type': 'float', 'value': 0.9,
'tip': 'kappa is multiplied by this factor every iteration.'},
{'title': 'Kappa decay delay:', 'name': 'kappa_decay_delay', 'type': 'int',
'value': 20, 'tip': 'Number of iterations that must have passed before applying '
'the decay to kappa.'},
]},
{'title': 'Stopping Criteria:', 'name': 'stopping', 'expanded': False, 'type': 'group',
'children': [
{'title': 'Niteration', 'name': 'niter', 'type': 'int', 'value': 100, 'min': -1},
{'title': 'Type:', 'name': 'stop_type', 'type': 'list',
'limits': StopType.names()},
{'title': 'Tolerance', 'name': 'tolerance', 'type': 'slide', 'value': 1e-2,
'min': 1e-8, 'max': 1, 'subtype': 'log',},
{'title': 'Npoints', 'name': 'npoints', 'type': 'int', 'value': 5, 'min': 1},
]},
{'title': 'Ini. State', 'name': 'ini_random', 'type': 'int', 'value': 5},
{'title': 'bounds', 'name': 'bounds', 'type': 'group', 'children': []},
]},
{'title': 'Models', 'name': 'models', 'type': 'group', 'expanded': True, 'visible': True,
'children': [
{'title': 'Models class:', 'name': 'model_class', 'type': 'list',
'limits': [d['name'] for d in models]},
{'title': 'Ini Model', 'name': 'ini_model', 'type': 'action', },
{'title': 'Ini Algo', 'name': 'ini_runner', 'type': 'action', 'enabled': False},
{'title': 'Model params:', 'name': 'model_params', 'type': 'group', 'children': []},
]},
{'title': 'Move settings:', 'name': 'move_settings', 'expanded': True, 'type': 'group',
'visible': False, 'children': [
{'title': 'Units:', 'name': 'units', 'type': 'str', 'value': ''}]},
]
def __init__(self, dockarea, dashboard):
super().__init__(dockarea, dashboard)
self.algorithm: Optional[BayesianAlgorithm] = None
self.viewer_fitness: Optional[Viewer0D] = None
self.viewer_observable: Optional[ViewerDispatcher] = None
self.model_class: Optional[BayesianModelGeneric] = None
self._save_main_settings = True
self._modules_manager = ModulesManager(self.dashboard.detector_modules,
self.dashboard.actuators_modules)
self.modules_manager.actuators_changed[list].connect(self.update_actuators)
self.modules_manager.settings.child('data_dimensions').setOpts(expanded=False)
self.modules_manager.settings.child('actuators_positions').setOpts(expanded=False)
self.setup_ui()
self.bayesian_config = BayesianConfig()
self.mainsettings_saver_loader = configmod.ConfigSaverLoader(
self.settings.child('main_settings'), self.bayesian_config)
self.h5temp: H5Saver = None
self.temp_path: tempfile.TemporaryDirectory = None
self.enlargeable_saver: DataEnlargeableSaver = None
self.live_plotter = LoaderPlotter(self.dockarea)
self.enl_index = 0
self.settings.child('models', 'ini_model').sigActivated.connect(
self.get_action('ini_model').trigger)
self.settings.child('models', 'ini_runner').sigActivated.connect(
self.get_action('ini_runner').trigger)
@property
def modules_manager(self) -> ModulesManager:
return self._modules_manager
[docs] def setup_docks(self):
"""
to be subclassed to setup the docks layout
for instance:
self.docks['ADock'] = gutils.Dock('ADock name)
self.dockarea.addDock(self.docks['ADock"])
self.docks['AnotherDock'] = gutils.Dock('AnotherDock name)
self.dockarea.addDock(self.docks['AnotherDock"], 'bottom', self.docks['ADock"])
See Also
########
pyqtgraph.dockarea.Dock
"""
self.docks['settings'] = gutils.Dock('Settings')
self.dockarea.addDock(self.docks['settings'])
splitter = QtWidgets.QSplitter(QtCore.Qt.Vertical)
self.docks['settings'].addWidget(splitter)
splitter.addWidget(self.settings_tree)
splitter.addWidget(self.modules_manager.settings_tree)
self.modules_manager.show_only_control_modules(False)
splitter.setSizes((int(self.dockarea.height() / 2),
int(self.dockarea.height() / 2)))
widget_observable = QtWidgets.QWidget()
widget_observable.setLayout(QtWidgets.QHBoxLayout())
observable_dockarea = gutils.DockArea()
widget_observable.layout().addWidget(observable_dockarea)
self.viewer_observable = ViewerDispatcher(observable_dockarea, direction='bottom')
self.docks['observable'] = gutils.Dock('Observable')
self.dockarea.addDock(self.docks['observable'], 'right', self.docks['settings'])
self.docks['observable'].addWidget(widget_observable)
if len(self.models) != 0:
self.get_set_model_params(self.models[0]['name'])
def get_set_model_params(self, model_name):
self.settings.child('models', 'model_params').clearChildren()
if len(self.models) > 0:
model_class = utils.find_dict_in_list_from_key_val(self.models, 'name', model_name)['class']
params = getattr(model_class, 'params')
self.settings.child('models', 'model_params').addChildren(params)
[docs] def value_changed(self, param):
''' to be subclassed for actions to perform when one of the param's value in self.settings is changed
For instance:
if param.name() == 'do_something':
if param.value():
print('Do something')
self.settings.child('main_settings', 'something_done').setValue(False)
Parameters
----------
param: (Parameter) the parameter whose value just changed
'''
if param.name() == 'model_class':
self.get_set_model_params(param.value())
elif param.name() in putils.iter_children(self.settings.child('models', 'model_params'), []):
if self.model_class is not None:
self.model_class.update_settings(param)
elif param.name() in putils.iter_children(
self.settings.child('main_settings', 'utility'), []):
if param.name() != 'kappa_actual':
self.update_utility_function()
elif param.name() in putils.iter_children(
self.settings.child('main_settings', 'bounds'), []):
self.update_bounds()
elif param.name() in putils.iter_children(
self.settings.child('main_settings', 'stopping'), []):
self.update_stopping_criteria()
if self._save_main_settings and param.name() in putils.iter_children(
self.settings.child('main_settings'), []):
self.mainsettings_saver_loader.save_config()
def update_utility_function(self):
utility_settings = self.settings.child('main_settings', 'utility')
uparams = UtilityParameters(utility_settings['kind'], utility_settings['kappa'],
utility_settings['xi'], utility_settings['kappa_decay'],
utility_settings['kappa_decay_delay'])
self.command_runner.emit(utils.ThreadCommand('utility', uparams))
def get_stopping_parameters(self) -> StoppingParameters:
stopping_settings = self.settings.child('main_settings', 'stopping')
stopping_params = StoppingParameters(stopping_settings['niter'],
stopping_settings['stop_type'],
stopping_settings['tolerance'],
stopping_settings['npoints'])
return stopping_params
def update_stopping_criteria(self):
self.command_runner.emit(utils.ThreadCommand('stopping', self.get_stopping_parameters()))
def update_bounds(self):
bounds = {}
for child in self.settings.child('main_settings', 'bounds').children():
bounds[child.name()] = (child['min'], child['max'])
self.command_runner.emit(utils.ThreadCommand('bounds', bounds))
[docs] def setup_actions(self):
logger.debug('setting actions')
self.add_action('quit', 'Quit', 'close2', "Quit program")
self.add_action('ini_model', 'Init Model', 'ini')
self.add_widget('model_led', QLED, toolbar=self.toolbar)
self.add_action('ini_runner', 'Init the Optimisation Algorithm', 'ini', checkable=True,
enabled=False)
self.add_widget('runner_led', QLED, toolbar=self.toolbar)
self.add_action('run', 'Run Optimisation', 'run2', checkable=True, enabled=False)
self.add_action('gotobest', 'Got to best individual', 'move_contour', enabled=False,
tip='Go to the best individual guessed by the algorithm')
logger.debug('actions set')
[docs] def connect_things(self):
logger.debug('connecting things')
self.connect_action('quit', self.quit, )
self.connect_action('ini_model', self.ini_model)
self.connect_action('ini_runner', self.ini_optimisation_runner)
self.connect_action('run', self.run_optimisation)
self.connect_action('gotobest', self.go_to_best)
def go_to_best(self):
best_individual = self.algorithm.best_individual
actuators = self.modules_manager.selected_actuators_name
dte_act = DataToActuators('best', data=[
DataActuator(actuators[ind], data=float(best_individual[ind])) for ind in range(len(best_individual))
],
mode='abs')
self.modules_manager.connect_actuators(True)
self.modules_manager.move_actuators(dte_act, polling=True)
self.modules_manager.connect_actuators(False)
self.modules_manager.grab_datas()
def quit(self):
self.dockarea.parent().close()
self.clean_h5_temp()
def set_model(self):
model_name = self.settings.child('models', 'model_class').value()
self.model_class = utils.find_dict_in_list_from_key_val(
self.models, 'name', model_name)['class'](self)
self.model_class.ini_model_base()
def ini_temp_file(self):
self.clean_h5_temp()
self.h5temp = H5Saver()
self.temp_path = tempfile.TemporaryDirectory(prefix='pymo')
addhoc_file_path = Path(self.temp_path.name).joinpath('bayesian_temp_data.h5')
self.h5temp.init_file(custom_naming=True, addhoc_file_path=addhoc_file_path)
act_names = [child.name() for child in self.settings.child( 'main_settings',
'bounds').children()]
act_units = [self.modules_manager.get_mod_from_name(act_name, 'act').units for act_name
in act_names]
self.enlargeable_saver = DataEnlargeableSaver(
self.h5temp,
enl_axis_names=act_names,
enl_axis_units=act_units)
def ini_live_plot(self):
self.live_plotter.h5saver = self.h5temp
act_names = [child.name() for child in self.settings.child('main_settings',
'bounds').children()]
act_units = [self.modules_manager.get_mod_from_name(act_name, 'act').units for act_name
in act_names]
if len(act_names) == 1:
viewer_enum = 'Viewer1D'
elif len(act_names) == 2:
viewer_enum = 'Viewer2D'
else:
viewer_enum = 'ViewerND'
viewers = self.live_plotter.prepare_viewers([viewer_enum],
viewers_name=[self.explored_viewer_name])
for viewer in viewers:
if viewer.has_action('crosshair'):
viewer.get_action('crosshair').trigger()
if hasattr(viewer.view, 'collapse_lineout_widgets'):
viewer.view.collapse_lineout_widgets()
if viewer.has_action('sort'):
if not viewer.is_action_checked('sort'):
viewer.get_action('sort').trigger()
if viewer.has_action('scatter'):
if not viewer.is_action_checked('scatter'):
viewer.get_action('scatter').trigger()
QtWidgets.QApplication.processEvents()
win_width = self.dockarea.width()
self.docks['settings'].container().setSizes((int(win_width / 5),
int(2 * win_width / 5),
int(2 * win_width / 5), 10, 10))
def update_actuators(self, actuators: List[str]):
if self.is_action_checked('ini_runner'):
self.get_action('ini_runner').trigger()
QtWidgets.QApplication.processEvents()
self._save_main_settings = False
for child in self.settings.child('main_settings', 'bounds').children():
self.settings.child('main_settings', 'bounds').removeChild(child)
params = []
for actuator in actuators:
params.append({'title': actuator, 'name': actuator, 'type': 'group', 'children': [
{'title': 'min', 'name': 'min', 'type': 'float', 'value': -5},
{'title': 'max', 'name': 'max', 'type': 'float', 'value': 5},
]})
self.settings.child('main_settings', 'bounds').addChildren(params)
self.mainsettings_saver_loader.base_path = [self.model_class.__class__.__name__] + \
self.modules_manager.selected_actuators_name
self.mainsettings_saver_loader.load_config()
self._save_main_settings = True
def format_bounds(self):
bound_dict = {}
for bound in self.settings.child('main_settings', 'bounds').children():
bound_dict.update({bound.name(): (bound['min'], bound['max'])})
return bound_dict
def set_algorithm(self):
self.algorithm = BayesianAlgorithm(
ini_random=self.settings['main_settings', 'ini_random'],
bounds=self.format_bounds(),)
def ini_model(self):
try:
if self.model_class is None:
self.set_model()
self.modules_manager.selected_actuators_name = self.model_class.actuators_name
self.modules_manager.selected_detectors_name = self.model_class.detectors_name
self.enable_controls_opti(True)
self.get_action('model_led').set_as_true()
self.set_action_enabled('ini_model', False)
self.viewer_observable.update_viewers(['Viewer0D', 'Viewer0D'],
['Fitness', 'Individual'])
self.settings.child('models', 'ini_model').setValue(True)
self.settings.child('models', 'ini_runner').setOpts(enabled=True)
self.set_action_enabled('ini_runner', True)
self.mainsettings_saver_loader.base_path = [self.model_class.__class__.__name__] + \
self.modules_manager.selected_actuators_name
self.mainsettings_saver_loader.load_config()
try: # this is correct for Default Model and probably for all models...
self.model_class.settings.child('optimizing_signal', 'data_probe').activate()
except Exception:
pass
except Exception as e:
logger.exception(str(e))
def ini_optimisation_runner(self):
if self.is_action_checked('ini_runner'):
self.set_algorithm()
self.settings.child('models', 'ini_runner').setValue(True)
self.enl_index = 0
self.ini_temp_file()
self.ini_live_plot()
self.runner_thread = QtCore.QThread()
runner = OptimisationRunner(self.model_class, self.modules_manager, self.algorithm,
self.get_stopping_parameters())
self.runner_thread.runner = runner
runner.algo_output_signal.connect(self.process_output)
runner.algo_finished.connect(self.optimisation_done)
self.command_runner.connect(runner.queue_command)
runner.moveToThread(self.runner_thread)
self.runner_thread.start()
self.get_action('runner_led').set_as_true()
self.set_action_enabled('run', True)
self.model_class.runner_initialized()
self.update_utility_function()
else:
if self.is_action_checked('run'):
self.get_action('run').trigger()
QtWidgets.QApplication.processEvents()
self.runner_thread.terminate()
self.get_action('runner_led').set_as_false()
def clean_h5_temp(self):
if self.temp_path is not None:
try:
self.h5temp.close()
self.temp_path.cleanup()
except Exception as e:
logger.exception(str(e))
def optimisation_done(self, dte: DataToExport):
self.go_to_best()
self.optimisation_done_signal.emit(dte)
def process_output(self, dte: DataToExport):
self.enl_index += 1
dwa_kappa = dte.remove(dte.get_data_from_name('kappa'))
self.settings.child('main_settings', 'utility', 'kappa_actual').setValue(
float(dwa_kappa[0][0])
)
dwa_data = dte.remove(dte.get_data_from_name('ProbedData'))
dwa_actuators: DataActuator = dte.remove(dte.get_data_from_name('Actuators'))
self.viewer_observable.show_data(dte)
# dwa_observations = self.algorithm.get_dwa_obervations(
# self.modules_manager.selected_actuators_name)
self.model_class.update_plots()
best_individual = dte.get_data_from_name('Individual')
best_indiv_as_list = [float(best_individual[ind][0]) for ind in range(len(best_individual))]
self.enlargeable_saver.add_data('/RawData', dwa_data,
axis_values=dwa_actuators.values())
if len(best_indiv_as_list) == 1 or (
len(best_indiv_as_list) == 2 and self.enl_index >= 3):
self.update_data_plot(target_at=dwa_actuators.values(),
crosshair_at=best_indiv_as_list)
def update_data_plot(self, target_at=None, crosshair_at=None):
self.live_plotter.load_plot_data(remove_navigation=False,
crosshair_at=crosshair_at,
target_at=target_at)
def enable_controls_opti(self, enable: bool):
pass
def run_optimisation(self):
if self.is_action_checked('run'):
self.get_action('run').set_icon('pause')
self.command_runner.emit(utils.ThreadCommand('start', {}))
QtWidgets.QApplication.processEvents()
QtWidgets.QApplication.processEvents()
self.command_runner.emit(utils.ThreadCommand('run', {}))
else:
self.get_action('run').set_icon('run2')
self.command_runner.emit(utils.ThreadCommand('stop', {}))
self.set_action_enabled('gotobest', True)
QtWidgets.QApplication.processEvents()
class OptimisationRunner(QtCore.QObject):
algo_output_signal = QtCore.Signal(DataToExport)
algo_finished = QtCore.Signal(DataToExport)
def __init__(self, model_class: BayesianModelGeneric, modules_manager: ModulesManager,
algorithm: BayesianAlgorithm, stopping_params: StoppingParameters):
super().__init__()
self.det_done_datas: DataToExport = None
self.input_from_dets: float = None
self.outputs: List[np.ndarray] = []
self.dte_actuators: DataToExport = None
self.stopping_params: StoppingParameters = stopping_params
self.model_class: BayesianModelGeneric = model_class
self.modules_manager: ModulesManager = modules_manager
self.running = True
self.optimisation_algorithm: BayesianAlgorithm = algorithm
self._ind_iter: int = 0
@QtCore.Slot(utils.ThreadCommand)
def queue_command(self, command: utils.ThreadCommand):
"""
"""
if command.command == "run":
self.run_opti(**command.attribute)
elif command.command == "stop":
self.running = False
elif command.command == 'utility':
utility_params: UtilityParameters = command.attribute
self.optimisation_algorithm.set_utility_function(
utility_params.kind,
kappa=utility_params.kappa,
xi=utility_params.xi,
kappa_decay=utility_params.kappa_decay,
kappa_decay_delay=utility_params.kappa_decay_delay)
elif command.command == 'stopping':
self.stopping_params: StoppingParameters = command.attribute
elif command.command == 'bounds':
self.optimisation_algorithm.set_bounds(command.attribute)
def run_opti(self, sync_detectors=True, sync_acts=True):
"""Start the optimisation loop
Parameters
----------
sync_detectors: (bool) if True will make sure all selected detectors (if any) all got their data before calling
the model
sync_acts: (bool) if True will make sure all selected actuators (if any) all reached their target position
before calling the model
"""
self.running = True
converged = False
try:
if sync_detectors:
self.modules_manager.connect_detectors()
if sync_acts:
self.modules_manager.connect_actuators()
self.current_time = time.perf_counter()
logger.info('Optimisation loop starting')
while self.running:
self._ind_iter += 1
next_target = self.optimisation_algorithm.ask()
self.outputs = next_target
self.output_to_actuators: DataToActuators =\
self.model_class.convert_output(
self.outputs,
best_individual=self.optimisation_algorithm.best_individual
)
self.modules_manager.move_actuators(self.output_to_actuators,
self.output_to_actuators.mode,
polling=sync_acts)
# Do the evaluation (measurements)
self.det_done_datas = self.modules_manager.grab_datas()
self.input_from_dets = self.model_class.convert_input(self.det_done_datas)
# Run the algo internal mechanic
self.optimisation_algorithm.tell(float(self.input_from_dets))
dte = DataToExport('algo',
data=[self.individual_as_data(
np.array([self.optimisation_algorithm.best_fitness]),
'Fitness'),
self.individual_as_data(
self.optimisation_algorithm.best_individual,
'Individual'),
DataCalculated('ProbedData',
data=[np.array([self.input_from_dets])],
),
self.output_to_actuators.merge_as_dwa('Data0D',
'Actuators'),
DataCalculated(
'kappa',
data=[
np.array([self.optimisation_algorithm.kappa])])
])
self.algo_output_signal.emit(dte)
self.optimisation_algorithm.update_utility_function()
if self.optimisation_algorithm.stopping(self._ind_iter, self.stopping_params):
converged = True
break
self.current_time = time.perf_counter()
QtWidgets.QApplication.processEvents()
logger.info('Optimisation loop exiting')
self.modules_manager.connect_actuators(False)
self.modules_manager.connect_detectors(False)
if converged:
self.algo_finished.emit(dte)
except Exception as e:
logger.exception(str(e))
@staticmethod
def individual_as_data(individual: np.ndarray, name: str = 'Individual') -> DataCalculated:
return DataCalculated(name, data=[np.atleast_1d(np.squeeze(coordinate)) for coordinate in
np.atleast_1d(np.squeeze(individual))])
def main(init_qt=True):
import sys
from pathlib import Path
from pymodaq.utils.daq_utils import get_set_preset_path
if init_qt: # used for the test suite
app = QtWidgets.QApplication(sys.argv)
import qdarkstyle
app.setStyleSheet(qdarkstyle.load_stylesheet())
from pymodaq.dashboard import DashBoard
win = QtWidgets.QMainWindow()
area = gutils.dock.DockArea()
win.setCentralWidget(area)
win.resize(1000, 500)
win.setWindowTitle('PyMoDAQ Dashboard')
dashboard = DashBoard(area)
daq_scan = None
file = Path(get_set_preset_path()).joinpath(f"{'beam_steering_mock'}.xml")
if file.exists():
dashboard.set_preset_mode(file)
daq_scan = dashboard.load_bayesian()
else:
msgBox = QtWidgets.QMessageBox()
msgBox.setText(f"The default file specified in the configuration file does not exists!\n"
f"{file}\n"
f"Impossible to load the DAQScan Module")
msgBox.setStandardButtons(msgBox.Ok)
ret = msgBox.exec()
if init_qt:
sys.exit(app.exec_())
return dashboard, daq_scan, win
if __name__ == '__main__':
main()