135 lines
4.8 KiB
Python
135 lines
4.8 KiB
Python
import org.zeromq.ZMQ as ZMQ
|
|
import org.zeromq.ZMQ.Socket as Socket
|
|
import json
|
|
|
|
|
|
class Array10Array(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray):
|
|
def getSize(self):
|
|
return 0 if (self.take() is None) else len(self.take())
|
|
|
|
def set(self, data):
|
|
self.onReadout(data)
|
|
|
|
|
|
class Array10Matrix(ReadonlyAsyncRegisterBase, ReadonlyRegisterMatrix):
|
|
def getWidth(self):
|
|
return 0 if (self.take() is None) else len(self.take()[0])
|
|
|
|
def getHeight(self):
|
|
return 0 if (self.take() is None) else len(self.take())
|
|
|
|
def set(self, data, shape):
|
|
if (data is not None) and (shape is not None):
|
|
self.onReadout(Convert.reshape(data, shape))
|
|
|
|
|
|
class Array10(DeviceBase, Readable, Cacheable, Readable.ReadableType):
|
|
def __init__(self, name, address, mode=ZMQ.PULL):
|
|
""" Device implementing communication with over Array10.
|
|
Args:
|
|
name(str): device name
|
|
address(str): address of stream
|
|
modulo_array(int, default=1): if defined create child device that holds the data array subsampled by this value.
|
|
modulo_matrix(int, default=1): if defined create child device that holds the 2d image subsampled by this value.
|
|
"""
|
|
super(Array10, self).__init__(name)
|
|
self.address = address
|
|
self.mode = mode
|
|
self.context = None
|
|
self.socket = None
|
|
self.running = False
|
|
self.header=None
|
|
self.data=None
|
|
self.devArray=None
|
|
self.devMatrix=None
|
|
self.message_count=0
|
|
|
|
self.devArray = Array10Array(self.name + "_array" )
|
|
self.addChild(self.devArray)
|
|
self.devMatrix = Array10Matrix(self.name + "_matrix" )
|
|
self.devMatrix.modulo = 1
|
|
self.addChild(self.devMatrix)
|
|
print "Scripted Array10"
|
|
|
|
|
|
def doInitialize(self):
|
|
self.stop()
|
|
self.header=None
|
|
self.data=None
|
|
self.message_count=0
|
|
self.start()
|
|
|
|
def start(self):
|
|
if self.running==False:
|
|
self.getLogger().info("Starting");
|
|
self.running = True
|
|
self.task = fork(self.rx_thread)
|
|
|
|
|
|
def stop(self):
|
|
if self.running:
|
|
self.getLogger().info("Stopping");
|
|
self.running = False
|
|
ret = join(self.task)
|
|
|
|
|
|
def rx_thread(self):
|
|
self.getLogger().info("Enter rx thread");
|
|
try:
|
|
self.context = ZMQ.context(1)
|
|
self.socket = self.context.socket(self.mode)
|
|
self.socket.connect(self.address)
|
|
if self.mode == ZMQ.SUB:
|
|
self.socket.subscribe("")
|
|
self.getLogger().info("Running " + str(self.mode) + " " + str(self.address))
|
|
first = True
|
|
while self.running:
|
|
header = self.socket.recv(ZMQ.NOBLOCK)
|
|
if (header is not None):
|
|
try:
|
|
self.header=json.loads(''.join(chr(i) for i in header))
|
|
self.shape = self.header.get("shape", None)
|
|
self.dtype = self.header.get("type", "int8")
|
|
data = self.socket.recv()
|
|
if data is not None:
|
|
self.data=BufferConverter.fromArray(data, Type.fromKey(self.dtype))
|
|
self.setCache({"header":self.header, "data":self.data}, None)
|
|
self.message_count=self.message_count+1
|
|
self.devArray.set(self.data)
|
|
if self.message_count % self.devMatrix.modulo == 0:
|
|
#if self.shape: self.shape=(self.shape[1],self.shape[0]) #TODO: FIXME
|
|
self.devMatrix.set(self.data, self.shape)
|
|
continue
|
|
except Exception as e:
|
|
print e
|
|
time.sleep(0.01)
|
|
except Exception as e:
|
|
print e
|
|
finally:
|
|
if self.socket:
|
|
self.socket.close()
|
|
if self.context:
|
|
self.context.term()
|
|
self.getLogger().info("Quit rx thread");
|
|
|
|
def doClose(self):
|
|
"""Close the channel.
|
|
"""
|
|
self.stop()
|
|
#super(Array10, self).doClose()
|
|
DeviceBase.doClose(self)
|
|
|
|
#Readable interface
|
|
def read(self):
|
|
self.waitCacheChange(-1)
|
|
return self.take()
|
|
|
|
|
|
#Testing
|
|
#add_device(Array10("a10", "tcp://localhost:1234", modulo_array=1, modulo_matrix=10), True)
|
|
#add_device(a10.devArray, True)
|
|
#add_device(a10.devMatrix, True)
|
|
#add_device(RegisterMatrixSource("src_a10", a10.devMatrix), True)
|
|
|
|
|