# -*- coding: utf-8 -*-
"""
Created the 15/11/2022
@author: Sebastien Weber
"""
from typing import Union, Dict
from enum import Enum
from pathlib import Path
import numpy as np
import importlib
from importlib import metadata
import pickle
from pymodaq_utils.logger import set_logger, get_module_name
from pymodaq_utils.config import GlobalConfig as Config
from pymodaq_utils.utils import capitalize, JsonConverter
from pymodaq_utils import utils
from pymodaq_utils.enums import BaseEnum, enum_checker
config = Config()
logger = set_logger(get_module_name(__file__))
backends_available = []
# default backend
is_tables = True
try:
import tables
backends_available.append('tables')
except Exception as e: # pragma: no cover
logger.warning(str(e))
is_tables = False
is_h5py = True
# other possibility
try:
import h5py
backends_available.append('h5py')
except Exception as e: # pragma: no cover
logger.warning(str(e))
is_h5py = False
is_h5pyd = True
# this one is to be used for remote reading/writing towards a HSDS server (or h5serv), see HDFGroup
try:
import h5pyd
backends_available.append('h5pyd')
except Exception as e: # pragma: no cover
logger.warning(str(e))
is_h5pyd = False
if not (is_tables or is_h5py or is_h5pyd):
logger.exception('No valid hdf5 backend has been installed, please install either pytables or h5py')
SWMR_CAPABLE_BACKENDS = frozenset({'h5py'})
[docs]
class NodeError(Exception):
pass
[docs]
class SaveType(BaseEnum):
scan = 0
detector = 1
logger = 2
custom = 3
actuator = 4
optimizer = 5
[docs]
class GroupType(BaseEnum):
detector = 0
actuator = 1
data = 2
ch = 3
scan = 4
external_h5 = 5
data_dim = 6
data_logger = 7
[docs]
class InvalidExport(Exception):
pass
[docs]
def check_mandatory_attrs(attr_name, attr):
"""for cross compatibility between different backends. If these attributes have binary value, then decode them
Parameters
----------
attr_name
attr
Returns
-------
"""
if attr_name == 'TITLE' or attr_name == 'CLASS' or attr_name == 'EXTDIM':
if isinstance(attr, bytes):
return attr.decode()
else:
return attr
else:
return attr
[docs]
def get_attr(node, attr_name, backend='tables'):
if backend == 'tables':
if attr_name is not None:
attr = node._v_attrs[attr_name]
attr = check_mandatory_attrs(attr_name, attr)
return JsonConverter.json2object(attr)
else:
attrs = dict([])
for attr_name in node._v_attrs._v_attrnames:
attrval = node._v_attrs[attr_name]
attrval = check_mandatory_attrs(attr_name, attrval)
attrs[attr_name] = JsonConverter.json2object(attrval)
return attrs
else:
if attr_name is not None:
attr = node.attrs[attr_name]
attr = check_mandatory_attrs(attr_name, attr)
return JsonConverter.json2object(attr)
else:
attrs = dict([])
for attr_name in node.attrs.keys():
attrval = node.attrs[attr_name]
attrval = check_mandatory_attrs(attr_name, attrval)
attrs[attr_name] = JsonConverter.json2object(attrval)
return attrs
[docs]
def set_attr(node, attr_name, attr_value, backend='tables'):
if backend == 'tables':
node._v_attrs[attr_name] = JsonConverter.object2json(attr_value)
else:
node.attrs[attr_name] = JsonConverter.object2json(attr_value)
[docs]
class InvalidGroupType(Exception):
pass
[docs]
class InvalidSave(Exception):
pass
[docs]
class InvalidGroupDataType(Exception):
pass
[docs]
class InvalidDataType(Exception):
pass
[docs]
class InvalidDataDimension(Exception):
pass
[docs]
class InvalidScanType(Exception):
pass
[docs]
class Node(object):
def __init__(self, node, backend):
if isinstance(node, Node): # to ovoid recursion if one call Node(Node()) or even more
self._node = node.node
else:
self._node = node
self.backend = backend
self._attrs = Attributes(self, backend)
def __str__(self):
# Get this class name
classname = self.__class__.__name__
# The title
title = self.attrs['TITLE']
return "%s (%s) %r" % \
(self.path, classname, title)
@property
def node(self):
return self._node
def __eq__(self, other):
return self.node == other.node
@property
def parent_node(self) -> 'GROUP':
if self.path == '/':
return None
mod = importlib.import_module('.backends', 'pymodaq_data.h5modules')
if self.backend == 'tables':
p = self.node._v_parent
else:
p = self.node.parent
klass = get_attr(p, 'CLASS', self.backend)
_cls = getattr(mod, klass)
return _cls(p, self.backend)
@property
def h5file(self):
if self.backend == 'tables':
return self.node._v_file
else:
return self.node.file
[docs]
def to_h5_backend(self) -> 'H5Backend':
h5_backend = H5Backend(self.backend)
h5_backend.h5file = self.h5file
return h5_backend
[docs]
def set_attr(self, key, value):
self.attrs[key] = value
[docs]
def get_attr(self, item):
return self.attrs[item]
@property
def attrs(self):
return self._attrs
@property
def name(self):
"""return node name
"""
if self.backend == 'tables':
return self._node._v_name
else:
path = self._node.name
if path == '/':
return path
else:
return path.split('/')[-1]
@property
def title(self):
return self.attrs['TITLE']
@property
def path(self):
"""return node path
Parameters
----------
node (str or node instance), see h5py and pytables documentation on nodes
Returns
-------
str : full path of the node
"""
if self.backend == 'tables':
return self._node._v_pathname
else:
return self._node.name
[docs]
class GROUP(Node):
def __init__(self, node, backend):
super().__init__(node, backend)
def __str__(self):
"""Return a short string representation of the group.
"""
pathname = self.path
classname = self.__class__.__name__
title = self.attrs['TITLE']
return "%s (%s) %r" % (pathname, classname, title)
def __repr__(self):
"""Return a detailed string representation of the group.
"""
rep = [
'%r (%s)' % (childname, child.__class__.__name__)
for (childname, child) in self.children().items()
]
childlist = '[%s]' % (', '.join(rep))
return "%s\n children := %s" % (str(self), childlist)
[docs]
def children(self) -> Dict[str, Node]:
"""Get a dict containing all children node hanging from self whith their name as keys
Returns
-------
dict: keys are children node names, values are the children nodes
See Also
--------
children_name
"""
mod = importlib.import_module('.backends', 'pymodaq_data.h5modules')
children = dict([])
if self.backend == 'tables':
for child_name, child in self.node._v_children.items():
klass = get_attr(child, 'CLASS', self.backend)
if 'ARRAY' in klass:
_cls = getattr(mod, klass)
else:
_cls = GROUP
children[child_name] = _cls(child, self.backend)
else:
for child_name, child in self.node.items():
klass = get_attr(child, 'CLASS', self.backend)
if 'ARRAY' in klass:
_cls = getattr(mod, klass)
else:
_cls = GROUP
children[child_name] = _cls(child, self.backend)
return children
[docs]
def get_child(self, name: str) -> Node:
return self.children()[name]
[docs]
def children_name(self):
"""Gets the sorted list of children name hanging from self
Returns
-------
list: list of name of the children
"""
if self.backend == 'tables':
return sorted(list(self.node._v_children.keys()))
else:
return sorted(list(self.node.keys()))
pass
[docs]
def remove_children(self):
children_dict = self.children()
for child_name in children_dict:
if self.backend == 'tables':
children_dict[child_name].node._f_remove(recursive=True)
else:
self.node.__delitem__(child_name)
[docs]
class CARRAY(Node):
def __init__(self, node, backend):
super().__init__(node, backend)
self._array = node
@property
def array(self):
return self._array
def __repr__(self):
"""This provides more metainfo in addition to standard __str__"""
return """%s
shape := %s
dtype := %s""" % (self, str(self.attrs['shape']), self.attrs['dtype'])
def __getitem__(self, item):
return self._array.__getitem__(item)
def __setitem__(self, key, value):
self._array.__setitem__(key, value)
[docs]
def read(self):
if self.backend == 'tables':
return self._array.read()
else:
return self._array[:]
def __len__(self):
if self.backend == 'tables':
return self.array.nrows
else:
return len(self.array)
[docs]
class EARRAY(CARRAY):
def __init__(self, array, backend):
super().__init__(array, backend)
[docs]
def append(self, data: np.ndarray, expand=True):
""" appends a ndarray after the current data in the enlargeable array
Considering the shape length of the enlargeable array is n+1
The data to append could be:
* a single element (without the enlargeable shape index that is always the first
index, that is of shape length n). In that case the first index of the enlargeable array
is increased by one.
* an ensemble of elements (a ndarray) of shape length of (n+1).
Parameters
----------
data: np.ndarray
the data array to append to the enlargeable node
expand: bool
If True the data array will have its shape expanded by one dim
"""
if not isinstance(data, np.ndarray):
raise TypeError('The appended object should be a ndarray')
if len(self.attrs['shape']) > 1 and data.shape == self.attrs['shape'][1:]:
shape = [1]
shape.extend(data.shape)
data = data.reshape(shape)
extended_first_index = 1
else:
extended_first_index = data.shape[0]
if expand and (len(data.shape) == 1 and not data.shape == (1, )):
data = np.expand_dims(data, 1)
self.append_backend(data)
if self.backend == 'h5py' and self._node.file.swmr_mode:
pass # defer shape attr update until after SWMR ends
else:
sh = list(self.attrs['shape'])
sh[0] += extended_first_index
self.attrs['shape'] = tuple(sh)
[docs]
def append_backend(self, data):
if self.backend == 'tables':
self.array.append(data)
else:
n_new = data.shape[0]
old_len = self.array.len()
self.array.resize(old_len + n_new, axis=0)
# Reshape to the target slice shape in case expand_dims added an extra dim
target_shape = (n_new,) + tuple(self.attrs['shape'][1:])
self.array[old_len:old_len + n_new] = data.reshape(target_shape)
[docs]
class VLARRAY(EARRAY):
def __init__(self, array, backend):
super().__init__(array, backend)
[docs]
def append(self, data):
self.append_backend(data)
if self.backend == 'h5py' and self._node.file.swmr_mode:
pass # defer shape attr update until after SWMR ends
else:
sh = list(self.attrs['shape'])
sh[0] += 1
self.attrs['shape'] = tuple(sh)
[docs]
def append_backend(self, data):
"""Append one variable-length element.
VLARRAY in h5py stores variable-length arrays as individual elements
(one per row). EARRAY.append_backend incorrectly uses data.shape[0] as
the number of new rows, which would split one VL element into many rows.
"""
if self.backend == 'tables':
self.array.append(data)
else:
old_len = self.array.len()
self.array.resize(old_len + 1, axis=0)
self.array[old_len] = data
[docs]
class StringARRAY(VLARRAY):
def __init__(self, array, backend):
super().__init__(array, backend)
def __getitem__(self, item):
return self.array_to_string(super().__getitem__(item))
[docs]
def read(self):
data_list = super().read()
return [self.array_to_string(data) for data in data_list]
[docs]
def append(self, string):
data = self.string_to_array(string)
super().append(data)
[docs]
def array_to_string(self, array):
return pickle.loads(array)
[docs]
def string_to_array(self, string):
return np.frombuffer(pickle.dumps(string), np.uint8)
[docs]
class Attributes(object):
def __init__(self, node, backend='tables'):
self._node = node
self.backend = backend
def __getitem__(self, item):
if item == 'title':
item = item.upper()
attr = get_attr(self._node.node, item, backend=self.backend)
# if isinstance(attr, bytes):
# attr = attr.decode()
return attr
def __setitem__(self, key, value):
if key == 'title':
key = key.upper()
set_attr(self._node.node, key, value, backend=self.backend)
def __iter__(self):
self._iter_index = 0
return self
def __next__(self):
if self._iter_index < len(self):
self._iter_index += 1
return self.attrs_name[self._iter_index-1]
else:
raise StopIteration
def __len__(self):
return len(self.attrs_name)
[docs]
def get(self, key, default=None):
"""Return the attribute value for key, or default if not present."""
if key in self.attrs_name:
return self[key]
return default
[docs]
def to_dict(self) -> dict:
"""Returns attributes name/value as a dict"""
attrs_dict = dict()
for name in self.attrs_name:
attrs_dict[name] = self[name]
return attrs_dict
@property
def node(self):
return self._node
@property
def attrs_name(self):
if self.backend == 'tables':
return [k for k in self.node.node._v_attrs._v_attrnames]
else:
return [k for k in self.node.node.attrs.keys()]
def __str__(self):
"""The string representation for this object."""
# The pathname
if self.backend == 'tables':
pathname = self._node.node._v_pathname
else:
pathname = self._node.node.name
# Get this class name
classname = self.__class__.__name__
# The attribute names
attrnumber = len([n for n in self.attrs_name])
return "%s.attrs (%s), %s attributes" % \
(pathname, classname, attrnumber)
def __repr__(self):
attrnames = self.attrs_name
if len(attrnames):
rep = ['%s := %s' % (attr, str(self[attr]))
for attr in attrnames]
attrlist = '[%s]' % (',\n '.join(rep))
return "%s:\n %s" % (str(self), attrlist)
else:
return str(self)
[docs]
class H5Backend:
def __init__(self, backend='tables'):
self._h5file = None
self.file_path = None
self.compression = None
self._swmr_mode = False
self._swmr_enabled = False
self.set_backend(backend)
[docs]
def set_backend(self, backend: str):
"""Switch the active backend, closing any open file first.
Updates both ``self.backend`` (the name string) and ``self.h5_library``
(the imported module), which both need to be consistent for file operations.
Parameters
----------
backend: str
One of ``'tables'``, ``'h5py'``, or ``'h5pyd'``.
"""
if hasattr(self, '_h5file'):
self.close_file()
if backend == 'tables':
if is_tables:
self.h5_library = tables
else:
raise ImportError('the pytables module is not present')
elif backend == 'h5py':
if is_h5py:
self.h5_library = h5py
else:
raise ImportError('the h5py module is not present')
elif backend == 'h5pyd':
if is_h5pyd:
self.h5_library = h5pyd
else:
raise ImportError('the h5pyd module is not present')
else:
raise ValueError(f"Unknown backend: {backend!r}. Must be one of {backends_available}")
self.backend = backend
@property
def h5file(self):
return self._h5file
@h5file.setter
def h5file(self, file):
self.file_path = file.filename
self._h5file = file
@property
def filename(self):
return self._h5file.filename
[docs]
def isopen(self):
if self._h5file is None:
return False
if self.backend == 'tables':
return bool(self._h5file.isopen)
elif self.backend == 'h5py':
return bool(self._h5file.id.valid)
else:
return self._h5file.id.http_conn is not None
[docs]
def close_file(self):
"""Flush data and close the h5file
"""
try:
if self._h5file is not None:
self.flush()
if self.isopen():
self._h5file.close()
except Exception as e:
logger.warning(f"Error closing h5file: {e}")
finally:
self._swmr_enabled = False
self._h5file = None # Release the file handle reference
[docs]
def open_file(self, fullpathname, mode='r', title='PyMoDAQ file', swmr_mode=False, **kwargs):
self.file_path = fullpathname
self._swmr_mode = swmr_mode
if self.backend == 'tables':
self._h5file = self.h5_library.open_file(str(fullpathname), mode=mode, title=title, **kwargs)
if mode == 'w':
try:
self.root().attrs['pymodaq_version'] = utils.get_version('pymodaq')
except importlib.metadata.PackageNotFoundError:
self.root().attrs['pymodaq_version'] = '0.0.0'
self.root().attrs['pymodaq_data_version'] = utils.get_version('pymodaq_data')
return self._h5file
else:
if swmr_mode and self.backend == 'h5py':
if mode == 'w':
kwargs['libver'] = 'latest'
elif mode == 'r':
kwargs['swmr'] = True
self._h5file = self.h5_library.File(str(fullpathname), mode=mode, **kwargs)
if mode == 'w':
self.root().attrs['TITLE'] = title
try:
self.root().attrs['pymodaq_version'] = utils.get_version('pymodaq')
except importlib.metadata.PackageNotFoundError:
self.root().attrs['pymodaq_version'] = '0.0.0'
self.root().attrs['pymodaq_data_version'] = utils.get_version('pymodaq_data')
if swmr_mode:
self.root().attrs['swmr_compatible'] = True
return self._h5file
[docs]
def save_file_as(self, filenamepath='h5copy.txt'):
if self.backend == 'tables':
self.h5file.copy_file(str(filenamepath))
else:
raise Warning(f'Not possible to copy the file with the "{self.backend}" backend')
[docs]
def root(self):
if self.backend == 'tables':
return GROUP(self._h5file.get_node('/'), self.backend)
else:
return GROUP(self._h5file, self.backend)
[docs]
def get_attr(self, node, attr_name=None):
if isinstance(node, Node):
node = node.node
return get_attr(node, attr_name, self.backend)
[docs]
def set_attr(self, node, attr_name, attr_value):
if isinstance(node, Node):
node = node.node
return set_attr(node, attr_name, attr_value, self.backend)
[docs]
def has_attr(self, node, attr_name):
return attr_name in self.get_node(node).attrs.attrs_name
[docs]
def flush(self):
if self._h5file is not None:
self._h5file.flush()
[docs]
def enable_swmr(self):
"""Activate SWMR mode on the open h5py file.
Must be called after all groups/datasets have been created.
Raises RuntimeError if backend is not h5py or file was not opened with swmr_mode=True.
Idempotent: does nothing if already enabled.
"""
if self._swmr_enabled:
return
if self.backend != 'h5py':
raise RuntimeError('SWMR mode is only supported with the h5py backend')
if not self._swmr_mode:
raise RuntimeError('File was not opened with swmr_mode=True')
# If h5py already has SWMR active (flag got out of sync), just
# resynchronise without calling start_swmr_write() again.
if self._h5file.swmr_mode:
self._swmr_enabled = True
return
# Mark file as being written with SWMR (for readers to detect)
# Write as plain bool (not JSON-serialized) so raw h5py readers can detect it
self.root().node.attrs['swmr_active'] = True
self._h5file.swmr_mode = True
self._swmr_enabled = True
@property
def is_swmr_active(self):
"""Return True if SWMR mode is currently active on the file."""
return self._swmr_enabled
@property
def is_swmr_compatible(self):
"""Return True if the open file was created with SWMR support."""
try:
return bool(self.root().attrs['swmr_compatible'])
except Exception:
return False
@property
def is_swmr_capable(self):
"""Return True if the current backend supports SWMR mode."""
return self.backend in SWMR_CAPABLE_BACKENDS
[docs]
def reconcile_swmr_attrs(self):
"""Walk all EARRAY/VLARRAY nodes and update attrs['shape'] from actual data.
Called after SWMR is ended (file closed and reopened in 'a' mode) to fix
deferred attribute writes that were skipped during SWMR.
"""
# Clear the swmr_active flag now that writing is complete
# Write as plain bool (not JSON-serialized) so raw h5py readers can detect it
if 'swmr_active' in self.root().attrs.attrs_name:
self.root().node.attrs['swmr_active'] = False
for node in self.walk_nodes('/'):
if 'CLASS' in node.attrs:
node_class = node.attrs['CLASS']
if node_class in ('EARRAY', 'VLARRAY'):
actual_shape = node.node.shape
node.attrs['shape'] = actual_shape
[docs]
def finalize_swmr(self, keep_open=False):
"""End SWMR by closing the file, reopening in 'a' mode, and reconciling deferred attrs.
After SWMR mode, attrs['shape'] on EARRAY/VLARRAY nodes may be stale.
This method closes the file (ending SWMR), reopens it in append mode,
and updates all deferred attributes.
Parameters
----------
keep_open : bool
If True, leaves the file open in 'a' mode after reconciling.
If False (default), closes the file after reconciling.
"""
file_path = Path(self.file_path)
self.close_file()
self.open_file(file_path, mode='a')
self.reconcile_swmr_attrs()
if not keep_open:
self.close_file()
[docs]
def define_compression(self, compression, compression_opts):
"""Define cmpression library and level of compression
Parameters
----------
compression: (str) either gzip and zlib are supported here as they are compatible
but zlib is used by pytables while gzip is used by h5py
compression_opts (int) : 0 to 9 0: None, 9: maximum compression
"""
#
if self.backend == 'tables':
if compression == 'gzip':
compression = 'zlib'
self.compression = self.h5_library.Filters(complevel=compression_opts, complib=compression)
else:
if compression == 'zlib':
compression = 'gzip'
self.compression = dict(compression=compression, compression_opts=compression_opts)
[docs]
def get_set_group(self, where, name, title='', **kwargs):
"""Retrieve or create (if absent) a node group
Get attributed to the class attribute ``current_group``
Parameters
----------
where: str or node
path or parent node instance
name: str
group node name
title: str
node title
Keyword Arguments:
any other metadata related to this node (for example: origin)
Returns
-------
group: group node
"""
if isinstance(where, Node):
where = where.node
if name not in list(self.get_children(where)):
if self.backend == 'tables':
group = self._h5file.create_group(where, name, title)
for key, value in kwargs.items():
if not hasattr(group._v_attrs, key):
group._v_attrs[key] = value
else:
group = self.get_node(where).node.create_group(name)
group.attrs['TITLE'] = title
group.attrs['CLASS'] = 'GROUP'
for key, value in kwargs.items():
if not hasattr(group.attrs, key):
group.attrs[key] = value
else:
group = self.get_node(where, name)
return GROUP(group, self.backend)
[docs]
def get_group_by_title(self, where, title):
if isinstance(where, Node):
where = where.node
node = self.get_node(where).node
for child_name in self.get_children(node):
child = node[child_name]
if 'TITLE' in self.get_attr(child):
if self.get_attr(child, 'TITLE') == title and self.get_attr(child, 'CLASS') == 'GROUP':
return GROUP(child, self.backend)
return None
[docs]
def is_node_in_group(self, where, name):
"""
Check if a given node with name is in the group defined by where (comparison on lower case strings)
Parameters
----------
where: (str or node)
path or parent node instance
name: (str)
group node name
Returns
-------
bool
True if node exists, False otherwise
"""
if isinstance(where, Node):
where = where.node
return name.lower() in [name.lower() for name in self.get_children(where)]
[docs]
def get_node(self, where, name=None) -> Node:
if isinstance(where, Node):
where = where.node
try:
if self.backend == 'tables':
node = self._h5file.get_node(where, name)
else:
if name is not None:
if isinstance(where, str):
where += f'/{name}'
node = self._h5file.get(where)
else:
where = where.get(name)
node = where
else:
if isinstance(where, str):
node = self._h5file.get(where)
else:
node = where
except Exception as e:
raise NodeError(str(e))
if node is None:
raise NodeError(f'Node {where} (name={name}) does not exist')
if 'CLASS' not in self.get_attr(node):
klass = self._infer_class(node)
if self.backend == 'tables':
node._v_attrs['CLASS'] = klass
else:
node.attrs['CLASS'] = klass
else:
klass = self.get_attr(node, 'CLASS')
if 'ARRAY' not in klass:
return GROUP(node, self.backend)
elif klass == 'CARRAY':
return CARRAY(node, self.backend)
elif klass == 'EARRAY':
return EARRAY(node, self.backend)
elif klass == 'VLARRAY':
if self.get_attr(node, 'subdtype') == 'string':
return StringARRAY(node, self.backend)
else:
return VLARRAY(node, self.backend)
def _infer_class(self, node) -> str:
"""Infer the CLASS attribute of a node that is missing it
PyTables always writes a CLASS attribute on every node, but a node written
through h5py/h5pyd can be missing it (for instance an interrupted write).
Falling back to 'GROUP' regardless of the actual object type would mislabel
datasets and break further tree traversal, so infer the class from the
underlying object instead.
"""
if self.backend == 'tables' or isinstance(node, self.h5_library.Group):
return 'GROUP'
if node.dtype.kind == 'O':
return 'VLARRAY'
if node.maxshape and node.maxshape[0] is None:
return 'EARRAY'
return 'CARRAY'
[docs]
def get_node_name(self, node):
"""return node name
Parameters
----------
node (str or node instance), see h5py and pytables documentation on nodes
Returns
-------
str: name of the node
"""
if isinstance(node, Node):
node = node.node
return self.get_node(node).name
[docs]
def get_node_path(self, node):
"""return node path
Parameters
----------
node (str or node instance), see h5py and pytables documentation on nodes
Returns
-------
str : full path of the node
"""
if isinstance(node, Node):
node = node.node
return self.get_node(node).path
[docs]
def get_parent_node(self, node):
if node == self.root():
return None
if isinstance(node, Node):
node = node.node
if self.backend == 'tables':
return self.get_node(node._v_parent)
else:
return self.get_node(node.parent)
[docs]
def get_children(self, where):
"""Get a dict containing all children node hanging from where with their name as keys and types among Node,
CARRAY, EARRAY, VLARRAY or StringARRAY
Parameters
----------
where (str or node instance)
see h5py and pytables documentation on nodes, and Node objects of this module
Returns
-------
dict: keys are children node names, values are the children nodes
See Also
--------
:meth:`.GROUP.children_name`
"""
where = self.get_node(where) # return a node object in case where is a string
if isinstance(where, Node):
where = where.node
mod = importlib.import_module('.backends', 'pymodaq_data.h5modules')
children = dict([])
if self.backend == 'tables':
for child_name, child in where._v_children.items():
klass = get_attr(child, 'CLASS', self.backend)
if 'ARRAY' in klass:
_cls = getattr(mod, klass)
else:
_cls = GROUP
children[child_name] = _cls(child, self.backend)
else:
for child_name, child in where.items():
try:
klass = get_attr(child, 'CLASS', self.backend)
except KeyError:
klass = self._infer_class(child)
child.attrs['CLASS'] = klass
if 'ARRAY' in klass:
_cls = getattr(mod, klass)
else:
_cls = GROUP
children[child_name] = _cls(child, self.backend)
return children
[docs]
def walk_nodes(self, where):
where = self.get_node(where) # return a node object in case where is a string
yield where
for gr in self.walk_groups(where):
for child in self.get_children(gr).values():
yield child
[docs]
def walk_groups(self, where):
where = self.get_node(where) # return a node object in case where is a string
if where.attrs['CLASS'] != 'GROUP':
return None
if self.backend == 'tables':
for ch in self.h5file.walk_groups(where.node):
yield self.get_node(ch)
else:
stack = [where]
yield where
while stack:
obj = stack.pop()
children = [child for child in self.get_children(obj).values() if child.attrs['CLASS'] == 'GROUP']
for child in children:
stack.append(child)
yield child
[docs]
def read(self, array, *args, **kwargs):
if isinstance(array, CARRAY):
array = array.array
if self.backend == 'tables':
return array.read()
else:
return array[:]
[docs]
def create_carray(self, where, name, obj=None, title=''):
if isinstance(where, Node):
where = where.node
title = str(title)
if obj is None:
raise ValueError('Data to be saved as carray cannot be None')
dtype = obj.dtype
if self.backend == 'tables':
array = CARRAY(self._h5file.create_carray(where, name, obj=obj,
title=title,
filters=self.compression), self.backend)
else:
if self.compression is not None:
array = CARRAY(self.get_node(where).node.create_dataset(name, data=obj, **self.compression),
self.backend)
else:
array = CARRAY(self.get_node(where).node.create_dataset(name, data=obj), self.backend)
array.array.attrs['TITLE'] = title
array.array.attrs[
'CLASS'] = 'CARRAY' # direct writing using h5py to be compatible with pytable automatic class writing as binary
array.attrs['shape'] = obj.shape
array.attrs['dtype'] = dtype.name
array.attrs['subdtype'] = ''
array.attrs['backend'] = self.backend
return array
[docs]
def create_earray(self, where, name, dtype, data_shape=None, title=''):
"""create enlargeable arrays from data with a given shape and of a given type. The array is enlargeable along
the first dimension
"""
if isinstance(where, Node):
where = where.node
title = str(title)
dtype = np.dtype(dtype)
shape = [0]
if data_shape is not None:
shape.extend(list(data_shape))
shape = tuple(shape)
if self.backend == 'tables':
atom = self.h5_library.Atom.from_dtype(dtype)
array = EARRAY(self._h5file.create_earray(where, name, atom, shape=shape, title=title,
filters=self.compression), self.backend)
else:
maxshape = [None]
if data_shape is not None:
maxshape.extend(list(data_shape))
maxshape = tuple(maxshape)
if self.compression is not None:
array = EARRAY(
self.get_node(where).node.create_dataset(name, shape=shape, dtype=dtype, maxshape=maxshape,
**self.compression), self.backend)
else:
array = EARRAY(
self.get_node(where).node.create_dataset(name, shape=shape, dtype=dtype, maxshape=maxshape),
self.backend)
array.array.attrs['TITLE'] = title
array.array.attrs[
'CLASS'] = 'EARRAY' # direct writing using h5py to be compatible with pytable automatic class writing as binary
array.array.attrs['EXTDIM'] = 0
array.attrs['shape'] = shape
array.attrs['dtype'] = dtype.name
array.attrs['subdtype'] = ''
array.attrs['backend'] = self.backend
return array
[docs]
def create_vlarray(self, where, name, dtype, title=''):
"""create variable data length and type and enlargeable 1D arrays
Parameters
----------
where: (str) group location in the file where to create the array node
name: (str) name of the array
dtype: (dtype) numpy dtype style, for particular case of strings, use dtype='string'
title: (str) node title attribute (written in capitals)
Returns
-------
array
"""
if isinstance(where, Node):
where = where.node
title = str(title)
if dtype == 'string':
dtype = np.dtype(np.uint8)
subdtype = 'string'
else:
dtype = np.dtype(dtype)
subdtype = ''
if self.backend == 'tables':
atom = self.h5_library.Atom.from_dtype(dtype)
if subdtype == 'string':
array = StringARRAY(self._h5file.create_vlarray(where, name, atom, title=title,
filters=self.compression), self.backend)
else:
array = VLARRAY(self._h5file.create_vlarray(where, name, atom, title=title,
filters=self.compression), self.backend)
else:
maxshape = (None,)
if self.backend == 'h5py':
dt = self.h5_library.vlen_dtype(dtype)
else:
dt = h5pyd.special_dtype(dtype)
if self.compression is not None:
if subdtype == 'string':
array = StringARRAY(self.get_node(where).node.create_dataset(name, (0,), dtype=dt,
**self.compression, maxshape=maxshape),
self.backend)
else:
array = VLARRAY(self.get_node(where).node.create_dataset(name, (0,), dtype=dt, **self.compression,
maxshape=maxshape), self.backend)
else:
if subdtype == 'string':
array = StringARRAY(self.get_node(where).node.create_dataset(name, (0,), dtype=dt,
maxshape=maxshape), self.backend)
else:
array = VLARRAY(self.get_node(where).node.create_dataset(name, (0,), dtype=dt,
maxshape=maxshape), self.backend)
array.array.attrs['TITLE'] = title
array.array.attrs[
'CLASS'] = 'VLARRAY' # direct writing using h5py to be compatible with pytable automatic class writing as binary
array.array.attrs['EXTDIM'] = 0
array.attrs['shape'] = (0,)
array.attrs['dtype'] = dtype.name
array.attrs['subdtype'] = subdtype
array.attrs['backend'] = self.backend
return array
[docs]
def add_group(self, group_name, group_type: Union[GroupType, str], where, title='', metadata=None) -> GROUP:
"""
Add a node in the h5 file tree of the group type
Parameters
----------
group_name: (str) a custom name for this group
group_type: str or GroupType enum
one of the possible values of GroupType, should be enforced by higher level modules not here
where: (str or node) parent node where to create the new group
metadata: (dict) extra metadata to be saved with this new group node
Returns
-------
(node): newly created group node
"""
if metadata is None:
metadata = {}
if isinstance(where, Node):
where = where.node
if isinstance(group_type, Enum):
group_type = group_type.name
if group_name in self.get_children(self.get_node(where)):
node = self.get_node(where, group_name)
else:
node = self.get_set_group(where, utils.capitalize(group_name), title)
node.attrs['type'] = group_type.lower()
for metadat in metadata:
node.attrs[metadat] = metadata[metadat]
node.attrs['backend'] = self.backend
return node