Source code for pymodaq_utils.serialize.utils

import numpy as np
from typing import Tuple, Union


[docs] def split_nbytes(bytes_str: bytes, bytes_len: int) -> Tuple[bytes, bytes]: return bytes_str[:bytes_len], bytes_str[bytes_len:]
[docs] def get_int_from_bytes(bytes_str: bytes) -> Tuple[int, bytes]: """ Convert the 4 first bytes into an integer Returns ------- int: the decoded integer bytes: the remaining bytes string if any """ int_bytes, remaining_bytes = split_nbytes(bytes_str, 4) int_obj = bytes_to_int(int_bytes) return int_obj, remaining_bytes
[docs] def bytes_to_string(message: bytes) -> str: return message.decode()
[docs] def bytes_to_int(bytes_string: bytes) -> int: """Convert a bytes of length 4 into an integer""" if not isinstance(bytes_string, bytes): raise TypeError(f'{bytes_string} should be an bytes string, not a {type(bytes_string)}') assert len(bytes_string) == 4 return int.from_bytes(bytes_string, 'big')
[docs] def bytes_to_scalar(data: bytes, dtype: np.dtype) -> complex: """Convert bytes to a scalar given a certain numpy dtype Parameters ---------- data: bytes dtype:np.dtype Returns ------- numbers.Number """ return np.frombuffer(data, dtype=dtype)[0]
[docs] def bytes_to_nd_array(data: bytes, dtype: np.dtype, shape: Tuple[int]) -> np.ndarray: """Convert bytes to a ndarray given a certain numpy dtype and shape Parameters ---------- data: bytes dtype: np.dtype shape: tuple of int Returns ------- np.ndarray """ array = np.frombuffer(data, dtype=dtype) array = array.reshape(tuple(shape)) array = np.atleast_1d(array) # remove singleton dimensions but keeping ndarrays return array
[docs] def int_to_bytes(an_integer: int) -> bytes: """Convert an unsigned integer into a byte array of length 4 in big endian Parameters ---------- an_integer: int Returns ------- bytearray """ if not isinstance(an_integer, int): raise TypeError(f'{an_integer} should be an integer, not a {type(an_integer)}') elif an_integer < 0: raise ValueError('Can only serialize unsigned integer using this method') return an_integer.to_bytes(4, 'big')
[docs] def str_to_bytes(message: str) -> bytes: if not isinstance(message, str): raise TypeError('Can only serialize str object using this method') return message.encode()
[docs] def str_len_to_bytes( message: Union[str, bytes]) -> Tuple[bytes, bytes]: """ Convert a string and its length to two bytes Parameters ---------- message: str the message to convert Returns ------- bytes: message converted as a byte array bytes: length of the message byte array, itself as a byte array of length 4 """ if not isinstance(message, str) and not isinstance(message, bytes): message = str(message) if not isinstance(message, bytes): message = str_to_bytes(message) return message, int_to_bytes(len(message))