import socket import datetime import random import signal import time port=55555 outlet_state = ['Off', 'Off', 'Off', 'Off', 'Off', 'Off', 'Off', 'Off'] outlet_restart_time = [0,0,0,0,0,0,0,0] socket_listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def updateSocketRestartTimer(): for i in range(len(outlet_restart_time)): if outlet_restart_time[i] > 0: outlet_restart_time[i] -= 1 if outlet_restart_time[i] == 0: outlet_state[i] = 'On' def readAscii(connexion_client : socket): message = "" chunk = "" while chunk != b'\r\n' and keepAlive: if not keepAlive: return chunk = connexion_client.recv(1024) message += chunk.decode() ## There are two messages possible : ## If the length is equal to 58, someone is asking for the current state of the device ## Else, someone is sending a command if len(message) != 58 : ## At index 22, we have the ASCII outlet number outlet_index = ord(message[22]) - ord('1') if (outlet_index < 0 or outlet_index > 7): ## if garbage, we return earlier return match message[25]: case 'n': outlet_state[outlet_index] = 'On' case 'f': outlet_state[outlet_index] = 'Off' case 'e': outlet_state[outlet_index] = 'Rst' outlet_restart_time[outlet_index] = random.randint(2,5) else: time = datetime.datetime.now().strftime('%d %B %Y %H:%M:%S') connexion_client.send(bytes("HTTP/1.0 200 OK\r\n" + "Server: lwIP/1.3.2 (http://www.sics.se/~adam/lwip/)\r\n"+ "Content-type: text/html;charset=iso-8859-1\r\n"+ "Expires: Fri, 01 Jan 2010 00:00:00 GMT\r\n"+ "Pragma: no-cache\r\n"+ "\r\n" + "\r\n" + "\r\n" + "
\r\n" + "\r\n" + "\r\n" + "Hidden Page\r\n" +
time + "\r\n" +
"ePowerSwitch 8M+ R3\r\n" +
"Version: v 1.5.0.1\r\n" +
"M0:O1=" + outlet_state[0] + "\r\n" +
"M0:O2=" + outlet_state[1] + "\r\n" +
"M0:O3=" + outlet_state[2] + "\r\n" +
"M0:O4=" + outlet_state[3] + "\r\n" +
"M0:O5=" + outlet_state[4] + "\r\n" +
"M0:O6=" + outlet_state[5] + "\r\n" +
"M0:O7=" + outlet_state[6] + "\r\n" +
"M0:O8=" + outlet_state[7] + "\r\n" +
"\r\n" +
"\r\n" +
" "
, "UTF-8"))
keepAlive = True;
socket_listener.bind(('', port))
socket_listener.listen()
def turnOff():
keepAlive = False
socket_listener.close()
signal.signal(signal.SIGTERM, turnOff )
while(keepAlive):
connexion_client = 0
try:
connexion_client, address_client = socket_listener.accept()
socket_listener.settimeout(1)
readAscii(connexion_client)
updateSocketRestartTimer()
finally:
connexion_client.close()
socket_listener.close()