[WIP] ccracks / ccu4: split ccu() into nv(), he() and flow()

Change-Id: I346330a5f350bf03eefe86c8e890b59afaaaa231
This commit is contained in:
zolliker 2025-03-31 10:45:25 +02:00
parent 44750572d9
commit d231e9ce06
3 changed files with 344 additions and 290 deletions

View File

@ -12,6 +12,6 @@ rack.sensor('Ts', channel='C', calcurve='x186350')
rack.loop('T', channel='B', calcurve='x174786', output_module='htr', target=10)
rack.heater('htr', 1, '100W', 25)
rack.ccu(he=True, n2=True)
rack.hepump()
rack.he()
rack.n2()
rack.flow(min_open_pulse=0.03)

View File

@ -28,6 +28,7 @@ class Rack:
raise ConfigError(f'no rack found in {instpath}')
self.props = {} # dict (<property>, <method>) of value
self.mods = {} # dict (<property>, <method>) of list of <cfg>
self.ccu_uri = {}
def set_props(self, mod, **kwds):
for prop, method in kwds.items():
@ -88,43 +89,47 @@ class Rack:
mod = Mod(name, max_heater=max_heater, resistance=resistance, **kwds)
self.set_props(mod, io='lakeshore', device='lakeshore', description=f'heater({name})')
def ccu(self, ccu_uri=None, ccu_io='ccu_io', he=None, n2=None, **kwds):
Mod = self.modfactory
self.ccu_io = ccu_io
def ccu(self, name=None, ccu_uri=None, ccu_io='ccu_io', args_for_io=None, **kwds):
if args_for_io is None:
args_for_io, kwds = kwds, {}
prev_uri = self.ccu_uri.get(ccu_io)
ccu_uri = ccu_uri or self.config.get('ccu_uri')
# self.devname = ccu_devname
Mod(ccu_io, 'frappy_psi.ccu4.IO',
'comm. to CCU4', uri=ccu_uri)
if he:
if not isinstance(he, str): # e.g. True
he = 'He_lev'
Mod(he, cls='frappy_psi.ccu4.HeLevel',
description='the He Level', io=self.ccu_io)
if n2:
if isinstance(n2, str):
n2 = n2.split(',')
else: # e.g. True
n2 = []
n2, valve, upper, lower = n2 + ['N2_lev', 'N2_valve', 'N2_upper', 'N2_lower'][len(n2):]
print(n2, valve, upper, lower)
Mod(n2, cls='frappy_psi.ccu4.N2Level',
description='the N2 Level', io=ccu_io,
valve=valve, upper=upper, lower=lower)
Mod(valve, cls='frappy_psi.ccu4.N2FillValve',
description='LN2 fill valve', io=ccu_io)
Mod(upper, cls='frappy_psi.ccu4.N2TempSensor',
description='upper LN2 sensor')
Mod(lower, cls='frappy_psi.ccu4.N2TempSensor',
description='lower LN2 sensor')
if prev_uri:
if prev_uri == ccu_uri:
return kwds # already configured
raise ConfigError(f'rack.{name or "ccu"}: ccu_uri {prev_uri} does not match {ccu_uri}')
self.ccu_uri[ccu_io] = ccu_uri
self.modfactory(ccu_io, 'frappy_psi.ccu4.IO', 'comm. to CCU4', uri=ccu_uri, **args_for_io)
return kwds
def hepump(self, hepump_uri=None, hepump_type=None, hepump_io='hepump_io',
def he(self, name='He_lev', ccu_io='ccu_io', **kwds):
self.ccu('he', ccu_io=ccu_io, args_for_io={}, **kwds)
self.modfactory(name, cls='frappy_psi.ccu4.HeLevel',
description='the He Level', io=ccu_io, **kwds)
def n2(self, name='N2_lev', valve='N2_valve', upper='N2_upper', lower='N2_lower', ccu_io='ccu_io', **kwds):
self.ccu('n2', ccu_io=ccu_io, args_for_io={}, **kwds)
Mod = self.modfactory
Mod(name, cls='frappy_psi.ccu4.N2Level',
description='the N2 Level', io=ccu_io,
valve=valve, upper=upper, lower=lower)
Mod(valve, cls='frappy_psi.ccu4.N2FillValve',
description='LN2 fill valve', io=ccu_io)
Mod(upper, cls='frappy_psi.ccu4.N2TempSensor',
description='upper LN2 sensor')
Mod(lower, cls='frappy_psi.ccu4.N2TempSensor',
description='lower LN2 sensor')
def flow(self, hepump_uri=None, hepump_type=None, hepump_io='hepump_io',
hepump='hepump', hepump_mot='hepump_mot', hepump_valve='hepump_valve',
flow_sensor='flow_sensor', pump_pressure='pump_pressure', nv='nv',
ccu_io='ccu_io', **kwds):
"""creates needle valve and pump access if available"""
kwds = self.ccu('flow', ccu_io=ccu_io, args_for_io={}, **kwds)
Mod = self.modfactory
hepump_type = hepump_type or self.config.get('hepump_type', 'no')
Mod(nv, 'frappy_psi.ccu4.NeedleValveFlow', 'flow from flow sensor or pump pressure',
flow_sensor=flow_sensor, pressure=pump_pressure, io=ccu_io)
flow_sensor=flow_sensor, pressure=pump_pressure, io=ccu_io, **kwds)
Mod(pump_pressure, 'frappy_psi.ccu4.Pressure', 'He pump pressure', io=ccu_io)
if hepump_type == 'no':
print('no pump, no flow meter - using flow from pressure alone')

View File

@ -22,13 +22,15 @@
"""drivers for CCU4, the cryostat control unit at SINQ"""
import time
import math
import numpy as np
from frappy.lib.enum import Enum
from frappy.lib import clamp
from frappy.lib import clamp, formatExtendedTraceback
from frappy.lib.interpolation import Interpolation
# the most common Frappy classes can be imported from frappy.core
from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \
Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached, nopoll
from frappy.datatypes import BoolType, EnumType, FloatRange, StructOf, \
StatusType, IntRange, StringType, TupleOf
StatusType, IntRange, StringType, TupleOf, ArrayOf
from frappy.errors import CommunicationFailedError
from frappy.states import HasStates, status_code, Retry
@ -314,6 +316,10 @@ class Pressure(HasFilter, Base, Readable):
return self.filter(self.filter_time, self.command(f=float)) - self.mbar_offset
def Table(miny=None, maxy=None):
return ArrayOf(TupleOf(FloatRange(), FloatRange(miny, maxy)))
class NeedleValveFlow(HasStates, Base, Drivable):
flow_sensor = Attached(Readable, mandatory=False)
pressure = Attached(Pressure, mandatory=False)
@ -325,27 +331,53 @@ class NeedleValveFlow(HasStates, Base, Drivable):
target = Parameter(unit='ln/min')
motor_state = Parameter('motor_state', EnumType(M), default=0)
tolerance = Parameter('tolerance', FloatRange(0), value=0.25, readonly=False)
tolerance2 = Parameter('tolerance limit above 2 lnm', FloatRange(0), value=0.5, readonly=False)
prop = Parameter('proportional term', FloatRange(unit='s/lnm'), readonly=False, default=0.001)
speed = Parameter('speed moving time / passed time', FloatRange())
tolerance = Parameter('tolerance', Table(0), value=[(2,0.1),(4,0.4)], readonly=False)
prop_open = Parameter('proportional term for opening', Table(0), readonly=False, value=[(1,0.05)])
prop_close = Parameter('proportional term for closing', Table(0), readonly=False, value=[(1,0.02)])
deriv = Parameter('min progress time constant', FloatRange(unit='s'),
default=30, readonly=False)
settle = Parameter('time within tolerance before getting quiet', FloatRange(unit='s'),
default=30, readonly=False)
step_factor = Parameter('factor (no progress time) / (min step size)', FloatRange(), default=300, readonly=False)
control_active = Parameter('control active flag', BoolType(), readonly=False, default=1)
min_open_step = Parameter('minimal open step', FloatRange(unit='s'), readonly=False, default=0.06)
min_close_step = Parameter('minimal close step', FloatRange(unit='s'), readonly=False, default=0.05)
raw_open_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.12)
raw_close_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.04)
pollinterval = Parameter(default=5)
min_open_pulse = Parameter('minimal open step', FloatRange(0, unit='s'), readonly=False, default=0.02)
min_close_pulse = Parameter('minimal close step', FloatRange(0, unit='s'), readonly=False, default=0.0)
# raw_open_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.12)
# raw_close_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.04)
pollinterval = Parameter(datatype=FloatRange(1, unit='s'), default=5)
_last_dirchange = 0
_ref_time = 0
_ref_dif = 0
_dir = 0
_rawdir = 0
_step = 0
_speed_sum = 0
_last_era = 0
_value = None
def doPoll(self):
# poll at least every sec, but update value only
# every pollinterval and status when changed
if not self.pollInfo.fast_flag:
self.pollInfo.interval = min(1, self.pollinterval) # reduce internal poll interval
self._value = self.get_value()
self._last.append(self._value)
del self._last[0:-300]
self.read_motor_state()
era = time.time() // self.pollinterval
if era != self._last_era:
self.speed = self._speed_sum / self.pollinterval
self._speed_sum = 0
self.value = self._value
self._last_era = era
self.read_status()
self.cycle_machine()
def get_value(self):
p = self.pressure.read_value() * self.lnm_per_mbar
f = self.flow_sensor.read_value()
return p if self.use_pressure else f
def initModule(self):
self._last = []
if self.pressure:
self.pressure.addCallback('value', self.update_from_pressure)
if self.flow_sensor:
@ -354,29 +386,16 @@ class NeedleValveFlow(HasStates, Base, Drivable):
def update_from_flow(self, value):
if not self.use_pressure:
self.value = value
# self.cycle_machine()
self._value = value
def update_from_pressure(self, value):
if self.use_pressure:
self.value = value * self.lnm_per_mbar
self._value = value * self.lnm_per_mbar
# self.cycle_machine()
# def doPoll(self):
# """only the updates should trigger the machine"""
def read_value(self):
p = self.pressure.read_value() * self.lnm_per_mbar
f = self.flow_sensor.read_value()
# self.log.info('p %g f %g +- %.2g', p, f, self.flow_sensor.stddev)
self.read_motor_state()
return p if self.use_pressure else f
def write_tolerance(self, tolerance):
if hasattr(self.pressure, 'tolerance'):
self.pressure.tolerance = tolerance / self.lnm_per_mbar
if hasattr(self.flow_sensor, 'tolerance'):
self.flow_sensor.tolerance = tolerance
self._value = self.get_value()
return self._value
def read_use_pressure(self):
if self.pressure:
@ -386,239 +405,134 @@ class NeedleValveFlow(HasStates, Base, Drivable):
return False
def write_target(self, value):
self.start_machine(self.change_target, fast_poll=1)
self.log.info('change target')
self.target = value
self.start_machine(self.change_target)
def write_prop_open(self, value):
self._prop_open = Interpolation(value)
return self._prop_open
def write_prop_close(self, value):
self._prop_close = Interpolation(value)
return self._prop_close
def write_tolerance(self, value):
self._tolerance = Interpolation(value)
return self._tolerance
@status_code(BUSY)
def change_target(self, state):
state.in_tol_time = 0
state.last_minstep = {}
state.last_progress = state.now
state.ref_time = 0
state.ref_dif = 0
state.prev_dif = 0 # used?
state.last_close_time = 0
state.last_pulse_time = 0
state.raw_fact = 1
state.raw_step = 0
if abs(self.target - self.value) < self._tolerance():
def change_target(self, sm):
sm.last_progress = sm.now
sm.ref_time = 0
sm.ref_dif = 0
sm.last_pulse_time = 0
sm.no_progress_pulse = (0.1, -0.05)
self.log.info('target %s value %s', self.target, self._value)
if abs(self.target - self._value) < self._tolerance(self._value):
self.log.info('go to at_target')
return self.at_target
return self.raw_control
def start_direction(self, state):
if self.target > self.value:
self._dir = 1
state.minstep = self.min_open_step
else:
self._dir = -1
state.minstep = self.min_close_step
state.prev = []
def perform_pulse(self, state):
tol = self._tolerance()
dif = self.target - self.value
difdir = dif * self._dir
state.last_pulse_time = state.now
if difdir > tol:
step = state.minstep + (difdir - tol) * self.prop
elif difdir > 0:
step = state.minstep * difdir / tol
else:
return
self.log.info('MP %g dif=%g tol=%g', step * self._dir, dif, tol)
self.command(mp=clamp(-1, step * self._dir, 1))
@status_code(BUSY)
def raw_control(self, state):
tol = self._tolerance()
if state.init:
self.start_direction(state)
state.raw_step = self.raw_open_step if self._dir > 0 else -self.raw_close_step
state.raw_fact = 1
# if self.read_motor_state() == M.closed:
# # TODO: also check for flow near lower limit ? but only once after change_target
# self.log.info('start with fast opening')
# state.raw_step = 1
# self._dir = 1
difdir = (self.target - self.value) * self._dir
state.prev.append(difdir)
state.diflim = max(0, difdir - tol * 1)
state.success = 0
self.command(mp=state.raw_step)
self.log.info('first rawstep %g', state.raw_step)
state.last_pulse_time = state.now
state.raw_pulse_cnt = 0
state.cycle_cnt = 0
return Retry
difdir = (self.target - self.value) * self._dir
state.cycle_cnt += 1
state.prev.append(difdir)
del state.prev[:-5]
if state.prev[-1] > max(state.prev[:-1]):
# TODO: use the amount of overshoot to reduce the raw_step
state.cycle_cnt = 0
self.log.info('difference is increasing %s', ' '.join(f'{v:g}' for v in state.prev))
return Retry
if state.cycle_cnt >= 5:
state.cycle_cnt = 0
state.diflim = max(tol, min(state.prev) - tol * 0.5)
state.raw_pulse_cnt += 1
self.command(mp=state.raw_step * state.raw_fact)
self.log.info('rawstep %g', state.raw_step)
if state.raw_pulse_cnt % 5 == 0 and state.raw_pulse_cnt > 5:
state.raw_fact *= 1.25
return Retry
if difdir >= state.diflim:
state.success = max(0, state.success - 1)
return Retry
state.success += 1
if state.success <= 3:
return Retry
if state.raw_pulse_cnt < 3:
state.raw_fact = 1 - (3 - state.raw_pulse_cnt) ** 2 * 0.05
if state.raw_fact != 1:
if self._dir > 0:
self.raw_open_step *= state.raw_fact
self.log.info('raw_open_step %g f=%g', self.raw_open_step, state.raw_fact)
self.min_open_pulse = min(self.min_open_pulse, self.raw_open_step)
else:
self.raw_close_step *= state.raw_fact
self.log.info('raw_close_step %g f=%g', self.raw_close_step, state.raw_fact)
self.min_close_pulse = min(self.min_close_pulse, self.raw_close_step)
self.log.info('go to controlling')
return self.controlling
# @status_code(BUSY)
# def raw_control(self, state):
# tol = self._tolerance()
# if state.init:
# self.start_direction(state)
# if self._dir != self._rawdir:
# self._rawdir = self._dir
# state.first_step = self.first_open_step if self._dir > 0 else -self.first_close_step
# else:
# state.first_step = 0
# state.first_fact = 1
# # if self.read_motor_state() == M.closed:
# # # TODO: also check for flow near lower limit ? but only once after change_target
# # self.log.info('start with fast opening')
# # state.first_step = 1
# # self._dir = 1
# difdir = (self.target - self.value) * self._dir
# state.prev = [difdir]
# state.diflim = max(0, difdir - tol * 0.5)
# state.success = 0
# if state.first_step:
# self.command(mp=state.first_step)
# else:
# self.perform_pulse(state)
# self.log.info('firststep %g', state.first_step)
# state.last_pulse_time = state.now
# state.raw_pulse_cnt = 0
# return Retry
# difdir = (self.target - self.value) * self._dir
# if state.delta(5):
# state.diflim = max(0, min(state.prev) - tol * 0.1)
# state.prev = [difdir]
# state.raw_pulse_cnt += 1
# if state.first_step and state.raw_pulse_cnt % 10 == 0:
# self.command(mp=state.first_step * state.first_fact)
# self.log.info('repeat firststep %g', state.first_step * state.first_fact)
# state.first_fact *= 1.25
# else:
# self.perform_pulse(state)
# return Retry
# state.prev.append(difdir)
# if difdir >= state.diflim:
# state.success = max(0, state.success - 1)
# return Retry
# state.success += 1
# if state.success <= 5:
# return Retry
# if state.first_step:
# if state.raw_pulse_cnt < 3:
# state.first_fact = 1 - (3 - state.raw_pulse_cnt) ** 2 * 0.04
# if state.first_fact != 1:
# if self._dir > 0:
# self.first_open_step *= state.first_fact
# self.log.info('first_open_step %g f=%g', self.first_open_step, state.first_fact)
# else:
# self.first_close_step *= state.first_fact
# self.log.info('first_close_step %g f=%g', self.first_close_step, state.first_fact)
# return self.controlling
def filtered(self, n=60, m=5, nsigma=2):
"""return mean and tolerance, augmented by noise"""
# TODO: better idea: use median over last minute and last value and treat them both
n = len(self._last[-n:])
mean = np.median(self._last[-m:])
tol = self._tolerance(mean)
span = 0
if len(self._last) >= n + m:
# get span over the last n points
span = max(self._last[-n:]) - min(self._last[-n:])
slope = mean - np.median(self._last[-n-m:-n])
# in case there is a slope, subtract it
tol = math.sqrt(tol ** 2 + max(0, span-abs(slope)) ** 2)
self.log.info('filt %d %d %d %g %g', len(self._last), n, m, self._value, span)
m = min(m, n)
narr = np.array(self._last[-n:])
mdif = np.median(np.abs(narr[1:-1] - 0.5 * (narr[:-2] + narr[2:])))
return mean, tol
@status_code(BUSY)
def controlling(self, state):
dif = self.target - self.value
if state.init:
self.start_direction(state)
state.ref_dif = abs(dif)
state.ref_time = state.now
state.in_tol_time = 0
def controlling(self, sm):
tol = self._tolerance(self.target)
dif = np.array([self.target - np.median(self._last[-m:]) for m in (1,5,60)])
if sm.init:
self.log.info('restart controlling')
direction = math.copysign(1, dif[1])
if direction != self._dir:
self.log.info('new dir %g dif=%g', direction, dif[1])
self._dir = direction
self._last_dirchange = sm.now
sm.ref_dif = abs(dif[1])
sm.ref_time = sm.now
difdir = dif * self._dir # negative when overshoot happend
# difdif = dif - state.prev_dif
# state.prev_dif = dif
expected_dif = state.ref_dif * math.exp((state.ref_time - state.now) / self.deriv)
# difdif = dif - self._prev_dif
# self._prev_dif = dif
expected_dif = sm.ref_dif * math.exp((sm.ref_time - sm.now) / self.deriv)
tol = self._tolerance()
if difdir < tol:
# prev_minstep = state.last_minstep.pop(self._dir, None)
# attr = 'min_open_step' if self._dir > 0 else 'min_close_step'
# if prev_minstep is not None:
# # increase minstep
# minstep = getattr(self, attr)
# setattr(self, attr, minstep * 1.1)
# self.log.info('increase %s to %g', attr, minstep)
if difdir > -tol: # within tolerance
delta = state.delta()
state.in_tol_time += delta
if state.in_tol_time > self.settle:
# state.last_minstep.pop(self._dir, None)
self.log.info('at target %g %g', dif, tol)
return self.at_target
if difdir < 0:
return Retry
# self.log.info('minstep=0 dif=%g', dif)
else: # overshoot
self.log.info('overshoot %g', dif)
return self.raw_control
# # overshoot
# prev_minstep = state.last_minstep.pop(self._dir, None)
# if prev_minstep is None:
# minstep = getattr(self, attr) * 0.9
# self.log.info('decrease %s to %g', attr, minstep)
# setattr(self, attr, minstep)
# self.start_step(state, self.target)
# still approaching
if difdir <= expected_dif:
if difdir < expected_dif / 1.25 - tol:
state.ref_time = state.now
state.ref_dif = (difdir + tol) * 1.25
# self.log.info('new ref %g', state.ref_dif)
state.last_progress = state.now
return Retry # progressing: no pulse needed
if state.now < state.last_pulse_time + 2.5:
if np.all(difdir < tol):
if np.all(difdir < -tol):
self.log.info('overshoot %r', dif)
return self.controlling
# within tolerance
self.log.info('at target %r tol %g', dif, tol)
return self.at_target
if np.all(difdir > expected_dif):
# not enough progress
if sm.now > sm.last_progress + self.deriv:
if sm.no_progress_pulse:
pulse = abs(sm.no_progress_pulse[self._dir < 0]) * self._dir
self.log.info('not enough progress %g', pulse)
self.pulse(pulse)
sm.last_progress = sm.now
if sm.now < sm.last_pulse_time + 2.5:
return Retry
# TODO: check motor state for closed / opened ?
difd = min(difdir[:2])
sm.last_pulse_time = sm.now
if self._dir > 0:
minstep = self.min_open_pulse
prop = self._prop_open(self._value)
else:
minstep = self.min_close_pulse
prop = self._prop_close(self._value)
if difd > 0:
if prop * tol > minstep:
# step outside tol is already minstep
step = difd * prop
else:
if difd > tol:
step = (minstep + (difd - tol) * prop)
else:
step = minstep * difd / tol
step *= self._dir
self.log.info('MP %g dif=%g tol=%g', step, difd * self._dir, tol)
self.command(mp=step)
self._speed_sum += step
return Retry
# TODO: check motor state for closed / opened ?
self.perform_pulse(state)
return Retry
def _tolerance(self):
return min(self.tolerance * min(1, self.value / 2), self.tolerance2)
# still approaching
difmax = max(difdir)
if difmax < expected_dif:
sm.ref_time = sm.now
sm.ref_dif = difmax
# self.log.info('new ref %g', sm.ref_dif)
sm.last_progress = sm.now
return Retry # progressing: no pulse needed
@status_code(IDLE)
def at_target(self, state):
dif = self.target - self.value
if abs(dif) > self._tolerance():
state.in_tol_time = 0
self.log.info('unstable %g', dif)
def at_target(self, sm):
tol = self._tolerance(self.target)
dif = np.array([self.target - np.median(self._last[-m:]) for m in (1,5,60)])
if np.all(dif > tol) or np.all(dif < -tol):
return self.unstable
return Retry
@status_code(IDLE, 'unstable')
def unstable(self, state):
difdir = (self.target - self.value) * self._dir
if difdir < 0 or self._dir == 0:
return self.raw_control
return self.controlling(state)
def unstable(self, sm):
sm.no_progress_pulse = None
return self.controlling(sm)
def read_motor_state(self):
return self.command(fm=int)
@ -628,18 +542,19 @@ class NeedleValveFlow(HasStates, Base, Drivable):
"""close valve fully"""
self.command(mp=-60)
self.motor_state = self.command(fm=int)
self.start_machine(self.closing)
self.start_machine(self.closing, fast_poll=0.1)
@status_code(BUSY)
def closing(self, state):
if state.init:
state.start_time = state.now
def closing(self, sm):
if sm.init:
sm.start_time = sm.now
self._speed_sum -= sm.delta()
self.read_motor_state()
if self.motor_state == M.closing:
return Retry
if self.motor_state == M.closed:
return self.final_status(IDLE, 'closed')
if state.now < state.start_time + 1:
if sm.now < sm.start_time + 1:
return Retry
return self.final_status(IDLE, 'fixed')
@ -648,28 +563,162 @@ class NeedleValveFlow(HasStates, Base, Drivable):
"""open valve fully"""
self.command(mp=60)
self.read_motor_state()
self.start_machine(self.opening)
self.start_machine(self.opening, threshold=None)
@status_code(BUSY)
def opening(self, state):
if state.init:
state.start_time = state.now
def opening(self, sm):
if sm.init:
sm.start_time = sm.now
self._speed_sum += sm.dleta()
self.read_motor_state()
if self.motor_state == M.opening:
return Retry
if self.motor_state == M.opened:
return self.final_status(IDLE, 'opened')
if state.now < state.start_time + 1:
if sm.now < sm.start_time + 1:
return Retry
return self.final_status(IDLE, 'fixed')
@Command
def lim_pulse(self):
"""try to open until pressure increases"""
p = self.command(f=float)
self.start_machine(self.lim_open, threshold=0.5,
prev=[p], ref=p, fast_poll=0.1, cnt=0)
@status_code(BUSY)
def lim_open(self, sm):
self.read_motor_state()
if self.motor_state == M.opening:
return Retry
if self.motor_state == M.opened:
return self.final_status(IDLE, 'opened')
press, measured = self.command(f=float, mmp=float)
sm.prev.append(press)
if press > sm.ref + 0.2:
sm.cnt += 1
if sm.cnt > 5 or press > sm.ref + 0.5:
self.log.info('flow increased %g', press)
return self.final_status(IDLE, 'flow increased')
self.log.info('wait count %g', press)
return Retry
sm.cnt = 0
last5 = sm.prev[-5:]
median = sorted(last5)[len(last5) // 2]
if press > median:
# avoid to pulse again after an even small increase
self.log.info('wait %g', press)
return Retry
sm.ref = min(sm.prev[0], median)
if measured:
self._speed_sum += measured
if measured < 0.1:
sm.threshold = round(sm.threshold * 1.1, 2)
elif measured > 0.3:
sm.threshold = round(sm.threshold * 0.9, 2)
self.log.info('measured %g new threshold %g press %g', measured, sm.threshold, press)
else:
self._speed_sum += 1
self.log.info('full pulse')
sm.cnt = 0
self.command(mft=sm.ref + sm.threshold, mp=1)
return Retry
@Command(FloatRange())
def pulse(self, value):
"""perform a motor pulse"""
self.log.info('pulse %g', value)
self.command(mp=value)
self._speed_sum += value
if value > 0:
self.motor_state = M.opening
return self.opening
self.motor_state = M.closing
return self.closing
@Command()
def autopar(self):
"""adjust automatically needle valve parameters"""
self.close()
self.start_machine(self.auto_wait, open_pulse=0.1, close_pulse=0.05,
minflow=self.read_value(), last=None)
return self.auto_wait
def is_stable(self, sm, n, tol=0.01):
"""wait for a stable flow
n: size of buffer
tol: a tolerance
"""
if sm.last is None:
sm.last = []
sm.cnt = 0
v = self.read_value()
sm.last.append(v)
del sm.last[:-n]
dif = v - sm.last[0]
if dif < -tol:
sm.cnt -= 1
elif dif > tol:
sm.cnt += 1
else:
sm.cnt -= clamp(-1, sm.cnt, 1)
if len(sm.last) < n:
return False
return abs(sm.cnt) < n // 2
def is_unstable(self, sm, n, tol=0.01):
"""wait for a stable flow
return 0, -1 or 1
"""
if sm.last is None:
sm.last = []
sm.cnt = 0
v = self.read_value()
prevmax = max(sm.last)
prevmin = min(sm.last)
sm.last.append(v)
del sm.last[:-n]
self.log.info('unstable %g >? %g <? %g', v, prevmax, prevmin)
if v > prevmax + tol:
return 1
if v < prevmin - tol:
return -1
return 0
@status_code(BUSY)
def auto_wait(self, sm):
stable = self.is_stable(sm, 5, 0.01)
if self._value < sm.minflow:
sm.minflow = self._value
if self.read_motor_state() == M.closing or not stable:
return Retry
return self.auto_open
@status_code(BUSY)
def auto_open(self, sm):
stable = self.is_unstable(sm, 5, 0.1)
if stable > 0:
sm.start_time = sm.now
sm.flow_before = sm.last[-1]
self.pulse(sm.open_pulse)
return self.auto_close
if sm.delta(sm.open_pulse * 2) is not None:
self.pulse(sm.open_pulse)
return Retry
@status_code(BUSY)
def auto_open_stable(self, sm):
if self.is_stable(sm, 5, 0.01):
return Retry
return self.auto_close
@status_code(BUSY)
def auto_close(self, sm):
if not self.is_stable(sm, 10, 0.01):
return Retry
self.log.info('before %g pulse %g, flowstep %g', sm.flow_before, sm.open_pulse, sm.last[-1] - sm.flow_before)
self.close()
return self.final_status(IDLE, '')