from paho.mqtt import client as mqtt_client from time import sleep, time class Tessie: """ This class represents a client for the Tessie MQTT communication system. Attributes: broker (str): The address of the MQTT broker. port (int): The port number of the MQTT broker. topic (str): The topic to subscribe to. _client (mqtt_client.Client): The MQTT client object. waiting (List[str]): A list of variables that are waiting to be received. found (List[str, str]): A list of received variables and their corresponding values. Methods: __init__(self): Initializes the Tessie client by setting the attributes, connecting to the MQTT broker, and subscribing to the topic. on_connect(client, userdata, flags, rc): A callback function that is called when the client successfully connects to the MQTT broker. _connect_mqtt(self): Connects to the MQTT broker. decode_msg(msg: str): Decodes a received message and stores it in the `found` list if it matches a waiting variable. on_message(client, userdata, msg_recv): A callback function that is called when a message is received on the subscribed topic. _subscribe(self): Subscribes to the specified topic. _wait_for_var(var): Waits for a specified variable to be received and returns its value. get(self, var, args=""): Sends a request to get the value of a specified variable. Returns the value of the variable. set(self, var, data, args=""): Sends a request to set the value of a specified variable to a specified value. cmd(self, cmd, args=""): Sends a command to the Tessie system. help(self): Sends a request for help on the Tessie system. """ def __init__(self, broker): print(broker) self.broker = broker self.port = 1883 self.topic = 'ctrlTessie' client_id = 'Python Tessie Client' self._client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1) Tessie.waiting = [] Tessie.found = [] self._connect_mqtt() while not self._client.is_connected(): pass self._subscribe() @staticmethod def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) def _connect_mqtt(self): # Set Connecting Client ID self._client.on_connect = self.on_connect self._client.connect(self.broker, self.port) self._client.loop_start() @staticmethod def decode_msg(msg: str): for var in Tessie.waiting: if msg.startswith(var): Tessie.waiting.remove(var) Tessie.found.append([var, msg]) @staticmethod def on_message(client, userdata, msg_recv): if msg_recv.payload.decode().startswith('get'): return if msg_recv.payload.decode().startswith('set'): return if msg_recv.payload.decode().startswith('cmd'): return if msg_recv.payload.decode().startswith('help'): return if msg_recv.payload.decode().startswith('>'): print(msg_recv.payload.decode()) return # print('recv: ' + msg_recv.payload.decode()) Tessie.decode_msg(msg_recv.payload.decode()) def _subscribe(self): self._client.subscribe(self.topic) self._client.on_message = self.on_message @staticmethod def _wait_for_var(var): timeout = 5 start_time = time() while True: if time() - start_time > timeout: # timeout reached, exit the loop break for msg in Tessie.found: if msg[0] == var: Tessie.found.remove(msg) return msg[1] def get(self, var, args="") -> str: msg = 'get ' + var + args Tessie.waiting.append(var) # print('send ' + msg) if self._client.publish(self.topic, msg)[0] != 0: print(f'Failed to send message: {msg}') result = Tessie._wait_for_var(var) if not result: print('no result') return False if not result.startswith(var): print('wrong result') return False result = result[len(var) + 3:] # print(result) return result def set(self, var, data, args=""): msg = 'set ' + str(var) + ' ' + str(data) + args # print('send ' + msg) if self._client.publish(self.topic, msg)[0] != 0: print(f'Failed to send message: {msg}') def cmd(self, cmd, args="", answer=False): msg = 'cmd ' + cmd + args # print('send ' + msg) if answer: Tessie.waiting.append(cmd) if self._client.publish(self.topic, msg)[0] != 0: print(f'Failed to send message: {msg}') if answer: result = Tessie._wait_for_var(cmd) if not result: print('no result') return False if not result.startswith(cmd): print('wrong result') return False result = result[len(cmd) + 3:] # print(result) return result def help(self): msg = 'help' if self._client.publish(self.topic, msg)[0] != 0: print(f'Failed to send message: {msg}') class Valve: def __init__(self, tessie: Tessie, i): self.name = "valve" + str(i) self._tessie = tessie def set(self, value): self._tessie.set(self.name, "on" if value == 1 else "off") def get(self): return 1 if self._tessie.get(self.name) == "on" else 0 class Env: def __init__(self, tessie: Tessie): self._tessie = tessie def getRH(self): return float(self._tessie.get("RH")) def getDP(self): return float(self._tessie.get("DP")) def getTempAir(self): return float(self._tessie.get("Temp")) def getTempWater(self): return float(self._tessie.get("Temp_W", " tec 8")) def getVprobe(self, number): data = self._tessie.get(f"vprobe{number}") return [float(x) for x in data.split(",")] # returns [-999.0] if not available class ConfTEC: def __init__(self, tessie: Tessie, i): self._tessie = tessie self.name = " tec " + str(i) def _single(self, data): if self.single: return data else: return [float(x) for x in data.split(",")] def saveToFlash(self): return self._tessie.cmd("SaveVariables", self.name, True) def getPID(self): return [self._single(self._tessie.get("PID_kp", self.name)), self._single(self._tessie.get("PID_ki", self.name)), self._single(self._tessie.get("PID_kd", self.name))] def setPID(self, kp: float, ki: float, kd: float): self._tessie.set("PID_kp", str(kp), self.name) self._tessie.set("PID_ki", str(ki), self.name) self._tessie.set("PID_kd", str(kd), self.name) def getPIDMinMax(self): return [self._single(self._tessie.get("PID_Min", self.name)), self._single(self._tessie.get("PID_Max", self.name))] def setPIDMinMax(self, pidmin, pidmax): self._tessie.set("PID_Min", str(round(pidmin, 1)), self.name) self._tessie.set("PID_Max", str(round(pidmax, 1)), self.name) def setRef(self, ref: float): self._tessie.set("Ref_U", str(round(ref, 3)), self.name) def getRef(self): return self._single(self._tessie.get("Ref_U", self.name)) def setMode(self, mode: int): self._tessie.set("Mode", str(mode), self.name) def getMode(self): return self._single(self._tessie.get("Mode", self.name)) def clearError(self): self._tessie.cmd("clearError", self.name) def getError(self): return self._single(self._tessie.get("Error", self.name)) class TEC: def __init__(self, tessie: Tessie, i): self._tessie = tessie self.conf = ConfTEC(tessie, i) self.name = " tec " + str(i) self.single = True if i != 0 else False def _single(self, data): if self.single: return data else: return [float(x) for x in data.split(",")] def pon(self): self._tessie.cmd("Power_On", self.name) def poff(self): self._tessie.cmd("Power_Off", self.name) def getState(self): return self._single(self._tessie.get("PowerState", self.name)) def getTemp(self): return self._single(self._tessie.get("Temp_M", self.name)) def getUI(self): return [self._single(self._tessie.get("Supply_U", self.name)), self._single(self._tessie.get("Supply_I", self.name)), self._single(self._tessie.get("Peltier_U", self.name)), self._single(self._tessie.get("Peltier_I", self.name))] def setTemp(self, temp: float): self._tessie.set("Temp_Set", str(round(temp)), self.name) def setVoltage(self, u: float): self._tessie.set("ControlVoltage_Set", str(round(u, 2)), self.name) def getVoltage(self): return self._single(self._tessie.get("ControlVoltage_Set", self.name)) def reset(self): return self._tessie.cmd("Reboot", self.name, True) def loadFromFlash(self): return self._tessie.cmd("LoadVariables", self.name, True) def getSWVersion(self): return self._tessie.cmd("GetSWVersion", self.name, True) class Coldbox: def __init__(self, broker): self._tessie = Tessie(broker) self.valve0 = Valve(self._tessie, 0) self.valve1 = Valve(self._tessie, 1) self.tecall = TEC(self._tessie, 0) self.tec1 = TEC(self._tessie, 1) self.tec2 = TEC(self._tessie, 2) self.tec3 = TEC(self._tessie, 3) self.tec4 = TEC(self._tessie, 4) self.tec5 = TEC(self._tessie, 5) self.tec6 = TEC(self._tessie, 6) self.tec7 = TEC(self._tessie, 7) self.tec8 = TEC(self._tessie, 8) self.env = Env(self._tessie) self.tecs = [self.tec1, self.tec2, self.tec3, self.tec4, self.tec5, self.tec6, self.tec7, self.tec8] def help(self): self._tessie.help() def main(): print("Hello World!") box = Coldbox('coldbox02.psi.ch') print(box.tec1.getTemp()) if __name__ == "__main__": main()