Files
sinqDAQ/sim/counterbox_sim.py
2024-11-05 09:29:38 +01:00

148 lines
3.9 KiB
Python

#!/usr/bin/env python3
import re
import socket
import sys
import time
from random import randrange
HOST = "127.0.0.1" # Localhost
PORT = int(sys.argv[1]) # Port to listen on
TOTAL_CH = int(sys.argv[2]) # Number of Channels
class CounterBox:
def __init__(self, total_channels):
self.total_channels = total_channels
self.counts = [0] * self.total_channels
self.status = 0
self.countmode = 'time'
self.presettime = 0
self.presetcount = 0
self.starttime = 0
self.elapsed = 0
self.monitor = 0
def resetCounts(self):
self.counts = [0] * self.total_channels
self.starttime = time.time()
self.elapsed = 0
def getStatus(self):
return self.status
def getCounts(self):
return self.counts
def getMonitorCount(self):
return self.counts[self.monitor]
def updateCounts(self):
for i in range(self.total_channels):
self.counts[i] += randrange(5)
def getRunTime(self):
elapsed = round(time.time() - self.starttime, 3)
if self.countmode == 'time':
if elapsed < self.presettime:
self.updateCounts()
self.elapsed = elapsed
else:
self.elapsed = self.presettime
self.status = 0
elif self.countmode == 'count':
if self.getMonitorCount() < self.presetcount:
self.updateCounts()
self.elapsed = elapsed
if self.getMonitorCount() >= self.presetcount:
self.counts[self.monitor] = self.presetcount
self.status = 0
else:
raise Exception("Invalid State")
return self.elapsed
def startTimePreset(self, presettime):
self.countmode = 'time'
self.status = 1
self.presettime = round(presettime, 3)
self.resetCounts()
def startCountPreset(self, presetcount):
self.countmode = 'count'
self.status = 1
self.presetcount = presetcount
self.resetCounts()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
def send(data: str):
if data:
return conn.sendall(f'{data}\r'.encode())
else:
return conn.sendall(b'\r')
def receive():
data = conn.recv(1024)
if data:
# also removes terminator
return data.decode('ascii').rstrip()
else:
return ''
counterbox = CounterBox(TOTAL_CH)
while True:
data = receive()
if not data: # Empty implies client disconnected
break # Close Server
try:
if data == 'RMT 1':
send('')
elif data == 'ECHO 2':
send('Counterbox') # Sends some sort of info command
elif data == 'RA':
send(
' '.join(map(str,
[counterbox.getRunTime()] + \
counterbox.getCounts()
))
)
elif data == 'RS':
send(str(counterbox.getStatus()))
elif re.fullmatch(r'TP (\d+(\.\d+)?)', data):
presettime = float(re.fullmatch(r'TP (\d+(\.\d+)?)', data).group(1))
counterbox.startTimePreset(presettime)
send('')
elif re.fullmatch(r'MP (\d+)', data):
counts = int(re.fullmatch(r'MP (\d+)', data).group(1))
counterbox.startCountPreset(counts)
send('')
else:
send('?2') # Bad command
except:
send('?OV')