6.8. Making a PyMoDAQ Component Available Through LECO
This page is a developer guide explaining how to expose an existing PyMoDAQ component
over the PyMoDAQ integrated LECO network. The DashBoard class is used as a running example
throughout.
A second section covers the scripting side: once a component is LECO-enabled, how to control it from a plain Python script without a PyMoDAQ GUI.
See also
LECO communication for the end-user guide (connecting the UI, starting the Coordinator, using the LECODirector plugins).
Communication With Devices Outside of PyMoDAQ for communicating with non-Python devices over LECO.
6.8.1. Architecture
Fig. 6.12 Architecture of LECO in PyMoDAQ and of the scripting layer. A plain Python
script instantiates a wrapper (the Director side) that starts its own Listener and Director,
connects to the LECO Coordinator, and communicates with the running
PyMoDAQ component (the Actor side) over the network.
6.8.1.1. Part 1 – LECO-Enabling a Component (PyMoDAQ Side)
The process has five steps:
Inherit from
LECOComponentMixinand implement its abstract methods.Define the command and RPC-method name enumerations.
Register incoming RPC methods in
ActorHandler.Add outgoing message handling in
queue_command.Implement
process_leco_commandson the component.
6.8.2. Step 1 – Inherit LECOComponentMixin
Your class must also be a QObject (or inherit from one), because
_leco_commands_signal is a Qt Signal. Pass the listener class to use
to LECOComponentMixin.__init__. Also the QObject inheritance must
be before the LECOComponentMixin one, and the constructor call after.
Note
QObject.__init__ calls super() so it follows the class MRO.
If LECOComponentMixin appears before QObject in the inheritance
order, its __init__ may be invoked without the required
listener_class argument, leading to initialization errors.
from pymodaq.utils.leco.pymodaq_listener import LECOComponentMixin, ActorListener
from pymodaq_utils.utils import ThreadCommand
from qtpy.QtCore import Signal
class MyComponent(SomeQObjectBase, LECOComponentMixin):
_leco_commands_signal = Signal(ThreadCommand) # required
def __init__(self, ...):
LECOComponentMixin.__init__(self, MyComponentActorListener)
SomeQObjectBase.__init__(self, ...)
Then implement the three abstract methods:
def get_leco_name(self) -> str:
return "my_component" # unique name on the LECO network, static or defined by constructor
def get_leco_host_port(self) -> tuple[str, int]:
return "localhost", 12300 # Coordinator address, generally from some settings or configuration
def process_leco_commands(self, status: ThreadCommand) -> None:
... # see Step 5
Then connect_leco(True) can be user when ready to join the network (e.g. at startup or
on a button click) and connect_leco(False) to disconnect.
For example, DashBoard inherits both CustomApp and
LECOComponentMixin, passes DashboardActorListener, and calls
connect_leco(True) automatically during do_things_after_ui_setup.
Warning
All listeners (DashboardActorListener, MoveActorListener, …)
are actually aliases for ActorListener
6.8.3. Step 2 – Define Command Enumerations
Two sets of names are needed:
RPC method names (strings exchanged over the network) – defined in
pymodaq.utils.leco.rpc_method_definitionsasStrEnumsubclasses.ThreadCommand names (used internally to cross the Qt thread boundary) – defined in
pymodaq.utils.leco.pymodaq_listener.
However, they are generally the same names.
For each new component you need one enum for incoming commands (Director → Actor, i.e. what your component receives) and one for outgoing commands (Actor → Director, i.e. what your component sends back). It seems to add some complexity, but actually helps to understand what each component is doing:
# in rpc_method_definitions.py
class MyComponentMethods(StrEnum):
DO_SOMETHING = "do_something" # incoming: Director calls this on the Actor
GET_INFO = "get_info"
class MyComponentDirectorMethods(StrEnum):
SOMETHING_DONE = "something_done" # outgoing: Actor calls this on the Director
SEND_INFO = "send_info"
# in pymodaq_listener.py
class LECOMyComponentCommands(StrEnum):
# incoming (RPC → ThreadCommand)
DO_SOMETHING = "do_something"
GET_INFO = "get_info"
# outgoing (ThreadCommand → RPC)
SOMETHING_DONE = "something_done"
SEND_INFO = "send_info"
Dashboard example – DashboardMethods
contains GET_DEVICES, GET_CONFIGURATIONS, APPLY_CONFIGURATION,
GET_PRESETS, APPLY_PRESET.
DashboardDirectorMethods
contains SEND_DEVICES, SEND_CONFIGURATIONS, SEND_PRESETS,
APPLIED_CONFIGURATION_DONE, APPLIED_PRESET_DONE.
Both mirror each other in
LECODashboardCommands.
6.8.4. Step 3 – Register Incoming RPC Methods in ActorHandler
Modify ActorHandler to
register new RPC methods. Use register_binary_rpc_method when the argument needs binary
(de)serialisation and register_rpc_method otherwise. Each method body emits a
ThreadCommand on self.signals.cmd_signal to hand the call over to the
Qt thread.
class ActorHandler:
def register_rpc_methods(self) -> None:
...
self.register_rpc_method(self.do_something,
name=MyComponentMethods.DO_SOMETHING)
self.register_rpc_method(self.get_info,
name=MyComponentMethods.GET_INFO)
def do_something(self, value: str) -> None:
self.signals.cmd_signal.emit(
ThreadCommand(LECOMyComponentCommands.DO_SOMETHING, attribute=value)
)
def get_info(self) -> None:
self.signals.cmd_signal.emit(ThreadCommand(LECOMyComponentCommands.GET_INFO))
class MyActorListener(ActorListener):
def __init__(self, name, **kwargs):
super().__init__(name, handler_class=MyActorHandler, **kwargs)
Dashboard example – get_devices,
get_configurations, apply_configuration, get_presets,
apply_preset were added to ActorHandler. Each simply emits the matching
LECODashboardCommands on cmd_signal:
def get_devices(self):
self.signals.cmd_signal.emit(ThreadCommand(LECODashboardCommands.GET_DEVICES))
def apply_preset(self, preset: str):
self.signals.cmd_signal.emit(
ThreadCommand(LECODashboardCommands.APPLY_PRESET, attribute=preset)
)
...
6.8.5. Step 4 – Handle Outgoing Messages in queue_command
ActorListener.queue_command is the outgoing channel: it receives
ThreadCommand objects emitted on _leco_commands_signal and translates
them into ask_rpc calls sent to the Director. Add elif branches for
your new commands:
# in your MyActorListener (or patched into ActorListener)
def queue_command(self, command: ThreadCommand) -> None:
...
elif command.command == LECOMyComponentCommands.SOMETHING_DONE:
self.send_rpc_message_to_remote(
method=MyComponentDirectorMethods.SOMETHING_DONE,
result=command.attribute, # plain JSON value
)
elif command.command == LECOMyComponentCommands.SEND_INFO:
# binary payload example
self.send_rpc_message_to_remote(
method=MyComponentDirectorMethods.SEND_INFO,
**binary_serialization_to_kwargs(command.attribute, data_key="data"),
)
else:
super().queue_command(command)
Dashboard example – the relevant branches in
queue_command():
elif command.command == LECODashboardCommands.SEND_DEVICES:
self.send_rpc_message_to_remote(
method=DashboardDirectorMethods.SEND_DEVICES,
**binary_serialization_to_kwargs(command.attribute, data_key="data"),
)
elif command.command == LECODashboardCommands.SEND_CONFIGURATIONS:
self.send_rpc_message_to_remote(
method=DashboardDirectorMethods.SEND_CONFIGURATIONS,
configurations=command.attribute, # plain list, no serialisation needed
)
elif command.command == LECODashboardCommands.APPLIED_PRESET_DONE:
self.send_rpc_message_to_remote(
method=DashboardDirectorMethods.APPLIED_PRESET_DONE,
done=command.attribute,
)
6.8.6. Step 5 – Implement process_leco_commands
This slot runs in the Qt thread and is called whenever the background
ActorListener delivers an incoming command. Dispatch on
status.command and either:
Return a result immediately – emit the reply command directly on
_leco_commands_signal.Start a process – trigger an internal action; emit the reply command once the action completes.
def process_leco_commands(self, status: ThreadCommand) -> None:
if status.command == LECOMyComponentCommands.DO_SOMETHING:
# start a process; the reply will be emitted when it finishes
self.do_something_async(status.attribute) # will later emit SOMETHING_DONE
elif status.command == LECOMyComponentCommands.GET_INFO:
# return result immediately
info = self.compute_info()
self._leco_commands_signal.emit(
ThreadCommand(LECOMyComponentCommands.SEND_INFO, info)
)
Dashboard example:
def process_leco_commands(self, status: ThreadCommand) -> None:
if status.command == LECODashboardCommands.GET_DEVICES:
# returns immediately
devices = {
'actuators': [m.get_leco_name() for m in self.actuators_modules],
'detectors': [m.get_leco_name() for m in self.detector_modules],
}
self._leco_commands_signal.emit(
ThreadCommand(LECODashboardCommands.SEND_DEVICES, devices)
)
elif status.command == LECODashboardCommands.GET_PRESETS:
# returns immediately
self._leco_commands_signal.emit(
ThreadCommand(LECODashboardCommands.SEND_PRESETS,
self.preset_manager.entries)
)
elif status.command == LECODashboardCommands.APPLY_PRESET:
# asynchronous: preset loading triggers a callback that later emits
# APPLIED_PRESET_DONE once the preset has been loaded
preset = status.attribute
self._scripted_preset_load = True
self.preset_manager.execute_entry_base(preset)
...
...
def do_things_after_preset(self, preset_name: str):
...
# returns the loaded preset value here
if self._scripted_preset_load:
self._scripted_preset_load = False
self._leco_commands_signal.emit(ThreadCommand(LECODashboardCommands.APPLIED_PRESET_DONE, True))
Once all of that is done, your component is available through LECO with the sets or methods you defined.
6.8.6.1. Part 2 – Scripting LECO-Enabled Components
Once a component is LECO-enabled you can control it from a plain Python script
without any PyMoDAQ GUI. The scripting utilities live in
pymodaq.scripting.
6.8.7. Class Hierarchy
The scripting layer is built around a two-tier design:
Low-level wrappers (in
pymodaq.scripting.utils) manage the LECO plumbing – listener, communicator, director, and callbacks.Public device classes (in
pymodaq.scripting.devices) expose a clean Python API and hide theFuture-resolution callbacks from the user.
The inheritance tree is:
LECOBaseWrapper
├── LECODeviceWrapper # adds settings management
│ ├── LECOActuatorWrapper # move commands & position callbacks
│ └── LECODetectorWrapper # snap/grab commands & data callbacks
└── LECODashboardWrapper # preset/config/device-list commands
Device[WrapperT] # generic public interface
├── Actuator # wraps LECOActuatorWrapper
├── Detector # wraps LECODetectorWrapper
└── Dashboard # wraps LECODashboardWrapper
6.8.8. LECOBaseWrapper
LECOBaseWrapper provides all the common LECO
infrastructure for every scripting wrapper:
Creates a
pyleco.utils.listener.Listenerregistered on the LECO network underscripting_<device_name>.Obtains a
communicatorfrom the listener and wraps it in apyleco.directors.director.Directorpointed at the target Actor.Registers a
_cleanshutdown hook viaatexit.register().
class LECOBaseWrapper:
def __init__(self, device_name: str, **kwargs) -> None:
self._device_name = device_name
self._listener = Listener(name=self.leco_name, timeout=None)
self._listener.start_listen()
self._communicator = self._listener.get_communicator()
self._director = Director(actor=self.name,
communicator=self._communicator, **kwargs)
atexit.register(self._clean)
@cached_property
def leco_name(self) -> str:
return f'scripting_{self.name}'
@cached_property
def name(self) -> str:
return self._device_name
def _clean(self):
self.sign_out()
self._listener.close()
self._director.close()
self._communicator.close()
def sign_out(self):
self._director.ask_rpc('sign_out', actor='COORDINATOR')
def set_remote_name(self):
self._director.ask_rpc('set_remote_name', name=self.leco_name)
Key points:
leco_nameisscripting_<name>– the name this wrapper registers on the LECO network.set_remote_name()must be called before any RPC that expects an asynchronous reply, so the Actor knows where to send the answer.sign_out()is public and can be called explicitly;_clean()is called automatically at interpreter exit viaatexit.register().
6.8.9. LECODeviceWrapper
LECODeviceWrapper extends
LECOBaseWrapper with settings management
shared by actuators and detectors. In its __init__ it registers two
callbacks on the listener:
set_director_settings(plain RPC) – receives the XML settings bytes and resolves the_settings_future.set_director_info(binary RPC) – placeholder for device-specific information (overridden by subclasses as needed).
class LECODeviceWrapper(LECOBaseWrapper):
def __init__(self, device: str, **kwargs) -> None:
self._base_settings: Optional[Element] = None
self._settings_future: Optional[Future[Element]] = None
super().__init__(device, **kwargs)
self._listener.register_rpc_method(self.set_director_settings)
self._listener.register_binary_rpc_method(self.set_director_info,
accept_binary_input=True)
def get_settings(self) -> Future[Element]:
future = Future()
self._settings_future = future
self.set_remote_name()
self._director.ask_rpc("get_settings")
return future
def set_settings(self, settings: Element):
# Diffs the modified tree against the original and sends only changes.
if self._base_settings is not None:
for (path, modified) in compare_xml_trees(self._base_settings, settings):
... # serialises and sends each changed parameter
...
6.8.10. Writing a New Scripting Wrapper
To expose a new LECO-enabled component via scripting, subclass
LECOBaseWrapper (or
LECODeviceWrapper if settings support is
needed) and follow these steps:
Declare ``Future`` attributes for each asynchronous reply your wrapper expects.
Register Director-side callbacks on
self._listenerin__init__. Useregister_rpc_methodfor plain JSON replies andregister_binary_rpc_methodfor binary-serialised PyMoDAQ objects.Write command methods that create a
Future, callself.set_remote_name(), thenself._director.ask_rpc(<command_name>), and return theFuture.Write callback methods that deserialize the reply and resolve the matching
Future.
Minimal skeleton:
from concurrent.futures import Future, InvalidStateError
from serializall import SerializableFactory
from pymodaq.scripting.utils import LECOBaseWrapper
sf = SerializableFactory()
class MyComponentWrapper(LECOBaseWrapper):
def __init__(self, actor_name: str = "my_component", **kwargs) -> None:
self._some_result_future: Optional[Future] = None
super().__init__(actor_name, **kwargs)
# 2 – register Director-side callbacks
self._listener.register_rpc_method(self.something_done)
# for binary replies:
# self._listener.register_binary_rpc_method(self.some_data,
# accept_binary_input=True)
# 3 – command methods
def do_something(self, value: str) -> Future[str]:
future = Future()
self._some_result_future = future
self.set_remote_name()
# value should be serialized using serializall for complex values
self._director.ask_rpc("do_something", value=value)
return future
# 4 – callback methods
def something_done(self, result: str) -> None:
try:
self._some_result_future.set_result(result)
self._some_result_future = None
except (InvalidStateError, AttributeError):
pass
Binary replies (e.g. a DataToExport) require register_binary_rpc_method
and deserialization:
# in __init__:
self._listener.register_binary_rpc_method(self.set_data, accept_binary_input=True)
# callback:
def set_data(self, data=None, additional_payload=None) -> None:
value = sf.get_apply_deserializer(additional_payload[0])
try:
self._snap_future.set_result(value)
self._snap_future = None
except (InvalidStateError, AttributeError):
pass
Sending binary data to the Actor (e.g. move_abs with a DataActuator):
def move_abs(self, value: DataActuator) -> Future[DataActuator]:
future = Future()
self._move_done_future = future
self.set_remote_name()
self._director.ask_rpc(
"move_abs",
position=None,
additional_payload=[sf.get_apply_serializer(value)],
)
return future
6.8.11. The Device Public Interface
End-user scripts should not deal with the low-level wrappers directly.
Instead, a Device subclass wraps a wrapper
and exposes only the user-facing methods, hiding the Future-resolution
callbacks:
from abc import ABC
from typing import Generic, TypeVar
from pymodaq.scripting.utils import LECOBaseWrapper
WrapperT = TypeVar('WrapperT', bound=LECOBaseWrapper)
class Device(ABC, Generic[WrapperT]):
def __init__(self, wrapper: WrapperT) -> None:
self._wrapper = wrapper
@property
def leco_name(self): return self._wrapper.leco_name
@property
def name(self) -> str: return self._wrapper.name
def sign_out(self) -> None: self._wrapper.sign_out()
class MyComponent(Device[MyComponentWrapper]):
def __init__(self, actor_name: str = "my_component", **kwargs) -> None:
super().__init__(MyComponentWrapper(actor_name, **kwargs))
def do_something(self, value: str) -> Future[str]:
return self._wrapper.do_something(value)
The existing Actuator,
Detector, and
Dashboard classes in
pymodaq.scripting.devices follow exactly this pattern.
6.8.12. Using the Public Classes in a Script
With the public classes in place, scripts are straightforward. Calling
.result() on a Future blocks until the Actor replies:
from pymodaq.scripting import Actuator, Detector, Dashboard
# connect to a running Dashboard and load a preset
dashboard = Dashboard()
dashboard.apply_preset('default').result()
# get all loaded modules as Actuator / Detector instances
devices = dashboard.get_scripting_devices()
theta = devices['actuators']['Angle']
camera = devices['detectors']['Camera']
# blocking move, then sync snap
pos = theta.move_abs('90°').result()
frame = camera.snap().result()
get_scripting_devices() calls get_devices() on the Dashboard and
automatically wraps the returned names in Actuator
and Detector instances, so the caller
never has to manage LECO names manually.