import os
import sys
import datetime
import importlib
import inspect
import json
import functools
import platform
import re
import time
import warnings
from packaging import version as version_mod
from pathlib import Path
import pkgutil
import traceback
import platform
from typing import Union, List
from typing import Iterable as IterableType
from collections.abc import Iterable
import numpy as np
from qtpy import QtCore
from qtpy.QtCore import QLocale
from pymodaq.utils import logger as logger_module
from pymodaq.utils.config import get_set_preset_path, Config
from pymodaq.utils.messenger import deprecation_msg
from pymodaq.utils.qvariant import QVariant
if version_mod.parse(platform.python_version()) >= version_mod.parse('3.8'): # from version 3.8 this feature is included in the
# standard lib
from importlib import metadata
else:
import importlib_metadata as metadata
if version_mod.parse(platform.python_version()) >= version_mod.parse('3.9'):
# from version 3.9 the cache decorator is available
from functools import cache
else:
from functools import lru_cache as cache
if version_mod.parse(platform.python_version()) >= version_mod.parse('3.9'):
# from version 3.9 the cache decorator is available
from functools import cache
else:
from functools import lru_cache as cache
logger = logger_module.set_logger(logger_module.get_module_name(__file__))
plot_colors = [(255, 255, 255), (255, 0, 0), (0, 255, 0), (0, 0, 255), (14, 207, 189), (207, 14, 166), (207, 204, 14)]
config = Config()
def is_64bits():
return sys.maxsize > 2**32
def timer(func):
"""Print the runtime of the decorated function"""
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter() # 1
value = func(*args, **kwargs)
end_time = time.perf_counter() # 2
run_time = end_time - start_time # 3
print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
return value
return wrapper_timer
def get_version():
"""Obtain pymodaq version from the VERSION file
Follows the layout from the packaging tool hatch, hatchling
"""
DEFAULT_PATTERN = r'(?i)^(__version__|VERSION) *= *([\'"])v?(?P<version>.+?)\2'
with open(str(Path(__file__).parent.parent.joinpath('resources/VERSION')), 'r') as fvers:
contents = fvers.read().strip()
match = re.search(DEFAULT_PATTERN, contents, flags=re.MULTILINE)
groups = match.groupdict()
if 'version' not in groups:
raise ValueError('no group named `version` was defined in the pattern')
return groups['version']
def copy_preset(): # pragma: no cover
path = get_set_preset_path().joinpath('preset_default.xml')
if not path.exists(): # copy the preset_default from pymodaq folder and create one in pymodad's local folder
with open(str(Path(__file__).parent.parent.joinpath('resources/preset_default.xml')), 'r') as file:
path.write_text(file.read())
def set_qt_backend():
backend_present = True
if config('qtbackend', 'backend').lower() not in [mod.lower() for mod in sys.modules]:
backend_present = False
logger.warning(f"The chosen Qt backend ({config('qtbackend', 'backend')}) has not been installed...\n"
f"Trying another...")
backends = config('qtbackend', 'backends')
backends.pop(config('qtbackend', 'backend'))
for backend in backends:
if backend.lower() in [mod.lower() for mod in sys.modules]:
backend_present = True
break
if backend_present:
os.environ['QT_API'] = config('qtbackend', 'backend')
logger.info('************************')
logger.info(f"{config('qtbackend', 'backend')} Qt backend loaded")
logger.info('************************')
else:
msg = f"No Qt backend could be found in your system, please install either pyqt5/6 or pyside2/6." \
f"pyqt5 is still preferred, while pyqt6 should mostly work."
logger.critical(msg)
warnings.warn(msg, FutureWarning, 3)
print(msg.upper())
class JsonConverter:
def __init__(self):
super().__init__()
@classmethod
def trusted_types(cls):
return ['float', 'int', 'str', 'datetime', 'date', 'time', 'tuple', 'list', 'bool', 'bytes',
'float64']
@classmethod
def istrusted(cls, type_name):
return type_name in cls.trusted_types()
@classmethod
def object2json(cls, obj):
dic = dict(module=type(obj).__module__, type=type(obj).__name__, data=repr(obj))
return json.dumps(dic)
@classmethod
def json2object(cls, jsonstring):
try:
dic = json.loads(jsonstring)
if isinstance(dic, dict):
if dic['type'] in cls.trusted_types():
return eval(dic['data'])
else:
return dic
else: # pragma: no cover
return dic
except Exception:
return jsonstring
def decode_data(encoded_data):
"""
Decode QbyteArrayData generated when drop items in table/tree/list view
Parameters
----------
encoded_data: QByteArray
Encoded data of the mime data to be dropped
Returns
-------
data: list
list of dict whose key is the QtRole in the Model, and the value a QVariant
"""
data = []
ds = QtCore.QDataStream(encoded_data, QtCore.QIODevice.ReadOnly)
while not ds.atEnd():
row = ds.readInt32()
col = ds.readInt32()
map_items = ds.readInt32()
item = {}
for ind in range(map_items):
key = ds.readInt32()
#TODO check this is fine
value = QVariant()
#value = None
ds >> value
item[QtCore.Qt.ItemDataRole(key)] = value.value()
data.append(item)
return data
#############################
def capitalize(string, Nfirst=1):
"""
Returns same string but with first Nfirst letters upper
Parameters
----------
string: (str)
Nfirst: (int)
Returns
-------
str
"""
return string[:Nfirst].upper() + string[Nfirst:]
def uncapitalize(string, Nfirst=1):
return string[:Nfirst].lower() + string[Nfirst:]
def getLineInfo():
"""get information about where the Exception has been triggered"""
tb = sys.exc_info()[2]
res = ''
for t in traceback.format_tb(tb):
res += t
return res
[docs]class ThreadCommand:
"""Generic object to pass info (command) and data (attribute) between thread or objects using signals
Parameters
----------
command: str
The command to be analysed for further action
attribute: any type
the attribute related to the command. The actual type and value depend on the command and the situation
attributes: deprecated, attribute should be used instead
Attributes
----------
command : str
The command to be analysed for further action
attribute : any type
the attribute related to the command. The actual type and value depend on the command and the situation
"""
def __init__(self, command: str, attribute=None, attributes=None):
if not isinstance(command, str):
raise TypeError(f'The command in a Threadcommand object should be a string, not a {type(command)}')
self.command = command
if attribute is None and attributes is not None:
deprecation_msg('ThreadCommand signature changed, use attribute in place of attribute')
self.attribute = attributes
self.attributes = attributes
self.attribute = attribute
def __repr__(self):
return f'Threadcommand: {self.command} with attribute {self.attribute}'
def ensure_ndarray(data):
"""
Make sure data is returned as a numpy array
Parameters
----------
data
Returns
-------
ndarray
"""
if not isinstance(data, np.ndarray):
if isinstance(data, list):
data = np.array(data)
else:
data = np.array([data])
return data
def setLocale():
"""
defines the Locale to use to convert numbers to strings representation using language/country conventions
Default is English and US
"""
language = getattr(QLocale, config('style', 'language'))
country = getattr(QLocale, config('style', 'country'))
QLocale.setDefault(QLocale(language, country))
def recursive_find_files_extension(ini_path, ext, paths=[]):
with os.scandir(ini_path) as it:
for entry in it:
if os.path.splitext(entry.name)[1][1:] == ext and entry.is_file():
paths.append(entry.path)
elif entry.is_dir():
recursive_find_files_extension(entry.path, ext, paths)
return paths
def recursive_find_files(ini_path, exp='make_enum', paths=[],
filters=['build']):
for child in Path(ini_path).iterdir():
if child.is_dir():
recursive_find_files(child, exp, paths, filters)
else:
if exp in child.stem:
if not any([filt in str(child) for filt in filters]):
paths.append(child)
return paths
def recursive_find_expr_in_files(ini_path, exp='make_enum', paths=[],
filters=['.git', '.idea', '__pycache__', 'build', 'egg', 'documentation', '.tox'],
replace=False, replace_str=''):
for child in Path(ini_path).iterdir():
if not any(filt in str(child) for filt in filters):
if child.is_dir():
recursive_find_expr_in_files(child, exp, paths, filters, replace=replace, replace_str=replace_str)
else:
try:
found = False
with child.open('r') as f:
replacement = ''
for ind, line in enumerate(f):
if exp in line:
found = True
paths.append([child, ind, line])
if replace:
replacement += line.replace(exp, replace_str)
else:
if replace:
replacement += line
if replace and found:
with child.open('w') as f:
f.write(replacement)
except Exception:
pass
return paths
def count_lines(ini_path, count=0, filters=['lextab', 'yacctab','pycache', 'pyc']):
# if Path(ini_path).is_file():
# with Path(ini_path).open('r') as f:
# count += len(f.readlines())
# return count
for child in Path(ini_path).iterdir():
if child.is_dir():
count = count_lines(child, count)
else:
try:
if not any([filt in child.name for filt in filters]):
if '.py' in child.name:
with child.open('r') as f:
count += len(f.readlines())
else:
print(child.stem)
except Exception:
pass
return count
def remove_spaces(string):
"""
return a string without any white spaces in it
Parameters
----------
string
Returns
-------
"""
return ''.join(string.split())
def rint(x):
"""
almost same as numpy rint function but return an integer
Parameters
----------
x: (float or integer)
Returns
-------
nearest integer
"""
return int(np.rint(x))
def elt_as_first_element(elt_list, match_word='Mock'):
if not hasattr(elt_list, '__iter__'):
raise TypeError('elt_list must be an iterable')
if elt_list:
ind_elt = 0
for ind, elt in enumerate(elt_list):
if not isinstance(elt, str):
raise TypeError('elt_list must be a list of str')
if match_word in elt:
ind_elt = ind
break
plugin_match = elt_list[ind_elt]
elt_list.remove(plugin_match)
plugins = [plugin_match]
plugins.extend(elt_list)
else:
plugins = []
return plugins
def elt_as_first_element_dicts(elt_list, match_word='Mock', key='name'):
if not hasattr(elt_list, '__iter__'):
raise TypeError('elt_list must be an iterable')
if elt_list:
ind_elt = 0
for ind, elt in enumerate(elt_list):
if not isinstance(elt, dict):
raise TypeError('elt_list must be a list of dicts')
if match_word in elt[key]:
ind_elt = ind
break
plugin_match = elt_list[ind_elt]
elt_list.remove(plugin_match)
plugins = [plugin_match]
plugins.extend(elt_list)
else:
plugins = []
return plugins
def find_keys_from_val(dict_tmp: dict, val: object):
"""Returns the keys from a dict if its value is matching val"""
return [k for k, v in dict_tmp.items() if v == val]
def find_object_if_matched_attr_name_val(obj, attr_name, attr_value):
"""check if an attribute key/value pair match in a given object
Parameters
----------
obj: object
attr_name: str
attribute name to look for in the object
attr_value: object
value to match
Returns
-------
bool: True if the key/value pair has been found in dict_tmp
"""
if hasattr(obj, attr_name):
if getattr(obj, attr_name) == attr_value:
return True
return False
def find_objects_in_list_from_attr_name_val(objects: List[object], attr_name: str,
attr_value: object, return_first=True):
""" lookup within a list of objects. Look for the objects within the list which has the correct attribute name,
value pair
Parameters
----------
objects: list
list of objects
attr_name: str
attribute name to look for in the object
attr_value: object
value to match
return_first: bool
if True return the first objects found in the list else all the objects matching
Returns
-------
list of tuple(object, int): object and index or list of object and indexes
"""
selection = []
obj = None
for ind, obj_tmp in enumerate(objects):
if find_object_if_matched_attr_name_val(obj_tmp, attr_name, attr_value):
obj = obj_tmp
if return_first:
break
else:
selection.append((obj_tmp, ind))
if obj is None:
if return_first:
return None, -1
else:
return []
else:
if return_first:
return obj, ind
else:
return selection
def find_dict_if_matched_key_val(dict_tmp, key, value):
"""
check if a key/value pair match in a given dictionnary
Parameters
----------
dict_tmp: (dict) the dictionnary to be tested
key: (str) a key string to look for in dict_tmp
value: (object) any python object
Returns
-------
bool: True if the key/value pair has been found in dict_tmp
"""
if key in dict_tmp:
if dict_tmp[key] == value:
return True
return False
def find_dict_in_list_from_key_val(dicts, key, value, return_index=False):
""" lookup within a list of dicts. Look for the dict within the list which has the correct key, value pair
Parameters
----------
dicts: (list) list of dictionnaries
key: (str) specific key to look for in each dict
value: value to match
Returns
-------
dict: if found otherwise returns None
"""
for ind, dict_tmp in enumerate(dicts):
if find_dict_if_matched_key_val(dict_tmp, key, value):
if return_index:
return dict_tmp, ind
else:
return dict_tmp
if return_index:
return None, -1
else:
return None
def get_entrypoints(group='pymodaq.plugins') -> List[metadata.EntryPoint]:
""" Get the list of modules defined from a group entry point
Because of evolution in the package, one or another of the forms below may be deprecated.
We start from the newer way down to the older
Parameters
----------
group: str
the name of the group
"""
try:
discovered_entrypoints = metadata.entry_points(group=group)
except TypeError:
try:
discovered_entrypoints = metadata.entry_points().select(group=group)
except AttributeError:
discovered_entrypoints = metadata.entry_points().get(group, [])
if isinstance(discovered_entrypoints, tuple): # API for python > 3.8
discovered_entrypoints = list(discovered_entrypoints)
if not isinstance(discovered_entrypoints, list):
discovered_entrypoints = list(discovered_entrypoints)
return discovered_entrypoints
@cache
def get_instrument_plugins(): # pragma: no cover
"""
Get plugins names as a list
Parameters
----------
plugin_type: (str) plugin type either 'daq_0Dviewer', 'daq_1Dviewer', 'daq_2Dviewer', 'daq_NDviewer' or 'daq_move'
module: (module) parent module of the plugins
Returns
-------
"""
plugins_import = []
discovered_plugins = []
discovered_plugins_all = get_entrypoints(group='pymodaq.plugins') # old naming of the instrument plugins
discovered_plugins_all.extend(get_entrypoints(group='pymodaq.instruments')) # new naming convention
for entry in discovered_plugins_all:
if entry.name not in [ent.name for ent in discovered_plugins]:
discovered_plugins.append(entry)
logger.debug(f'Found {len(discovered_plugins)} installed plugins, trying to import them')
viewer_types = ['0D', '1D', '2D', 'ND']
for entrypoint in discovered_plugins:
#print(f'Looking for valid instrument plugins in package: {module.value}')
plugin_list = []
try:
try:
movemodule = importlib.import_module(f'{entrypoint.value}.daq_move_plugins', entrypoint.value)
plugin_list.extend([{'name': mod[len('daq_move') + 1:],
'module': movemodule,
'parent_module': importlib.import_module(entrypoint.value),
'type': 'daq_move'}
for mod in [mod[1] for mod in pkgutil.iter_modules([str(movemodule.path.parent)])]
if 'daq_move' in mod])
except ModuleNotFoundError:
pass
viewer_modules = {}
for vtype in viewer_types:
try:
viewer_modules[vtype] = importlib.import_module(f'{entrypoint.value}.daq_viewer_plugins.plugins_{vtype}',
entrypoint.value)
plugin_list.extend([{'name': mod[len(f'daq_{vtype}viewer') + 1:],
'module': viewer_modules[vtype],
'parent_module': importlib.import_module(entrypoint.value),
'type': f'daq_{vtype}viewer'}
for mod in [mod[1] for mod in pkgutil.iter_modules([str(viewer_modules[vtype].path.parent)])]
if f'daq_{vtype}viewer' in mod])
except ModuleNotFoundError:
pass
# check if modules are importable
for mod in plugin_list:
try:
plugin_type = mod['type']
if plugin_type == 'daq_move':
submodule = mod['module']
importlib.import_module(f'{submodule.__package__}.daq_move_{mod["name"]}')
else:
submodule = mod['module']
importlib.import_module(f'{submodule.__package__}.daq_{plugin_type[4:6]}viewer_{mod["name"]}')
plugins_import.append(mod)
except Exception as e: # pragma: no cover
"""If an error is generated at the import, then exclude this plugin"""
logger.debug(f'Impossible to import Instrument plugin {mod["name"]}'
f' from module: {mod["parent_module"].__package__}')
except Exception as e: # pragma: no cover
logger.debug(str(e))
# add utility plugin for PID
try:
pidmodule = importlib.import_module('pymodaq.extensions.pid')
mod = [{'name': 'PID',
'module': pidmodule,
'parent_module': pidmodule,
'type': 'daq_move'}]
importlib.import_module(f'{pidmodule.__package__}.daq_move_PID')
plugins_import.extend(mod)
except Exception as e:
logger.debug(f'Impossible to import PID utility plugin: {str(e)}')
return plugins_import
def get_plugins(plugin_type='daq_0Dviewer'): # pragma: no cover
"""
Get plugins names as a list
Parameters
----------
plugin_type: (str) plugin type either 'daq_0Dviewer', 'daq_1Dviewer', 'daq_2Dviewer', 'daq_NDviewer' or 'daq_move'
module: (module) parent module of the plugins
Returns
-------
"""
return [plug for plug in get_instrument_plugins() if plug['type'] == plugin_type]
def check_vals_in_iterable(iterable1, iterable2):
assert len(iterable1) == len(iterable2)
iterable1 = list(iterable1) # so the assertion below is valid for any kind of iterable, list, tuple, ndarray...
iterable2 = list(iterable2)
for val1, val2 in zip(iterable1, iterable2):
assert val1 == val2
def caller_name(skip=2):
"""Get a name of a caller in the format module.class.method
`skip` specifies how many levels of stack to skip while getting caller
name. skip=1 means "who calls me", skip=2 "who calls my caller" etc.
An empty string is returned if skipped levels exceed stack height
"""
stack = inspect.stack()
start = 0 + skip
if len(stack) < start + 1:
return ''
parentframe = stack[start][0]
name = []
module = inspect.getmodule(parentframe)
# `modname` can be None when frame is executed directly in console
# TODO(techtonik): consider using __main__
if module:
name.append(module.__name__)
# detect classname
if 'self' in parentframe.f_locals:
# I don't know any way to detect call from the object method
# XXX: there seems to be no way to detect static method call - it will
# be just a function call
name.append(parentframe.f_locals['self'].__class__.__name__)
codename = parentframe.f_code.co_name
if codename != '<module>': # top level usually
name.append(codename) # function or a method
del parentframe
return ".".join(name)
def zeros_aligned(n, align, dtype=np.uint32):
"""
Get aligned memory array wih alignment align.
Parameters
----------
n: (int) length in dtype bytes of memory
align: (int) memory alignment
dtype: (numpy.dtype) type of the stored memory elements
Returns
-------
"""
dtype = np.dtype(dtype)
nbytes = n * dtype.itemsize
buff = np.zeros(nbytes + align, dtype=np.uint8)
start_index = -buff.ctypes.data % align
return buff[start_index:start_index + nbytes].view(dtype)
# ########################
# #File management
def get_new_file_name(base_path=Path(config('data_saving', 'h5file', 'save_path')), base_name='tttr_data'):
if isinstance(base_path, str):
base_path = Path(base_path)
today = datetime.datetime.now()
date = today.strftime('%Y%m%d')
year = today.strftime('%Y')
year_dir = base_path.joinpath(year)
if not year_dir.is_dir():
year_dir.mkdir()
curr_dir = base_path.joinpath(year, date)
if not curr_dir.is_dir():
curr_dir.mkdir()
files = []
for entry in curr_dir.iterdir():
if entry.name.startswith(base_name) and entry.is_file():
files.append(entry.stem)
files.sort()
if not files:
index = 0
else:
index = int(files[-1][-3:]) + 1
file = f'{base_name}_{index:03d}'
return file, curr_dir
if __name__ == '__main__':
#paths = recursive_find_expr_in_files('C:\\Users\\weber\\Labo\\Programmes Python\\PyMoDAQ_Git', 'visa')
# for p in paths:
# print(str(p))
# v = get_version()
# pass
#plugins = get_plugins() # pragma: no cover
#extensions = get_extension()
#models = get_models()
#count = count_lines('C:\\Users\\weber\\Labo\\Programmes Python\\PyMoDAQ_Git\\pymodaq\src')
# import license
# mit = license.find('MIT')
#
# paths = recursive_find_expr_in_files(r'C:\Users\weber\Labo\Programmes Python\PyMoDAQ_Git',
# exp="cfunc",
# paths=[],
# filters=['.git', '.idea', '__pycache__', 'build', 'egg', 'documentation',
# '.tox',],
# replace=False,
# replace_str="pymodaq.utils")
#get_version()
get_instrument_plugins()
#get_plugins('daq_move')
#get_plugins('daq_0Dviewer')
pass
# paths = recursive_find_files('C:\\Users\\weber\\Labo\\Programmes Python\\PyMoDAQ_Git',
# exp='VERSION', paths=[])
# import version
# for file in paths:
# with open(str(file), 'r') as f:
# v = version.Version(f.read())
# v.minor += 1
# v.patch = 0
# with open(str(file), 'w') as f:
# f.write(str(v))
# for file in paths:
# with open(str(file), 'w') as f:
# f.write(mit.render(name='Sebastien Weber', email='sebastien.weber@cemes.fr'))