rework Epics support/testing with LS336

second approach, better fitting what was agreed upon so far.
- pv_names are local to SEC-node, so not exporting via json and marking
  them 'private'
- 2 devices for 2 temperature control loops, not one 'monster' device
  which handles everything.
- read_status implemented
- write_target also updates the status (may be sensible to go to the core?)
- provide working stubs in case epics is not installed (-> testing possible)
- tested with the stubs.
- tests with real epics.

found problems:
in EpicsTempCtrl(EpicsDriveable) the read/write_<paramname> methods from
EpicsDriveable needed to be reimplemented. This should not be needed!

Change-Id: I9e4eeaff83114131d117c8f04fba758dfe22237b
This commit is contained in:
Erik Dahlbäck
2017-01-18 14:06:00 +01:00
committed by Erik Dahlbäck
parent c0fd2e9907
commit 502d0f152a
4 changed files with 258 additions and 27 deletions

View File

@ -1,5 +1,9 @@
[equipment] [equipment]
# set SEC-node properties
id=cryo_7 id=cryo_7
description = short description
This is a very long description providing all the glory details in all the glory details about the stuff we are describing
[interface tcp] [interface tcp]
interface=tcp interface=tcp

View File

@ -1,11 +1,59 @@
[server] [equipment]
bindto=0.0.0.0 id=see_demo_equipment
bindport=10767
[client]
connectto=0.0.0.0
port=10767
interface = tcp interface = tcp
framing=eol framing=eol
encoding=text encoding=text
[device epicspv] [interface testing]
class=devices.epics.EPICS_PV interface=tcp
sensor="test_sensor" bindto=0.0.0.0
max_rpm="very high" bindport=10767
# protocol to use for this interface
framing=eol
encoding=demo
[device tc1]
class=devices.demo.CoilTemp
sensor="X34598T7"
[device tc2]
class=devices.demo.CoilTemp
sensor="X39284Q8'
[device sensor1]
class=devices.epics.EpicsReadable
epics_version="v4"
.group="Lakeshore336"
value_pv="DEV:KRDG1"
[device loop1]
class=devices.epics.EpicsTempCtrl
epics_version="v4"
.group="Lakeshore336"
value_pv="DEV:KRDG1"
target_pv="DEV:SETP_S1"
heaterrange_pv="DEV:RANGE_S1"
[device sensor2]
class=devices.epics.EpicsReadable
epics_version="v4"
.group="Lakeshore336"
value_pv="DEV:KRDG2"
[device loop2]
class=devices.epics.EpicsTempCtrl
epics_version="v4"
.group="Lakeshore336"
value_pv="DEV:KRDG2"
target_pv="DEV:SETP_S2"
heaterrange_pv="DEV:RANGE_S2"

View File

@ -240,7 +240,14 @@ class Device(object):
# make local copies of PARAMS # make local copies of PARAMS
params = {} params = {}
for k, v in self.PARAMS.items()[:]: for k, v in self.PARAMS.items()[:]:
params[k] = PARAM(v) #params[k] = PARAM(v)
# PARAM: type(v) -> PARAM
# type(v)(v) -> PARAM(v)
# EPICS_PARAM: type(v) -> EPICS_PARAM
# type(v)(v) -> EPICS_PARAM(v)
param_type = type(v)
params[k] = param_type(v)
self.PARAMS = params self.PARAMS = params
# check and apply properties specified in cfgdict # check and apply properties specified in cfgdict
@ -293,7 +300,9 @@ class Device(object):
cfgdict[k] = v.default cfgdict[k] = v.default
# replace CLASS level PARAM objects with INSTANCE level ones # replace CLASS level PARAM objects with INSTANCE level ones
self.PARAMS[k] = PARAM(self.PARAMS[k]) #self.PARAMS[k] = PARAM(self.PARAMS[k])
param_type = type(self.PARAMS[k])
self.PARAMS[k] = param_type(self.PARAMS[k])
# now 'apply' config: # now 'apply' config:
# pass values through the validators and store as attributes # pass values through the validators and store as attributes
@ -385,4 +394,3 @@ class Driveable(Readable):
"""if implemented should pause the module """if implemented should pause the module
use start to continue movement use start to continue movement
""" """

View File

@ -17,35 +17,206 @@
# #
# Module authors: # Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> # Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Erik Dahlbäck <erik.dahlback@esss.se>
# ***************************************************************************** # *****************************************************************************
"""testing devices"""
import random import random
from secop.devices.core import Readable, Driveable, PARAM from secop.lib.parsing import format_time
from secop.validators import enum, vector, floatrange, validator_to_str
from secop.devices.core import Readable, Device, Driveable, PARAM
from secop.protocol import status
try:
from pvaccess import Channel #import EPIVSv4 functionallity, PV access
except ImportError:
class Channel(object):
def __init__(self, pv_name):
self.pv_name = pv_name
self.value = 0.0
def get(self):
return self
def getDouble(self):
return self.value
def put(self, value):
try:
self.value = value
self.value = float(value)
except (TypeError, ValueError):
pass
try: try:
from epics import PV from epics import PV
except ImportError: except ImportError:
PV = None class PV(object):
def __init__(self, pv_name):
self.pv_name = pv_name
self.value = 0.0
class EPICS_PV(Driveable):
"""pyepics test device."""
class EpicsReadable(Readable):
"""EpicsDriveable handles a Driveable interfacing to EPICS v4"""
# Commmon PARAMS for all EPICS devices
PARAMS = { PARAMS = {
'sensor': PARAM("Sensor number or calibration id", 'value': PARAM('EPICS generic value', validator=float,
validator=str, readonly=True), default=300.0,),
'max_rpm': PARAM("Maximum allowed rpm", 'epics_version': PARAM("EPICS version used, v3 or v4",
validator=str, readonly=True), validator=str,),
# 'private' parameters: not remotely accessible
'value_pv': PARAM('EPICS pv_name of value', validator=str,
default="unset", export=False),
'status_pv': PARAM('EPICS pv_name of status', validator=str,
default="unset", export=False),
} }
def read_value(self, maxage=0): # Generic read and write functions
p1 = PV('testpv.VAL') def _read_pv(self, pv_name):
return p1.value if self.epics_version == 'v4':
pv_channel = Channel(pv_name)
# TODO: cannot handle read of string (is there a .getText() or .getString() ?)
return_value = pv_channel.get().getDouble()
else: # Not EPICS v4
# TODO: fix this, it does not work
pv = PV(pv_name + ".VAL")
return_value = pv.value
return return_value
def write_target(self, target): def _write_pv(self, pv_name, write_value):
p1 = PV('test.VAL') #self.log.info('Write value = %s from EPICS PV = %s' %(write_value, pv_name))
p1.value = target # try to convert value to float
try:
write_value = float(write_value)
except (TypeError, ValueError):
# can not convert to float, force to string
write_value = str(write_value)
if self.epics_version == 'v4':
pv_channel = Channel(pv_name)
pv_channel.put(write_value)
else: # Not EPICS v4
pv = PV(pv_name + ".VAL")
pv.value = write_value
def read_value(self, maxage=0):
return self._read_pv(self.value_pv)
def read_status(self, maxage=0):
# XXX: comparison may need to be a little unsharp
# XXX: Hardware may have it's own idea about the status: how to obtain?
if self.status_pv != 'unset':
# XXX: how to map an unknown type+value to an valid status ???
return status.UNKNOWN, self._read_pv(self.status_pv)
# status_pv is unset
return (status.OK, 'no pv set')
class EpicsDriveable(Driveable):
"""EpicsDriveable handles a Driveable interfacing to EPICS v4"""
# Commmon PARAMS for all EPICS devices
PARAMS = {
'target': PARAM('EPICS generic target', validator=float,
default=300.0, readonly=False),
'value': PARAM('EPICS generic value', validator=float,
default=300.0,),
'epics_version': PARAM("EPICS version used, v3 or v4",
validator=str,),
# 'private' parameters: not remotely accessible
'target_pv': PARAM('EPICS pv_name of target', validator=str,
default="unset", export=False),
'value_pv': PARAM('EPICS pv_name of value', validator=str,
default="unset", export=False),
'status_pv': PARAM('EPICS pv_name of status', validator=str,
default="unset", export=False),
}
# Generic read and write functions
def _read_pv(self, pv_name):
if self.epics_version == 'v4':
pv_channel = Channel(pv_name)
# TODO: cannot handle read of string (is there a .getText() or .getString() ?)
return_value = pv_channel.get().getDouble()
else: # Not EPICS v4
# TODO: fix this, it does not work
pv = PV(pv_name + ".VAL")
return_value = pv.value
return return_value
def _write_pv(self, pv_name, write_value):
#self.log.info('Write value = %s from EPICS PV = %s' %(write_value, pv_name))
# try to convert value to float
try:
write_value = float(write_value)
except (TypeError, ValueError):
# can not convert to float, force to string
write_value = str(write_value)
if self.epics_version == 'v4':
pv_channel = Channel(pv_name)
pv_channel.put(write_value)
else: # Not EPICS v4
pv = PV(pv_name + ".VAL")
pv.value = write_value
def read_target(self, maxage=0):
return self._read_pv(self.target_pv)
def write_target(self, write_value):
self._write_pv(self.target_pv, write_value)
def read_value(self, maxage=0):
return self._read_pv(self.value_pv)
def read_status(self, maxage=0):
# XXX: comparison may need to be a little unsharp
# XXX: Hardware may have it's own idea about the status: how to obtain?
if self.status_pv != 'unset':
# XXX: how to map an unknown type+value to an valid status ???
return status.UNKNOWN, self._read_pv(self.status_pv)
# status_pv is unset, derive status from equality of value + target
return (status.OK, '') if self.read_value() == self.read_target() else \
(status.BUSY, 'Moving')
"""Temperature control loop"""
# should also derive from secop.core.temperaturecontroller, once its features are agreed upon
class EpicsTempCtrl(EpicsDriveable):
PARAMS = {
# TODO: restrict possible values with oneof validator
'heaterrange': PARAM('Heater range', validator=str,
default='Off', readonly=False,),
'tolerance': PARAM('allowed deviation between value and target',
validator=floatrange(1e-6,1e6), default=0.1,
readonly=False,),
# 'private' parameters: not remotely accessible
'heaterrange_pv': PARAM('EPICS pv_name of heater range',
validator=str, default="unset", export=False,),
}
def read_target(self, maxage=0):
return self._read_pv(self.target_pv)
def write_target(self, write_value):
# send target to HW
self._write_pv(self.target_pv, write_value)
# update our status
self.read_status()
def read_value(self, maxage=0):
return self._read_pv(self.value_pv)
def read_status(self, maxage=0):
# XXX: comparison may need to collect a history to detect oscillations
at_target = abs(self.read_value(maxage) - self.read_target(maxage)) \
<= self.tolerance
return (status.OK, 'at Target') if at_target else (status.BUSY, 'Moving')
# TODO: add support for strings over epics pv
#def read_heaterrange(self, maxage=0):
# return self._read_pv(self.heaterrange_pv)
# TODO: add support for strings over epics pv
#def write_heaterrange(self, range_value):
# self._write_pv(self.heaterrange_pv, range_value)