130 lines
3.5 KiB
Python
Executable File
130 lines
3.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# TODO might be better to use PyEpics or Caproto or something
|
|
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
import time
|
|
|
|
from queue import Queue, Empty
|
|
from subprocess import Popen, PIPE, run
|
|
from threading import Thread
|
|
|
|
|
|
os.environ['EPICS_BASE'] = '/usr/local/epics/base-7.0.7'
|
|
os.environ['EPICS_HOST_ARCH'] = 'linux-x86_64'
|
|
os.environ['PARENT_PATH'] = str(pathlib.Path(__file__).parent.resolve())
|
|
|
|
|
|
def queue_output(pipe, queue):
|
|
while True:
|
|
line = pipe.readline()
|
|
|
|
if not line:
|
|
break
|
|
|
|
queue.put(line.decode('ascii').rstrip())
|
|
pipe.close()
|
|
|
|
def get_piped_output(proc):
|
|
stdqueue = Queue()
|
|
stdthread = Thread(target=queue_output, args=(proc.stdout, stdqueue))
|
|
stdthread.daemon = True
|
|
stdthread.start()
|
|
|
|
errqueue = Queue()
|
|
errthread = Thread(target=queue_output, args=(proc.stderr, errqueue))
|
|
errthread.daemon = True
|
|
errthread.start()
|
|
|
|
return stdqueue, errqueue
|
|
|
|
def getState(instr, name):
|
|
result = run(['caget', f'{instr}{name}:STATUS'], stdout=PIPE)
|
|
state = result.stdout.decode('ascii').rstrip().split()[1]
|
|
print(f'Currently in state {state}')
|
|
return state
|
|
|
|
def getCount(instr, name, ch):
|
|
result = run(['caget', f'{instr}{name}:M{ch}'], stdout=PIPE)
|
|
count = int(result.stdout.decode('ascii').rstrip().split()[1])
|
|
print(f'M{ch} == {count}')
|
|
return count
|
|
|
|
def presetTime(instr, name, time):
|
|
print(f'Starting count for {time} seconds')
|
|
run(['caput', f'{instr}{name}:PRESET-TIME', f'{time}'])
|
|
|
|
def presetCount(instr, name, count):
|
|
print(f'Starting count until channel 1 reaches {count}')
|
|
run(['caput', f'{instr}{name}:PRESET-COUNT', f'{count}'])
|
|
|
|
def testCanCount(instr, name):
|
|
# Check in Idle State
|
|
assert getState(instr, name) == 'Idle', 'Not in valid state'
|
|
|
|
# Start Time Based Count and Check that Status Changes
|
|
assert getCount(instr, name, 1) == 0, "Erroneous nonzero starting count value"
|
|
presetTime(instr, name, 5)
|
|
time.sleep(1)
|
|
assert getState(instr, name) == 'Counting', 'Didn\'t start counting'
|
|
time.sleep(5)
|
|
assert getState(instr, name) == 'Idle', 'Didn\'t finish counting'
|
|
assert getCount(instr, name, 1) > 0, 'No events were counted'
|
|
|
|
# Start Monitor Based Count and Check that Status Changes
|
|
presetAmount = 100
|
|
presetCount(instr, name, presetAmount)
|
|
time.sleep(1)
|
|
assert getState(instr, name) == 'Counting', 'Didn\'t start counting'
|
|
assert getCount(instr, name, 1) < presetAmount
|
|
while getState(instr, name) != 'Idle':
|
|
time.sleep(1)
|
|
assert getCount(instr, name, 1) == presetAmount, 'Counted events not equal to preset'
|
|
assert getCount(instr, name, 2) > 0
|
|
|
|
def test(instr, name):
|
|
|
|
# TODO pass instr and name to script
|
|
proc = Popen([f'{os.environ["PARENT_PATH"]}/ioc.sh'], stdout=PIPE, stderr=PIPE, shell=False)
|
|
|
|
try:
|
|
stdqueue, errqueue = get_piped_output(proc)
|
|
|
|
while True:
|
|
line = stdqueue.get()
|
|
print(line)
|
|
|
|
if 'iocRun: All initialization complete' in line:
|
|
break # IOC is now running
|
|
|
|
#time.sleep(20)
|
|
|
|
print("IOC Initialisation Complete")
|
|
print("Starting Tests")
|
|
|
|
testCanCount(instr, name)
|
|
|
|
print("Success")
|
|
proc.kill()
|
|
proc.wait()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failure: {e}", file=sys.stderr)
|
|
proc.kill()
|
|
proc.wait()
|
|
return False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
print("Starting Test")
|
|
|
|
# Test V2
|
|
if test('SQ:TEST:', 'CB_TEST'):
|
|
exit(0)
|
|
else:
|
|
exit(1)
|