129 lines
5.0 KiB
Python
129 lines
5.0 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# M. Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
|
|
from frappy.core import Attached, Command, EnumType, FloatRange, \
|
|
Drivable, Parameter, BUSY, IDLE, ERROR
|
|
|
|
|
|
class Valve(Drivable):
|
|
motor = Attached(Drivable) # refers to motor module
|
|
value = Parameter('valve state',
|
|
EnumType(closed=0, open=1, undefined=2),
|
|
default=2)
|
|
status = Parameter() # inherit properties from Drivable
|
|
target = Parameter('valve target',
|
|
EnumType(closed=0, open=1),
|
|
readonly=False)
|
|
# TODO: convert to properties after tests
|
|
open_pos = Parameter('target position for open state', FloatRange(), readonly=False, default=80)
|
|
mid_pos = Parameter('position for changing speed', FloatRange(), readonly=False, default=5)
|
|
fast_speed = Parameter('normal speed', FloatRange(), readonly=False, default=40)
|
|
slow_speed = Parameter('reduced speed', FloatRange(), readonly=False, default=10)
|
|
__motor_target = None
|
|
__status = IDLE, ''
|
|
__value = 'undefined'
|
|
__drivestate = 0 # 2 when driving to intermediate target or on retry, 1 when driving to final target, 0 when idle
|
|
|
|
def doPoll(self):
|
|
mot = self.motor
|
|
motpos = mot.read_value()
|
|
scode, stext = mot.read_status()
|
|
drivestate = self.__drivestate
|
|
if scode >= ERROR:
|
|
if self.__drivestate and self.__remaining_tries > 0:
|
|
drivestate = 2
|
|
self.__remaining_tries -= 1
|
|
mot.reset()
|
|
mot.write_speed(self.slow_speed)
|
|
self.__status = BUSY, f'retry {self._action}'
|
|
else:
|
|
self.__status = ERROR, f'valve motor: {stext}'
|
|
elif scode < BUSY:
|
|
if self.__motor_target is not None and mot.target != self.__motor_target:
|
|
self.__status = ERROR, 'motor was driven directly'
|
|
elif drivestate == 2:
|
|
self.goto(self.target)
|
|
drivestate = 1
|
|
else:
|
|
if -3 < motpos < 3:
|
|
self.__value = 'closed'
|
|
self.__status = IDLE, ''
|
|
elif self.open_pos * 0.5 < motpos < self.open_pos * 1.5:
|
|
self.__value = 'open'
|
|
self.__status = IDLE, ''
|
|
else:
|
|
self.__status = ERROR, 'undefined'
|
|
if self.__drivestate and not self.isBusy(self.__status):
|
|
drivestate = 0
|
|
self.__motor_target = None
|
|
self.setFastPoll(False)
|
|
self.__drivestate = drivestate
|
|
self.read_status()
|
|
self.read_value()
|
|
|
|
def read_status(self):
|
|
return self.__status
|
|
|
|
def read_value(self):
|
|
if self.read_status()[0] >= BUSY:
|
|
return 'undefined'
|
|
return self.__value
|
|
|
|
def goto(self, target):
|
|
"""go to open, closed or intermediate position
|
|
|
|
the intermediate position is targeted when a speed change is needed
|
|
|
|
return 2 when a retry is needed, 1 else
|
|
"""
|
|
mot = self.motor
|
|
if target: # 'open'
|
|
self._action = 'opening'
|
|
if True or mot.value > self.mid_pos:
|
|
mot.write_speed(self.fast_speed)
|
|
self.__motor_target = mot.write_target(self.open_pos)
|
|
return 1
|
|
mot.write_speed(self.slow_speed)
|
|
self.__motor_target = mot.write_target(self.mid_pos)
|
|
return 2
|
|
self._action = 'closing'
|
|
if mot.value > self.mid_pos * 2:
|
|
mot.write_speed(self.fast_speed)
|
|
self.__motor_target = mot.write_target(self.mid_pos)
|
|
return 2
|
|
mot.write_speed(self.slow_speed)
|
|
self.__motor_target = mot.write_target(0)
|
|
return 1
|
|
|
|
def write_target(self, target):
|
|
self.__remaining_tries = 5
|
|
self.__drivestate = self.goto(target)
|
|
self.__status = BUSY, self._action
|
|
self.read_status()
|
|
self.read_value()
|
|
self.setFastPoll(True)
|
|
|
|
@Command() # python decorator to mark it as a command
|
|
def stop(self):
|
|
"""stop the motor -> value might get undefined"""
|
|
self.__drivestate = 0
|
|
self.motor.stop()
|