#!/usr/bin/env python3 # TODO might be better to use PyEpics or Caproto or something import os import pathlib import sys import time from queue import Queue, Empty from subprocess import Popen, PIPE, run from threading import Thread os.environ['EPICS_BASE'] = '/usr/local/epics/base-7.0.7' os.environ['EPICS_HOST_ARCH'] = 'linux-x86_64' os.environ['PARENT_PATH'] = str(pathlib.Path(__file__).parent.resolve()) def queue_output(pipe, queue): while True: line = pipe.readline() if not line: break queue.put(line.decode('ascii').rstrip()) pipe.close() def get_piped_output(proc): stdqueue = Queue() stdthread = Thread(target=queue_output, args=(proc.stdout, stdqueue)) stdthread.daemon = True stdthread.start() errqueue = Queue() errthread = Thread(target=queue_output, args=(proc.stderr, errqueue)) errthread.daemon = True errthread.start() return stdqueue, errqueue def getState(prefix, name): result = run(['caget', f'{prefix}:{name}:STATUS'], stdout=PIPE) state = result.stdout.decode('ascii').rstrip().split()[1] print(f'Currently in state {state}') return state def getCount(prefix, name, ch): result = run(['caget', f'{prefix}:{name}:M{ch}'], stdout=PIPE) count = int(result.stdout.decode('ascii').rstrip().split()[1]) print(f'M{ch} == {count}') return count def presetTime(prefix, name, time): print(f'Starting count for {time} seconds') run(['caput', f'{prefix}:{name}:PRESET-TIME', f'{time}']) def presetCount(prefix, name, count): print(f'Starting count until channel 1 reaches {count}') run(['caput', f'{prefix}:{name}:PRESET-COUNT', f'{count}']) def testCanCount(prefix, name): # Check in Idle State assert getState(prefix, name) == 'Idle', 'Not in valid state' # Start Time Based Count and Check that Status Changes assert getCount(prefix, name, 1) == 0, "Erroneous nonzero starting count value" presetTime(prefix, name, 5) time.sleep(1) assert getState(prefix, name) == 'Counting', 'Didn\'t start counting' time.sleep(5) assert getState(prefix, name) == 'Idle', 'Didn\'t finish counting' assert getCount(prefix, name, 1) > 0, 'No events were counted' # Start Monitor Based Count and Check that Status Changes presetAmount = 100 presetCount(prefix, name, presetAmount) time.sleep(1) assert getState(prefix, name) == 'Counting', 'Didn\'t start counting' assert getCount(prefix, name, 1) < presetAmount while getState(prefix, name) != 'Idle': time.sleep(1) assert getCount(prefix, name, 1) == presetAmount, 'Counted events not equal to preset' assert getCount(prefix, name, 2) > 0 def test(prefix, name): # TODO pass prefix and name to script proc = Popen([f'{os.environ["PARENT_PATH"]}/st.cmd'], stdout=PIPE, stderr=PIPE, shell=False) try: stdqueue, errqueue = get_piped_output(proc) while True: line = stdqueue.get() print(line) if 'iocRun: All initialization complete' in line: break # IOC is now running testCanCount(prefix, name) print("Success") proc.kill() proc.wait() return True except Exception as e: print(f"Failure: {e}", file=sys.stderr) proc.kill() proc.wait() return False if __name__ == '__main__': print("Starting Test") # Test V2 if test('SQ:TEST', 'CB_TEST'): exit(0) else: exit(1)