Files
x06da/script/devices/BarcodeReader.py
gac-S_Changer c85d4b4aa9 Startup
2020-09-08 11:53:11 +02:00

83 lines
2.3 KiB
Python

class BarcodeReader(DeviceBase):
def __init__(self, name, microscan, microscan_cmd):
DeviceBase.__init__(self, name)
self.microscan = microscan
self.microscan_cmd = microscan_cmd
def doInitialize(self):
self.disable()
self.readout = None
self.processing = False
self.task_callable=None
def enable(self):
self.microscan_cmd.write("<H>")
self.setState(State.Ready)
def disable(self):
self.microscan_cmd.write("<I>")
self.setState(State.Disabled)
def get(self,timeout=1.0):
self.state.assertReady()
try:
self.setState(State.Busy)
self.microscan.flush()
ret = self.microscan.waitString(int(timeout * 1000))
if ret is not None:
ret = ret.strip()
self.setCache(ret, None)
return ret
except:
self.setCache(None, None)
return None
finally:
if self.state == State.Busy:
self.setState(State.Ready)
def doUpdate(self):
self.get()
def read(self,timeout=1.0):
if self.processing:
raise Exception("Ongoing read operation")
self.processing = True
try:
initial = self.state
if initial == State.Disabled:
self.enable()
try:
return self.get(timeout)
finally:
if initial == State.Disabled:
self.disable()
finally:
self.processing = False
def _read_task(self, timeout):
global readout
self.readout = self.read(timeout)
return self.readout
def start_read(self, timeout=1.0):
self.readout = None
self.task_callable = fork((self._read_task, (timeout,)))
def get_readout(self):
return self.readout
def wait_readout(self):
if self.task_callable is not None:
join(self.task_callable)
self.task_callable = None
return self.readout
add_device(BarcodeReader("barcode_reader", mscan_pin, mscan_pin_cmd), force = True)
add_device(BarcodeReader("barcode_reader_puck", mscan_puck, mscan_puck_cmd), force = True)