mlz/demo: move old examples to Attached

change very early version of module attachments in GarfieldMagnet and
MagnetigField to use Attached

Change-Id: I616ad17bc72cd93d86e1b3e3609543cfe90edcd8
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32250
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
This commit is contained in:
Alexander Zaft 2023-10-04 12:43:07 +02:00 committed by Markus Zolliker
parent ec664d9268
commit ae2a731161
4 changed files with 84 additions and 104 deletions

View File

@ -74,10 +74,10 @@ Mod('currentsource',
Mod('mf',
'frappy_mlz.amagnet.GarfieldMagnet',
'magnetic field module, handling polarity switching and stuff',
subdev_currentsource = 'currentsource',
subdev_enable = 'enable',
subdev_polswitch = 'polarity',
subdev_symmetry = 'symmetry',
currentsource = 'currentsource',
enable = 'enable',
polswitch = 'polarity',
symmetry = 'symmetry',
target = Param(unit='T'),
value = Param(unit='T'),
userlimits = (-0.35, 0.35),

View File

@ -41,6 +41,6 @@ Mod('label',
'frappy_demo.modules.Label',
'some label indicating the state of the magnet `mf`.',
system = 'Cryomagnet MX15',
subdev_mf = 'mf',
subdev_ts = 'ts',
mf = 'mf',
ts = 'ts',
)

View File

@ -16,6 +16,7 @@
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
"""testing devices"""
@ -28,9 +29,8 @@ import time
from frappy.datatypes import ArrayOf, BoolType, EnumType, \
FloatRange, IntRange, StringType, StructOf, TupleOf
from frappy.lib.enum import Enum
from frappy.modules import Drivable
from frappy.modules import Drivable, Readable, Attached
from frappy.modules import Parameter as SECoP_Parameter
from frappy.modules import Readable
from frappy.properties import Property
@ -119,9 +119,7 @@ class MagneticField(Drivable):
default=1, datatype=EnumType(persistent=1, hold=0),
readonly=False,
)
heatswitch = Parameter('name of heat switch device',
datatype=StringType(), export=False,
)
heatswitch = Attached(Switch, description='name of heat switch device')
Status = Enum(Drivable.Status, PERSIST=PERSIST, PREPARE=301, RAMPING=302, FINISH=303)
@ -130,7 +128,6 @@ class MagneticField(Drivable):
def initModule(self):
super().initModule()
self._state = Enum('state', idle=1, switch_on=2, switch_off=3, ramp=4).idle
self._heatswitch = self.DISPATCHER.get_module(self.heatswitch)
_thread = threading.Thread(target=self._thread)
_thread.daemon = True
_thread.start()
@ -165,10 +162,10 @@ class MagneticField(Drivable):
if self.target != self.value:
self.log.debug('got new target -> switching heater on')
self._state = self._state.enum.switch_on
self._heatswitch.write_target('on')
self.heatswitch.write_target('on')
if self._state == self._state.enum.switch_on:
# wait until switch is on
if self._heatswitch.read_value() == 'on':
if self.heatswitch.read_value() == 'on':
self.log.debug('heatswitch is on -> ramp to %.3f',
self.target)
self._state = self._state.enum.ramp
@ -178,7 +175,7 @@ class MagneticField(Drivable):
if self.mode:
self.log.debug('at field -> switching heater off')
self._state = self._state.enum.switch_off
self._heatswitch.write_target('off')
self.heatswitch.write_target('off')
else:
self.log.debug('at field -> hold')
self._state = self._state.enum.idle
@ -189,7 +186,7 @@ class MagneticField(Drivable):
self.value += step
if self._state == self._state.enum.switch_off:
# wait until switch is off
if self._heatswitch.read_value() == 'off':
if self.heatswitch.read_value() == 'off':
self.log.debug('heatswitch is off at %.3f', self.value)
self._state = self._state.enum.idle
self.read_status() # update async
@ -269,12 +266,8 @@ class Label(Readable):
system = Parameter("Name of the magnet system",
datatype=StringType(), export=False,
)
subdev_mf = Parameter("name of subdevice for magnet status",
datatype=StringType(), export=False,
)
subdev_ts = Parameter("name of subdevice for sample temp",
datatype=StringType(), export=False,
)
mf = Attached(MagneticField, description="subdevice for magnet status")
ts = Attached(SampleTemp, description="subdevice for sample temp")
value = Parameter("final value of label string", default='',
datatype=StringType(),
)
@ -282,18 +275,16 @@ class Label(Readable):
def read_value(self):
strings = [self.system]
dev_ts = self.DISPATCHER.get_module(self.subdev_ts)
if dev_ts:
strings.append(f"at {dev_ts.read_value():.3f} {dev_ts.parameters['value'].datatype.unit}")
if self.ts:
strings.append(f"at {self.ts.read_value():.3f} {self.ts.parameters['value'].datatype.unit}")
else:
strings.append('No connection to sample temp!')
dev_mf = self.DISPATCHER.get_module(self.subdev_mf)
if dev_mf:
mf_stat = dev_mf.read_status()
mf_mode = dev_mf.mode
mf_val = dev_mf.value
mf_unit = dev_mf.parameters['value'].datatype.unit
if self.mf:
mf_stat = self.mf.read_status()
mf_mode = self.mf.mode
mf_val = self.mf.value
mf_unit = self.mf.parameters['value'].datatype.unit
if mf_stat[0] == self.Status.IDLE:
state = 'Persistent' if mf_mode else 'Non-persistent'
else:

View File

@ -17,6 +17,7 @@
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
@ -28,10 +29,10 @@
import math
from frappy.datatypes import ArrayOf, FloatRange, StringType, StructOf, TupleOf
from frappy.datatypes import ArrayOf, FloatRange, StructOf, TupleOf
from frappy.errors import ConfigError, DisabledError
from frappy.lib.sequence import SequencerMixin, Step
from frappy.modules import Drivable, Parameter
from frappy.modules import Drivable, Parameter, Attached
class GarfieldMagnet(SequencerMixin, Drivable):
@ -47,19 +48,12 @@ class GarfieldMagnet(SequencerMixin, Drivable):
the symmetry setting selects which.
"""
# attached submodules
currentsource = Attached(description='(bipolar) Powersupply')
enable = Attached(description='Switch to set for on/off')
polswitch = Attached(description='Switch to set for polarity')
symmetry = Attached(description='Switch to read for symmetry')
# parameters
subdev_currentsource = Parameter('(bipolar) Powersupply',
datatype=StringType(),
readonly=True, export=False)
subdev_enable = Parameter('Switch to set for on/off',
datatype=StringType(), readonly=True,
export=False)
subdev_polswitch = Parameter('Switch to set for polarity',
datatype=StringType(), readonly=True,
export=False)
subdev_symmetry = Parameter('Switch to read for symmetry',
datatype=StringType(), readonly=True,
export=False)
userlimits = Parameter('User defined limits of device value',
datatype=TupleOf(FloatRange(unit='$'),
FloatRange(unit='$')),
@ -111,7 +105,7 @@ class GarfieldMagnet(SequencerMixin, Drivable):
Note: This may be overridden in derived classes.
"""
# binary search/bisection
maxcurr = self._currentsource.abslimits[1]
maxcurr = self.currentsource.abslimits[1]
mincurr = -maxcurr
maxfield = self._current2field(maxcurr)
minfield = -maxfield
@ -143,26 +137,21 @@ class GarfieldMagnet(SequencerMixin, Drivable):
def initModule(self):
super().initModule()
self._enable = self.DISPATCHER.get_module(self.subdev_enable)
self._symmetry = self.DISPATCHER.get_module(self.subdev_symmetry)
self._polswitch = self.DISPATCHER.get_module(self.subdev_polswitch)
self._currentsource = self.DISPATCHER.get_module(
self.subdev_currentsource)
self.init_sequencer(fault_on_error=False, fault_on_stop=False)
self._symmetry.read_value()
self.symmetry.read_value()
def read_calibration(self):
try:
try:
return self.calibrationtable[self._symmetry.value]
return self.calibrationtable[self.symmetry.value]
except KeyError:
return self.calibrationtable[self._symmetry.value.name]
return self.calibrationtable[self.symmetry.value.name]
except KeyError:
minslope = min(entry[0]
for entry in self.calibrationtable.values())
self.log.error(
'unconfigured calibration for symmetry %r',
self._symmetry.value)
self.symmetry.value)
return [minslope, 0, 0, 0, 0]
def _checkLimits(self, limits):
@ -182,22 +171,22 @@ class GarfieldMagnet(SequencerMixin, Drivable):
return limits
def read_abslimits(self):
maxfield = self._current2field(self._currentsource.abslimits[1])
maxfield = self._current2field(self.currentsource.abslimits[1])
# limit to configured value (if any)
maxfield = min(maxfield, max(self.accessibles['abslimits'].default))
return -maxfield, maxfield
def read_ramp(self):
# This is an approximation!
return self.calibration[0] * abs(self._currentsource.ramp)
return self.calibration[0] * abs(self.currentsource.ramp)
def write_ramp(self, newramp):
# This is an approximation!
self._currentsource.ramp = float(newramp) / self.calibration[0]
self.currentsource.ramp = float(newramp) / self.calibration[0]
def _get_field_polarity(self):
sign = int(self._polswitch.read_value())
if self._enable.read_value():
sign = int(self.polswitch.read_value())
if self.enable.read_value():
return sign
return 0
@ -210,35 +199,35 @@ class GarfieldMagnet(SequencerMixin, Drivable):
return
if current_pol == 0:
# safe to switch
self._polswitch.write_target(
self.polswitch.write_target(
'+1' if polarity > 0 else str(polarity))
return
if self._currentsource.value < 0.1:
self._polswitch.write_target('0')
if self.currentsource.value < 0.1:
self.polswitch.write_target('0')
return
# unsafe to switch, go to safe state first
self._currentsource.write_target(0)
self.currentsource.write_target(0)
def read_value(self):
return self._current2field(
self._currentsource.read_value() *
self.currentsource.read_value() *
self._get_field_polarity())
def readHwStatus(self):
# called from SequencerMixin.read_status if no sequence is running
if self._enable.value == 'Off':
if self.enable.value == 'Off':
return self.Status.WARN, 'Disabled'
if self._enable.read_status()[0] != self.Status.IDLE:
return self._enable.status
if self._polswitch.value in ['0', 0]:
return self.Status.IDLE, 'Shorted, ' + self._currentsource.status[1]
if self._symmetry.value in ['short', 0]:
return self._currentsource.status[
0], 'Shorted, ' + self._currentsource.status[1]
return self._currentsource.read_status()
if self.enable.read_status()[0] != self.Status.IDLE:
return self.enable.status
if self.polswitch.value in ['0', 0]:
return self.Status.IDLE, 'Shorted, ' + self.currentsource.status[1]
if self.symmetry.value in ['short', 0]:
return self.currentsource.status[
0], 'Shorted, ' + self.currentsource.status[1]
return self.currentsource.read_status()
def write_target(self, target):
if target != 0 and self._symmetry.read_value() in ['short', 0]:
if target != 0 and self.symmetry.read_value() in ['short', 0]:
raise DisabledError(
'Symmetry is shorted, please select another symmetry first!')
@ -251,7 +240,7 @@ class GarfieldMagnet(SequencerMixin, Drivable):
seq.append(Step('preparing', 0, self._prepare_ramp))
seq.append(Step('recover', 0, self._recover))
if current_polarity != wanted_polarity:
if self._currentsource.read_value() > 0.1:
if self.currentsource.read_value() > 0.1:
# switching only allowed if current is low enough -> ramp down
# first
seq.append(
@ -281,54 +270,54 @@ class GarfieldMagnet(SequencerMixin, Drivable):
# steps for the sequencing
def _prepare_ramp(self, store, *args):
store.old_window = self._currentsource.window
self._currentsource.window = 1
store.old_window = self.currentsource.window
self.currentsource.window = 1
def _finish_ramp(self, store, *args):
self._currentsource.window = max(store.old_window, 10)
self.currentsource.window = max(store.old_window, 10)
def _recover(self, store):
# check for interlock
if self._currentsource.read_status()[0] != self.Status.ERROR:
if self.currentsource.read_status()[0] != self.Status.ERROR:
return
# recover from interlock
ramp = self._currentsource.ramp
self._polswitch.write_target('0') # short is safe...
self._polswitch._hw_wait()
self._enable.write_target('On') # else setting ramp won't work
self._enable._hw_wait()
self._currentsource.ramp = 60000
self._currentsource.target = 0
self._currentsource.ramp = ramp
ramp = self.currentsource.ramp
self.polswitch.write_target('0') # short is safe...
self.polswitch._hw_wait()
self.enable.write_target('On') # else setting ramp won't work
self.enable._hw_wait()
self.currentsource.ramp = 60000
self.currentsource.target = 0
self.currentsource.ramp = ramp
# safe state.... if anything of the above fails, the tamperatures may
# be too hot!
def _ramp_current(self, store, target):
if abs(self._currentsource.value - target) <= 0.05:
if abs(self.currentsource.value - target) <= 0.05:
# done with this step if no longer BUSY
return self._currentsource.read_status()[0] == 'BUSY'
if self._currentsource.status[0] != 'BUSY':
if self._enable.status[0] == 'ERROR':
self._enable.reset()
self._enable.read_status()
self._enable.write_target('On')
self._enable._hw_wait()
self._currentsource.write_target(target)
return self.currentsource.read_status()[0] == 'BUSY'
if self.currentsource.status[0] != 'BUSY':
if self.enable.status[0] == 'ERROR':
self.enable.reset()
self.enable.read_status()
self.enable.write_target('On')
self.enable._hw_wait()
self.currentsource.write_target(target)
return True # repeat
def _ramp_current_cleanup(self, store, step_was_busy, target):
# don't cleanup if step finished
if step_was_busy:
self._currentsource.write_target(self._currentsource.read_value())
self._currentsource.window = max(store.old_window, 10)
self.currentsource.write_target(self.currentsource.read_value())
self.currentsource.window = max(store.old_window, 10)
def _set_polarity(self, store, target):
if self._polswitch.read_status()[0] == self.Status.BUSY:
if self.polswitch.read_status()[0] == self.Status.BUSY:
return True
if int(self._polswitch.value) == int(target):
if int(self.polswitch.value) == int(target):
return False # done with this step
if self._polswitch.read_value() != 0:
self._polswitch.write_target(0)
if self.polswitch.read_value() != 0:
self.polswitch.write_target(0)
else:
self._polswitch.write_target(target)
self.polswitch.write_target(target)
return True # repeat