Files
eco/utilities_test.py
2025-06-24 15:33:02 +02:00

178 lines
5.7 KiB
Python

from time import sleep
import sys, select
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pathlib import Path
from typing import Any
import numpy as np
import matplotlib.pyplot as plt
from numbers import Number
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
try:
return future.result(self.timeout)
except TimeoutError:
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, "_Thread__target"):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(
*self._Thread__args, **self._Thread__kwargs
)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self):
super(PropagatingThread, self).join()
if self.exc:
raise self.exc
return self.ret
def isiter(a):
try:
iter(a)
return True
except TypeError:
return False
def roundto(v,interval):
return np.rint(v/interval)*interval
def linlog_intervals(*args, verbose=True, plot=False):
"""Get linearly and logarithmically spaced arrays from providing limits and intervals or number of intervals.
Example usages:
linlog_intervals(-1e-12,('lin',.1e-12),2e-12,('log',2e-12),1e-6,('lin',4),5e-6)
Args:
*args : limits and specifications of intervals,
limits are numbers,
specifications tuples of strings 'lin' or 'log'
and the definition if interval.
A integer is interpretet as number of intervals within the limits,
a float is interpreted asinterval size,
where in log definition, the size of the first (smallest) interval is matched.
verbose (bool, optional): _description_. Defaults to True.
plot (bool, optional): _description_. Defaults to False.
Raises:
Exception: _description_
Exception: _description_
Exception: _description_
Returns:
numpy ndarray: 1D array of intervals.
"""
limits = []
idefs = []
last_lim = False
for arg in args:
if not last_lim and isinstance(arg,Number):
limits.append(arg)
last_lim = True
elif last_lim and isiter(arg):
idefs.append(arg)
last_lim = False
else:
raise Exception('Limits need to follow interval description and vice versa')
if verbose:
print(limits,idefs)
if not len(limits)==len(idefs)+1:
raise Exception('need exactly one more limit than interval definitions!')
a = []
for i,idef in enumerate(idefs):
tlims = limits[i:i+2]
if np.diff(tlims)<=0:
raise Exception('number limits should increasing!')
if isinstance(a,np.ndarray):
if np.isclose(a[-1],tlims[0]):
a = a[:-1]
a = [a]
if idef[0] == 'lin':
if type(idef[1]) is int:
a.append(np.linspace(*tlims,idef[1]+1))
if type(idef[1]) is float:
a.append(np.arange(tlims[0],tlims[1]+idef[1],idef[1]))
if verbose:
print(f'From {tlims[0]:5g} to {a[-1][-1]:5g}: {len(a[-1])-1} linear intervals of {np.mean(np.diff(a[-1])):5g} size')
if plot:
plt.plot(np.arange(len(np.hstack(a))-len(a[-1]), len(np.hstack(a))),a[-1],'db', mfc='none')
if idef[0] == 'log':
tlims = np.asarray(tlims)
if type(idef[1]) is int:
a.append(np.logspace(*np.log10(tlims),idef[1]+1))
if type(idef[1]) is float:
intlog = np.log10(tlims[0]+idef[1])-np.log10(tlims[0])
a.append(10**np.arange(np.log10(tlims[0]),np.log10(tlims[1]),intlog))
if verbose:
intervals = np.diff(np.log10(a[-1]))
print(f'From {tlims[0]:5g} to {a[-1][-1]:5g}: {len(a[-1])-1} logarithmic intervals between {np.min(intervals):5g} and {np.max(intervals):5g} in sizes.')
if plot:
plt.plot(np.arange(len(np.hstack(a))-len(a[-1]), len(np.hstack(a))),a[-1],'or',mfc='none')
if verbose:
if not np.isclose(a[-1][-1],tlims[1]):
print(f' NB: given limit {tlims[1]:5g} computes off by {tlims[1]-a[-1][-1]:5g} from last element!')
a = np.hstack(a)
if plot:
plt.plot(np.arange(len(a)),a,'.k')
if verbose:
print(f'{len(a)} elements in total.')
return a
# TODO
# _wait_strs = '\|/-\|/-'
# class WaitInput:
# def __init__(self,text,wait_time=5,update_interval=1):
# self.text = text
# self.wait_time=wait_time
# def start(self):
# resttime = self.wait_time
# while resttime>0:
# print(f"You have {resttime} seconds to answer!")
# i, o, e = select.select( [sys.stdin], [], [], 2 )
# if (i):
# print("You said", sys.stdin.readline().strip())
# else:
# print("You said nothing!")