frappy/frappy_psi/logo.py
l_samenv 92a8dfac5d fixed heater level
- should be percent
- round to integer before writing to logo
2024-06-11 13:47:36 +02:00

134 lines
4.3 KiB
Python

# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Andrea Plank <andrea.plank@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from ast import literal_eval
from threading import RLock
import snap7
from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator
from frappy.errors import CommunicationFailedError
class IO(Communicator):
tcap_client = Property('tcap_client', IntRange())
tsap_server = Property('tcap_server', IntRange())
ip_address = Property('numeric ip address', StringType())
_plc = None
def _init(self):
if not self._plc:
self._lock = RLock()
self._plc = snap7.logo.Logo()
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
if not self._plc.get_connected():
raise CommunicationFailedError
def communicate(self, cmd):
self._init()
with self._lock:
cmd = cmd.split(maxsplit=1)
if len(cmd) == 2:
self._plc.write(cmd[0], literal_eval(cmd[1]))
return self._plc.read(cmd[0])
class Snap7Mixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return self.io.communicate(vm_address)
def set_vm_value(self, vm_address, value):
return self.io.communicate(f'{vm_address} {value}')
class Pressure(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('pressure', datatype = FloatRange(unit = 'mbar'))
pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Valve(Snap7Mixin, Drivable):
vm_address_input = Property('VM address input', datatype= StringType())
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class FluidMachines(Snap7Mixin, Drivable):
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(Snap7Mixin, Writable):
vm_address = Property('VM address output', datatype= StringType())
target = Parameter('Heater target', datatype = FloatRange(unit='%'))
value = Parameter('Heater Param', datatype = FloatRange(unit='%'))
def read_value(self):
return self.get_vm_value(self.vm_address) * 0.1
def write_target(self, target):
return self.set_vm_value(self.vm_address, round(target * 10)) * 0.1
def read_status(self):
return IDLE, ''