frappy/secop/simulation.py
Markus Zolliker 6b610f1e25 remove irrelevant comments
+ improve error message 'can not convert %r to float'
Change-Id: Idf534a4105086463fd238d7c91317424a809d7ba

Change-Id: I09260dda8deff1e6ee0af2fa38a42a09884a2061
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/26345
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2021-07-06 16:13:33 +02:00

143 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
#
# *****************************************************************************
"""Define Simulation classes"""
import random
from time import sleep
from secop.datatypes import FloatRange
from secop.lib import mkthread
from secop.modules import BasicPoller, Drivable, \
Module, Parameter, Readable, Writable
class SimBase:
pollerClass = BasicPoller
def __new__(cls, devname, logger, cfgdict, dispatcher):
extra_params = cfgdict.pop('extra_params', '') or cfgdict.pop('.extra_params', '')
attrs = {}
if extra_params:
for k in extra_params.split(','):
k = k.strip()
attrs[k] = Parameter('extra_param: %s' % k.strip(),
datatype=FloatRange(),
default=0.0)
def reader(self, pname=k):
self.log.debug('simulated reading %s' % pname)
return self.parameters[pname].value
attrs['read_' + k] = reader
def writer(self, newval, pname=k):
self.log.debug('simulated writing %r to %s' % (newval, pname))
self.parameters[pname].value = newval
return newval
attrs['write_' + k] = writer
return object.__new__(type('SimBase_%s' % devname, (cls,), attrs))
def initModule(self):
self._sim_thread = mkthread(self._sim)
def _sim(self):
try:
if not self.sim():
self.log.info('sim thread running')
while not self.sim():
pass
self.log.info('sim thread ended')
except Exception as e:
self.log.exception(e)
def sim(self):
return True # nothing to do, stop thread
class SimModule(SimBase, Module):
pass
class SimReadable(SimBase, Readable):
def __init__(self, devname, logger, cfgdict, dispatcher):
super().__init__(devname, logger, cfgdict, dispatcher)
self._value = self.parameters['value'].default
def read_value(self):
if 'jitter' in self.parameters:
return self._value + self.jitter * (0.5 - random.random())
return self._value
class SimWritable(SimReadable, Writable):
def read_value(self):
return self.target
def write_target(self, value):
self.value = value
def _hw_wait(self):
pass
class SimDrivable(SimReadable, Drivable):
def sim(self):
while self._value == self.target:
sleep(0.3)
self.status = self.Status.BUSY, 'MOVING'
speed = 0
if 'ramp' in self.accessibles:
speed = self.ramp / 60. # ramp is per minute!
elif 'speed' in self.accessibles:
speed = self.speed
if speed == 0:
self._value = self.target
speed *= 0.3
try:
self.pollParams(0)
except Exception:
pass
while self._value != self.target:
if self._value < self.target - speed:
self._value += speed
elif self._value > self.target + speed:
self._value -= speed
else:
self._value = self.target
sleep(0.3)
try:
self.pollParams(0)
except Exception:
pass
self.status = self.Status.IDLE, ''
return False # keep thread running
def _hw_wait(self):
while self.status[0] == self.Status.BUSY:
sleep(0.3)