PC_Coldbox/coldbox.py
2024-12-05 13:41:23 +01:00

322 lines
10 KiB
Python

from paho.mqtt import client as mqtt_client
from time import sleep, time
class Tessie:
"""
This class represents a client for the Tessie MQTT communication system.
Attributes:
broker (str): The address of the MQTT broker.
port (int): The port number of the MQTT broker.
topic (str): The topic to subscribe to.
_client (mqtt_client.Client): The MQTT client object.
waiting (List[str]): A list of variables that are waiting to be received.
found (List[str, str]): A list of received variables and their corresponding values.
Methods:
__init__(self):
Initializes the Tessie client by setting the attributes, connecting to the MQTT broker, and subscribing to the topic.
on_connect(client, userdata, flags, rc):
A callback function that is called when the client successfully connects to the MQTT broker.
_connect_mqtt(self):
Connects to the MQTT broker.
decode_msg(msg: str):
Decodes a received message and stores it in the `found` list if it matches a waiting variable.
on_message(client, userdata, msg_recv):
A callback function that is called when a message is received on the subscribed topic.
_subscribe(self):
Subscribes to the specified topic.
_wait_for_var(var):
Waits for a specified variable to be received and returns its value.
get(self, var, args=""):
Sends a request to get the value of a specified variable. Returns the value of the variable.
set(self, var, data, args=""):
Sends a request to set the value of a specified variable to a specified value.
cmd(self, cmd, args=""):
Sends a command to the Tessie system.
help(self):
Sends a request for help on the Tessie system.
"""
def __init__(self, broker):
print(broker)
self.broker = broker
self.port = 1883
self.topic = 'ctrlTessie'
client_id = 'Python Tessie Client'
self._client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1)
Tessie.waiting = []
Tessie.found = []
self._connect_mqtt()
while not self._client.is_connected():
pass
self._subscribe()
@staticmethod
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
def _connect_mqtt(self):
# Set Connecting Client ID
self._client.on_connect = self.on_connect
self._client.connect(self.broker, self.port)
self._client.loop_start()
@staticmethod
def decode_msg(msg: str):
for var in Tessie.waiting:
if msg.startswith(var):
Tessie.waiting.remove(var)
Tessie.found.append([var, msg])
@staticmethod
def on_message(client, userdata, msg_recv):
if msg_recv.payload.decode().startswith('get'):
return
if msg_recv.payload.decode().startswith('set'):
return
if msg_recv.payload.decode().startswith('cmd'):
return
if msg_recv.payload.decode().startswith('help'):
return
if msg_recv.payload.decode().startswith('>'):
print(msg_recv.payload.decode())
return
# print('recv: ' + msg_recv.payload.decode())
Tessie.decode_msg(msg_recv.payload.decode())
def _subscribe(self):
self._client.subscribe(self.topic)
self._client.on_message = self.on_message
@staticmethod
def _wait_for_var(var):
timeout = 5
start_time = time()
while True:
if time() - start_time > timeout:
# timeout reached, exit the loop
break
for msg in Tessie.found:
if msg[0] == var:
Tessie.found.remove(msg)
return msg[1]
def get(self, var, args="") -> str:
msg = 'get ' + var + args
Tessie.waiting.append(var)
# print('send ' + msg)
if self._client.publish(self.topic, msg)[0] != 0:
print(f'Failed to send message: {msg}')
result = Tessie._wait_for_var(var)
if not result:
print('no result')
return False
if not result.startswith(var):
print('wrong result')
return False
result = result[len(var) + 3:]
# print(result)
return result
def set(self, var, data, args=""):
msg = 'set ' + str(var) + ' ' + str(data) + args
# print('send ' + msg)
if self._client.publish(self.topic, msg)[0] != 0:
print(f'Failed to send message: {msg}')
def cmd(self, cmd, args="", answer=False):
msg = 'cmd ' + cmd + args
# print('send ' + msg)
if answer:
Tessie.waiting.append(cmd)
if self._client.publish(self.topic, msg)[0] != 0:
print(f'Failed to send message: {msg}')
if answer:
result = Tessie._wait_for_var(cmd)
if not result:
print('no result')
return False
if not result.startswith(cmd):
print('wrong result')
return False
result = result[len(cmd) + 3:]
# print(result)
return result
def help(self):
msg = 'help'
if self._client.publish(self.topic, msg)[0] != 0:
print(f'Failed to send message: {msg}')
class Valve:
def __init__(self, tessie: Tessie, i):
self.name = "valve" + str(i)
self._tessie = tessie
def set(self, value):
self._tessie.set(self.name, "on" if value == 1 else "off")
def get(self):
return 1 if self._tessie.get(self.name) == "on" else 0
class Env:
def __init__(self, tessie: Tessie):
self._tessie = tessie
def getRH(self):
return float(self._tessie.get("RH"))
def getDP(self):
return float(self._tessie.get("DP"))
def getTempAir(self):
return float(self._tessie.get("Temp"))
def getTempWater(self):
return float(self._tessie.get("Temp_W", " tec 8"))
def getVprobe(self, number):
data = self._tessie.get(f"vprobe{number}")
return [float(x) for x in data.split(",")] # returns [-999.0] if not available
class ConfTEC:
def __init__(self, tessie: Tessie, i):
self._tessie = tessie
self.name = " tec " + str(i)
def _single(self, data):
if self.single:
return data
else:
return [float(x) for x in data.split(",")]
def saveToFlash(self):
return self._tessie.cmd("SaveVariables", self.name, True)
def getPID(self):
return [self._single(self._tessie.get("PID_kp", self.name)),
self._single(self._tessie.get("PID_ki", self.name)),
self._single(self._tessie.get("PID_kd", self.name))]
def setPID(self, kp: float, ki: float, kd: float):
self._tessie.set("PID_kp", str(kp), self.name)
self._tessie.set("PID_ki", str(ki), self.name)
self._tessie.set("PID_kd", str(kd), self.name)
def getPIDMinMax(self):
return [self._single(self._tessie.get("PID_Min", self.name)),
self._single(self._tessie.get("PID_Max", self.name))]
def setPIDMinMax(self, pidmin, pidmax):
self._tessie.set("PID_Min", str(round(pidmin, 1)), self.name)
self._tessie.set("PID_Max", str(round(pidmax, 1)), self.name)
def setRef(self, ref: float):
self._tessie.set("Ref_U", str(round(ref, 3)), self.name)
def getRef(self):
return self._single(self._tessie.get("Ref_U", self.name))
def setMode(self, mode: int):
self._tessie.set("Mode", str(mode), self.name)
def getMode(self):
return self._single(self._tessie.get("Mode", self.name))
def clearError(self):
self._tessie.cmd("clearError", self.name)
def getError(self):
return self._single(self._tessie.get("Error", self.name))
class TEC:
def __init__(self, tessie: Tessie, i):
self._tessie = tessie
self.conf = ConfTEC(tessie, i)
self.name = " tec " + str(i)
self.single = True if i != 0 else False
def _single(self, data):
if self.single:
return data
else:
return [float(x) for x in data.split(",")]
def pon(self):
self._tessie.cmd("Power_On", self.name)
def poff(self):
self._tessie.cmd("Power_Off", self.name)
def getState(self):
return self._single(self._tessie.get("PowerState", self.name))
def getTemp(self):
return self._single(self._tessie.get("Temp_M", self.name))
def getUI(self):
return [self._single(self._tessie.get("Supply_U", self.name)),
self._single(self._tessie.get("Supply_I", self.name)),
self._single(self._tessie.get("Peltier_U", self.name)),
self._single(self._tessie.get("Peltier_I", self.name))]
def setTemp(self, temp: float):
self._tessie.set("Temp_Set", str(round(temp)), self.name)
def setVoltage(self, u: float):
self._tessie.set("ControlVoltage_Set", str(round(u, 2)), self.name)
def getVoltage(self):
return self._single(self._tessie.get("ControlVoltage_Set", self.name))
def reset(self):
return self._tessie.cmd("Reboot", self.name, True)
def loadFromFlash(self):
return self._tessie.cmd("LoadVariables", self.name, True)
def getSWVersion(self):
return self._tessie.cmd("GetSWVersion", self.name, True)
class Coldbox:
def __init__(self, broker):
self._tessie = Tessie(broker)
self.valve0 = Valve(self._tessie, 0)
self.valve1 = Valve(self._tessie, 1)
self.tecall = TEC(self._tessie, 0)
self.tec1 = TEC(self._tessie, 1)
self.tec2 = TEC(self._tessie, 2)
self.tec3 = TEC(self._tessie, 3)
self.tec4 = TEC(self._tessie, 4)
self.tec5 = TEC(self._tessie, 5)
self.tec6 = TEC(self._tessie, 6)
self.tec7 = TEC(self._tessie, 7)
self.tec8 = TEC(self._tessie, 8)
self.env = Env(self._tessie)
self.tecs = [self.tec1, self.tec2, self.tec3, self.tec4, self.tec5, self.tec6, self.tec7, self.tec8]
def help(self):
self._tessie.help()
def main():
print("Hello World!")
box = Coldbox('coldbox02.psi.ch')
print(box.tec1.getTemp())
if __name__ == "__main__":
main()