Creation
This commit is contained in:
138
script/devices/Array10.py
Normal file
138
script/devices/Array10.py
Normal file
@@ -0,0 +1,138 @@
|
||||
import org.zeromq.ZMQ as ZMQ
|
||||
import org.zeromq.ZMQ.Socket as Socket
|
||||
import json
|
||||
|
||||
|
||||
|
||||
class Array10Array(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray):
|
||||
def getSize(self):
|
||||
return 0 if (self.take() is None) else len(self.take())
|
||||
|
||||
def set(self, data):
|
||||
self.onReadout(data)
|
||||
|
||||
|
||||
class Array10Matrix(ReadonlyAsyncRegisterBase, ReadonlyRegisterMatrix):
|
||||
def getWidth(self):
|
||||
return 0 if (self.take() is None) else len(self.take()[0])
|
||||
|
||||
def getHeight(self):
|
||||
return 0 if (self.take() is None) else len(self.take())
|
||||
|
||||
def set(self, data, shape):
|
||||
if (data is not None) and (shape is not None):
|
||||
self.onReadout(Convert.reshape(data, shape))
|
||||
|
||||
|
||||
class Array10(DeviceBase, Readable, Cacheable, Readable.ReadableType):
|
||||
def __init__(self, name, address, modulo_array=1, modulo_matrix=1, mode=ZMQ.PULL):
|
||||
""" Device implementing communication with over Array10.
|
||||
Args:
|
||||
name(str): device name
|
||||
address(str): address of stream
|
||||
modulo_array(int, default=1): if defined create child device that holds the data array subsampled by this value.
|
||||
modulo_matrix(int, default=1): if defined create child device that holds the 2d image subsampled by this value.
|
||||
"""
|
||||
super(Array10, self).__init__(name)
|
||||
self.address = address
|
||||
self.mode = mode
|
||||
self.context = None
|
||||
self.socket = None
|
||||
self.running = False
|
||||
self.header=None
|
||||
self.data=None
|
||||
self.array_dev=None
|
||||
self.matrix_dev=None
|
||||
self.message_count=0
|
||||
|
||||
if modulo_array is not None:
|
||||
self.array_dev = Array10Array(self.name + "_array" )
|
||||
self.addChild(self.array_dev)
|
||||
self.array_dev.modulo = modulo_array
|
||||
else:
|
||||
self.array_dev = None
|
||||
|
||||
if modulo_matrix is not None:
|
||||
self.matrix_dev = Array10Matrix(self.name + "_matrix" )
|
||||
self.addChild(self.matrix_dev)
|
||||
self.matrix_dev.modulo = modulo_matrix
|
||||
else:
|
||||
self.matrix_dev = None
|
||||
|
||||
|
||||
def doInitialize(self):
|
||||
self.stop()
|
||||
self.header=None
|
||||
self.data=None
|
||||
self.message_count=0
|
||||
self.start()
|
||||
|
||||
def start(self):
|
||||
if self.running==False:
|
||||
self.getLogger().info("Starting");
|
||||
self.running = True
|
||||
self.task = fork(self.rx_thread)
|
||||
|
||||
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.getLogger().info("Stopping");
|
||||
self.running = False
|
||||
ret = join(self.task)
|
||||
|
||||
|
||||
def rx_thread(self):
|
||||
self.getLogger().info("Enter rx thread");
|
||||
try:
|
||||
self.context = ZMQ.context(1)
|
||||
self.socket = self.context.socket(self.mode)
|
||||
self.socket.connect(self.address)
|
||||
while self.running:
|
||||
header = self.socket.recv(ZMQ.NOBLOCK)
|
||||
if (header is not None):
|
||||
try:
|
||||
self.header=json.loads(''.join(chr(i) for i in header))
|
||||
self.data = self.socket.recv()
|
||||
if self.data is not None:
|
||||
self.setCache({"header":self.header, "data":self.data}, None)
|
||||
self.message_count=self.message_count+1
|
||||
if self.array_dev is not None:
|
||||
if self.message_count % self.array_dev.modulo == 0:
|
||||
self.array_dev.set(self.data)
|
||||
if self.matrix_dev is not None:
|
||||
if self.message_count % self.matrix_dev.modulo == 0:
|
||||
shape = self.header.get("shape", None)
|
||||
if shape: shape=(shape[1],shape[0]) #TODO: FIXME
|
||||
self.matrix_dev.set(self.data, shape)
|
||||
continue
|
||||
except Exception as e:
|
||||
print e
|
||||
time.sleep(0.01)
|
||||
except Exception as e:
|
||||
print e
|
||||
finally:
|
||||
if self.socket:
|
||||
self.socket.close()
|
||||
if self.context:
|
||||
self.context.term()
|
||||
self.getLogger().info("Quit rx thread");
|
||||
|
||||
def doClose(self):
|
||||
"""Close the channel.
|
||||
"""
|
||||
self.stop()
|
||||
super(Array10, self).doClose()
|
||||
|
||||
#Readable interface
|
||||
def read(self):
|
||||
self.waitCacheChange(-1)
|
||||
return self.take()
|
||||
|
||||
|
||||
#Testing
|
||||
#add_device(Array10("a10", "tcp://localhost:1234", modulo_array=1, modulo_matrix=10), True)
|
||||
#add_device(a10.array_dev, True)
|
||||
#add_device(a10.matrix_dev, True)
|
||||
#add_device(RegisterMatrixSource("src_a10", a10.matrix_dev), True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user