Adding sim and test

This commit is contained in:
2024-11-05 09:29:38 +01:00
parent 29e8aa9e85
commit 897e54901f
17 changed files with 465 additions and 161 deletions

124
test/test.py Executable file
View File

@ -0,0 +1,124 @@
#!/usr/bin/env python3
# TODO might be better to use PyEpics or Caproto or something
import os
import pathlib
import sys
import time
from queue import Queue, Empty
from subprocess import Popen, PIPE, run
from threading import Thread
os.environ['EPICS_BASE'] = '/usr/local/epics/base-7.0.7'
os.environ['EPICS_HOST_ARCH'] = 'linux-x86_64'
os.environ['PARENT_PATH'] = str(pathlib.Path(__file__).parent.resolve())
def queue_output(pipe, queue):
while True:
line = pipe.readline()
if not line:
break
queue.put(line.decode('ascii').rstrip())
pipe.close()
def get_piped_output(proc):
stdqueue = Queue()
stdthread = Thread(target=queue_output, args=(proc.stdout, stdqueue))
stdthread.daemon = True
stdthread.start()
errqueue = Queue()
errthread = Thread(target=queue_output, args=(proc.stderr, errqueue))
errthread.daemon = True
errthread.start()
return stdqueue, errqueue
def getState(prefix, name):
result = run(['caget', f'{prefix}:{name}:STATUS'], stdout=PIPE)
state = result.stdout.decode('ascii').rstrip().split()[1]
print(f'Currently in state {state}')
return state
def getCount(prefix, name, ch):
result = run(['caget', f'{prefix}:{name}:M{ch}'], stdout=PIPE)
count = int(result.stdout.decode('ascii').rstrip().split()[1])
print(f'M{ch} == {count}')
return count
def presetTime(prefix, name, time):
print(f'Starting count for {time} seconds')
run(['caput', f'{prefix}:{name}:PRESET-TIME', f'{time}'])
def presetCount(prefix, name, count):
print(f'Starting count until channel 1 reaches {count}')
run(['caput', f'{prefix}:{name}:PRESET-COUNT', f'{count}'])
def testCanCount(prefix, name):
# Check in Idle State
assert getState(prefix, name) == 'Idle', 'Not in valid state'
# Start Time Based Count and Check that Status Changes
assert getCount(prefix, name, 1) == 0, "Erroneous nonzero starting count value"
presetTime(prefix, name, 5)
time.sleep(1)
assert getState(prefix, name) == 'Counting', 'Didn\'t start counting'
time.sleep(5)
assert getState(prefix, name) == 'Idle', 'Didn\'t finish counting'
assert getCount(prefix, name, 1) > 0, 'No events were counted'
# Start Monitor Based Count and Check that Status Changes
presetAmount = 100
presetCount(prefix, name, presetAmount)
time.sleep(1)
assert getState(prefix, name) == 'Counting', 'Didn\'t start counting'
assert getCount(prefix, name, 1) < presetAmount
while getState(prefix, name) != 'Idle':
time.sleep(1)
assert getCount(prefix, name, 1) == presetAmount, 'Counted events not equal to preset'
assert getCount(prefix, name, 2) > 0
def test(prefix, name):
# TODO pass prefix and name to script
proc = Popen([f'{os.environ["PARENT_PATH"]}/st.cmd'], stdout=PIPE, stderr=PIPE, shell=False)
try:
stdqueue, errqueue = get_piped_output(proc)
while True:
line = stdqueue.get()
print(line)
if line == 'iocRun: All initialization complete':
break # IOC is now running
testCanCount(prefix, name)
print("Success")
proc.kill()
proc.wait()
return True
except Exception as e:
print(f"Failure: {e}", file=sys.stderr)
proc.kill()
proc.wait()
return False
if __name__ == '__main__':
print("Starting Test")
# Test V2
if test('SQ:TEST', 'CB_TEST'):
exit(0)
else:
exit(1)