-adding automatic test through python

This commit is contained in:
2026-06-09 09:30:37 +02:00
parent 5e575a020c
commit f93a1a8ee9
6 changed files with 71 additions and 35 deletions
+8 -10
View File
@@ -31,19 +31,14 @@ def readAscii(connexion_client : socket):
message += chunk.decode()
if len(message) != 58 :
print(message)
outlet_index = ord(message[22]) - ord('1')
print(outlet_state[outlet_index])
match message[25]:
case 'n':
outlet_state[outlet_index] = 'On'
print("powering on " + str(outlet_index))
case 'f':
outlet_state[outlet_index] = 'Off'
print("powering off " + str(outlet_index))
case 'e':
outlet_state[outlet_index] = 'Rst'
print("restarting " + str(outlet_index + 1))
outlet_restart_time[outlet_index] = random.randint(2,5)
if len(message) == 58:
time = datetime.datetime.now().strftime('%d %B %Y %H:%M:%S')
@@ -91,11 +86,14 @@ def turnOff():
socket_listener.close()
while(keepAlive):
connexion_client, address_client = socket_listener.accept()
socket_listener.settimeout(1)
readAscii(connexion_client)
updateSocketRestartTimer()
connexion_client.close()
connexion_client = 0
try:
connexion_client, address_client = socket_listener.accept()
socket_listener.settimeout(1)
readAscii(connexion_client)
updateSocketRestartTimer()
finally:
connexion_client.close()
socket_listener.close()
Regular → Executable
View File
+4 -1
View File
@@ -13,5 +13,8 @@ export EPOWERSWITCH_SOCKET_NUMBER=8
export HOST_NAME="localhost"
export HOST_PORT="55555"
iocsh st.cmd
echo "${PWD}/st.cmd"
/usr/local/bin/procServ -L - -f -i ^D^C 20001 "${PWD}/st.cmd"
+22
View File
@@ -0,0 +1,22 @@
from caproto.sync.client import read, write
from time import sleep
class CaprotoWrapper:
def raw_read(pvname):
return read(pvname, timeout=5).data[0].decode('utf-8')
def get_pv_value(pvname, expected_value):
timeout = 10
while(expected_value != CaprotoWrapper.raw_read(pvname) and timeout > 0):
timeout -=1
sleep(0.5)
return CaprotoWrapper.raw_read(pvname)
def set_pv_value(pvname, value):
write(pvname, value, notify=True)
timeout = 10
while(value != CaprotoWrapper.raw_read(pvname) and timeout > 0):
timeout -= 1
sleep(0.5)
+36
View File
@@ -0,0 +1,36 @@
import subprocess
import sys
import signal
from caproto_wrapper import CaprotoWrapper
def is_ioc_running():
return "" != CaprotoWrapper.raw_read("ePowerSwitch_set_outlet_1")
def start_iocshell():
res = subprocess.Popen(["./startioc.sh"])
while not is_ioc_running():
pass
return res
def start_sim():
return subprocess.Popen(["python", "/home/ponsin_h/ePowerSwitchStreamDevice/sim/ePowerSwitch8_sim.py"])
def start_pytest():
return subprocess.Popen(["pytest","tests"])
sim_process = start_sim()
iocsh_process = start_iocshell()
pytest_process = start_pytest()
exit_code = 0
try:
exit_code = pytest_process.wait()
finally:
sim_process.send_signal(sig=signal.SIGTERM)
iocsh_process.send_signal(sig=signal.SIGTERM)
pytest_process.send_signal(sig=signal.SIGTERM)
sys.exit(exit_code)
+1 -24
View File
@@ -1,30 +1,7 @@
from caproto.threading.client import Context
from caproto.sync.client import read, write
from time import sleep
from caproto_wrapper import CaprotoWrapper
delay = 1
class CaprotoWrapper:
def raw_read(pvname):
return read(pvname).data[0].decode('utf-8')
def get_pv_value(pvname, expected_value):
timeout = 10
while(expected_value != CaprotoWrapper.raw_read(pvname) and timeout > 0):
timeout -=1
sleep(0.5)
return CaprotoWrapper.raw_read(pvname)
def set_pv_value(pvname, value):
write(pvname, value, notify=True)
timeout = 10
while(value != CaprotoWrapper.raw_read(pvname) and timeout > 0):
timeout -= 1
sleep(0.5)
def test_turn_on():
CaprotoWrapper.set_pv_value("ePowerSwitch_set_outlet_1", "On")