Source code for pymodaq.utils.data

# -*- coding: utf-8 -*-
"""
Created the 28/10/2022

@author: Sebastien Weber
"""
from __future__ import annotations

from abc import ABCMeta, abstractmethod, abstractproperty
import numbers
import numpy as np
from typing import List, Tuple, Union, Any, Callable
from typing import Iterable as IterableType
from collections.abc import Iterable
from collections import OrderedDict
import logging

import warnings
from time import time
import copy

from multipledispatch import dispatch
from pymodaq.utils.enums import BaseEnum, enum_checker
from pymodaq.utils.messenger import deprecation_msg
from pymodaq.utils.daq_utils import find_objects_in_list_from_attr_name_val
from pymodaq.utils.logger import set_logger, get_module_name
from pymodaq.utils.slicing import SpecialSlicersData
from pymodaq.utils import math_utils as mutils
from pymodaq.utils.config import Config
from pymodaq.utils.plotting.plotter.plotter import PlotterFactory

config = Config()
plotter_factory = PlotterFactory()
logger = set_logger(get_module_name(__file__))


def squeeze(data_array: np.ndarray, do_squeeze=True, squeeze_indexes: Tuple[int]=None) -> np.ndarray:
    """ Squeeze numpy arrays return at least 1D arrays except if do_squeeze is False"""
    if do_squeeze:
        return np.atleast_1d(np.squeeze(data_array, axis=squeeze_indexes))
    else:
        return np.atleast_1d(data_array)


class DataIndexWarning(Warning):
    pass


class DataTypeWarning(Warning):
    pass


class DataDimWarning(Warning):
    pass


class DataSizeWarning(Warning):
    pass


WARNINGS = [DataIndexWarning, DataTypeWarning, DataDimWarning, DataSizeWarning]

if logging.getLevelName(logger.level) == 'DEBUG':
    for warning in WARNINGS:
        warnings.filterwarnings('default', category=warning)
else:
    for warning in WARNINGS:
        warnings.filterwarnings('ignore', category=warning)


class DataShapeError(Exception):
    pass


class DataLengthError(Exception):
    pass


class DataDimError(Exception):
    pass


class DwaType(BaseEnum):
    DataWithAxes = 0
    DataRaw = 1
    DataActuator = 2
    DataFromPlugins = 3
    DataCalculated = 4


[docs]class DataDim(BaseEnum): """Enum for dimensionality representation of data""" Data0D = 0 Data1D = 1 Data2D = 2 DataND = 3 def __le__(self, other_dim: 'DataDim'): other_dim = enum_checker(DataDim, other_dim) return self.value.__le__(other_dim.value) def __lt__(self, other_dim: 'DataDim'): other_dim = enum_checker(DataDim, other_dim) return self.value.__lt__(other_dim.value) def __ge__(self, other_dim: 'DataDim'): other_dim = enum_checker(DataDim, other_dim) return self.value.__ge__(other_dim.value) def __gt__(self, other_dim: 'DataDim'): other_dim = enum_checker(DataDim, other_dim) return self.value.__gt__(other_dim.value) @property def dim_index(self): return self.value @staticmethod def from_data_array(data_array: np.ndarray): if len(data_array.shape) == 1 and data_array.size == 1: return DataDim['Data0D'] elif len(data_array.shape) == 1 and data_array.size > 1: return DataDim['Data1D'] elif len(data_array.shape) == 2: return DataDim['Data2D'] else: return DataDim['DataND']
[docs]class DataSource(BaseEnum): """Enum for source of data""" raw = 0 calculated = 1
[docs]class DataDistribution(BaseEnum): """Enum for distribution of data""" uniform = 0 spread = 1
[docs]class Axis: """Object holding info and data about physical axis of some data In case the axis's data is linear, store the info as a scale and offset else store the data Parameters ---------- label: str The label of the axis, for instance 'time' for a temporal axis units: str The units of the data in the object, for instance 's' for seconds data: ndarray A 1D ndarray holding the data of the axis index: int an integer representing the index of the Data object this axis is related to scaling: float The scaling to apply to a linspace version in order to obtain the proper scaling offset: float The offset to apply to a linspace/scaled version in order to obtain the proper axis size: int The size of the axis array (to be specified if data is None) spread_order: int An integer needed in the case where data has a spread DataDistribution. It refers to the index along the data's spread_index dimension Examples -------- >>> axis = Axis('myaxis', units='seconds', data=np.array([1,2,3,4,5]), index=0) """ def __init__(self, label: str = '', units: str = '', data: np.ndarray = None, index: int = 0, scaling=None, offset=None, size=None, spread_order: int = 0): super().__init__() self.iaxis: Axis = SpecialSlicersData(self, False) self._size = size self._data = None self._index = None self._label = None self._units = None self._scaling = scaling self._offset = offset self.units = units self.label = label self.data = data self.index = index self.spread_order = spread_order if (scaling is None or offset is None or size is None) and data is not None: self.get_scale_offset_from_data(data) def copy(self): return copy.copy(self) def as_dwa(self) -> DataWithAxes: dwa = DataRaw(self.label, data=[self.get_data()], labels=[f'{self.label}_{self.units}']) dwa.create_missing_axes() return dwa @property def label(self) -> str: """str: get/set the label of this axis""" return self._label @label.setter def label(self, lab: str): if not isinstance(lab, str): raise TypeError('label for the Axis class should be a string') self._label = lab @property def units(self) -> str: """str: get/set the units for this axis""" return self._units @units.setter def units(self, units: str): if not isinstance(units, str): raise TypeError('units for the Axis class should be a string') self._units = units @property def index(self) -> int: """int: get/set the index this axis corresponds to in a DataWithAxis object""" return self._index @index.setter def index(self, ind: int): self._check_index_valid(ind) self._index = ind @property def data(self): """np.ndarray: get/set the data of Axis""" return self._data @data.setter def data(self, data: np.ndarray): if data is not None: self._check_data_valid(data) self.get_scale_offset_from_data(data) self._size = data.size elif self.size is None: self._size = 0 self._data = data
[docs] def get_data(self) -> np.ndarray: """Convenience method to obtain the axis data (usually None because scaling and offset are used)""" return self._data if self._data is not None else self._linear_data(self.size)
[docs] def get_data_at(self, indexes: Union[int, IterableType, slice]) -> np.ndarray: """ Get data at specified indexes Parameters ---------- indexes: """ if not (isinstance(indexes, np.ndarray) or isinstance(indexes, slice) or isinstance(indexes, int)): indexes = np.array(indexes) return self.get_data()[indexes]
[docs] def get_scale_offset_from_data(self, data: np.ndarray = None): """Get the scaling and offset from the axis's data If data is not None, extract the scaling and offset Parameters ---------- data: ndarray """ if data is None and self._data is not None: data = self._data if self.is_axis_linear(data): if len(data) == 1: self._scaling = 1 else: self._scaling = np.mean(np.diff(data)) self._offset = data[0] self._data = None
def is_axis_linear(self, data=None): if data is None: data = self.get_data() if data is not None: return np.allclose(np.diff(data), np.mean(np.diff(data))) else: return False @property def scaling(self): return self._scaling @scaling.setter def scaling(self, _scaling: float): self._scaling = _scaling @property def offset(self): return self._offset @offset.setter def offset(self, _offset: float): self._offset = _offset @property def size(self) -> int: """int: get/set the size/length of the 1D ndarray""" return self._size @size.setter def size(self, _size: int): if self._data is None: self._size = _size @staticmethod def _check_index_valid(index: int): if not isinstance(index, int): raise TypeError('index for the Axis class should be a positive integer') elif index < 0: raise ValueError('index for the Axis class should be a positive integer') @staticmethod def _check_data_valid(data): if not isinstance(data, np.ndarray): raise TypeError(f'data for the Axis class should be a 1D numpy array') elif len(data.shape) != 1: raise ValueError(f'data for the Axis class should be a 1D numpy array') def _linear_data(self, nsteps: int): """create axis data with a linear version using scaling and offset""" return self._offset + self._scaling * np.linspace(0, nsteps-1, nsteps)
[docs] def create_linear_data(self, nsteps:int): """replace the axis data with a linear version using scaling and offset""" self.data = self._linear_data(nsteps)
@staticmethod def create_simple_linear_data(nsteps: int): return np.linspace(0, nsteps-1, nsteps) def __len__(self): return self.size def _compute_slices(self, slices, *ignored, **ignored_also): return slices def _slicer(self, _slice, *ignored, **ignored_also): ax: Axis = copy.deepcopy(self) if isinstance(_slice, int): ax.data = np.array([ax.get_data()[_slice]]) return ax elif _slice is Ellipsis: return ax elif isinstance(_slice, slice): if ax._data is not None: ax.data = ax._data.__getitem__(_slice) return ax else: start = _slice.start if _slice.start is not None else 0 stop = _slice.stop if _slice.stop is not None else self.size ax._offset = ax.offset + start * ax.scaling ax._size = stop - start return ax def __getitem__(self, item): if hasattr(self, item): # for when axis was a dict deprecation_msg('attributes from an Axis object should not be fetched using __getitem__') return getattr(self, item) def __repr__(self): return f'{self.__class__.__name__}: <label: {self.label}> - <units: {self.units}> - <index: {self.index}>' def __mul__(self, scale: numbers.Real): if isinstance(scale, numbers.Real): ax = copy.deepcopy(self) if self.data is not None: ax.data *= scale else: ax._offset *= scale ax._scaling *= scale return ax def __add__(self, offset: numbers.Real): if isinstance(offset, numbers.Real): ax = copy.deepcopy(self) if self.data is not None: ax.data += offset else: ax._offset += offset return ax def __eq__(self, other: Axis): if isinstance(other, Axis): eq = self.label == other.label eq = eq and (self.units == other.units) eq = eq and (self.index == other.index) if self.data is not None and other.data is not None: eq = eq and (np.allclose(self.data, other.data)) else: eq = eq and self.offset == other.offset eq = eq and self.scaling == other.scaling return eq else: return False def mean(self): if self._data is not None: return np.mean(self._data) else: return self.offset + self.size / 2 * self.scaling def min(self): if self._data is not None: return np.min(self._data) else: return self.offset + (self.size * self.scaling if self.scaling < 0 else 0) def max(self): if self._data is not None: return np.max(self._data) else: return self.offset + (self.size * self.scaling if self.scaling > 0 else 0)
[docs] def find_index(self, threshold: float) -> int: """find the index of the threshold value within the axis""" if threshold < self.min(): return 0 elif threshold > self.max(): return len(self) - 1 elif self._data is not None: return mutils.find_index(self._data, threshold)[0][0] else: return int((threshold - self.offset) / self.scaling)
def find_indexes(self, thresholds: IterableType[float]) -> IterableType[int]: if isinstance(thresholds, numbers.Number): thresholds = [thresholds] return [self.find_index(threshold) for threshold in thresholds]
class NavAxis(Axis): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) deprecation_msg('NavAxis should not be used anymore, please use Axis object with correct index.' 'The navigation index should be specified in the Data object') class DataLowLevel: """Abstract object for all Data Object Parameters ---------- name: str the identifier of the data Attributes ---------- name: str timestamp: float Time in seconds since epoch. See method time.time() """ def __init__(self, name: str): self._timestamp = time() self._name = name @property def name(self): """Get/Set the identifier of the data""" return self._name @name.setter def name(self, other_name: str): self._name = other_name @property def timestamp(self): """Get/Set the timestamp of when the object has been created""" return self._timestamp @timestamp.setter def timestamp(self, timestamp: float): """The timestamp of when the object has been created""" self._timestamp = timestamp
[docs]class DataBase(DataLowLevel): """Base object to store homogeneous data and metadata generated by pymodaq's objects. To be inherited for real data Parameters ---------- name: str the identifier of these data source: DataSource or str Enum specifying if data are raw or processed (for instance from roi) dim: DataDim or str The identifier of the data type distribution: DataDistribution or str The distribution type of the data: uniform if distributed on a regular grid or spread if on specific unordered points data: list of ndarray The data the object is storing labels: list of str The labels of the data nd-arrays origin: str An identifier of the element where the data originated, for instance the DAQ_Viewer's name. Used when appending DataToExport in DAQ_Scan to disintricate from which origin data comes from when scanning multiple detectors. kwargs: named parameters All other parameters are stored dynamically using the name/value pair. The name of these extra parameters are added into the extra_attributes attribute Attributes ---------- name: str the identifier of these data source: DataSource or str Enum specifying if data are raw or processed (for instance from roi) dim: DataDim or str The identifier of the data type distribution: DataDistribution or str The distribution type of the data: uniform if distributed on a regular grid or spread if on specific unordered points data: list of ndarray The data the object is storing labels: list of str The labels of the data nd-arrays origin: str An identifier of the element where the data originated, for instance the DAQ_Viewer's name. Used when appending DataToExport in DAQ_Scan to disintricate from which origin data comes from when scanning multiple detectors. shape: Tuple[int] The shape of the underlying data size: int The size of the ndarrays stored in the object length: int The number of ndarrays stored in the object extra_attributes: List[str] list of string giving identifiers of the attributes added dynamically at the initialization (for instance to save extra metadata using the DataSaverLoader See Also -------- DataWithAxes, DataFromPlugins, DataRaw, DataSaverLoader Examples -------- >>> import numpy as np >>> from pymodaq.utils.data import DataBase, DataSource, DataDim, DataDistribution >>> data = DataBase('mydata', source=DataSource['raw'], dim=DataDim['Data1D'], \ distribution=DataDistribution['uniform'], data=[np.array([1.,2.,3.]), np.array([4.,5.,6.])],\ labels=['channel1', 'channel2'], origin='docutils code') >>> data.dim <DataDim.Data1D: 1> >>> data.source <DataSource.raw: 0> >>> data.shape (3,) >>> data.length 2 >>> data.size 3 """ def __init__(self, name: str, source: DataSource = None, dim: DataDim = None, distribution: DataDistribution = DataDistribution['uniform'], data: List[np.ndarray] = None, labels: List[str] = None, origin: str = '', **kwargs): super().__init__(name=name) self._iter_index = 0 self._shape = None self._size = None self._data = None self._length = None self._labels = None self._dim = dim self._errors = None self.origin = origin source = enum_checker(DataSource, source) self._source = source distribution = enum_checker(DataDistribution, distribution) self._distribution = distribution self.data = data # dim consistency is actually checked within the setter method self._check_labels(labels) self.extra_attributes = [] self.add_extra_attribute(**kwargs)
[docs] def as_dte(self, name: str = 'mydte') -> DataToExport: """Convenience method to wrap the DataWithAxes object into a DataToExport""" return DataToExport(name, data=[self])
def add_extra_attribute(self, **kwargs): for key in kwargs: if key not in self.extra_attributes: self.extra_attributes.append(key) setattr(self, key, kwargs[key])
[docs] def get_full_name(self) -> str: """Get the data ful name including the origin attribute into the returned value Returns ------- str: the name of the ataWithAxes data constructed as : origin/name Examples -------- d0 = DataBase(name='datafromdet0', origin='det0') """ return f'{self.origin}/{self.name}'
def __repr__(self): return f'{self.__class__.__name__} <{self.name}> <{self.dim}> <{self.source}> <{self.shape}>' def __len__(self): return self.length def __iter__(self): self._iter_index = 0 return self def __next__(self): if self._iter_index < len(self): self._iter_index += 1 return self.data[self._iter_index-1] else: raise StopIteration def __getitem__(self, item) -> np.ndarray: if (isinstance(item, int) and item < len(self)) or isinstance(item, slice): return self.data[item] else: raise IndexError(f'The index should be an integer lower than the data length') def __setitem__(self, key, value): if isinstance(key, int) and key < len(self) and isinstance(value, np.ndarray) and value.shape == self.shape: self.data[key] = value else: raise IndexError(f'The index should be an positive integer lower than the data length') def __add__(self, other: object): if isinstance(other, DataBase) and len(other) == len(self): new_data = copy.deepcopy(self) for ind_array in range(len(new_data)): if self[ind_array].shape != other[ind_array].shape: raise ValueError('The shapes of arrays stored into the data are not consistent') new_data[ind_array] = self[ind_array] + other[ind_array] return new_data elif isinstance(other, numbers.Number) and self.length == 1 and self.size == 1: new_data = copy.deepcopy(self) new_data = new_data + DataActuator(data=other) return new_data else: raise TypeError(f'Could not add a {other.__class__.__name__} or a {self.__class__.__name__} ' f'of a different length') def __sub__(self, other: object): if isinstance(other, DataBase) and len(other) == len(self): new_data = copy.deepcopy(self) for ind_array in range(len(new_data)): new_data[ind_array] = self[ind_array] - other[ind_array] return new_data elif isinstance(other, numbers.Number) and self.length == 1 and self.size == 1: new_data = copy.deepcopy(self) new_data = new_data - DataActuator(data=other) return new_data else: raise TypeError(f'Could not substract a {other.__class__.__name__} or a {self.__class__.__name__} ' f'of a different length') def __mul__(self, other): if isinstance(other, numbers.Number): new_data = copy.deepcopy(self) for ind_array in range(len(new_data)): new_data[ind_array] = self[ind_array] * other return new_data else: raise TypeError(f'Could not multiply a {other.__class__.__name__} and a {self.__class__.__name__} ' f'of a different length') def __truediv__(self, other): if isinstance(other, numbers.Number): return self * (1 / other) else: raise TypeError(f'Could not divide a {other.__class__.__name__} and a {self.__class__.__name__} ' f'of a different length') def _comparison_common(self, other, operator='__eq__'): if isinstance(other, DataBase): if not(self.name == other.name and len(self) == len(other)): return False if self.dim != other.dim: return False eq = True for ind in range(len(self)): if self[ind].shape != other[ind].shape: eq = False break eq = eq and np.all(getattr(self[ind], operator)(other[ind])) # extra attributes are not relevant as they may contain module specific data... # eq = eq and (self.extra_attributes == other.extra_attributes) # for attribute in self.extra_attributes: # eq = eq and (getattr(self, attribute) == getattr(other, attribute)) return eq elif isinstance(other, numbers.Number): return np.all(getattr(self[0], operator)(other)) else: raise TypeError() def __eq__(self, other): return self._comparison_common(other, '__eq__') def __le__(self, other): return self._comparison_common(other, '__le__') def __lt__(self, other): return self._comparison_common(other, '__lt__') def __ge__(self, other): return self._comparison_common(other, '__ge__') def __gt__(self, other): return self._comparison_common(other, '__gt__') def deepcopy(self): return copy.deepcopy(self)
[docs] def average(self, other: 'DataBase', weight: int) -> 'DataBase': """ Compute the weighted average between self and other DataBase Parameters ---------- other_data: DataBase weight: int The weight the 'other' holds with respect to self Returns ------- DataBase: the averaged DataBase object """ if isinstance(other, DataBase) and len(other) == len(self) and isinstance(weight, numbers.Number): return (other * weight + self) / (weight + 1) else: raise TypeError(f'Could not average a {other.__class__.__name__} or a {self.__class__.__name__} ' f'of a different length')
[docs] def abs(self): """ Take the absolute value of itself""" new_data = copy.copy(self) new_data.data = [np.abs(dat) for dat in new_data] return new_data
[docs] def real(self): """ Take the real part of itself""" new_data = copy.copy(self) new_data.data = [np.real(dat) for dat in new_data] return new_data
[docs] def imag(self): """ Take the imaginary part of itself""" new_data = copy.copy(self) new_data.data = [np.imag(dat) for dat in new_data] return new_data
[docs] def flipud(self): """Reverse the order of elements along axis 0 (up/down)""" new_data = copy.copy(self) new_data.data = [np.flipud(dat) for dat in new_data] return new_data
[docs] def fliplr(self): """Reverse the order of elements along axis 1 (left/right)""" new_data = copy.copy(self) new_data.data = [np.fliplr(dat) for dat in new_data] return new_data
def append(self, data: DataWithAxes): for dat in data: if dat.shape != self.shape: raise DataShapeError('Cannot append those ndarrays, they don\'t have the same shape as self') self.data += data.data self.labels.extend(data.labels)
[docs] def pop(self, index: int) -> DataBase: """ Returns a copy of self but with data taken at the specified index""" dwa = self.deepcopy() dwa.data = [dwa.data[index]] dwa.labels = [dwa.labels[index]] return dwa
@property def shape(self): """The shape of the nd-arrays""" return self._shape
[docs] def stack_as_array(self, axis=0, dtype=None) -> np.ndarray: """ Stack all data arrays in a single numpy array Parameters ---------- axis: int The new stack axis index, default 0 dtype: str or np.dtype the dtype of the stacked array Returns ------- np.ndarray See Also -------- :meth:`np.stack` """ return np.stack(self.data, axis=axis, dtype=dtype)
@property def size(self): """The size of the nd-arrays""" return self._size @property def dim(self): """DataDim: the enum representing the dimensionality of the stored data""" return self._dim
[docs] def set_dim(self, dim: Union[DataDim, str]): """Addhoc modification of dim independantly of the real data shape, should be used with extra care""" self._dim = enum_checker(DataDim, dim)
@property def source(self): """DataSource: the enum representing the source of the data""" return self._source @source.setter def source(self, source_type: Union[str, DataSource]): """DataSource: the enum representing the source of the data""" source_type = enum_checker(DataSource, source_type) self._source = source_type @property def distribution(self): """DataDistribution: the enum representing the distribution of the stored data""" return self._distribution @property def length(self): """The length of data. This is the length of the list containing the nd-arrays""" return self._length @property def labels(self): return self._labels @labels.setter def labels(self, labels: List['str']): self._check_labels(labels) def _check_labels(self, labels: List['str']): if labels is None: labels = [] else: labels = labels[:] while len(labels) < self.length: labels.append(f'CH{len(labels):02d}') self._labels = labels
[docs] def get_data_index(self, index: int = 0) -> np.ndarray: """Get the data by its index in the list, same as self[index]""" return self.data[index]
@staticmethod def _check_data_type(data: List[np.ndarray]) -> List[np.ndarray]: """make sure data is a list of nd-arrays""" is_valid = True if data is None: is_valid = False if not isinstance(data, list): # try to transform the data to regular type if isinstance(data, np.ndarray): warnings.warn(DataTypeWarning(f'Your data should be a list of numpy arrays not just a single numpy' f' array, wrapping them with a list')) data = [data] elif isinstance(data, numbers.Number): warnings.warn(DataTypeWarning(f'Your data should be a list of numpy arrays not just a single numpy' f' array, wrapping them with a list')) data = [np.array([data])] else: is_valid = False if isinstance(data, list): if len(data) == 0: is_valid = False elif not isinstance(data[0], np.ndarray): is_valid = False elif len(data[0].shape) == 0: is_valid = False if not is_valid: raise TypeError(f'Data should be an non-empty list of non-empty numpy arrays') return data def check_shape_from_data(self, data: List[np.ndarray]): self._shape = data[0].shape @staticmethod def _get_dim_from_data(data: List[np.ndarray]) -> DataDim: shape = data[0].shape size = data[0].size if len(shape) == 1 and size == 1: dim = DataDim['Data0D'] elif len(shape) == 1 and size > 1: dim = DataDim['Data1D'] elif len(shape) == 2: dim = DataDim['Data2D'] else: dim = DataDim['DataND'] return dim
[docs] def get_dim_from_data(self, data: List[np.ndarray]): """Get the dimensionality DataDim from data""" self.check_shape_from_data(data) self._size = data[0].size self._length = len(data) if len(self._shape) == 1 and self._size == 1: dim = DataDim['Data0D'] elif len(self._shape) == 1 and self._size > 1: dim = DataDim['Data1D'] elif len(self._shape) == 2: dim = DataDim['Data2D'] else: dim = DataDim['DataND'] return dim
def _check_shape_dim_consistency(self, data: List[np.ndarray]): """Process the dim from data or make sure data and DataDim are coherent""" dim = self.get_dim_from_data(data) if self._dim is None: self._dim = dim else: self._dim = enum_checker(DataDim, self._dim) if self._dim != dim: warnings.warn(DataDimWarning('The specified dimensionality is not coherent with the data shape, ' 'replacing it')) self._dim = dim def _check_same_shape(self, data: List[np.ndarray]): """Check that all nd-arrays have the same shape""" for dat in data: if dat.shape != self.shape: raise DataShapeError('The shape of the ndarrays in data is not the same') @property def data(self) -> List[np.ndarray]: """List[np.ndarray]: get/set (and check) the data the object is storing""" return self._data @data.setter def data(self, data: List[np.ndarray]): data = self._check_data_type(data) #data = [squeeze(data_array) for data_array in data] self._check_shape_dim_consistency(data) self._check_same_shape(data) self._data = data def to_dict(self): data_dict = OrderedDict([]) for ind in range(len(self)): data_dict[self.labels[ind]] = self[ind] return data_dict
class AxesManagerBase: def __init__(self, data_shape: Tuple[int], axes: List[Axis], nav_indexes=None, sig_indexes=None, **kwargs): self._data_shape = data_shape[:] # initial shape needed for self._check_axis self._axes = axes[:] self._nav_indexes = nav_indexes self._sig_indexes = sig_indexes if sig_indexes is not None else self.compute_sig_indexes() self._check_axis(self._axes) self._manage_named_axes(self._axes, **kwargs) @property def axes(self): return self._axes @axes.setter def axes(self, axes: List[Axis]): self._axes = axes[:] self._check_axis(self._axes) @abstractmethod def _check_axis(self, axes): ... @abstractmethod def get_sorted_index(self, axis_index: int = 0, spread_index=0) -> Tuple[np.ndarray, Tuple[slice]]: """ Get the index to sort the specified axis Parameters ---------- axis_index: int The index along which one should sort the data spread_index: int for spread data only, specifies which spread axis to use Returns ------- np.ndarray: the sorted index from the specified axis tuple of slice: used to slice the underlying data """ ... @abstractmethod def get_axis_from_index_spread(self, index: int, spread_order: int) -> Axis: """in spread mode, different nav axes have the same index (but not the same spread_order integer value) """ ... def compute_sig_indexes(self): _shape = list(self._data_shape) indexes = list(np.arange(len(self._data_shape))) for index in self.nav_indexes: if index in indexes: indexes.pop(indexes.index(index)) return tuple(indexes) def _has_get_axis_from_index(self, index: int): """Check if the axis referred by a given data dimensionality index is present Returns ------- bool: True if the axis has been found else False Axis or None: return the axis instance if has the axis else None """ if index > len(self._data_shape) or index < 0: raise IndexError('The specified index does not correspond to any data dimension') for axis in self.axes: if axis.index == index: return True, axis return False, None def _manage_named_axes(self, axes, x_axis=None, y_axis=None, nav_x_axis=None, nav_y_axis=None): """This method make sur old style Data is still compatible, especially when using x_axis or y_axis parameters""" modified = False if x_axis is not None: modified = True index = 0 if len(self._data_shape) == 1 and not self._has_get_axis_from_index(0)[0]: # in case of Data1D the x_axis corresponds to the first data dim index = 0 elif len(self._data_shape) == 2 and not self._has_get_axis_from_index(1)[0]: # in case of Data2D the x_axis corresponds to the second data dim (columns) index = 1 axes.append(Axis(x_axis.label, x_axis.units, x_axis.data, index=index)) if y_axis is not None: if len(self._data_shape) == 2 and not self._has_get_axis_from_index(0)[0]: modified = True # in case of Data2D the y_axis corresponds to the first data dim (lines) axes.append(Axis(y_axis.label, y_axis.units, y_axis.data, index=0)) if nav_x_axis is not None: if len(self.nav_indexes) > 0: modified = True # in case of DataND the y_axis corresponds to the first data dim (lines) axes.append(Axis(nav_x_axis.label, nav_x_axis.units, nav_x_axis.data, index=self._nav_indexes[0])) if nav_y_axis is not None: if len(self.nav_indexes) > 1: modified = True # in case of Data2D the y_axis corresponds to the first data dim (lines) axes.append(Axis(nav_y_axis.label, nav_y_axis.units, nav_y_axis.data, index=self._nav_indexes[1])) if modified: self._check_axis(axes) @property def shape(self) -> Tuple[int]: # self._data_shape = self.compute_shape_from_axes() return self._data_shape @abstractmethod def compute_shape_from_axes(self): ... @property def sig_shape(self) -> tuple: return tuple([self.shape[ind] for ind in self.sig_indexes]) @property def nav_shape(self) -> tuple: return tuple([self.shape[ind] for ind in self.nav_indexes]) def append_axis(self, axis: Axis): self._axes.append(axis) self._check_axis([axis]) @property def nav_indexes(self) -> IterableType[int]: return self._nav_indexes @nav_indexes.setter def nav_indexes(self, nav_indexes: IterableType[int]): if isinstance(nav_indexes, Iterable): nav_indexes = tuple(nav_indexes) valid = True for index in nav_indexes: if index not in self.get_axes_index(): logger.warning('Could not set the corresponding nav_index into the data object, not enough' ' Axis declared') valid = False break if valid: self._nav_indexes = nav_indexes else: logger.warning('Could not set the corresponding sig_indexes into the data object, should be an iterable') self.sig_indexes = self.compute_sig_indexes() self.shape @property def sig_indexes(self) -> IterableType[int]: return self._sig_indexes @sig_indexes.setter def sig_indexes(self, sig_indexes: IterableType[int]): if isinstance(sig_indexes, Iterable): sig_indexes = tuple(sig_indexes) valid = True for index in sig_indexes: if index in self._nav_indexes: logger.warning('Could not set the corresponding sig_index into the axis manager object, ' 'the axis is already affected to the navigation axis') valid = False break if index not in self.get_axes_index(): logger.warning('Could not set the corresponding nav_index into the data object, not enough' ' Axis declared') valid = False break if valid: self._sig_indexes = sig_indexes else: logger.warning('Could not set the corresponding sig_indexes into the data object, should be an iterable') @property def nav_axes(self) -> List[int]: deprecation_msg('nav_axes parameter should not be used anymore, use nav_indexes') return self._nav_indexes @nav_axes.setter def nav_axes(self, nav_indexes: List[int]): deprecation_msg('nav_axes parameter should not be used anymore, use nav_indexes') self.nav_indexes = nav_indexes def is_axis_signal(self, axis: Axis) -> bool: """Check if an axis is considered signal or navigation""" return axis.index in self._nav_indexes def is_axis_navigation(self, axis: Axis) -> bool: """Check if an axis is considered signal or navigation""" return axis.index not in self._nav_indexes @abstractmethod def get_shape_from_index(self, index: int) -> int: """Get the data shape at the given index""" ... def get_axes_index(self) -> List[int]: """Get the index list from the axis objects""" return [axis.index for axis in self._axes] @abstractmethod def get_axis_from_index(self, index: int, create: bool = False) -> List[Axis]: ... def get_axis_from_index_spread(self, index: int, spread_order: int) -> Axis: """Only valid for Spread data""" ... def get_nav_axes(self) -> List[Axis]: """Get the navigation axes corresponding to the data Use get_axis_from_index for all index in self.nav_indexes, but in spread distribution, one index may correspond to multiple nav axes, see Spread data distribution """ return list(mutils.flatten([copy.copy(self.get_axis_from_index(index, create=True)) for index in self.nav_indexes])) def get_signal_axes(self): if self.sig_indexes is None: self._sig_indexes = tuple([int(axis.index) for axis in self.axes if axis.index not in self.nav_indexes]) axes = [] for index in self._sig_indexes: axes_tmp = copy.copy(self.get_axis_from_index(index, create=True)) for ax in axes_tmp: if ax.size > 1: axes.append(ax) return axes def is_axis_signal(self, axis: Axis) -> bool: """Check if an axis is considered signal or navigation""" return axis.index in self._nav_indexes def is_axis_navigation(self, axis: Axis) -> bool: """Check if an axis is considered signal or navigation""" return axis.index not in self._nav_indexes def __repr__(self): return self._get_dimension_str() @abstractmethod def _get_dimension_str(self): ... class AxesManagerUniform(AxesManagerBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def compute_shape_from_axes(self): if len(self.axes) != 0: shape = [] for ind in range(len(self.axes)): shape.append(len(self.get_axis_from_index(ind, create=True)[0])) else: shape = self._data_shape return tuple(shape) def get_shape_from_index(self, index: int) -> int: """Get the data shape at the given index""" if index > len(self._data_shape) or index < 0: raise IndexError('The specified index does not correspond to any data dimension') return self._data_shape[index] def _check_axis(self, axes: List[Axis]): """Check all axis to make sure of their type and make sure their data are properly referring to the data index See Also -------- :py:meth:`Axis.create_linear_data` """ for ind, axis in enumerate(axes): if not isinstance(axis, Axis): raise TypeError(f'An axis of {self.__class__.__name__} should be an Axis object') if self.get_shape_from_index(axis.index) != axis.size: warnings.warn(DataSizeWarning('The size of the axis is not coherent with the shape of the data. ' 'Replacing it with a linspaced version: np.array([0, 1, 2, ...])')) axis.size = self.get_shape_from_index(axis.index) axis.scaling = 1 axis.offset = 0 axes[ind] = axis self._axes = axes def get_axis_from_index(self, index: int, create: bool = False) -> List[Axis]: """Get the axis referred by a given data dimensionality index If the axis is absent, create a linear one to fit the data shape if parameter create is True Parameters ---------- index: int The index referring to the data ndarray shape create: bool If True and the axis referred by index has not been found in axes, create one Returns ------- List[Axis] or None: return the list of axis instance if Data has the axis (or it has been created) else None See Also -------- :py:meth:`Axis.create_linear_data` """ index = int(index) has_axis, axis = self._has_get_axis_from_index(index) if not has_axis: if create: warnings.warn(DataIndexWarning(f'The axis requested with index {index} is not present, ' f'creating a linear one...')) axis = Axis(index=index, offset=0, scaling=1) axis.size = self.get_shape_from_index(index) else: warnings.warn(DataIndexWarning(f'The axis requested with index {index} is not present, returning None')) return [axis] def get_axis_from_index_spread(self, index: int, spread_order: int) -> Axis: """in spread mode, different nav axes have the same index (but not the same spread_order integer value) """ return None def get_sorted_index(self, axis_index: int = 0, spread_index=0) -> Tuple[np.ndarray, Tuple[slice]]: """ Get the index to sort the specified axis Parameters ---------- axis_index: int The index along which one should sort the data spread_index: int for spread data only, specifies which spread axis to use Returns ------- np.ndarray: the sorted index from the specified axis tuple of slice: used to slice the underlying data """ axes = self.get_axis_from_index(axis_index) if axes[0] is not None: sorted_index = np.argsort(axes[0].get_data()) axes[0].data = axes[0].get_data()[sorted_index] slices = [] for ind in range(len(self.shape)): if ind == axis_index: slices.append(sorted_index) else: slices.append(Ellipsis) slices = tuple(slices) return sorted_index, slices else: return None, None def _get_dimension_str(self): string = "(" for nav_index in self.nav_indexes: string += str(self._data_shape[nav_index]) + ", " string = string.rstrip(", ") string += "|" for sig_index in self.sig_indexes: string += str(self._data_shape[sig_index]) + ", " string = string.rstrip(", ") string += ")" return string class AxesManagerSpread(AxesManagerBase): """For this particular data category, some explanation is needed, see example below: Examples -------- One take images data (20x30) as a function of 2 parameters, say xaxis and yaxis non-linearly spaced on a regular grid. data.shape = (150, 20, 30) data.nav_indexes = (0,) The first dimension (150) corresponds to the navigation (there are 150 non uniform data points taken) The second and third could correspond to signal data, here an image of size (20x30) so: * nav_indexes is (0, ) * sig_indexes are (1, 2) xaxis = Axis(name=xaxis, index=0, data...) length 150 yaxis = Axis(name=yaxis, index=0, data...) length 150 In fact from such a data shape the number of navigation axes in unknown . In our example, they are 2. To somehow keep track of some ordering in these navigation axes, one adds an attribute to the Axis object: the spread_order xaxis = Axis(name=xaxis, index=0, spread_order=0, data...) length 150 yaxis = Axis(name=yaxis, index=0, spread_order=1, data...) length 150 """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _check_axis(self, axes: List[Axis]): """Check all axis to make sure of their type and make sure their data are properly referring to the data index """ for axis in axes: if not isinstance(axis, Axis): raise TypeError(f'An axis of {self.__class__.__name__} should be an Axis object') elif len(self.nav_indexes) != 1: raise ValueError('Spread data should have only one specified index in self.nav_indexes') elif axis.index in self.nav_indexes: if axis.size != 1 and (axis.size != self._data_shape[self.nav_indexes[0]]): raise DataLengthError('all navigation axes should have the same size') def compute_shape_from_axes(self): """Get data shape from axes First get the nav length from one of the navigation axes Then check for signal axes """ if len(self.axes) != 0: axes = sorted(self.axes, key=lambda axis: axis.index) shape = [] for axis in axes: if axis.index in self.nav_indexes: shape.append(axis.size) break for axis in axes: if axis.index not in self.nav_indexes: shape.append(axis.size) else: shape = self._data_shape return tuple(shape) def get_shape_from_index(self, index: int) -> int: """Get the data shape at the given index""" if index > len(self._data_shape) or index < 0: raise IndexError('The specified index does not correspond to any data dimension') return self._data_shape[index] def get_axis_from_index(self, index: int, create: bool = False) -> List[Axis]: """in spread mode, different nav axes have the same index (but not the same spread_order integer value) so may return multiple axis No possible "linear" creation in this mode except if the index is a signal index """ if index in self.nav_indexes: axes = [] for axis in self.axes: if axis.index == index: axes.append(axis) return axes else: index = int(index) try: has_axis, axis = self._has_get_axis_from_index(index) except IndexError: axis = [None] has_axis = False return axis if not has_axis and index in self.sig_indexes: if create: warnings.warn(DataIndexWarning(f'The axis requested with index {index} is not present, ' f'creating a linear one...')) axis = Axis(index=index, offset=0, scaling=1) axis.size = self.get_shape_from_index(index) else: warnings.warn(DataIndexWarning(f'The axis requested with index {index} is not present, returning None')) return [axis] def get_axis_from_index_spread(self, index: int, spread_order: int) -> Axis: """in spread mode, different nav axes have the same index (but not the same spread_order integer value) """ for axis in self.axes: if axis.index == index and axis.spread_order == spread_order: return axis def get_sorted_index(self, axis_index: int = 0, spread_index=0) -> Tuple[np.ndarray, Tuple[slice]]: """ Get the index to sort the specified axis Parameters ---------- axis_index: int The index along which one should sort the data spread_index: int for spread data only, specifies which spread axis to use Returns ------- np.ndarray: the sorted index from the specified axis tuple of slice: used to slice the underlying data """ if axis_index in self.nav_indexes: axis = self.get_axis_from_index_spread(axis_index, spread_index) else: axis = self.get_axis_from_index(axis_index)[0] if axis is not None: sorted_index = np.argsort(axis.get_data()) slices = [] for ind in range(len(self.shape)): if ind == axis_index: slices.append(sorted_index) else: if slices[-1] is Ellipsis: # only one ellipsis slices.append(Ellipsis) slices = tuple(slices) for nav_index in self.nav_indexes: for axis in self.get_axis_from_index(nav_index): axis.data = axis.get_data()[sorted_index] return sorted_index, slices else: return None, None def get_axis_from_index_spread(self, index: int, spread_order: int) -> Axis: for axis in self.axes: if axis.index == index and axis.spread_order == spread_order: return axis def _get_dimension_str(self): try: string = "(" for nav_index in self.nav_indexes: string += str(self._data_shape[nav_index]) + ", " break string = string.rstrip(", ") string += "|" for sig_index in self.sig_indexes: string += str(self._data_shape[sig_index]) + ", " string = string.rstrip(", ") string += ")" except Exception as e: string = f'({self._data_shape})' finally: return string class DataWithAxes(DataBase): """Data object with Axis objects corresponding to underlying data nd-arrays Parameters ---------- axes: list of Axis the list of Axis object for proper plotting, calibration ... nav_indexes: tuple of int highlight which Axis in axes is Signal or Navigation axis depending on the content: For instance, nav_indexes = (2,), means that the axis with index 2 in a at least 3D ndarray data is the first navigation axis For instance, nav_indexes = (3,2), means that the axis with index 3 in a at least 4D ndarray data is the first navigation axis while the axis with index 2 is the second navigation Axis. Axes with index 0 and 1 are signal axes of 2D ndarray data errors: list of ndarray. The list should match the length of the data attribute while the ndarrays should match the data ndarray """ def __init__(self, *args, axes: List[Axis] = [], nav_indexes: Tuple[int] = (), errors: Iterable[np.ndarray] = None, **kwargs): if 'nav_axes' in kwargs: deprecation_msg('nav_axes parameter should not be used anymore, use nav_indexes') nav_indexes = kwargs.pop('nav_axes') x_axis = kwargs.pop('x_axis') if 'x_axis' in kwargs else None y_axis = kwargs.pop('y_axis') if 'y_axis' in kwargs else None nav_x_axis = kwargs.pop('nav_x_axis') if 'nav_x_axis' in kwargs else None nav_y_axis = kwargs.pop('nav_y_axis') if 'nav_y_axis' in kwargs else None super().__init__(*args, **kwargs) self._axes = axes other_kwargs = dict(x_axis=x_axis, y_axis=y_axis, nav_x_axis=nav_x_axis, nav_y_axis=nav_y_axis) self.set_axes_manager(self.shape, axes=axes, nav_indexes=nav_indexes, **other_kwargs) self.inav: Iterable[DataWithAxes] = SpecialSlicersData(self, True) self.isig: Iterable[DataWithAxes] = SpecialSlicersData(self, False) self.get_dim_from_data_axes() # in DataBase, dim is processed from the shape of data, but if axes are provided #then use get_dim_from axes self._check_errors(errors) def _check_errors(self, errors: Iterable[np.ndarray]): """ Make sure the errors object is adapted to the len/shape of the dwa object new in 4.2.0 """ check = False if errors is None: self._errors = None return if isinstance(errors, (tuple, list)) and len(errors) == len(self): if np.all([isinstance(error, np.ndarray) for error in errors]): if np.all([error_array.shape == self.shape for error_array in errors]): check = True else: logger.warning(f'All error objects should have the same shape as the data' f'objects') else: logger.warning(f'All error objects should be np.ndarray') if not check: logger.warning('the errors field is incompatible with the structure of the data') self._errors = None else: self._errors = errors @property def errors(self): """ Get/Set the errors bar values as a list of np.ndarray new in 4.2.0 """ return self._errors @errors.setter def errors(self, errors: Iterable[np.ndarray]): self._check_errors(errors) def get_error(self, index): """ Get a particular error ndarray at the given index in the list new in 4.2.0 """ if self._errors is not None: #because to the initial check we know it is a list of ndarrays return self._errors[index] else: return np.array([0]) # this could be added to any numpy array of any shape def errors_as_dwa(self): """ Get a dwa from self replacing the data content with the error attribute (if not None) New in 4.2.0 """ if self.errors is not None: dwa = self.deepcopy_with_new_data(self.errors) dwa.name = f'{self.name}_errors' dwa.errors = None return dwa else: raise ValueError(f'Cannot create a dwa from a None, should be a list of ndarray') def plot(self, plotter_backend: str = config('plotting', 'backend'), *args, viewer=None, **kwargs): """ Call a plotter factory and its plot method over the actual data""" return plotter_factory.get(plotter_backend).plot(self, *args, viewer=viewer, **kwargs) def set_axes_manager(self, data_shape, axes, nav_indexes, **kwargs): if self.distribution.name == 'uniform' or len(nav_indexes) == 0: self._distribution = DataDistribution['uniform'] self.axes_manager = AxesManagerUniform(data_shape=data_shape, axes=axes, nav_indexes=nav_indexes, **kwargs) elif self.distribution.name == 'spread': self.axes_manager = AxesManagerSpread(data_shape=data_shape, axes=axes, nav_indexes=nav_indexes, **kwargs) else: raise ValueError(f'Such a data distribution ({data.distribution}) has no AxesManager') def __eq__(self, other): is_equal = super().__eq__(other) if isinstance(other, DataWithAxes): for ind in list(self.nav_indexes) + list(self.sig_indexes): axes_self = self.get_axis_from_index(ind) axes_other = other.get_axis_from_index(ind) if len(axes_other) != len(axes_self): return False for ind_ax in range(len(axes_self)): if axes_self[ind_ax] != axes_other[ind_ax]: return False if self.errors is None: is_equal = is_equal and other.errors is None else: for ind_error in range(len(self.errors)): if not np.allclose(self.errors[ind_error], other.errors[ind_error]): return False return is_equal def __repr__(self): return f'<{self.__class__.__name__}: {self.name} <len:{self.length}> {self._am}>' def sort_data(self, axis_index: int = 0, spread_index=0, inplace=False) -> DataWithAxes: """ Sort data along a given axis, default is 0 Parameters ---------- axis_index: int The index along which one should sort the data spread_index: int for spread data only, specifies which spread axis to use inplace: bool modify in place or not the data (and its axes) Returns ------- DataWithAxes """ if inplace: data = self else: data = self.deepcopy() sorted_index, slices = data._am.get_sorted_index(axis_index, spread_index) if sorted_index is not None: for ind in range(len(data)): data.data[ind] = data.data[ind][slices] return data def transpose(self): """replace the data by their transposed version Valid only for 2D data """ if self.dim == 'Data2D': self.data[:] = [data.T for data in self.data] for axis in self.axes: axis.index = 0 if axis.index == 1 else 1 def crop_at_along(self, coordinates_tuple: Tuple): slices = [] for coordinates in coordinates_tuple: axis = self.get_axis_from_index(0)[0] indexes = axis.find_indexes(coordinates) slices.append(slice(indexes)) return self._slicer(slices, False) def mean(self, axis: int = 0) -> DataWithAxes: """Process the mean of the data on the specified axis and returns the new data Parameters ---------- axis: int Returns ------- DataWithAxes """ dat_mean = [] for dat in self.data: mean = np.mean(dat, axis=axis) if isinstance(mean, numbers.Number): mean = np.array([mean]) dat_mean.append(mean) return self.deepcopy_with_new_data(dat_mean, remove_axes_index=axis) def sum(self, axis: int = 0) -> DataWithAxes: """Process the sum of the data on the specified axis and returns the new data Parameters ---------- axis: int Returns ------- DataWithAxes """ dat_sum = [] for dat in self.data: dat_sum.append(np.sum(dat, axis=axis)) return self.deepcopy_with_new_data(dat_sum, remove_axes_index=axis) def interp(self, new_axis_data: Union[Axis, np.ndarray], **kwargs) -> DataWithAxes: """Performs linear interpolation for 1D data only. For more complex ones, see :py:meth:`scipy.interpolate` Parameters ---------- new_axis_data: Union[Axis, np.ndarray] The coordinates over which to do the interpolation kwargs: dict extra named parameters to be passed to the :py:meth:`~numpy.interp` method Returns ------- DataWithAxes See Also -------- :py:meth:`~numpy.interp` :py:meth:`~scipy.interpolate` """ if self.dim != DataDim['Data1D']: raise ValueError('For basic interpolation, only 1D data are supported') data_interpolated = [] axis_obj = self.get_axis_from_index(0)[0] if isinstance(new_axis_data, np.ndarray): new_axis_data = Axis(axis_obj.label, axis_obj.units, data=new_axis_data) for dat in self.data: data_interpolated.append(np.interp(new_axis_data.get_data(), axis_obj.get_data(), dat, **kwargs)) new_data = DataCalculated(f'{self.name}_interp', data=data_interpolated, axes=[new_axis_data], labels=self.labels) return new_data def ft(self, axis: int = 0) -> DataWithAxes: """Process the Fourier Transform of the data on the specified axis and returns the new data Parameters ---------- axis: int Returns ------- DataWithAxes See Also -------- :py:meth:`~pymodaq.utils.math_utils.ft`, :py:meth:`~numpy.fft.fft` """ dat_ft = [] axis_obj = self.get_axis_from_index(axis)[0] omega_grid, time_grid = mutils.ftAxis_time(len(axis_obj), np.abs(axis_obj.max() - axis_obj.min())) for dat in self.data: dat_ft.append(mutils.ft(dat, dim=axis)) new_data = self.deepcopy_with_new_data(dat_ft) axis_obj = new_data.get_axis_from_index(axis)[0] axis_obj.data = omega_grid axis_obj.label = f'ft({axis_obj.label})' axis_obj.units = f'2pi/{axis_obj.units}' return new_data def ift(self, axis: int = 0) -> DataWithAxes: """Process the inverse Fourier Transform of the data on the specified axis and returns the new data Parameters ---------- axis: int Returns ------- DataWithAxes See Also -------- :py:meth:`~pymodaq.utils.math_utils.ift`, :py:meth:`~numpy.fft.ifft` """ dat_ift = [] axis_obj = self.get_axis_from_index(axis)[0] omega_grid, time_grid = mutils.ftAxis_time(len(axis_obj), np.abs(axis_obj.max() - axis_obj.min())) for dat in self.data: dat_ift.append(mutils.ift(dat, dim=axis)) new_data = self.deepcopy_with_new_data(dat_ift) axis_obj.data = omega_grid axis_obj.label = f'ift({axis_obj.label})' axis_obj.units = f'2pi/{axis_obj.units}' return new_data def fit(self, function: Callable, initial_guess: IterableType, data_index: int = None, axis_index: int = 0, **kwargs) -> DataCalculated: """ Apply 1D curve fitting using the scipy optimization package Parameters ---------- function: Callable a callable to be used for the fit initial_guess: Iterable The initial parameters for the fit data_index: int The index of the data over which to do the fit, if None apply the fit to all axis_index: int the axis index to use for the fit (if multiple) but there should be only one kwargs: dict extra named parameters applied to the curve_fit scipy method Returns ------- DataCalculated containing the evaluation of the fit on the specified axis See Also -------- :py:meth:`~scipy.optimize.curve_fit` """ import scipy.optimize as opt if self.dim != DataDim['Data1D']: raise ValueError('Integrated fitting only works for 1D data') axis = self.get_axis_from_index(axis_index)[0].copy() axis_array = axis.get_data() if data_index is None: datalist_to_fit = self.data labels = [f'{label}_fit' for label in self.labels] else: datalist_to_fit = [self.data[data_index]] labels = [f'{self.labels[data_index]}_fit'] datalist_fitted = [] fit_coeffs = [] for data_array in datalist_to_fit: popt, pcov = opt.curve_fit(function, axis_array, data_array, p0=initial_guess, **kwargs) datalist_fitted.append(function(axis_array, *popt)) fit_coeffs.append(popt) return DataCalculated(f'{self.name}_fit', data=datalist_fitted, labels=labels, axes=[axis], fit_coeffs=fit_coeffs) def find_peaks(self, height=None, threshold=None, **kwargs) -> DataToExport: """ Apply the scipy find_peaks method to 1D data Parameters ---------- height: number or ndarray or sequence, optional threshold: number or ndarray or sequence, optional kwargs: dict extra named parameters applied to the find_peaks scipy method Returns ------- DataCalculated See Also -------- :py:meth:`~scipy.optimize.find_peaks` """ if self.dim != DataDim['Data1D']: raise ValueError('Finding peaks only works for 1D data') from scipy.signal import find_peaks peaks_indices = [] dte = DataToExport('peaks') for ind in range(len(self)): peaks, properties = find_peaks(self[ind], height, threshold, **kwargs) peaks_indices.append(peaks) dte.append(DataCalculated(f'{self.labels[ind]}', data=[self[ind][peaks_indices[-1]], peaks_indices[-1] ], labels=['peak value', 'peak indexes'], axes=[Axis('peak position', self.axes[0].units, data=self.axes[0].get_data_at(peaks_indices[-1]))]) ) return dte def get_dim_from_data_axes(self) -> DataDim: """Get the dimensionality DataDim from data taking into account nav indexes """ if len(self.axes) != len(self.shape): self._dim = self.get_dim_from_data(self.data) else: if len(self.nav_indexes) > 0: self._dim = DataDim['DataND'] else: if len(self.axes) == 0: self._dim = DataDim['Data0D'] elif len(self.axes) == 1: self._dim = DataDim['Data1D'] elif len(self.axes) == 2: self._dim = DataDim['Data2D'] if len(self.nav_indexes) > 0: self._dim = DataDim['DataND'] return self._dim @property def n_axes(self): """Get the number of axes (even if not specified)""" return len(self.axes) @property def axes(self): """convenience property to fetch attribute from axis_manager""" return self._am.axes @axes.setter def axes(self, axes: List[Axis]): """convenience property to set attribute from axis_manager""" self.set_axes_manager(self.shape, axes=axes, nav_indexes=self.nav_indexes) def axes_limits(self, axes_indexes: List[int] = None) -> List[Tuple[float, float]]: """Get the limits of specified axes (all if axes_indexes is None)""" if axes_indexes is None: return [(axis.min(), axis.max()) for axis in self.axes] else: return [(axis.min(), axis.max()) for axis in self.axes if axis.index in axes_indexes] @property def sig_indexes(self): """convenience property to fetch attribute from axis_manager""" return self._am.sig_indexes @property def nav_indexes(self): """convenience property to fetch attribute from axis_manager""" return self._am.nav_indexes @nav_indexes.setter def nav_indexes(self, indexes: List[int]): """create new axis manager with new navigation indexes""" self.set_axes_manager(self.shape, axes=self.axes, nav_indexes=indexes) self.get_dim_from_data_axes() def get_nav_axes(self) -> List[Axis]: return self._am.get_nav_axes() def get_sig_index(self) -> List[Axis]: return self._am.get_signal_axes() def get_nav_axes_with_data(self) -> List[Axis]: """Get the data's navigation axes making sure there is data in the data field""" axes = self.get_nav_axes() for axis in axes: if axis.get_data() is None: axis.create_linear_data(self.shape[axis.index]) return axes def get_axis_indexes(self) -> List[int]: """Get all present different axis indexes""" return sorted(list(set([axis.index for axis in self.axes]))) def get_axis_from_index(self, index, create=False): return self._am.get_axis_from_index(index, create) def get_axis_from_index_spread(self, index: int, spread: int): return self._am.get_axis_from_index_spread(index, spread) def get_axis_from_label(self, label: str) -> Axis: """Get the axis referred by a given label Parameters ---------- label: str The label of the axis Returns ------- Axis or None: return the axis instance if it has the right label else None """ for axis in self.axes: if axis.label == label: return axis def create_missing_axes(self): """Check if given the data shape, some axes are missing to properly define the data (especially for plotting)""" axes = self.axes[:] for index in self.nav_indexes + self.sig_indexes: if (len(self.get_axis_from_index(index)) != 0 and self.get_axis_from_index(index)[0] is None): axes_tmp = self.get_axis_from_index(index, create=True) for ax in axes_tmp: if ax.size > 1: axes.append(ax) self.axes = axes def _compute_slices(self, slices, is_navigation=True): """Compute the total slice to apply to the data Filling in Ellipsis when no slicing should be done """ if isinstance(slices, numbers.Number) or isinstance(slices, slice): slices = [slices] if is_navigation: indexes = self._am.nav_indexes else: indexes = self._am.sig_indexes total_slices = [] slices = list(slices) for ind in range(len(self.shape)): if ind in indexes: total_slices.append(slices.pop(0)) elif len(total_slices) == 0: total_slices.append(Ellipsis) elif not (Ellipsis in total_slices and total_slices[-1] is Ellipsis): total_slices.append(slice(None)) total_slices = tuple(total_slices) return total_slices def check_squeeze(self, total_slices: List[slice], is_navigation: bool): do_squeeze = True if 1 in self.data[0][total_slices].shape: if not is_navigation and self.data[0][total_slices].shape.index(1) in self.nav_indexes: do_squeeze = False elif is_navigation and self.data[0][total_slices].shape.index(1) in self.sig_indexes: do_squeeze = False return do_squeeze def _slicer(self, slices, is_navigation=True): """Apply a given slice to the data either navigation or signal dimension Parameters ---------- slices: tuple of slice or int the slices to apply to the data is_navigation: bool if True apply the slices to the navigation dimension else to the signal ones Returns ------- DataWithAxes Object of the same type as the initial data, derived from DataWithAxes. But with lower data size due to the slicing and with eventually less axes. """ if isinstance(slices, numbers.Number) or isinstance(slices, slice): slices = [slices] total_slices = self._compute_slices(slices, is_navigation) do_squeeze = self.check_squeeze(total_slices, is_navigation) new_arrays_data = [squeeze(dat[total_slices], do_squeeze) for dat in self.data] tmp_axes = self._am.get_signal_axes() if is_navigation else self._am.get_nav_axes() axes_to_append = [copy.deepcopy(axis) for axis in tmp_axes] # axes_to_append are the axes to append to the new produced data # (basically the ones to keep) indexes_to_get = self.nav_indexes if is_navigation else self.sig_indexes # indexes_to_get are the indexes of the axes where the slice should be applied _indexes = list(self.nav_indexes) _indexes.extend(self.sig_indexes) lower_indexes = dict(zip(_indexes, [0 for _ in range(len(_indexes))])) # lower_indexes will store for each *axis index* how much the index should be reduced # because one axis has # been removed axes = [] nav_indexes = [] if is_navigation else list(self._am.nav_indexes) for ind_slice, _slice in enumerate(slices): if ind_slice < len(indexes_to_get): ax = self._am.get_axis_from_index(indexes_to_get[ind_slice]) if len(ax) != 0 and ax[0] is not None: for ind in range(len(ax)): ax[ind] = ax[ind].iaxis[_slice] if not(ax[0] is None or ax[0].size <= 1): # means the slice kept part of the axis if is_navigation: nav_indexes.append(self._am.nav_indexes[ind_slice]) axes.extend(ax) else: for axis in axes_to_append: # means we removed one of the axes (and data dim), # hence axis index above current index should be lowered by 1 if axis.index > indexes_to_get[ind_slice]: lower_indexes[axis.index] += 1 for index in indexes_to_get[ind_slice+1:]: lower_indexes[index] += 1 axes.extend(axes_to_append) for axis in axes: axis.index -= lower_indexes[axis.index] for ind in range(len(nav_indexes)): nav_indexes[ind] -= lower_indexes[nav_indexes[ind]] if len(nav_indexes) != 0: distribution = self.distribution else: distribution = DataDistribution['uniform'] data = DataWithAxes(self.name, data=new_arrays_data, nav_indexes=tuple(nav_indexes), axes=axes, source='calculated', origin=self.origin, labels=self.labels[:], distribution=distribution) return data def deepcopy_with_new_data(self, data: List[np.ndarray] = None, remove_axes_index: Union[int, List[int]] = None, source: DataSource = 'calculated', keep_dim=False) -> DataWithAxes: """deepcopy without copying the initial data (saving memory) The new data, may have some axes stripped as specified in remove_axes_index Parameters ---------- data: list of numpy ndarray The new data remove_axes_index: tuple of int indexes of the axis to be removed source: DataSource keep_dim: bool if False (the default) will calculate the new dim based on the data shape else keep the same (be aware it could lead to issues) Returns ------- DataWithAxes """ try: old_data = self.data self._data = None new_data = self.deepcopy() new_data._data = data new_data.get_dim_from_data(data) if source is not None: source = enum_checker(DataSource, source) new_data._source = source if remove_axes_index is not None: if not isinstance(remove_axes_index, Iterable): remove_axes_index = [remove_axes_index] lower_indexes = dict(zip(new_data.get_axis_indexes(), [0 for _ in range(len(new_data.get_axis_indexes()))])) # lower_indexes will store for each *axis index* how much the index should be reduced because one axis has # been removed nav_indexes = list(new_data.nav_indexes) sig_indexes = list(new_data.sig_indexes) for index in remove_axes_index: for axis in new_data.get_axis_from_index(index): if axis is not None: new_data.axes.remove(axis) if index in new_data.nav_indexes: nav_indexes.pop(nav_indexes.index(index)) if index in new_data.sig_indexes: sig_indexes.pop(sig_indexes.index(index)) # for ind, nav_ind in enumerate(nav_indexes): # if nav_ind > index and nav_ind not in remove_axes_index: # nav_indexes[ind] -= 1 # for ind, sig_ind in enumerate(sig_indexes): # if sig_ind > index: # sig_indexes[ind] -= 1 for axis in new_data.axes: if axis.index > index and axis.index not in remove_axes_index: lower_indexes[axis.index] += 1 for axis in new_data.axes: axis.index -= lower_indexes[axis.index] for ind in range(len(nav_indexes)): nav_indexes[ind] -= lower_indexes[nav_indexes[ind]] new_data.nav_indexes = tuple(nav_indexes) # new_data._am.sig_indexes = tuple(sig_indexes) new_data._shape = data[0].shape if not keep_dim: new_data._dim = self._get_dim_from_data(data) return new_data except Exception as e: pass finally: self._data = old_data @property def _am(self) -> AxesManagerBase: return self.axes_manager def get_data_dimension(self) -> str: return str(self._am) def get_data_as_dwa(self, index: int = 0) -> DataWithAxes: """ Get the underlying data selected from the list at index, returned as a DataWithAxes""" return self.deepcopy_with_new_data([self[index]])
[docs]class DataRaw(DataWithAxes): """Specialized DataWithAxes set with source as 'raw'. To be used for raw data""" def __init__(self, *args, **kwargs): if 'source' in kwargs: kwargs.pop('source') super().__init__(*args, source=DataSource['raw'], **kwargs)
class DataActuator(DataRaw): """Specialized DataWithAxes set with source as 'raw'. To be used for raw data generated by actuator plugins""" def __init__(self, *args, **kwargs): if len(args) == 0 and 'name' not in kwargs: args = ['actuator'] if 'data' not in kwargs: kwargs['data'] = [np.array([0.])] elif isinstance(kwargs['data'], numbers.Number): # useful formatting kwargs['data'] = [np.array([kwargs['data']])] super().__init__(*args, **kwargs) def __repr__(self): if self.dim.name == 'Data0D': return f'<{self.__class__.__name__} ({self.data[0][0]})>' else: return f'<{self.__class__.__name__} ({self.shape})>' def value(self) -> float: """Returns the underlying float value (of the first elt in the data list) if this data holds only a float otherwise returns a mean of the underlying data""" if self.length == 1 and self.size == 1: return float(self.data[0][0]) else: return float(np.mean(self.data)) def values(self) -> List[float]: """Returns the underlying float value (for each data array in the data list) if this data holds only a float otherwise returns a mean of the underlying data""" if self.length == 1 and self.size == 1: return [float(data_array[0]) for data_array in self.data] else: return [float(np.mean(data_array)) for data_array in self.data]
[docs]class DataFromPlugins(DataRaw): """Specialized DataWithAxes set with source as 'raw'. To be used for raw data generated by Detector plugins It introduces by default to extra attributes, do_plot and do_save. Their presence can be checked in the extra_attributes list. Parameters ---------- do_plot: bool If True the underlying data will be plotted in the DAQViewer do_save: bool If True the underlying data will be saved Attributes ---------- do_plot: bool If True the underlying data will be plotted in the DAQViewer do_save: bool If True the underlying data will be saved """ def __init__(self, *args, **kwargs): ##### for backcompatibility if 'plot' in kwargs: deprecation_msg("'plot' should not be used anymore as extra_attribute, " "please use 'do_plot'") do_plot = kwargs.pop('plot') kwargs['do_plot'] = do_plot if 'save' in kwargs: deprecation_msg("'save' should not be used anymore as extra_attribute, " "please use 'do_save'") do_save = kwargs.pop('save') kwargs['do_save'] = do_save ####### if 'do_plot' not in kwargs: kwargs['do_plot'] = True if 'do_save' not in kwargs: kwargs['do_save'] = True super().__init__(*args, **kwargs)
[docs]class DataCalculated(DataWithAxes): """Specialized DataWithAxes set with source as 'calculated'. To be used for processed/calculated data""" def __init__(self, *args, axes=[], **kwargs): if 'source' in kwargs: kwargs.pop('source') super().__init__(*args, source=DataSource['calculated'], axes=axes, **kwargs)
[docs]class DataFromRoi(DataCalculated): """Specialized DataWithAxes set with source as 'calculated'.To be used for processed data from region of interest""" def __init__(self, *args, axes=[], **kwargs): super().__init__(*args, axes=axes, **kwargs)
[docs]class DataToExport(DataLowLevel): """Object to store all raw and calculated DataWithAxes data for later exporting, saving, sending signal... Includes methods to retrieve data from dim, source... Stored data have a unique identifier their name. If some data is appended with an existing name, it will replace the existing data. So if you want to append data that has the same name Parameters ---------- name: str The identifier of the exporting object data: list of DataWithAxes All the raw and calculated data to be exported Attributes ---------- name timestamp data """ def __init__(self, name: str, data: List[DataWithAxes] = [], **kwargs): """ Parameters ---------- name data """ super().__init__(name) if not isinstance(data, list): raise TypeError('Data stored in a DataToExport object should be as a list of objects' ' inherited from DataWithAxis') self._data = [] self.data = data for key in kwargs: setattr(self, key, kwargs[key])
[docs] def plot(self, plotter_backend: str = config('plotting', 'backend'), *args, **kwargs): """ Call a plotter factory and its plot method over the actual data""" return plotter_factory.get(plotter_backend).plot(self, *args, **kwargs)
[docs] def affect_name_to_origin_if_none(self): """Affect self.name to all DataWithAxes children's attribute origin if this origin is not defined""" for dat in self.data: if dat.origin is None or dat.origin == '': dat.origin = self.name
def __sub__(self, other: object): if isinstance(other, DataToExport) and len(other) == len(self): new_data = copy.deepcopy(self) for ind_dfp in range(len(self)): new_data[ind_dfp] = self[ind_dfp] - other[ind_dfp] return new_data else: raise TypeError(f'Could not substract a {other.__class__.__name__} or a {self.__class__.__name__} ' f'of a different length') def __add__(self, other: object): if isinstance(other, DataToExport) and len(other) == len(self): new_data = copy.deepcopy(self) for ind_dfp in range(len(self)): new_data[ind_dfp] = self[ind_dfp] + other[ind_dfp] return new_data else: raise TypeError(f'Could not add a {other.__class__.__name__} or a {self.__class__.__name__} ' f'of a different length') def __mul__(self, other: object): if isinstance(other, numbers.Number): new_data = copy.deepcopy(self) for ind_dfp in range(len(self)): new_data[ind_dfp] = self[ind_dfp] * other return new_data else: raise TypeError(f'Could not multiply a {other.__class__.__name__} with a {self.__class__.__name__} ' f'of a different length') def __truediv__(self, other: object): if isinstance(other, numbers.Number): return self * (1 / other) else: raise TypeError(f'Could not divide a {other.__class__.__name__} with a {self.__class__.__name__} ' f'of a different length')
[docs] def average(self, other: DataToExport, weight: int) -> DataToExport: """ Compute the weighted average between self and other DataToExport and attributes it to self Parameters ---------- other: DataToExport weight: int The weight the 'other_data' holds with respect to self """ if isinstance(other, DataToExport) and len(other) == len(self): new_data = copy.copy(self) for ind_dfp in range(len(self)): new_data[ind_dfp] = self[ind_dfp].average(other[ind_dfp], weight) return new_data else: raise TypeError(f'Could not average a {other.__class__.__name__} with a {self.__class__.__name__} ' f'of a different length')
[docs] def merge_as_dwa(self, dim: Union[str, DataDim], name: str = None) -> DataRaw: """ attempt to merge filtered dwa into one Only possible if all filtered dwa and underlying data have same shape Parameters ---------- dim: DataDim or str will only try to merge dwa having this dimensionality name: str The new name of the returned dwa """ dim = enum_checker(DataDim, dim) filtered_data = self.get_data_from_dim(dim) if len(filtered_data) != 0: dwa = filtered_data[0].deepcopy() for dwa_tmp in filtered_data[1:]: if dwa_tmp.shape == dwa.shape and dwa_tmp.distribution == dwa.distribution: dwa.append(dwa_tmp) if name is None: name = self.name dwa.name = name return dwa
def __repr__(self): repr = f'{self.__class__.__name__}: {self.name} <len:{len(self)}>\n' for dwa in self: repr += f' * {str(dwa)}\n' return repr def __len__(self): return len(self.data) def __iter__(self): self._iter_index = 0 return self def __next__(self) -> DataWithAxes: if self._iter_index < len(self): self._iter_index += 1 return self.data[self._iter_index-1] else: raise StopIteration def __getitem__(self, item) -> Union[DataWithAxes, DataToExport]: if isinstance(item, int) and 0 <= item < len(self): return self.data[item] elif isinstance(item, slice): return DataToExport(self.name, data=[self[ind] for ind in list(range(len(self))[item])]) else: raise IndexError(f'The index should be a positive integer lower than the data length') def __setitem__(self, key, value: DataWithAxes): if isinstance(key, int) and 0 <= key < len(self) and isinstance(value, DataWithAxes): self.data[key] = value else: raise IndexError(f'The index should be a positive integer lower than the data length')
[docs] def get_names(self, dim: DataDim = None) -> List[str]: """Get the names of the stored DataWithAxes, eventually filtered by dim Parameters ---------- dim: DataDim or str Returns ------- list of str: the names of the (filtered) DataWithAxes data """ if dim is None: return [data.name for data in self.data] else: return [data.name for data in self.get_data_from_dim(dim).data]
[docs] def get_full_names(self, dim: DataDim = None): """Get the ful names including the origin attribute into the returned value, eventually filtered by dim Parameters ---------- dim: DataDim or str Returns ------- list of str: the names of the (filtered) DataWithAxes data constructed as : origin/name Examples -------- d0 = DataWithAxes(name='datafromdet0', origin='det0') """ if dim is None: return [data.get_full_name() for data in self.data] else: return [data.get_full_name() for data in self.get_data_from_dim(dim).data]
[docs] def get_origins(self, dim: DataDim = None): """Get the origins of the underlying data into the returned value, eventually filtered by dim Parameters ---------- dim: DataDim or str Returns ------- list of str: the origins of the (filtered) DataWithAxes data Examples -------- d0 = DataWithAxes(name='datafromdet0', origin='det0') """ if dim is None: return list({dwa.origin for dwa in self.data}) else: return list({dwa.origin for dwa in self.get_data_from_dim(dim).data})
[docs] def get_data_from_full_name(self, full_name: str, deepcopy=False) -> DataWithAxes: """Get the DataWithAxes with matching full name""" if deepcopy: data = self.get_data_from_name_origin(full_name.split('/')[1], full_name.split('/')[0]).deepcopy() else: data = self.get_data_from_name_origin(full_name.split('/')[1], full_name.split('/')[0]) return data
def get_data_from_full_names(self, full_names: List[str], deepcopy=False) -> DataToExport: data = [self.get_data_from_full_name(full_name, deepcopy) for full_name in full_names] return DataToExport(name=self.name, data=data) def get_dim_presents(self) -> List[str]: dims = [] for dim in DataDim.names(): if len(self.get_data_from_dim(dim)) != 0: dims.append(dim) return dims
[docs] def get_data_from_source(self, source: DataSource, deepcopy=False) -> DataToExport: """Get the data matching the given DataSource Returns ------- DataToExport: filtered with data matching the dimensionality """ source = enum_checker(DataSource, source) return self.get_data_from_attribute('source', source, deepcopy=deepcopy)
[docs] def get_data_from_missing_attribute(self, attribute: str, deepcopy=False) -> DataToExport: """ Get the data matching a given attribute value Parameters ---------- attribute: str a string of a possible attribute deepcopy: bool if True the returned DataToExport will contain deepcopies of the DataWithAxes Returns ------- DataToExport: filtered with data missing the given attribute """ if deepcopy: return DataToExport(self.name, data=[dwa.deepcopy() for dwa in self if not hasattr(dwa, attribute)]) else: return DataToExport(self.name, data=[dwa for dwa in self if not hasattr(dwa, attribute)])
[docs] def get_data_from_attribute(self, attribute: str, attribute_value: Any, deepcopy=False) -> DataToExport: """Get the data matching a given attribute value Returns ------- DataToExport: filtered with data matching the attribute presence and value """ selection = find_objects_in_list_from_attr_name_val(self.data, attribute, attribute_value, return_first=False) selection.sort(key=lambda elt: elt[0].name) if deepcopy: data = [sel[0].deepcopy() for sel in selection] else: data = [sel[0] for sel in selection] return DataToExport(name=self.name, data=data)
[docs] def get_data_from_dim(self, dim: DataDim, deepcopy=False) -> DataToExport: """Get the data matching the given DataDim Returns ------- DataToExport: filtered with data matching the dimensionality """ dim = enum_checker(DataDim, dim) return self.get_data_from_attribute('dim', dim, deepcopy=deepcopy)
[docs] def get_data_from_dims(self, dims: List[DataDim], deepcopy=False) -> DataToExport: """Get the data matching the given DataDim Returns ------- DataToExport: filtered with data matching the dimensionality """ data = DataToExport(name=self.name) for dim in dims: data.append(self.get_data_from_dim(dim, deepcopy=deepcopy)) return data
[docs] def get_data_from_sig_axes(self, Naxes: int, deepcopy: bool = False) -> DataToExport: """Get the data matching the given number of signal axes Parameters ---------- Naxes: int Number of signal axes in the DataWithAxes objects Returns ------- DataToExport: filtered with data matching the number of signal axes """ data = DataToExport(name=self.name) for _data in self: if len(_data.sig_indexes) == Naxes: if deepcopy: data.append(_data.deepcopy()) else: data.append(_data) return data
[docs] def get_data_from_Naxes(self, Naxes: int, deepcopy: bool = False) -> DataToExport: """Get the data matching the given number of axes Parameters ---------- Naxes: int Number of axes in the DataWithAxes objects Returns ------- DataToExport: filtered with data matching the number of axes """ data = DataToExport(name=self.name) for _data in self: if len(_data.shape) == Naxes: if deepcopy: data.append(_data.deepcopy()) else: data.append(_data) return data
[docs] def get_data_with_naxes_lower_than(self, n_axes=2, deepcopy: bool = False) -> DataToExport: """Get the data with n axes lower than the given number Parameters ---------- Naxes: int Number of axes in the DataWithAxes objects Returns ------- DataToExport: filtered with data matching the number of axes """ data = DataToExport(name=self.name) for _data in self: if _data.n_axes <= n_axes: if deepcopy: data.append(_data.deepcopy()) else: data.append(_data) return data
[docs] def get_data_from_name(self, name: str) -> DataWithAxes: """Get the data matching the given name""" data, _ = find_objects_in_list_from_attr_name_val(self.data, 'name', name, return_first=True) return data
def get_data_from_names(self, names: List[str]) -> DataToExport: return DataToExport(self.name, data=[dwa for dwa in self if dwa.name in names])
[docs] def get_data_from_name_origin(self, name: str, origin: str = '') -> DataWithAxes: """Get the data matching the given name and the given origin""" if origin == '': data, _ = find_objects_in_list_from_attr_name_val(self.data, 'name', name, return_first=True) else: selection = find_objects_in_list_from_attr_name_val(self.data, 'name', name, return_first=False) selection = [sel[0] for sel in selection] data, _ = find_objects_in_list_from_attr_name_val(selection, 'origin', origin) return data
def index(self, data: DataWithAxes): return self.data.index(data)
[docs] def index_from_name_origin(self, name: str, origin: str = '') -> List[DataWithAxes]: """Get the index of a given DataWithAxes within the list of data""" """Get the data matching the given name and the given origin""" if origin == '': _, index = find_objects_in_list_from_attr_name_val(self.data, 'name', name, return_first=True) else: selection = find_objects_in_list_from_attr_name_val(self.data, 'name', name, return_first=False) data_selection = [sel[0] for sel in selection] index_selection = [sel[1] for sel in selection] _, index = find_objects_in_list_from_attr_name_val(data_selection, 'origin', origin) index = index_selection[index] return index
[docs] def pop(self, index: int) -> DataWithAxes: """return and remove the DataWithAxes referred by its index Parameters ---------- index: int index as returned by self.index_from_name_origin See Also -------- index_from_name_origin """ return self.data.pop(index)
def remove(self, dwa: DataWithAxes): return self.pop(self.data.index(dwa)) @property def data(self) -> List[DataWithAxes]: """List[DataWithAxes]: get the data contained in the object""" return self._data @data.setter def data(self, new_data: List[DataWithAxes]): for dat in new_data: self._check_data_type(dat) self._data[:] = [dat for dat in new_data] # shallow copyto make sure that if the original # list is changed, the change will not be applied in here self.affect_name_to_origin_if_none() @staticmethod def _check_data_type(data: DataWithAxes): """Make sure data is a DataWithAxes object or inherited""" if not isinstance(data, DataWithAxes): raise TypeError('Data stored in a DataToExport object should be objects inherited from DataWithAxis') def deepcopy(self): return DataToExport('Copy', data=[data.deepcopy() for data in self]) @dispatch(list) def append(self, data_list: List[DataWithAxes]): for dwa in data_list: self.append(dwa) @dispatch(DataWithAxes) def append(self, dwa: DataWithAxes): """Append/replace DataWithAxes object to the data attribute Make sure only one DataWithAxes object with a given name is in the list except if they don't have the same origin identifier """ dwa = dwa.deepcopy() self._check_data_type(dwa) obj = self.get_data_from_name_origin(dwa.name, dwa.origin) if obj is not None: self._data.pop(self.data.index(obj)) self._data.append(dwa) @dispatch(object) def append(self, dte: DataToExport): if isinstance(dte, DataToExport): self.append(dte.data)
class DataScan(DataToExport): """Specialized DataToExport.To be used for data to be saved """ def __init__(self, name: str, data: List[DataWithAxes] = [], **kwargs): super().__init__(name, data, **kwargs) class DataToActuators(DataToExport): """ Particular case of a DataToExport adding one named parameter to indicate what kind of change should be applied to the actuators, absolute or relative Attributes ---------- mode: str Adds an attribute called mode holding a string describing the type of change: relative or absolute Parameters --------- mode: str either 'rel' or 'abs' for a relative or absolute change of the actuator's values """ def __init__(self, *args, mode='rel', **kwargs): if mode not in ['rel', 'abs']: warnings.warn('Incorrect mode for the actuators, switching to default relative mode: rel') mode = 'rel' kwargs.update({'mode': mode}) super().__init__(*args, **kwargs) def __repr__(self): return f'{super().__repr__()}: {self.mode}' if __name__ == '__main__': d1 = DataFromRoi(name=f'Hlineout_', data=[np.zeros((24,))], x_axis=Axis(data=np.zeros((24,)), units='myunits', label='mylabel1')) d2 = DataFromRoi(name=f'Hlineout_', data=[np.zeros((12,))], x_axis=Axis(data=np.zeros((12,)), units='myunits2', label='mylabel2')) Nsig = 200 Nnav = 10 x = np.linspace(-Nsig/2, Nsig/2-1, Nsig) dat = np.zeros((Nnav, Nsig)) for ind in range(Nnav): dat[ind] = mutils.gauss1D(x, 50 * (ind -Nnav / 2), 25 / np.sqrt(2)) data = DataRaw('mydata', data=[dat], nav_indexes=(0,), axes=[Axis('nav', data=np.linspace(0, Nnav-1, Nnav), index=0), Axis('sig', data=x, index=1)]) data2 = copy.copy(data) data3 = data.deepcopy_with_new_data([np.sum(dat, 1)], remove_axes_index=(1,)) print('done')