# -*- coding: utf-8 -*-
"""
Created on Fri Aug 30 12:21:56 2019
@author: Weber
"""
from collections import OrderedDict
import select
from typing import List
import socket
from threading import Timer
import numpy as np
from qtpy.QtCore import QObject, Signal, Slot, QThread
from qtpy import QtWidgets
from pymodaq.utils.parameter import utils as putils
from pymodaq.utils.parameter import ioxml
from pymodaq.utils.daq_utils import getLineInfo, ThreadCommand
from pymodaq.utils.data import DataFromPlugins, DataActuator
from pymodaq.utils import math_utils as mutils
from pymodaq.utils.config import Config
from pymodaq.utils.parameter import Parameter
from pymodaq.utils.data import DataToExport
from pymodaq.utils.tcp_ip.mysocket import Socket
from pymodaq.utils.tcp_ip.serializer import Serializer, DeSerializer
from pymodaq.utils.managers.parameter_manager import ParameterManager
config = Config()
tcp_parameters = [
{'title': 'Port:', 'name': 'port_id', 'type': 'int', 'value': config('network', 'tcp-server', 'port'), },
{'title': 'IP:', 'name': 'socket_ip', 'type': 'str', 'value': config('network', 'tcp-server', 'ip'), },
{'title': 'Settings PyMoDAQ Client:', 'name': 'settings_client', 'type': 'group', 'children': []},
{'title': 'Infos Client:', 'name': 'infos', 'type': 'group', 'children': []},
{'title': 'Connected clients:', 'name': 'conn_clients', 'type': 'table',
'value': dict(), 'header': ['Type', 'adress']}, ]
class TCPClientTemplate:
def __init__(self, ipaddress="192.168.1.62", port=6341, client_type=""):
"""Create a socket client
Parameters
----------
ipaddress: (str) the IP address of the server
port: (int) the port where to communicate with the server
client_type: (str) should be one of the accepted client_type by the TCPServer instance (within pymodaq it is
either 'GRABBER' or 'ACTUATOR'
"""
super().__init__()
self.ipaddress = ipaddress
self.port = port
self._socket: Socket = None
self._deserializer: DeSerializer = None
self.connected = False
self.client_type = client_type
self.timer = Timer(0.1, self.poll_connection)
@property
def socket(self) -> Socket:
return self._socket
@socket.setter
def socket(self, sock: Socket):
self._socket = sock
self._deserializer = DeSerializer(sock)
def close(self):
if self.socket is not None:
self.socket.close()
def _connect_socket(self):
# create an INET, STREAMing socket
self.socket = Socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
# now connect to the web server on port 80 - the normal http port
self.socket.connect((self.ipaddress, self.port))
def init_connection(self, extra_commands=[]):
"""init the socket connection then call the post_init method where to place custom
initialization"""
try:
self._connect_socket()
self.post_init(extra_commands)
self.connected = True
self.poll_connection()
#self.timer.start()
except ConnectionRefusedError as e:
self.not_connected(e)
self.connected = False
def poll_connection(self):
while True:
try:
ready_to_read, ready_to_write, in_error = \
select.select([self.socket.socket], [self.socket.socket], [self.socket.socket], 0)
if len(ready_to_read) != 0:
self.ready_to_read()
if len(in_error) != 0:
self.ready_with_error()
if len(ready_to_write) != 0:
self.ready_to_write()
QtWidgets.QApplication.processEvents()
except Exception as e:
self.process_error_in_polling(e)
break
def not_connected(self, e: ConnectionRefusedError):
raise NotImplementedError
def ready_to_read(self):
"""Do stuff (like read data) when messages arrive through the socket"""
raise NotImplementedError
def ready_to_write(self):
"""Send stuff into the socket"""
raise NotImplementedError
def ready_with_error(self):
"""Error in the socket communication"""
raise NotImplementedError
def process_error_in_polling(self, e: Exception):
raise NotImplementedError
def post_init(self, extra_commands=[]):
"""To implement in a real object implementation"""
raise NotImplementedError
[docs]class TCPClient(TCPClientTemplate, QObject):
"""
PyQt5 object initializing a TCP socket client. Can be used by any module but is a builtin functionality of all
actuators and detectors of PyMoDAQ
The module should init TCPClient, move it in a thread and communicate with it using a custom signal connected to
TCPClient.queue_command slot. The module should also connect TCPClient.cmd_signal to one of its methods inorder to
get info/data back from the client
The client itself communicate with a TCP server, it is best to use a server object subclassing the TCPServer
class defined within this python module
Parameters
----------
params_state: (dict) state of the Parameter settings of the module instantiating this client and wishing to
export its settings to the server. Obtained from param.saveState() where param is an
instance of Parameter object, see pyqtgraph.parametertree::Parameter
"""
cmd_signal = Signal(ThreadCommand) # signal to connect with a module slot in order to start communication back
params = []
def __init__(self, ipaddress="192.168.1.62", port=6341, params_state=None,
client_type="GRABBER"):
"""Create a socket client particularly fit to be used with PyMoDAQ's TCPServer
Parameters
----------
ipaddress: (str) the IP address of the server
port: (int) the port where to communicate with the server
params_state: (dict) state of the Parameter settings of the module instantiating this client and wishing to
export its settings to the server. Obtained from param.saveState() where param is an
instance of Parameter object, see pyqtgraph.parametertree::Parameter
client_type: (str) should be one of the accepted client_type by the TCPServer instance (within pymodaq it is
either 'GRABBER' or 'ACTUATOR'
"""
QObject.__init__(self)
TCPClientTemplate.__init__(self, ipaddress, port, client_type)
self.settings = Parameter.create(name='Settings', type='group', children=self.params)
if params_state is not None:
if isinstance(params_state, dict):
self.settings.restoreState(params_state)
elif isinstance(params_state, Parameter):
self.settings.restoreState(params_state.saveState())
def send_data(self, data: DataToExport):
# first send 'Done' and then send the length of the list
if not isinstance(data, DataToExport):
raise TypeError(f'should send a DataToExport object')
if self.socket is not None:
self.socket.check_sended_with_serializer('Done')
self.socket.check_sended_with_serializer(data)
def send_infos_xml(self, infos: str):
if self.socket is not None:
self.socket.check_sended_with_serializer('Infos')
self.socket.check_sended_with_serializer(infos)
def send_info_string(self, info_to_display, value_as_string):
if self.socket is not None:
self.socket.check_sended_with_serializer('Info')
self.socket.check_sended_with_serializer(info_to_display) # the actual info to display as a string
if not isinstance(value_as_string, str):
value_as_string = str(value_as_string)
self.socket.check_sended_with_serializer(value_as_string)
[docs] @Slot(ThreadCommand)
def queue_command(self, command=ThreadCommand):
"""
when this TCPClient object is within a thread, the corresponding module communicate with it with signal and slots
from module to client: module_signal to queue_command slot
from client to module: self.cmd_signal to a module slot
"""
if command.command == "ini_connection":
status = self.init_connection()
elif command.command == "quit":
try:
self.socket.close()
except Exception as e:
pass
finally:
self.cmd_signal.emit(ThreadCommand('disconnected'))
elif command.command == 'update_connection':
self.ipaddress = command.attribute['ipaddress']
self.port = command.attribute['port']
elif command.command == 'data_ready':
self.data_ready(command.attribute)
elif command.command == 'send_info':
if self.socket is not None:
path = command.attribute['path']
param = command.attribute['param']
self.socket.check_sended_with_serializer('Info_xml')
self.socket.check_sended_with_serializer(path)
# send value
data = ioxml.parameter_to_xml_string(param)
self.socket.check_sended_with_serializer(data)
elif command.command == 'position_is':
if self.socket is not None:
self.socket.check_sended_with_serializer('position_is')
self.socket.check_sended_with_serializer(command.attribute[0])
elif command.command == 'move_done':
if self.socket is not None:
self.socket.check_sended_with_serializer('move_done')
self.socket.check_sended_with_serializer(command.attribute[0])
elif command.command == 'x_axis':
raise DeprecationWarning('Getting axis though TCPIP is deprecated use the data objects directly')
elif command.command == 'y_axis':
raise DeprecationWarning('Getting axis though TCPIP is deprecated use the data objects directly')
else:
raise IOError('Unknown TCP client command')
def not_connected(self, e):
self.connected = False
self.cmd_signal.emit(ThreadCommand('disconnected'))
self.cmd_signal.emit(ThreadCommand('Update_Status', [getLineInfo() + str(e), 'log']))
[docs] def ready_to_read(self):
message = self._deserializer.string_deserialization()
self.get_data(message)
[docs] def ready_to_write(self):
pass
[docs] def ready_with_error(self):
self.connected = False
self.cmd_signal.emit(ThreadCommand('disconnected'))
def process_error_in_polling(self, e: Exception):
try:
self.cmd_signal.emit(ThreadCommand('Update_Status', [getLineInfo() + str(e), 'log']))
self.socket.check_sended_with_serializer('Quit')
self.socket.close()
except Exception: # pragma: no cover
pass
[docs] def post_init(self, extra_commands=[]):
self.cmd_signal.emit(ThreadCommand('connected'))
self.socket.check_sended_with_serializer(self.client_type)
self.send_infos_xml(ioxml.parameter_to_xml_string(self.settings))
for command in extra_commands:
if isinstance(command, ThreadCommand):
self.cmd_signal.emit(command)
[docs] def get_data(self, message: str):
"""
Parameters
----------
message
Returns
-------
"""
if self.socket is not None:
messg = ThreadCommand(message)
if message == 'set_info':
path = self._deserializer.list_deserialization()
param_xml = self._deserializer.string_deserialization()
messg.attribute = [path, param_xml]
elif message == 'move_abs' or message == 'move_rel':
position = self._deserializer.dwa_deserialization()
messg.attribute = [position]
self.cmd_signal.emit(messg)
def data_ready(self, data: DataToExport):
self.send_data(data)
[docs]class TCPServer(QObject):
"""
Abstract class to be used as inherited by DAQ_Viewer_TCP or DAQ_Move_TCP
"""
def __init__(self, client_type='GRABBER'):
QObject.__init__(self)
self.serversocket: Socket = None
self.connected_clients = []
self.listening = True
self.processing = False
self.client_type = client_type
[docs] def close_server(self):
"""
close the current opened server.
Update the settings tree consequently.
See Also
--------
set_connected_clients_table, daq_utils.ThreadCommand
"""
server_socket = self.find_socket_within_connected_clients('server')
self.remove_client(server_socket)
def init_server(self):
self.emit_status(ThreadCommand("Update_Status", [
"Started new server for {:s}:{:d}".format(self.settings['socket_ip'],
self.settings['port_id']), 'log']))
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.serversocket = Socket(serversocket)
# bind the socket to a public host, and a well-known port
try:
self.serversocket.bind(
(self.settings['socket_ip'], self.settings['port_id']))
# self.serversocket.bind((socket.gethostname(), self.settings.child(('port_id')).value()))
except socket.error as msg:
self.emit_status(ThreadCommand("Update_Status",
['Bind failed. Error Code : ' + str(msg.errno) + ' Message ' + msg.strerror,
'log']))
raise ConnectionError('Bind failed. Error Code : ' + str(msg.errno) + ' Message ' + msg.strerror)
self.serversocket.listen(1)
self.connected_clients.append(dict(socket=self.serversocket, type='server'))
self.settings.child('conn_clients').setValue(self.set_connected_clients_table())
self.timer = self.startTimer(100) # Timer event fired every 100ms
# self.listen_client()
[docs] def timerEvent(self, event):
"""
Called by set timers.
If the process is free, start the listen_client function.
=============== ==================== ==============================================
**Parameters** **Type** **Description**
*event* QTimerEvent object Containing id from timer issuing this event
=============== ==================== ==============================================
See Also
--------
listen_client
"""
if not self.processing:
self.listen_client()
[docs] def find_socket_within_connected_clients(self, client_type) -> Socket:
"""
Find a socket from a connected client with socket type corresponding.
=============== =========== ================================
**Parameters** **Type** **Description**
*client_type* string The corresponding client type
=============== =========== ================================
Returns
-------
dictionnary
the socket dictionnary
"""
res = None
for socket_dict in self.connected_clients:
if socket_dict['type'] == client_type:
res = socket_dict['socket']
return res
[docs] def find_socket_type_within_connected_clients(self, sock):
"""
Find a socket type from a connected client with socket content corresponding.
=============== =========== ===================================
**Parameters** **Type** **Description**
*sock* ??? The socket content corresponding.
=============== =========== ===================================
Returns
-------
dictionnary
the socket dictionnary
"""
res = None
for socket_dict in self.connected_clients:
if socket_dict['socket'] == sock:
res = socket_dict['type']
return res
[docs] def set_connected_clients_table(self):
"""
"""
con_clients = OrderedDict()
for socket_dict in self.connected_clients:
try:
address = str(socket_dict['socket'].getsockname())
except Exception:
address = "unconnected invalid socket"
con_clients[socket_dict['type']] = address
return con_clients
[docs] @Slot(list)
def print_status(self, status):
"""
Print the given status.
=============== ============= ================================================
**Parameters** **Type** **Description**
*status* string list a string list representing the status socket
=============== ============= ================================================
"""
print(status)
def remove_client(self, sock):
sock_type = self.find_socket_type_within_connected_clients(sock)
if sock_type is not None:
self.connected_clients.remove(dict(socket=sock, type=sock_type))
self.settings.child('conn_clients').setValue(self.set_connected_clients_table())
try:
sock.close()
except Exception:
pass
self.emit_status(ThreadCommand("Update_Status", ['Client ' + sock_type + ' disconnected', 'log']))
[docs] def select(self, rlist, wlist=[], xlist=[], timeout=0):
"""
Implements the select method, https://docs.python.org/3/library/select.html
Parameters
----------
rlist: (list) wait until ready for reading
wlist: (list) wait until ready for writing
xlist: (list) wait for an “exceptional condition”
timeout: (float) optional timeout argument specifies a time-out as a floating point number in seconds.
When the timeout argument is omitted the function blocks until at least one file descriptor is ready.
A time-out value of zero specifies a poll and never blocks.
Returns
-------
list: readable sockets
list: writable sockets
list: sockets with error pending
"""
read_sockets, write_sockets, error_sockets = select.select([sock.socket for sock in rlist],
[sock.socket for sock in wlist],
[sock.socket for sock in xlist],
timeout)
return ([Socket(sock) for sock in read_sockets], [Socket(sock) for sock in write_sockets],
[Socket(sock) for sock in error_sockets])
[docs] def listen_client(self):
"""
Server function.
Used to connect or listen incoming message from a client.
"""
try:
self.processing = True
# QtWidgets.QApplication.processEvents() #to let external commands in
read_sockets, write_sockets, error_sockets = self.select(
[client['socket'] for client in self.connected_clients], [],
[client['socket'] for client in self.connected_clients],
0)
for sock in error_sockets:
self.remove_client(sock)
for sock in read_sockets:
QThread.msleep(100)
if sock == self.serversocket: # New connection
# means a new socket (client) try to reach the server
(client_socket, address) = self.serversocket.accept()
DAQ_type = DeSerializer(client_socket).string_deserialization()
if DAQ_type not in self.socket_types:
self.emit_status(ThreadCommand("Update_Status", [DAQ_type + ' is not a valid type', 'log']))
client_socket.close()
break
self.connected_clients.append(dict(socket=client_socket, type=DAQ_type))
self.settings.child('conn_clients').setValue(self.set_connected_clients_table())
self.emit_status(ThreadCommand("Update_Status",
[DAQ_type + ' connected with ' + address[0] + ':' + str(address[1]),
'log']))
QtWidgets.QApplication.processEvents()
else: # Some incoming message from a client
# Data received from client, process it
try:
message = DeSerializer(sock).string_deserialization()
if message in ['Done', 'Info', 'Infos', 'Info_xml', 'position_is', 'move_done']:
self.process_cmds(message, command_sock=None)
elif message == 'Quit':
raise Exception("socket disconnect by user")
else:
self.process_cmds(message, command_sock=sock)
# client disconnected, so remove from socket list
except Exception as e:
self.remove_client(sock)
self.processing = False
except Exception as e:
self.emit_status(ThreadCommand("Update_Status", [str(e), 'log']))
[docs] def send_command(self, sock: Socket, command="move_at"):
"""
Send one of the message contained in self.message_list toward a socket with identity socket_type.
First send the length of the command with 4bytes.
=============== =========== ==========================
**Parameters** **Type** **Description**
*sock* ??? The current socket
*command* string The command as a string
=============== =========== ==========================
See Also
--------
utility_classes.DAQ_Viewer_base.emit_status, daq_utils.ThreadCommand, message_to_bytes
"""
if command not in self.message_list:
self.emit_status(
ThreadCommand("Update_Status", [f'Command: {command} is not in the specified list: {self.message_list}',
'log']))
return
if sock is not None:
sock.check_sended_with_serializer(command)
def emit_status(self, status):
print(status)
def read_data(self, sock):
pass
def send_data(self, sock, data):
pass
def command_done(self, command_sock):
pass
def command_to_from_client(self, command):
pass
[docs] def process_cmds(self, command, command_sock=None):
"""
Process the given command.
"""
if command not in self.message_list:
self.emit_status(
ThreadCommand("Update_Status", [f'Command: {command} is not in the specified list: {self.message_list}',
'log']))
return
if command == 'Done': # means the given socket finished grabbing data and is ready to send them
self.command_done(command_sock)
elif command == "Infos":
"""replace entirely the client settings information on the server widget
should be done as the init of the client module"""
try:
sock = self.find_socket_within_connected_clients(self.client_type)
if sock is not None: # if client self.client_type is connected then send it the command
self.read_infos(sock)
except Exception as e:
self.emit_status(ThreadCommand("Update_Status", [str(e), 'log']))
elif command == 'Info_xml':
"""update the state of one of the client settings on the server widget"""
sock = self.find_socket_within_connected_clients(self.client_type)
if sock is not None:
self.read_info_xml(sock)
elif command == "Info":
# add a custom info (as a string value) in the server widget settings. To be used if the client is not a
# PyMoDAQ's module
try:
sock = self.find_socket_within_connected_clients(self.client_type)
if sock is not None: # if client self.client_type is connected then send it the command
self.read_info(sock)
except Exception as e:
self.emit_status(ThreadCommand("Update_Status", [str(e), 'log']))
else:
self.command_to_from_client(command)
def read_infos(self, sock: Socket = None, infos=''):
if sock is not None:
infos = DeSerializer(sock).string_deserialization()
params = ioxml.XML_string_to_parameter(infos)
param_state = {'title': 'Infos Client:', 'name': 'settings_client', 'type': 'group', 'children': params}
self.settings.child('settings_client').restoreState(param_state)
def read_info_xml(self, sock: Socket, path=['settings', 'apathinsettings'], param_xml=''):
if sock is not None:
deser = DeSerializer(sock)
path = deser.list_deserialization()
param_xml = deser.string_deserialization()
try:
param_dict = ioxml.XML_string_to_parameter(param_xml)[0]
except Exception as e:
raise Exception(f'Invalid xml structure for TCP server settings: {str(e)}')
try:
param_here = self.settings.child('settings_client', *path[1:])
except Exception as e:
raise Exception(f'Invalid path for TCP server settings: {str(e)}')
param_here.restoreState(param_dict)
[docs] def read_info(self, sock: Socket=None, test_info='an_info', test_value=''):
"""
if the client is not from PyMoDAQ it can use this method to display some info into the server widget
"""
# #first get the info type
if sock is None:
info = test_info
data = test_value
else:
deser = DeSerializer(sock)
info = deser.string_deserialization()
data = deser.string_deserialization()
if info not in putils.iter_children(self.settings.child('infos'), []):
self.settings.child('infos').addChild({'name': info, 'type': 'str', 'value': data})
pass
else:
self.settings.child('infos', info).setValue(data)
[docs]class MockServer(TCPServer):
params = []
def __init__(self, client_type='GRABBER'):
super().__init__(client_type)
self.settings = Parameter.create(name='settings', type='group', children=tcp_parameters)
class MockDataGrabber:
def __init__(self, grabber_dim='2D'):
super().__init__()
self.Nx = 256
self.Ny = 128
self.x_axis = np.linspace(0, self.Nx-1, self.Nx)
self.y_axis = np.linspace(0, self.Ny-1, self.Ny)
self.grabber_dim = grabber_dim
def grab(self) -> List[DataFromPlugins]:
if self.grabber_dim == '0D':
return [DataFromPlugins(data=[np.array([np.random.rand()])])]
elif self.grabber_dim == '1D':
return [DataFromPlugins(data=[mutils.gauss1D(self.x_axis, 128, 25) +
np.random.rand(self.Nx)])]
elif self.grabber_dim == '2D':
return [DataFromPlugins(data=[mutils.gauss2D(self.x_axis, 128, 65,
self.y_axis, 60, 10) +
np.random.rand(self.Ny, self.Nx)])]
[docs]class Grabber(QObject):
command_tcpip = Signal(ThreadCommand)
def __init__(self, grab_method=None):
super().__init__()
self.send_to_tcpip = False
self.grab_method = grab_method
def connect_tcp_ip(self, ip='localhost', port=6341):
self.tcpclient_thread = QThread()
tcpclient = TCPClient(ip, port=port)
tcpclient.moveToThread(self.tcpclient_thread)
self.tcpclient_thread.tcpclient = tcpclient
tcpclient.cmd_signal.connect(self.process_tcpip_cmds)
self.command_tcpip[ThreadCommand].connect(tcpclient.queue_command)
self.tcpclient_thread.start()
#tcpclient.init_connection(extra_commands=[ThreadCommand('get_axis', )])
tcpclient.init_connection()
self.send_to_tcpip = True
def snapshot(self, info='', send_to_tcpip=True):
self.grab_data()
[docs] def grab_data(self):
"""
Do a grab session using 2 profile :
* if grab pb checked do a continous save and send an "update_channels" thread command and a "grab" too.
* if not send a "stop_grab" thread command with settings "main settings-naverage" node value as an attribute.
See Also
--------
daq_utils.ThreadCommand, set_enabled_Ini_buttons
"""
data = self.grab_method()
self.command_tcpip.emit(ThreadCommand('data_ready', data))
@Slot(ThreadCommand)
def process_tcpip_cmds(self, status):
if 'Send Data' in status.command:
self.snapshot('', send_to_tcpip=True)
elif status.command == 'connected':
#self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(True)
pass
elif status.command == 'disconnected':
#self.settings.child('main_settings', 'tcpip', 'tcp_connected').setValue(False)
pass
elif status.command == 'Update_Status':
print(status)
if __name__ == '__main__': # pragma: no cover
import sys
app = QtWidgets.QApplication(sys.argv)
mockdata_grabber = MockDataGrabber('2D')
grabber = Grabber(mockdata_grabber.grab)
grabber.connect_tcp_ip()
sys.exit(app.exec_())