import os
import sys
import datetime
import inspect
import json
import functools
import platform
import time
from packaging import version as version_mod
from pathlib import Path
import traceback
from typing import Any, cast, List, Optional, Tuple
import numpy as np
from pymodaq_utils import logger as logger_module
from pymodaq_utils.config import GlobalConfig as Config
from pymodaq_utils.warnings import deprecation_msg
from serializall import SerializableFactory, SerializableBase
from importlib import metadata
PackageNotFoundError = metadata.PackageNotFoundError # for use elsewhere
# for use elsewhere
if version_mod.parse(platform.python_version()) >= version_mod.parse('3.9'):
# from version 3.9 the cache decorator is available
pass
else:
pass
logger = logger_module.set_logger(logger_module.get_module_name(__file__))
config = Config()
[docs]
def is_64bits():
return sys.maxsize > 2**32
[docs]
def timer(func):
"""Print the runtime of the decorated function"""
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter() # 1
value = func(*args, **kwargs)
end_time = time.perf_counter() # 2
run_time = end_time - start_time # 3
print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
return value
return wrapper_timer
[docs]
def get_version(package_name='pymodaq'):
"""Obtain the package version using the importlib metadata module
"""
return metadata.version(package_name)
[docs]
class JsonConverter:
def __init__(self):
super().__init__()
[docs]
@classmethod
def trusted_types(cls):
return ['float', 'int', 'str', 'datetime', 'date', 'time', 'tuple', 'list', 'bool', 'bytes',
'float64']
[docs]
@classmethod
def istrusted(cls, type_name):
return type_name in cls.trusted_types()
[docs]
@classmethod
def object2json(cls, obj):
dic = dict(module=type(obj).__module__, type=type(obj).__name__, data=repr(obj))
return json.dumps(dic)
[docs]
@classmethod
def json2object(cls, jsonstring):
try:
dic = json.loads(jsonstring)
if isinstance(dic, dict):
if dic['type'] in cls.trusted_types():
return eval(dic['data'])
else:
return dic
else: # pragma: no cover
return dic
except Exception:
return jsonstring
[docs]
def capitalize(string, Nfirst=1):
"""
Returns same string but with first Nfirst letters upper
Parameters
----------
string: (str)
Nfirst: (int)
Returns
-------
str
"""
return string[:Nfirst].upper() + string[Nfirst:]
[docs]
def uncapitalize(string, Nfirst=1):
return string[:Nfirst].lower() + string[Nfirst:]
[docs]
def getLineInfo():
"""get information about where the Exception has been triggered"""
tb = sys.exc_info()[2]
res = ''
for t in traceback.format_tb(tb):
res += t
return res
[docs]
@SerializableFactory.register_decorator()
class ThreadCommand(SerializableBase):
"""Generic object to pass info (command) and data (attribute) between thread or objects using signals
Parameters
----------
command: str
The command to be analysed for further action
attribute: any type
the attribute related to the command. The actual type and value depend on the command and the situation
attributes: deprecated, attribute should be used instead
Attributes
----------
command : str
The command to be analysed for further action
attribute : any type
the attribute related to the command. The actual type and value depend on the command and the situation
args: some variables in a list
kwargs: some variables in a dict
"""
command: str
attribute: Any
args: list
kwargs: dict
def __init__(self, command: str, attribute=None, attributes=None, args=(), kwargs: Optional[dict] = None):
if not isinstance(command, str):
raise TypeError(f'The command in a Threadcommand object should be a string, not a {type(command)}')
self.command = command
if attribute is None and attributes is not None:
deprecation_msg('ThreadCommand signature changed, use attribute in place of attributes')
self.attribute = attributes
self.attributes = attributes
self.attribute = attribute
self.args = args
self.kwargs = {} if kwargs is None else kwargs
def __eq__(self, other: Any) -> bool:
if not isinstance(other, ThreadCommand):
return NotImplemented
return (
self.command == other.command
and self.attribute == other.attribute
and self.args == other.args
and self.kwargs == other.kwargs
)
[docs]
@staticmethod
def serialize(obj: "ThreadCommand") -> bytes: # type: ignore[override]
serialize_factory = SerializableFactory()
byte_string = b""
byte_string += serialize_factory.get_apply_serializer(obj.command)
byte_string += serialize_factory.get_apply_serializer(obj.attribute)
byte_string += serialize_factory.get_apply_serializer(obj.args)
byte_string += serialize_factory.get_apply_serializer(obj.kwargs)
return byte_string
[docs]
@staticmethod
def deserialize(bytes_str: bytes) -> Tuple["ThreadCommand", bytes]:
serialize_factory = SerializableFactory()
command, remaining = cast(
Tuple[str, bytes],
serialize_factory.get_apply_deserializer(bytes_str=bytes_str, only_object=False),
)
attribute, remaining = cast(
Tuple[Any, bytes], serialize_factory.get_apply_deserializer(remaining, False),
)
args, remaining = cast(
Tuple[list, bytes],
serialize_factory.get_apply_deserializer(remaining, False),
)
kwargs, remaining = cast(
Tuple[dict, bytes],
serialize_factory.get_apply_deserializer(remaining, False),
)
return ThreadCommand(command, attribute, args=tuple(args), kwargs=kwargs), remaining
def __repr__(self):
return f'Threadcommand: {self.command} with attribute {self.attribute}'
[docs]
def ensure_ndarray(data):
"""
Make sure data is returned as a numpy array
Parameters
----------
data
Returns
-------
ndarray
"""
if not isinstance(data, np.ndarray):
if isinstance(data, list):
data = np.array(data)
else:
data = np.array([data])
return data
[docs]
def recursive_find_files_extension(ini_path, ext, paths=[]):
with os.scandir(ini_path) as it:
for entry in it:
if os.path.splitext(entry.name)[1][1:] == ext and entry.is_file():
paths.append(entry.path)
elif entry.is_dir():
recursive_find_files_extension(entry.path, ext, paths)
return paths
[docs]
def recursive_find_files(ini_path, exp='make_enum', paths=[],
filters=['build']):
for child in Path(ini_path).iterdir():
if child.is_dir():
recursive_find_files(child, exp, paths, filters)
else:
if exp in child.stem:
if not any([filt in str(child) for filt in filters]):
paths.append(child)
return paths
[docs]
def recursive_find_expr_in_files(ini_path, exp='make_enum', paths=[],
filters=['.git', '.idea', '__pycache__', 'build', 'egg', 'documentation', '.tox'],
replace=False, replace_str=''):
for child in Path(ini_path).iterdir():
if not any(filt in str(child) for filt in filters):
if child.is_dir():
recursive_find_expr_in_files(child, exp, paths, filters, replace=replace, replace_str=replace_str)
else:
try:
found = False
with child.open('r') as f:
replacement = ''
for ind, line in enumerate(f):
if exp in line:
found = True
paths.append([child, ind, line])
if replace:
replacement += line.replace(exp, replace_str)
else:
if replace:
replacement += line
if replace and found:
with child.open('w') as f:
f.write(replacement)
except Exception:
pass
return paths
[docs]
def count_lines(ini_path, count=0, filters=['lextab', 'yacctab','pycache', 'pyc']):
# if Path(ini_path).is_file():
# with Path(ini_path).open('r') as f:
# count += len(f.readlines())
# return count
for child in Path(ini_path).iterdir():
if child.is_dir():
count = count_lines(child, count)
else:
try:
if not any([filt in child.name for filt in filters]):
if '.py' in child.name:
with child.open('r') as f:
count += len(f.readlines())
else:
print(child.stem)
except Exception:
pass
return count
[docs]
def remove_spaces(string):
"""
return a string without any white spaces in it
Parameters
----------
string
Returns
-------
"""
return ''.join(string.split())
[docs]
def rint(x):
"""
almost same as numpy rint function but return an integer
Parameters
----------
x: (float or integer)
Returns
-------
nearest integer
"""
return int(np.rint(x))
[docs]
def elt_as_first_element(elt_list, match_word='Mock'):
if not hasattr(elt_list, '__iter__'):
raise TypeError('elt_list must be an iterable')
if elt_list:
ind_elt = 0
for ind, elt in enumerate(elt_list):
if not isinstance(elt, str):
raise TypeError('elt_list must be a list of str')
if match_word in elt:
ind_elt = ind
break
plugin_match = elt_list[ind_elt]
elt_list.remove(plugin_match)
plugins = [plugin_match]
plugins.extend(elt_list)
else:
plugins = []
return plugins
[docs]
def elt_as_first_element_dicts(elt_list, match_word='Mock', key='name'):
if not hasattr(elt_list, '__iter__'):
raise TypeError('elt_list must be an iterable')
if elt_list:
ind_elt = 0
for ind, elt in enumerate(elt_list):
if not isinstance(elt, dict):
raise TypeError('elt_list must be a list of dicts')
if match_word in elt[key]:
ind_elt = ind
break
plugin_match = elt_list[ind_elt]
elt_list.remove(plugin_match)
plugins = [plugin_match]
plugins.extend(elt_list)
else:
plugins = []
return plugins
[docs]
def find_keys_from_val(dict_tmp: dict, val: object):
"""Returns the keys from a dict if its value is matching val"""
return [k for k, v in dict_tmp.items() if v == val]
[docs]
def find_object_if_matched_attr_name_val(obj, attr_name, attr_value):
"""check if an attribute key/value pair match in a given object
Parameters
----------
obj: object
attr_name: str
attribute name to look for in the object
attr_value: object
value to match
Returns
-------
bool: True if the key/value pair has been found in dict_tmp
"""
if hasattr(obj, attr_name):
if getattr(obj, attr_name) == attr_value:
return True
return False
[docs]
def find_objects_in_list_from_attr_name_val(objects: List[object], attr_name: str,
attr_value: object, return_first=True):
""" lookup within a list of objects. Look for the objects within the list which has the correct attribute name,
value pair
Parameters
----------
objects: list
list of objects
attr_name: str
attribute name to look for in the object
attr_value: object
value to match
return_first: bool
if True return the first objects found in the list else all the objects matching
Returns
-------
list of tuple(object, int): object and index or list of object and indexes
"""
selection = []
obj = None
for ind, obj_tmp in enumerate(objects):
if find_object_if_matched_attr_name_val(obj_tmp, attr_name, attr_value):
obj = obj_tmp
if return_first:
break
else:
selection.append((obj_tmp, ind))
if obj is None:
if return_first:
return None, -1
else:
return []
else:
if return_first:
return obj, ind
else:
return selection
[docs]
def find_dict_if_matched_key_val(dict_tmp, key, value):
"""
check if a key/value pair match in a given dictionary
Parameters
----------
dict_tmp: (dict) the dictionary to be tested
key: (str) a key string to look for in dict_tmp
value: (object) any python object
Returns
-------
bool: True if the key/value pair has been found in dict_tmp
"""
if key in dict_tmp:
if dict_tmp[key] == value:
return True
return False
[docs]
def find_dicts_in_list_from_key_val(dicts, key, value):
""" lookup within a list of dicts. Look for the dicts within the list which have the correct key, value pair
Parameters
----------
dicts: (list) list of dictionnaries
key: (str) specific key to look for in each dict
value: value to match
Returns
-------
dict: if found otherwise returns None
"""
selection = []
for ind, dict_tmp in enumerate(dicts):
if find_dict_if_matched_key_val(dict_tmp, key, value):
selection.append(dict_tmp)
return selection
[docs]
def find_dict_in_list_from_key_val(dicts, key, value, return_index=False):
""" lookup within a list of dicts. Look for the dict within the list which has the correct key, value pair
Parameters
----------
dicts: (list) list of dictionnaries
key: (str) specific key to look for in each dict
value: value to match
Returns
-------
dict: if found otherwise returns None
"""
for ind, dict_tmp in enumerate(dicts):
if find_dict_if_matched_key_val(dict_tmp, key, value):
if return_index:
return dict_tmp, ind
else:
return dict_tmp
if return_index:
return None, -1
else:
return None
[docs]
def get_entrypoints(group='pymodaq.plugins') -> List[metadata.EntryPoint]:
""" Get the list of modules defined from a group entry point
Because of evolution in the package, one or another of the forms below may be deprecated.
We start from the newer way down to the older
Parameters
----------
group: str
the name of the group
"""
try:
discovered_entrypoints = metadata.entry_points(group=group)
except TypeError:
try:
discovered_entrypoints = metadata.entry_points().select(group=group)
except AttributeError:
discovered_entrypoints = metadata.entry_points().get(group, [])
if isinstance(discovered_entrypoints, tuple): # API for python > 3.8
discovered_entrypoints = list(discovered_entrypoints)
if not isinstance(discovered_entrypoints, list):
discovered_entrypoints = list(discovered_entrypoints)
return discovered_entrypoints
[docs]
def check_vals_in_iterable(iterable1, iterable2):
assert len(iterable1) == len(iterable2)
iterable1 = list(iterable1) # so the assertion below is valid for any kind of iterable, list, tuple, ndarray...
iterable2 = list(iterable2)
for val1, val2 in zip(iterable1, iterable2):
assert val1 == val2
[docs]
def caller_name(skip=2):
"""Get a name of a caller in the format module.class.method
`skip` specifies how many levels of stack to skip while getting caller
name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
An empty string is returned if skipped levels exceed stack height
"""
stack = inspect.stack()
start = 0 + skip
if len(stack) < start + 1:
return ''
parentframe = stack[start][0]
name = []
module = inspect.getmodule(parentframe)
# `modname` can be None when frame is executed directly in console
# TODO(techtonik): consider using __main__
if module:
name.append(module.__name__)
# detect classname
if 'self' in parentframe.f_locals:
# I don't know any way to detect call from the object method
# XXX: there seems to be no way to detect static method call - it will
# be just a function call
name.append(parentframe.f_locals['self'].__class__.__name__)
codename = parentframe.f_code.co_name
if codename != '<module>': # top level usually
name.append(codename) # function or a method
del parentframe
return ".".join(name)
[docs]
def zeros_aligned(n, align, dtype=np.uint32):
"""
Get aligned memory array wih alignment align.
Parameters
----------
n: (int) length in dtype bytes of memory
align: (int) memory alignment
dtype: (numpy.dtype) type of the stored memory elements
Returns
-------
"""
dtype = np.dtype(dtype)
nbytes = n * dtype.itemsize
buff = np.zeros(nbytes + align, dtype=np.uint8)
start_index = -buff.ctypes.data % align
return buff[start_index:start_index + nbytes].view(dtype)
# ########################
# #File management
[docs]
def get_new_file_name(base_path: Path, base_name: str = 'tttr_data'):
if isinstance(base_path, str):
base_path = Path(base_path)
today = datetime.datetime.now()
date = today.strftime('%Y%m%d')
year = today.strftime('%Y')
year_dir = base_path.joinpath(year)
if not year_dir.is_dir():
year_dir.mkdir()
curr_dir = base_path.joinpath(year, date)
if not curr_dir.is_dir():
curr_dir.mkdir()
files = []
for entry in curr_dir.iterdir():
if entry.name.startswith(base_name) and entry.is_file():
files.append(entry.stem)
files.sort()
if not files:
index = 0
else:
index = int(files[-1][-3:]) + 1
file = f'{base_name}_{index:03d}'
return file, curr_dir
[docs]
def get_module_path(module_name: str) -> Path:
""" Get Path of a given module represented by its name
You get the name usually with the dunder __module__ on an object or class
"""
module = sys.modules[module_name]
return Path(module.__file__)
if __name__ == '__main__':
#plugins = get_plugins() # pragma: no cover
#extensions = get_extension()
#models = get_models()
#count = count_lines('C:\\Users\\weber\\Labo\\Programmes Python\\PyMoDAQ_Git\\pymodaq\src')
# import license
# mit = license.find('MIT')
#
paths = recursive_find_expr_in_files(r'C:\Users\weber\Labo\ProgrammesPython\PyMoDAQ_Git',
exp="'multiaxes'",
paths=[],
filters=['.git', '.idea', '__pycache__', 'build', 'egg', 'documentation',
'.tox'],
replace=False,
replace_str="pymodaq.utils")
#get_version()
pass
# paths = recursive_find_files('C:\\Users\\weber\\Labo\\Programmes Python\\PyMoDAQ_Git',
# exp='VERSION', paths=[])
# import version
# for file in paths:
# with open(str(file), 'r') as f:
# v = version.Version(f.read())
# v.minor += 1
# v.patch = 0
# with open(str(file), 'w') as f:
# f.write(str(v))
# for file in paths:
# with open(str(file), 'w') as f:
# f.write(mit.render(name='Sebastien Weber', email='sebastien.weber@cemes.fr'))