frappy_psi.ionopimax redesign

Change-Id: I46b62522c24ad9f0352ba7a784d39ffd1cb79ef3
This commit is contained in:
zolliker 2025-04-15 09:00:30 +02:00
parent d0c063c60b
commit ca6fd1dd5e

View File

@ -18,60 +18,59 @@
# Jael Celia Lorenzana <jael-celia.lorenzana@psi.ch> # Jael Celia Lorenzana <jael-celia.lorenzana@psi.ch>
# ***************************************************************************** # *****************************************************************************
import os """support for iono pi max from Sfera Labs
from glob import glob
supports also the smaller model iono pi
"""
from pathlib import Path
from frappy.core import Readable, Writable, Parameter, BoolType, StringType,\ from frappy.core import Readable, Writable, Parameter, BoolType, StringType,\
EnumType, FloatRange, Property, TupleOf, ERROR, IDLE EnumType, FloatRange, Property, TupleOf, ERROR, IDLE
from frappy.errors import ConfigError, OutOfRangeError from frappy.errors import ConfigError, OutOfRangeError, ProgrammingError
from math import log from math import log
basepaths = '/sys/class/ionopimax', '/sys/class/ionopi'
class Base: class Base:
addr = Property('address', StringType()) addr = Property('address', StringType())
_devpath = None _devpath = None
devclass = None _devclass = None
_status = None
def initModule(self): def initModule(self):
super().initModule() super().initModule()
# candidates = glob(f'/sys/class/iono*/*/{self.addr}') candidates = list(Path('/sys/class').glob(f'ionopi*/*/{self.addr}'))
# if not candidates: if not candidates:
# raise ConfigError(f'can not find path for {self.addr}') raise ConfigError(f'can not find path for {self.addr}')
for basepath in basepaths: if len(candidates) > 1:
for devclass in ([self.devclass] if isinstance(self.devclass, str) else self.devclass): raise ProgrammingError(f"ambiguous paths {','.join(candidates)}")
devpath = f'{basepath}/{devclass}' self._devpath = candidates[0].parent
if os.path.exists(f'{devpath}/{self.addr}'): self._devclass = candidates[0].parent.name
self._devpath = devpath
return
else:
self.log.info('%s does not exist', devpath)
else:
raise ConfigError(f'device path for {self.devclass} not found {devpath}')
def read(self, addr, scale=None): def read(self, addr, scale=None):
with open(f'{self._devpath}/{addr}') as f: with open(self._devpath / addr) as f:
result = f.read() result = f.read()
if scale: if scale:
return float(result) / scale return float(result) / scale
return result return result.strip()
def write(self, addr, value, scale=None): def write(self, addr, value, scale=None):
value = str(round(value * scale)) if scale else str(value) value = str(round(value * scale)) if scale else str(value)
with open(f'{self._devpath}/{addr}', 'w') as f: with open(self._devpath / addr, 'w') as f:
f.write(value) f.write(value)
def read_status(self):
if self.status is None:
return IDLE, ''
return self._status
class DigitalInput(Base, Readable): class DigitalInput(Base, Readable):
value = Parameter('input state', BoolType()) value = Parameter('input state', BoolType())
true_level = Property('level representig True', EnumType(low=0, high=1), default=1) true_level = Property('level representing True', EnumType(low=0, high=1), default=1)
devclass = 'digital_in', 'digital_io'
def initModule(self): def initModule(self):
super().initModule() super().initModule()
self.log.info('devpath %r', self._devpath) if self._devclass == 'digital_io'
if self.addr.startswith('dt'): self.write(f'{self.addr}_mode', 'inp')
self.write(f'{self.addr}_mode','inp')
def read_value(self): def read_value(self):
return self.read(self.addr, 1) == self.true_level return self.read(self.addr, 1) == self.true_level
@ -79,7 +78,24 @@ class DigitalInput(Base, Readable):
class DigitalOutput(DigitalInput, Writable): class DigitalOutput(DigitalInput, Writable):
target = Parameter('output state', BoolType(), readonly=False) target = Parameter('output state', BoolType(), readonly=False)
devclass = 'digital_out', 'relay'
def read_value(self):
reply = self.read(self.addr)
try:
self._status = None
self.read_status()
value = int(reply)
except ValueError:
if reply == 'S':
if self.addr.startswith('oc'):
self._status = ERROR, 'short circuit'
else:
self._status = ERROR, 'fault while closed'
value = 0
else:
self._status = ERROR, 'fault while open'
value = 1
return value == self.true_level
def write_target(self, value): def write_target(self, value):
self.write(self.addr, value == self.true_level, 1) self.write(self.addr, value == self.true_level, 1)
@ -87,9 +103,8 @@ class DigitalOutput(DigitalInput, Writable):
class AnalogInput(Base, Readable): class AnalogInput(Base, Readable):
value = Parameter('analog value', FloatRange()) value = Parameter('analog value', FloatRange())
rawrange = Property('raw range(electronic)', TupleOf(FloatRange(),FloatRange())) rawrange = Property('raw range (electronic)', TupleOf(FloatRange(),FloatRange()))
valuerange = Property('value range(physical)', TupleOf(FloatRange(),FloatRange())) valuerange = Property('value range (physical)', TupleOf(FloatRange(),FloatRange()))
devclass = 'analog_in'
def read_value(self): def read_value(self):
x0, x1 = self.rawrange x0, x1 = self.rawrange
@ -127,15 +142,14 @@ class CurrentInput(AnalogInput):
def read_value(self): def read_value(self):
result = super().read_value() result = super().read_value()
if self.x > 0.021: if self.x > 0.021:
self.status = ERROR, 'sensor broken' self.status = ERROR, 'sensor fault'
raise OutOfRangeError('sensor broken') raise OutOfRangeError('sensor fault')
self.status = IDLE, '' self.status = IDLE, ''
return result return result
class AnalogOutput(AnalogInput, Writable): class AnalogOutput(AnalogInput, Writable):
target = Parameter('outputvalue', FloatRange()) target = Parameter('outputvalue', FloatRange())
devclass = 'analog_out'
def write_target(self, value): def write_target(self, value):
x0, x1 = self.rawrange x0, x1 = self.rawrange
@ -156,7 +170,6 @@ class VoltageOutput(AnalogOutput):
class VoltagePower(Base, Writable): class VoltagePower(Base, Writable):
devclass = 'power_out'
target = Parameter(datatype=FloatRange(0, 24.5, unit='V'), default=12) target = Parameter(datatype=FloatRange(0, 24.5, unit='V'), default=12)
addr = 'vso' addr = 'vso'