commit e63cc1e4e42607a0c444280d26cb288042b4dae6 Author: pique_n Date: Thu Dec 22 14:02:12 2022 +0100 init diff --git a/coldbox.py b/coldbox.py new file mode 100644 index 0000000..0f9d1f6 --- /dev/null +++ b/coldbox.py @@ -0,0 +1,308 @@ +from paho.mqtt import client as mqtt_client +import time +from time import sleep + + +class Tessie: + """ + This class represents a client for the Tessie MQTT communication system. + + Attributes: + broker (str): The address of the MQTT broker. + port (int): The port number of the MQTT broker. + topic (str): The topic to subscribe to. + _client (mqtt_client.Client): The MQTT client object. + waiting (List[str]): A list of variables that are waiting to be received. + found (List[str, str]): A list of received variables and their corresponding values. + + Methods: + __init__(self): + Initializes the Tessie client by setting the attributes, connecting to the MQTT broker, and subscribing to the topic. + on_connect(client, userdata, flags, rc): + A callback function that is called when the client successfully connects to the MQTT broker. + _connect_mqtt(self): + Connects to the MQTT broker. + decode_msg(msg: str): + Decodes a received message and stores it in the `found` list if it matches a waiting variable. + on_message(client, userdata, msg_recv): + A callback function that is called when a message is received on the subscribed topic. + _subscribe(self): + Subscribes to the specified topic. + _wait_for_var(var): + Waits for a specified variable to be received and returns its value. + get(self, var, args=""): + Sends a request to get the value of a specified variable. Returns the value of the variable. + set(self, var, data, args=""): + Sends a request to set the value of a specified variable to a specified value. + cmd(self, cmd, args=""): + Sends a command to the Tessie system. + help(self): + Sends a request for help on the Tessie system. + """ + + def __init__(self): + self.broker = 'coldbox01.psi.ch' + self.port = 1883 + self.topic = 'ctrlTessie' + client_id = 'Python Tessie Client' + self._client = mqtt_client.Client(client_id) + Tessie.waiting = [] + Tessie.found = [] + self._connect_mqtt() + while not self._client.is_connected(): + pass + self._subscribe() + + @staticmethod + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Connected to MQTT Broker!") + else: + print("Failed to connect, return code %d\n", rc) + + def _connect_mqtt(self): + # Set Connecting Client ID + self._client.on_connect = self.on_connect + self._client.connect(self.broker, self.port) + self._client.loop_start() + + @staticmethod + def decode_msg(msg: str): + for var in Tessie.waiting: + if msg.startswith(var): + Tessie.waiting.remove(var) + Tessie.found.append([var, msg]) + + @staticmethod + def on_message(client, userdata, msg_recv): + if msg_recv.payload.decode().startswith('get'): + return + if msg_recv.payload.decode().startswith('set'): + return + if msg_recv.payload.decode().startswith('cmd'): + return + if msg_recv.payload.decode().startswith('help'): + return + if msg_recv.payload.decode().startswith('>'): + print(msg_recv.payload.decode()) + return + # print('recv: ' + msg_recv.payload.decode()) + Tessie.decode_msg(msg_recv.payload.decode()) + + def _subscribe(self): + self._client.subscribe(self.topic) + self._client.on_message = self.on_message + + @staticmethod + def _wait_for_var(var): + timeout = 5 + start_time = time.time() + while True: + if time.time() - start_time > timeout: + # timeout reached, exit the loop + break + for msg in Tessie.found: + if msg[0] == var: + Tessie.found.remove(msg) + return msg[1] + + def get(self, var, args="") -> str: + msg = 'get ' + var + args + Tessie.waiting.append(var) + # print('send ' + msg) + if self._client.publish(self.topic, msg)[0] != 0: + print(f'Failed to send message: {msg}') + result = Tessie._wait_for_var(var) + if not result: + print('no result') + return False + if not result.startswith(var): + print('wrong result') + return False + result = result[len(var) + 3:] + # print(result) + return result + + def set(self, var, data, args=""): + msg = 'set ' + str(var) + ' ' + str(data) + args + # print('send ' + msg) + if self._client.publish(self.topic, msg)[0] != 0: + print(f'Failed to send message: {msg}') + + def cmd(self, cmd, args="", answer=False): + msg = 'cmd ' + cmd + args + # print('send ' + msg) + if answer: + Tessie.waiting.append(cmd) + if self._client.publish(self.topic, msg)[0] != 0: + print(f'Failed to send message: {msg}') + if answer: + result = Tessie._wait_for_var(cmd) + if not result: + print('no result') + return False + if not result.startswith(cmd): + print('wrong result') + return False + result = result[len(cmd) + 3:] + # print(result) + return result + + def help(self): + msg = 'help' + if self._client.publish(self.topic, msg)[0] != 0: + print(f'Failed to send message: {msg}') + + +class Valve: + def __init__(self, tessie: Tessie, i): + self.name = "valve" + str(i) + self._tessie = tessie + + def set(self, value): + self._tessie.set(self.name, "on" if value == 1 else "off") + + def get(self): + return 1 if self._tessie.get(self.name) == "on" else 0 + + +class Env: + def __init__(self, tessie: Tessie): + self._tessie = tessie + + def getRH(self): + return float(self._tessie.get("RH")) + + def getDP(self): + return float(self._tessie.get("DP")) + + def getTempAir(self): + return float(self._tessie.get("Temp")) + + def getTempWater(self): + return float(self._tessie.get("Temp_W", " tec 8")) + + +class ConfTEC: + def __init__(self, tessie: Tessie, i): + self._tessie = tessie + self.name = " tec " + str(i) + + def saveToFlash(self): + return self._tessie.cmd("SaveVariables", self.name, True) + + def getPID(self): + return [[float(x) for x in self._tessie.get("PID_kp", self.name).split(",")], + [float(x) for x in self._tessie.get("PID_ki", self.name).split(",")], + [float(x) for x in self._tessie.get("PID_kd", self.name).split(",")]] + + def setPID(self, kp: float, ki: float, kd: float): + self._tessie.set("PID_kp", str(kp), self.name) + self._tessie.set("PID_ki", str(ki), self.name) + self._tessie.set("PID_kd", str(kd), self.name) + + def getPIDMinMax(self): + return [[float(x) for x in self._tessie.get("PID_Min", self.name).split(",")], + [float(x) for x in self._tessie.get("PID_Max", self.name).split(",")]] + + def setPIDMinMax(self, pidmin, pidmax): + self._tessie.set("PID_Min", str(round(pidmin, 1)), self.name) + self._tessie.set("PID_Max", str(round(pidmax, 1)), self.name) + + def setRef(self, ref: float): + self._tessie.set("Ref_U", str(round(ref, 3)), self.name) + + def getRef(self): + return [float(x) for x in self._tessie.get("Ref_U", self.name).split(",")] + + def setMode(self, mode: int): + self._tessie.set("Mode", str(mode), self.name) + + def getMode(self): + return [int(x) for x in self._tessie.get("Mode", self.name).split(",")] + + def clearError(self): + self._tessie.cmd("clearError", self.name) + + def getError(self): + return [hex(x) for x in self._tessie.get("Error", self.name).split(",")] + + +class TEC: + def __init__(self, tessie: Tessie, i): + self._tessie = tessie + self.conf = ConfTEC(tessie, i) + self.name = " tec " + str(i) + + def pon(self): + self._tessie.cmd("Power_On", self.name) + + def poff(self): + self._tessie.cmd("Power_Off", self.name) + + def getState(self): + return [bool(x) for x in self._tessie.get("PowerState", self.name).split(",")] + + def getTemp(self): + return [float(x) for x in self._tessie.get("Temp_M", self.name).split(",")] + + def getUI(self): + return [[float(x) for x in self._tessie.get("Supply_U", self.name).split(",")], + [float(x) for x in self._tessie.get("Supply_I", self.name).split(",")], + [float(x) for x in self._tessie.get("Peltier_U", self.name).split(",")], + [float(x) for x in self._tessie.get("Peltier_I", self.name).split(",")]] + + def setTemp(self, temp: float): + self._tessie.set("Temp_Set", str(round(temp)), self.name) + + def setVoltage(self, u: float): + self._tessie.set("ControlVoltage_Set", str(round(u, 2)), self.name) + + def getVoltage(self): + return [float(x) for x in self._tessie.get("ControlVoltage_Set", self.name).split(",")] + + def reset(self): + return self._tessie.cmd("Reboot", self.name, True) + + def loadFromFlash(self): + return self._tessie.cmd("LoadVariables", self.name, True) + + def getSWVersion(self): + return self._tessie.cmd("GetSWVersion", self.name, True) + + +class Coldbox: + def __init__(self): + self._tessie = Tessie() + self.valve0 = Valve(self._tessie, 0) + self.valve1 = Valve(self._tessie, 1) + self.tecall = TEC(self._tessie, 0) + self.tec1 = TEC(self._tessie, 1) + self.tec2 = TEC(self._tessie, 2) + self.tec3 = TEC(self._tessie, 3) + self.tec4 = TEC(self._tessie, 4) + self.tec5 = TEC(self._tessie, 5) + self.tec6 = TEC(self._tessie, 6) + self.tec7 = TEC(self._tessie, 7) + self.tec8 = TEC(self._tessie, 8) + self.env = Env(self._tessie) + + def help(self): + self._tessie.help() + + +def main(): + print("Hello World!") + + box = Coldbox() + print(box.tec1.getTemp()) + print(box.tec1.getTemp()) + box.tec1.setVoltage(1.5) + print(box.tec1.getVoltage()) + box.tec1.setVoltage(2) + + print(box.tec1.getTemp()) + + +if __name__ == "__main__": + main()