Files
frappy/frappy_psi/dilution.py
Markus Zolliker 472ae3f04d [WIP] dil5 improvements
Change-Id: I2b439bf5898601e10448511479bc67afa3edb4d3
2025-06-05 10:16:47 +02:00

408 lines
15 KiB
Python

# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Andrea Plank <andrea.plank@psi.ch>
#
# *****************************************************************************
import time
from frappy.core import Readable, Drivable, Parameter, Attached, FloatRange, \
Command, IDLE, BUSY, WARN, ERROR, Property
from frappy.datatypes import EnumType, IntRange, BoolType, StructOf, StringType
from frappy.states import Retry, Finish, status_code, HasStates
from frappy.lib.enum import Enum
from frappy.errors import ImpossibleError, HardwareError
from frappy.addrparam import AddrParam, AddrMixin
from frappy.lib import formatStatusBits
from frappy.persistent import PersistentMixin, PersistentParam
from frappy_psi.logo import LogoMixin, DigitalActuator
T = Enum( # target states
off = 0,
sorbpumped = 2,
condense = 5,
remove = 7,
remove_and_sorbpump = 9,
remove_and_condense = 10,
manual = 11,
test = 12,
)
V = Enum(T, # value status inherits from target status
sorbpumping=1,
condensing=4,
circulating=6,
removing=8,
)
class Dilution(HasStates, Drivable):
condenseline_pressure = Attached()
condense_valve = Attached()
dump_valve = Attached()
circulate_pump = Attached()
compressor = Attached(mandatory=False)
turbopump = Attached(mandatory=False)
condenseline_valve = Attached()
circuitshort_valve = Attached()
still_pressure = Attached()
value = Parameter('current state', EnumType(T), default=0)
target = Parameter('target state', EnumType(T), default=0)
sorbpumped = Parameter('sorb pump done', BoolType(), default=False)
#ls372 = Attached()
V5 = Attached() #Name noch ändern!!!
p1 = Attached() #Name noch ändern!!!
condensing_p_low = Parameter('Lower limit for condenseline pressure', FloatRange(unit='mbar'))
condensing_p_high = Parameter('Higher limit for condenseline pressure', FloatRange(unit='mbar'))
dump_target = Parameter('low dump pressure limit indicating end of condensation phase',
FloatRange(unit='mbar'), default=20)
end_condense_pressure = Parameter('low condense pressure indicating end of condensation phase',
FloatRange(unit='mbar'), default=500)
turbo_condense_pressure = Parameter('low condense pressure before turbo start',
FloatRange(unit='mbar'), default=900)
turbo_still_pressure = Parameter('low still pressure before turbo start',
FloatRange(unit='mbar'), default=10)
turbo_off_delay = Parameter('wait time after switching turbo off',
FloatRange(unit='s'), default=300)
turbo_off_speed = Parameter('speed to wait for after switching turbo off',
FloatRange(unit='s'), default=60)
end_remove_still_pressure = Parameter('pressure reached before end of remove',
FloatRange(unit='mbar'), default=1e-4)
st = StringType()
valve_set = StructOf(close=st, open=st, check_open=st, check_closed=st)
condense_valves = Parameter('valve to act when condensing', valve_set)
valves_after_remove = Parameter('valve to act after remove', valve_set)
check_after_remove = Parameter('check for manual valves after remove', valve_set)
_start_time = 0
init = True
_warn_manual_work = None
def write_target(self, target):
"""
if (target == Targetstates.SORBPUMP):
if self.value == target:
return self.target
self.start_machine(self.sorbpump)
self.value = Targetstates.SORBPUMP
return self.value
"""
if self.value == self.target:
return target # not sure if this is correct. may be a step wants to be repeated?
self.start_machine(getattr(self, target.name, None))
return target
"""
@status_code(BUSY, 'sorbpump state')
def sorbpump(self, state):
#Heizt Tsorb auf und wartet ab.
if self.init:
self.ls372.write_target(40) #Setze Tsorb auf 40K
self.start_time = self.now
self.init = false
return Retry
if self.now - self.start_time < 2400: # 40 Minuten warten
return Retry
self.ls372.write_target(0)
if self.ls372.read_value() > 10: # Warten bis Tsorb unter 10K
return Retry
return self.condense
"""
@status_code(BUSY, 'start test')
def test(self, state):
"""Nur zum testen, ob UI funktioniert"""
self.init = False
if state.init:
state._start = state.now
return self.wait_test
@status_code(BUSY)
def wait_test(self, state):
if state.now < state.start + 20:
return Retry
return self.final_status(IDLE, 'end test')
@status_code(BUSY)
def condense(self, state):
"""Führt das Kondensationsverfahren durch."""
if state.init:
self.value = V.condensing
self.handle_valves(**self.condense_valves)
return Retry
if self.wait_valves():
return Retry
self.check_valve_result()
return Retry
@status_code(BUSY)
def condensing(self, state):
if self.condenseline_pressure.read_value() < self.condensing_p_low:
self.condense_valve.write_target(1)
elif self.condenseline_pressure.read_value() > self.condensing_p_high:
self.condense_valve.write_target(0)
if self.p1.read_value() > self.dump_target:
return Retry
self.condense_valve.write_target(1)
if self.turbopump is not None:
return self.condense_wait_before_turbo_start
return self.wait_for_condense_line_pressure
@status_code(BUSY)
def wait_for_condense_line_pressure(self, state):
if self.condenseline_pressure.read_value() > self.end_condense_pressure:
return Retry
self.condense_valve.write_target(0)
return self.circulate
@status_code(BUSY, 'condense (wait before starting turbo)')
def condense_wait_before_turbo_start(self, state):
if (self.condenseline_pressure.read_value() > self.turbo_condense_pressure
and self.still_pressure.read_value() > self.turbo_still_pressure):
return Retry
self.turbopump.write_target(1)
return self.wait_for_condense_line_pressure
@status_code(BUSY)
def circulate(self, state):
"""Zirkuliert die Mischung."""
if state.init:
self.handle_valves(**self.condense_valves)
if self.wait_valves():
return Retry
self.check_valve_result()
self.value = V.circulating
return Finish
@status_code(BUSY, 'remove (wait for turbo shut down)')
def remove(self, state):
"""Entfernt die Mischung."""
if state.init:
self.condenseline_valve.write_target(0)
self.dump_valve.write_target(1)
if self.turbopump is not None:
self._start_time = state.now
self.turbopump.write_target(0)
return Retry
if self.turbopump is not None:
self.turbopump.write_target(0)
if (state.now - self._start_time < self.turbo_off_delay
or self.turbopump.read_speed() > self.turbo_off_speed):
return Retry
self.circuitshort_valve.write_target(1)
if self.turbopump is not None:
return self.remove_wait_for_still_pressure
return self.remove_endsequence
@status_code(BUSY, 'remove (wait for still pressure low)')
def remove_wait_for_still_pressure(self, state):
if self.still_pressure.read_value() > self.turbo_still_pressure:
return Retry
self.turbopump.write_target(1)
return self.remove_endsequence
@status_code(BUSY)
def remove_endsequence(self, state):
if self.still_pressure.read_value() > self.end_remove_still_pressure:
return Retry
self.circuitshort_valve.write_target(0)
self.dump_valve.write_target(0)
if self.compressor is not None:
self.compressor.write_target(0)
self.circulate_pump.write_target(0)
return self.close_valves_after_remove
@status_code(BUSY)
def close_valves_after_remove(self, state):
if state.init:
self.handle_valves(**self.valves_after_remove)
if self.wait_valves():
return Retry
self.check_valve_result()
self._warn_manual_work = True
return self.final_status(WARN, 'please check manual valves')
def read_status(self):
status = super().read_status()
if status[0] < ERROR and self._warn_manual_work:
try:
self.handle_valves(**self.check_after_remove)
self._warn_manual_work = False
except ImpossibleError:
return WARN, f'please close manual valves {",".join(self._valves_failed[False])}'
return status
def handle_valves(self, check_closed=(), check_open=(), close=(), open=()):
"""check ot set given valves
raises ImpossibleError, when checks fails
"""
self._valves_to_wait_for = {}
self._valves_failed = {True: [], False: []}
for flag, valves in enumerate([check_closed, check_open]):
for vname in valves.split():
if self.secNode.modules[vname].read_value() != flag:
self._valves_failed[flag].append(vname)
for flag, valves in enumerate([close, open]):
for vname in valves.split():
valve = self.secNode.modules[vname]
valve.write_target(flag)
if valve.isBusy():
self._valves_to_wait_for[vname] = (valve, flag)
elif valve.read_value() != flag:
self._valves_failed[flag].append(vname)
def wait_valves(self):
busy = False
for vname, (valve, flag) in dict(self._valves_to_wait_for.items()):
statuscode = valve.read_status()[0]
if statuscode == BUSY:
busy = True
continue
if valve.read_value() == flag and statuscode == IDLE:
self._valves_to_wait_for.pop(vname)
else:
self._valves_failed[flag].append(vname)
return busy
def check_valve_result(self):
result = []
for flag, valves in self._valves_failed.items():
if valves:
result.append(f"{','.join(valves)} not {'open' if flag else 'closed'}")
if result:
raise ImpossibleError(f"failed: {', '.join(result)}")
class DIL5(Dilution):
condense_valves = {
'close': 'V2 V4 V9',
'check_closed': 'MV10 MV13 MV8 MVB MV2',
'check_open': 'MV1 MV3a MV3b GV1 MV9 MV11 MV12 MV14',
'open': 'V1 V5 compressor pump',
}
valves_after_remove = {
'close': 'V1 V2 V4 V5 V9',
'check_closed': 'MV10 MV13 MV8 MVB MV2',
'open': '',
'check_open': '',
}
check_after_remove = {
'close': '',
'check_closed': 'MV1 MV9 MV10 MV11 MV12',
'open': '',
'check_open': '',
}
class Interlock(LogoMixin, AddrMixin, Readable):
value = AddrParam('interlock state (bitmap)',
IntRange(0, 31), addr='V414', readonly=False)
p5lim = AddrParam('safety limit on p5 to protect forepump',
FloatRange(), value=1300, addr='VW16 VW18', readonly=False)
p2lim = AddrParam('safety limit on p2 to protect compressor',
FloatRange(), value=4000, addr='VW8 VW10', readonly=False)
p1lim = AddrParam('safety limit to protect dump',
FloatRange(), value=1300, addr='VW12 VW14', readonly=False)
p2max = AddrParam('limit pn p2 for mechanism to put mix to dump',
FloatRange(), value=3000, addr='VW20 VW22', readonly=False)
conditions = { # starting with bit 1
'off (p5>p5lim)': {'forepump': False},
'off (p2>p2lim)': {'compressor': False},
'off (p1>p2lim)': {'forepump': False, 'compressor': False},
'open (p2>p2max)': {'V4': True}}
reset_param = Property('addr for reset', StringType(), default='V418.1')
_mismatch = None
_prefix = ''
def doPoll(self):
self.read_status() # this includes read_value
def initialReads(self):
super().initialReads()
self.reset()
@Command
def reset(self):
"""reset the interlock"""
self._prefix = ''
self.set_vm_value(self.reset_param, 1)
for actions in self.conditions.values():
for mname in actions:
self.secNode.modules[mname].reset_fault()
if self.read_value() != 0:
raise HardwareError('can not clear status byte')
self.set_vm_value(self.reset_param, 0)
self.read_status() # update status (this may trigger ERROR again)
def read_status(self):
if self._mismatch is None: # init
self._mismatch = set()
bits = self.read_value()
if bits:
keys = formatStatusBits(bits, self.conditions, 1)
statustext = []
for key in keys:
actions = self.conditions[key]
statustext.append(f"{' and '.join(actions)} {key}")
for module, value in actions.items():
modobj = self.secNode.modules[module]
if modobj.target != value:
self._prefix = 'switched '
modobj.set_fault(value, f'switched {key}')
return ERROR, f"{self._prefix}{', '.join(statustext)}"
if self._mismatch:
return ERROR, f"mismatch on values for {', '.join(self._mismatch)}"
return IDLE, ''
def addressed_read(self, pobj):
values = [self.get_vm_value(a) for a in pobj.addr.split()]
if any(v != values[0] for v in values):
self._mismatch.add(pobj.name)
self.read_status()
else:
self._mismatch.discard(pobj.name)
return values[0]
def addressed_write(self, pobj, value):
for addr in pobj.addr.split():
self.set_vm_value(addr, value)
self.read_status()