# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Enrico Faulhaber # # ***************************************************************************** """Define Simulation classes""" # TODO: rework after syntax change! import random from time import sleep from secop.datatypes import FloatRange from secop.lib import mkthread from secop.modules import BasicPoller, Drivable, \ Module, Parameter, Readable, Writable class SimBase: pollerClass = BasicPoller def __new__(cls, devname, logger, cfgdict, dispatcher): extra_params = cfgdict.pop('extra_params', '') or cfgdict.pop('.extra_params', '') attrs = {} if extra_params: # make a copy of self.parameter # self.accessibles = dict((k, v.copy()) for k, v in self.accessibles.items()) for k in extra_params.split(','): k = k.strip() attrs[k] = Parameter('extra_param: %s' % k.strip(), datatype=FloatRange(), default=0.0) def reader(self, pname=k): self.log.debug('simulated reading %s' % pname) return self.parameters[pname].value attrs['read_' + k] = reader def writer(self, newval, pname=k): self.log.debug('simulated writing %r to %s' % (newval, pname)) self.parameters[pname].value = newval return newval attrs['write_' + k] = writer return object.__new__(type('SimBase_%s' % devname, (cls,), attrs)) def initModule(self): self._sim_thread = mkthread(self._sim) def _sim(self): try: if not self.sim(): self.log.info('sim thread running') while not self.sim(): pass self.log.info('sim thread ended') except Exception as e: self.log.exception(e) def sim(self): return True # nothing to do, stop thread class SimModule(SimBase, Module): pass # def __init__(self, devname, logger, cfgdict, dispatcher): # SimBase.__init__(self, cfgdict) # Module.__init__(self, devname, logger, cfgdict, dispatcher) class SimReadable(SimBase, Readable): def __init__(self, devname, logger, cfgdict, dispatcher): # SimBase.__init__(self, cfgdict) super().__init__(devname, logger, cfgdict, dispatcher) self._value = self.parameters['value'].default def read_value(self): if 'jitter' in self.parameters: return self._value + self.jitter * (0.5 - random.random()) return self._value class SimWritable(SimReadable, Writable): def read_value(self): return self.target def write_target(self, value): self.value = value def _hw_wait(self): pass class SimDrivable(SimReadable, Drivable): def sim(self): while self._value == self.target: sleep(0.3) self.status = self.Status.BUSY, 'MOVING' speed = 0 if 'ramp' in self.accessibles: speed = self.ramp / 60. # ramp is per minute! elif 'speed' in self.accessibles: speed = self.speed if speed == 0: self._value = self.target speed *= 0.3 try: self.pollParams(0) except Exception: pass while self._value != self.target: if self._value < self.target - speed: self._value += speed elif self._value > self.target + speed: self._value -= speed else: self._value = self.target sleep(0.3) try: self.pollParams(0) except Exception: pass self.status = self.Status.IDLE, '' return False # keep thread running def _hw_wait(self): while self.status[0] == self.Status.BUSY: sleep(0.3)