# -*- coding: utf-8 -*-
"""
Created the 20/10/2023
@author: Sebastien Weber
"""
import numbers
from typing import Tuple, List, Union, TYPE_CHECKING, Iterable
import numpy as np
from pymodaq.utils import data as data_mod
from pymodaq.utils.data import DataWithAxes, DataToExport, Axis, DwaType
if TYPE_CHECKING:
from pymodaq.utils.tcp_ip.mysocket import Socket
[docs]class SocketString:
"""Mimic the Socket object but actually using a bytes string not a socket connection
Implements a minimal interface of two methods
Parameters
----------
bytes_string: bytes
See Also
--------
:class:`~pymodaq.utils.tcp_ip.mysocket.Socket`
"""
def __init__(self, bytes_string: bytes):
self._bytes_string = bytes_string
[docs] def check_received_length(self, length: int) -> bytes:
"""
Make sure all bytes (length) that should be received are received through the socket.
Here just read the content of the underlying bytes string
Parameters
----------
length: int
The number of bytes to be read from the socket
Returns
-------
bytes
"""
data = self._bytes_string[0:length]
self._bytes_string = self._bytes_string[length:]
return data
[docs] def get_first_nbytes(self, length: int) -> bytes:
""" Read the first N bytes from the socket
Parameters
----------
length: int
The number of bytes to be read from the socket
Returns
-------
bytes
the read bytes string
"""
return self.check_received_length(length)
[docs]class Serializer:
"""Used to Serialize to bytes python objects, numpy arrays and PyMoDAQ DataWithAxes and DataToExport objects"""
def __init__(self, obj: Union[int, str, numbers.Number, list, np.ndarray, Axis, DataWithAxes, DataToExport] = None):
self._bytes_string = b''
self._obj = obj
[docs] def to_bytes(self):
""" Generic method to obtain the bytes string from various objects
Compatible objects are:
* :class:`bytes`
* :class:`numbers.Number`
* :class:`str`
* :class:`numpy.ndarray`
* :class:`~pymodaq.utils.data.Axis`
* :class:`~pymodaq.utils.data.DataWithAxes` and sub-flavours
* :class:`~pymodaq.utils.data.DataToExport`
* :class:`list` of any objects above
"""
if isinstance(self._obj, bytes):
return self.bytes_serialization(self._obj)
elif isinstance(self._obj, numbers.Number):
return self.scalar_serialization(self._obj)
elif isinstance(self._obj, str):
return self.string_serialization(self._obj)
elif isinstance(self._obj, np.ndarray):
return self.ndarray_serialization(self._obj)
elif isinstance(self._obj, Axis):
return self.axis_serialization(self._obj)
elif self._obj.__class__.__name__ in DwaType.names():
return self.dwa_serialization(self._obj)
elif isinstance(self._obj, DataToExport):
return self.dte_serialization(self._obj)
elif isinstance(self._obj, list):
return self.list_serialization(self._obj)
elif isinstance(self._obj, bool):
return self.scalar_serialization(int(self._obj))
[docs] @staticmethod
def int_to_bytes(an_integer: int) -> bytes:
"""Convert an unsigned integer into a byte array of length 4 in big endian
Parameters
----------
an_integer: int
Returns
-------
bytearray
"""
if not isinstance(an_integer, int):
raise TypeError(f'{an_integer} should be an integer, not a {type(an_integer)}')
elif an_integer < 0:
raise ValueError(f'Can only serialize unsigned integer using this method')
return an_integer.to_bytes(4, 'big')
@staticmethod
def str_to_bytes(message: str) -> bytes:
if not isinstance(message, str):
raise TypeError(f'Can only serialize str object using this method')
return message.encode()
[docs] @classmethod
def str_len_to_bytes(cls, message: str) -> (bytes, bytes):
""" Convert a string and its length to two bytes
Parameters
----------
message: str
the message to convert
Returns
-------
bytes: message converted as a byte array
bytes: length of the message byte array, itself as a byte array of length 4
"""
if not isinstance(message, str) and not isinstance(message, bytes):
message = str(message)
if not isinstance(message, bytes):
message = cls.str_to_bytes(message)
return message, cls.int_to_bytes(len(message))
def _int_serialization(self, int_obj: int) -> bytes:
"""serialize an unsigned integer used for getting the length of messages internaly, for outside integer
serialization or deserialization use scalar_serialization"""
int_bytes = self.int_to_bytes(int_obj)
bytes_string = int_bytes
self._bytes_string += bytes_string
return bytes_string
def bytes_serialization(self, bytes_string_in: bytes) -> bytes:
bytes_string = b''
bytes_string += self.int_to_bytes(len(bytes_string_in))
bytes_string += bytes_string_in
return bytes_string
[docs] def string_serialization(self, string: str) -> bytes:
""" Convert a string into a bytes message together with the info to convert it back
Parameters
----------
string: str
Returns
-------
bytes: the total bytes message to serialize the string
"""
bytes_string = b''
cmd_bytes, cmd_length_bytes = self.str_len_to_bytes(string)
bytes_string += cmd_length_bytes
bytes_string += cmd_bytes
self._bytes_string += bytes_string
return bytes_string
[docs] def scalar_serialization(self, scalar: numbers.Number) -> bytes:
""" Convert a scalar into a bytes message together with the info to convert it back
Parameters
----------
scalar: str
Returns
-------
bytes: the total bytes message to serialize the scalar
"""
if not isinstance(scalar, numbers.Number):
raise TypeError(f'{scalar} should be an integer or a float, not a {type(scalar)}')
scalar_array = np.array([scalar])
data_type = scalar_array.dtype.descr[0][1]
data_bytes = scalar_array.tobytes()
bytes_string = b''
bytes_string += self.string_serialization(data_type)
bytes_string += self._int_serialization(len(data_bytes))
bytes_string += data_bytes
self._bytes_string += bytes_string
return bytes_string
[docs] def ndarray_serialization(self, array: np.ndarray) -> bytes:
""" Convert a ndarray into a bytes message together with the info to convert it back
Parameters
----------
array: np.ndarray
Returns
-------
bytes: the total bytes message to serialize the scalar
Notes
-----
The bytes sequence is constructed as:
* get data type as a string
* reshape array as 1D array and get the array dimensionality (len of array's shape)
* convert Data array as bytes
* serialize data type
* serialize data length
* serialize data shape length
* serialize all values of the shape as integers converted to bytes
* serialize array as bytes
"""
if not isinstance(array, np.ndarray):
raise TypeError(f'{array} should be an numpy array, not a {type(array)}')
array_type = array.dtype.descr[0][1]
array_shape = array.shape
array = array.reshape(array.size)
array_bytes = array.tobytes()
bytes_string = b''
bytes_string += self.string_serialization(array_type)
bytes_string += self._int_serialization(len(array_bytes))
bytes_string += self._int_serialization(len(array_shape))
for shape_elt in array_shape:
bytes_string += self._int_serialization(shape_elt)
bytes_string += array_bytes
self._bytes_string += bytes_string
return bytes_string
[docs] def object_type_serialization(self, obj: Union[Axis, DataToExport, DataWithAxes]) -> bytes:
""" Convert an object type into a bytes message as a string together with the info to convert it back
Applies to Data object from the pymodaq.utils.data module
"""
return self.string_serialization(obj.__class__.__name__)
[docs] def axis_serialization(self, axis: Axis) -> bytes:
""" Convert an Axis object into a bytes message together with the info to convert it back
Parameters
----------
axis: Axis
Returns
-------
bytes: the total bytes message to serialize the Axis
Notes
-----
The bytes sequence is constructed as:
* serialize the type: 'Axis'
* serialize the axis label
* serialize the axis units
* serialize the axis array
* serialize the axis
* serialize the axis spread_order
"""
if not isinstance(axis, Axis):
raise TypeError(f'{axis} should be a list, not a {type(axis)}')
bytes_string = b''
bytes_string += self.object_type_serialization(axis)
bytes_string += self.string_serialization(axis.label)
bytes_string += self.string_serialization(axis.units)
bytes_string += self.ndarray_serialization(axis.get_data())
bytes_string += self.scalar_serialization(axis.index)
bytes_string += self.scalar_serialization(axis.spread_order)
self._bytes_string += bytes_string
return bytes_string
[docs] def list_serialization(self, list_object: List) -> bytes:
""" Convert a list of objects into a bytes message together with the info to convert it back
Parameters
----------
list_object: list
the list could contains either scalars, strings or ndarrays or Axis objects or DataWithAxis objects
module
Returns
-------
bytes: the total bytes message to serialize the list of objects
Notes
-----
The bytes sequence is constructed as:
* the length of the list
Then for each object:
* get data type as a string
* use the serialization method adapted to each object in the list
"""
if not isinstance(list_object, list):
raise TypeError(f'{list_object} should be a list, not a {type(list_object)}')
bytes_string = b''
bytes_string += self._int_serialization(len(list_object))
for obj in list_object:
bytes_string += self.type_and_object_serialization(obj)
self._bytes_string += bytes_string
return bytes_string
def type_and_object_serialization(self, obj):
bytes_string = b''
if isinstance(obj, DataWithAxes):
bytes_string += self.string_serialization('dwa')
bytes_string += self.dwa_serialization(obj)
elif isinstance(obj, Axis):
bytes_string += self.string_serialization('axis')
bytes_string += self.axis_serialization(obj)
elif isinstance(obj, np.ndarray):
bytes_string += self.string_serialization('array')
bytes_string += self.ndarray_serialization(obj)
elif isinstance(obj, str):
bytes_string += self.string_serialization('string')
bytes_string += self.string_serialization(obj)
elif isinstance(obj, numbers.Number):
bytes_string += self.string_serialization('scalar')
bytes_string += self.scalar_serialization(obj)
elif isinstance(obj, bool):
bytes_string += self.string_serialization('bool')
bytes_string += self.scalar_serialization(int(obj))
elif isinstance(obj, list):
bytes_string += self.string_serialization('list')
bytes_string += self.list_serialization(obj)
else:
raise TypeError(
f'the element {obj} type cannot be serialized into bytes, only numpy arrays'
f', strings, or scalars (int or float)')
return bytes_string
[docs] def dwa_serialization(self, dwa: DataWithAxes) -> bytes:
""" Convert a DataWithAxes into a bytes string
Parameters
----------
dwa: DataWithAxes
Returns
-------
bytes: the total bytes message to serialize the DataWithAxes
Notes
-----
The bytes sequence is constructed as:
* serialize the string type: 'DataWithAxes'
* serialize the timestamp: float
* serialize the name
* serialize the source enum as a string
* serialize the dim enum as a string
* serialize the distribution enum as a string
* serialize the list of numpy arrays
* serialize the list of labels
* serialize the origin
* serialize the nav_index tuple as a list of int
* serialize the list of axis
* serialize the errors attributes (None or list(np.ndarray))
* serialize the list of names of extra attributes
* serialize the extra attributes
"""
if not isinstance(dwa, DataWithAxes):
raise TypeError(f'{dwa} should be a DataWithAxes, not a {type(dwa)}')
bytes_string = b''
bytes_string += self.object_type_serialization(dwa)
bytes_string += self.scalar_serialization(dwa.timestamp)
bytes_string += self.string_serialization(dwa.name)
bytes_string += self.string_serialization(dwa.source.name)
bytes_string += self.string_serialization(dwa.dim.name)
bytes_string += self.string_serialization(dwa.distribution.name)
bytes_string += self.list_serialization(dwa.data)
bytes_string += self.list_serialization(dwa.labels)
bytes_string += self.string_serialization(dwa.origin)
bytes_string += self.list_serialization(list(dwa.nav_indexes))
bytes_string += self.list_serialization(dwa.axes)
if dwa.errors is None:
errors = [] # have to use this extra attribute as if I force dwa.errors = [], it will be
#internally modified as None again
else:
errors = dwa.errors
bytes_string += self.list_serialization(errors)
bytes_string += self.list_serialization(dwa.extra_attributes)
for attribute in dwa.extra_attributes:
bytes_string += self.type_and_object_serialization(getattr(dwa, attribute))
self._bytes_string += bytes_string
return bytes_string
[docs] def dte_serialization(self, dte: DataToExport) -> bytes:
""" Convert a DataToExport into a bytes string
Parameters
----------
dte: DataToExport
Returns
-------
bytes: the total bytes message to serialize the DataToExport
Notes
-----
The bytes sequence is constructed as:
* serialize the string type: 'DataToExport'
* serialize the timestamp: float
* serialize the name
* serialize the list of DataWithAxes
"""
if not isinstance(dte, DataToExport):
raise TypeError(f'{dte} should be a DataToExport, not a {type(dte)}')
bytes_string = b''
bytes_string += self.object_type_serialization(dte)
bytes_string += self.scalar_serialization(dte.timestamp)
bytes_string += self.string_serialization(dte.name)
bytes_string += self.list_serialization(dte.data)
self._bytes_string += bytes_string
return bytes_string
[docs]class DeSerializer:
"""Used to DeSerialize bytes to python objects, numpy arrays and PyMoDAQ Axis, DataWithAxes and DataToExport
objects
Parameters
----------
bytes_string: bytes or Socket
the bytes string to deserialize into an object: int, float, string, arrays, list, Axis, DataWithAxes...
Could also be a Socket object reading bytes from the network having a `get_first_nbytes` method
See Also
--------
:py:class:`~pymodaq.utils.tcp_ip.serializer.SocketString`
:py:class:`~pymodaq.utils.tcp_ip.mysocket.Socket`
"""
def __init__(self, bytes_string: Union[bytes, 'Socket'] = None):
if isinstance(bytes_string, bytes):
bytes_string = SocketString(bytes_string)
self._bytes_string = bytes_string
@staticmethod
def bytes_to_string(message: bytes) -> str:
return message.decode()
[docs] @staticmethod
def bytes_to_int(bytes_string: bytes) -> int:
"""Convert a bytes of length 4 into an integer"""
if not isinstance(bytes_string, bytes):
raise TypeError(f'{bytes_string} should be an bytes string, not a {type(bytes_string)}')
assert len(bytes_string) == 4
return int.from_bytes(bytes_string, 'big')
[docs] @staticmethod
def bytes_to_scalar(data: bytes, dtype: np.dtype) -> numbers.Number:
"""Convert bytes to a scalar given a certain numpy dtype
Parameters
----------
data: bytes
dtype:np.dtype
Returns
-------
numbers.Number
"""
return np.frombuffer(data, dtype=dtype)[0]
[docs] @staticmethod
def bytes_to_nd_array(data: bytes, dtype: np.dtype, shape: Tuple[int]) -> np.ndarray:
"""Convert bytes to a ndarray given a certain numpy dtype and shape
Parameters
----------
data: bytes
dtype: np.dtype
shape: tuple of int
Returns
-------
np.ndarray
"""
array = np.frombuffer(data, dtype=dtype)
array = array.reshape(tuple(shape))
array = np.atleast_1d(array) # remove singleton dimensions but keeping ndarrays
return array
def _int_deserialization(self) -> int:
"""Convert the fourth first bytes into an unsigned integer to be used internally. For integer serialization
use scal_serialization"""
int_obj = self.bytes_to_int(self._bytes_string.get_first_nbytes(4))
return int_obj
[docs] def string_deserialization(self) -> str:
"""Convert bytes into a str object
Convert first the fourth first bytes into an int encoding the length of the string to decode
Returns
-------
str: the decoded string
"""
string_len = self._int_deserialization()
str_obj = self._bytes_string.get_first_nbytes(string_len).decode()
return str_obj
[docs] def scalar_deserialization(self) -> numbers.Number:
"""Convert bytes into a numbers.Number object
Get first the data type from a string deserialization, then the data length and finally convert this
length of bytes into a number (float, int)
Returns
-------
numbers.Number: the decoded number
"""
data_type = self.string_deserialization()
data_len = self._int_deserialization()
number = np.frombuffer(self._bytes_string.get_first_nbytes(data_len), dtype=data_type)[0]
if 'f' in data_type:
number = float(number) # because one get numpy float type
elif 'i' in data_type:
number = int(number) # because one get numpy int type
return number
[docs] def boolean_deserialization(self) -> bool:
"""Convert bytes into a boolean object
Get first the data type from a string deserialization, then the data length and finally
convert this length of bytes into a number (float, int) and cast it to a bool
Returns
-------
bool: the decoded boolean
"""
number = self.scalar_deserialization()
return bool(number)
[docs] def ndarray_deserialization(self) -> np.ndarray:
"""Convert bytes into a numpy ndarray object
Convert the first bytes into a ndarray reading first information about the array's data
Returns
-------
ndarray: the decoded numpy array
"""
ndarray_type = self.string_deserialization()
ndarray_len = self._int_deserialization()
shape_len = self._int_deserialization()
shape = []
for ind in range(shape_len):
shape_elt = self._int_deserialization()
shape.append(shape_elt)
ndarray = np.frombuffer(self._bytes_string.get_first_nbytes(ndarray_len), dtype=ndarray_type)
ndarray = ndarray.reshape(tuple(shape))
ndarray = np.atleast_1d(ndarray) # remove singleton dimensions
return ndarray
def object_deserialization(self):
obj_type = self.string_deserialization()
elt = None
if obj_type == 'scalar':
elt = self.scalar_deserialization()
elif obj_type == 'string':
elt = self.string_deserialization()
elif obj_type == 'array':
elt = self.ndarray_deserialization()
elif obj_type == 'dwa':
elt = self.dwa_deserialization()
elif obj_type == 'axis':
elt = self.axis_deserialization()
elif obj_type == 'bool':
elt = self.boolean_deserialization()
elif obj_type == 'list':
elt = self.list_deserialization()
else:
print(f'invalid object type {obj_type}')
return elt
[docs] def list_deserialization(self) -> list:
"""Convert bytes into a list of homogeneous objects
Convert the first bytes into a list reading first information about the list elt types, length ...
Returns
-------
list: the decoded list
"""
list_obj = []
list_len = self._int_deserialization()
for ind in range(list_len):
list_obj.append(self.object_deserialization())
return list_obj
[docs] def axis_deserialization(self) -> Axis:
"""Convert bytes into an Axis object
Convert the first bytes into an Axis reading first information about the Axis
Returns
-------
Axis: the decoded Axis
"""
class_name = self.string_deserialization()
if class_name != Axis.__name__:
raise TypeError(f'Attempting to deserialize an Axis but got the bytes for a {class_name}')
axis_label = self.string_deserialization()
axis_units = self.string_deserialization()
axis_array = self.ndarray_deserialization()
axis_index = self.scalar_deserialization()
axis_spread_order = self.scalar_deserialization()
axis = Axis(axis_label, axis_units, data=axis_array, index=axis_index,
spread_order=axis_spread_order)
return axis
[docs] def dwa_deserialization(self) -> DataWithAxes:
"""Convert bytes into a DataWithAxes object
Convert the first bytes into a DataWithAxes reading first information about the underlying data
Returns
-------
DataWithAxes: the decoded DataWithAxes
"""
class_name = self.string_deserialization()
if class_name not in DwaType.names():
raise TypeError(f'Attempting to deserialize a DataWithAxes flavor but got the bytes for a {class_name}')
timestamp = self.scalar_deserialization()
dwa = getattr(data_mod, class_name)(self.string_deserialization(),
source=self.string_deserialization(),
dim=self.string_deserialization(),
distribution=self.string_deserialization(),
data=self.list_deserialization(),
labels=self.list_deserialization(),
origin=self.string_deserialization(),
nav_indexes=tuple(self.list_deserialization()),
axes=self.list_deserialization(),
)
errors = self.list_deserialization()
if len(errors) != 0:
dwa.errors = errors
dwa.extra_attributes = self.list_deserialization()
for attribute in dwa.extra_attributes:
setattr(dwa, attribute, self.object_deserialization())
dwa.timestamp = timestamp
return dwa
[docs] def dte_deserialization(self) -> DataToExport:
"""Convert bytes into a DataToExport object
Convert the first bytes into a DataToExport reading first information about the underlying data
Returns
-------
DataToExport: the decoded DataToExport
"""
class_name = self.string_deserialization()
if class_name != DataToExport.__name__:
raise TypeError(f'Attempting to deserialize a DataToExport but got the bytes for a {class_name}')
timestamp = self.scalar_deserialization()
dte = DataToExport(self.string_deserialization(),
data=self.list_deserialization(),
)
dte.timestamp = timestamp
return dte