53 lines
2.0 KiB
Python
53 lines
2.0 KiB
Python
from time import time, sleep
|
|
|
|
from pyscan.config import max_time_tolerance
|
|
|
|
smoothing_factor = 0.95
|
|
|
|
|
|
class TimePositioner(object):
|
|
def __init__(self, time_interval, n_intervals, tolerance=None):
|
|
"""
|
|
Time interval at which to read data.
|
|
:param time_interval: Time interval in seconds.
|
|
:param n_intervals: How many intervals to measure.
|
|
"""
|
|
self.time_interval = time_interval
|
|
# Tolerance cannot be less than the min set tolerance.
|
|
if tolerance is None or tolerance < max_time_tolerance:
|
|
tolerance = max_time_tolerance
|
|
self.tolerance = tolerance
|
|
|
|
# Minimum one measurement.
|
|
if n_intervals < 1:
|
|
n_intervals = 1
|
|
self.n_intervals = n_intervals
|
|
|
|
def get_generator(self):
|
|
measurement_time_start = time()
|
|
last_time_to_sleep = 0
|
|
|
|
for _ in range(self.n_intervals):
|
|
measurement_time_stop = time()
|
|
# How much time did the measurement take.
|
|
measurement_time = measurement_time_stop - measurement_time_start
|
|
|
|
time_to_sleep = self.time_interval - measurement_time
|
|
# Use the smoothing factor to attenuate variations in the measurement time.
|
|
time_to_sleep = (smoothing_factor * time_to_sleep) + ((1-smoothing_factor) * last_time_to_sleep)
|
|
|
|
# Time to sleep is negative (more time has elapsed, we cannot achieve the requested time interval.
|
|
if time_to_sleep < (-1 * max_time_tolerance):
|
|
raise ValueError("The requested time interval cannot be achieved. Last iteration took %.2f seconds, "
|
|
"but a %.2f seconds time interval was set." % (measurement_time, self.time_interval))
|
|
|
|
# Sleep only if time to sleep is positive.
|
|
if time_to_sleep > 0:
|
|
sleep(time_to_sleep)
|
|
|
|
last_time_to_sleep = time_to_sleep
|
|
measurement_time_start = time()
|
|
|
|
# Return the timestamp at which the measurement should begin.
|
|
yield measurement_time_start
|