Files
x12sa-eiger/script/devices/Array10.py
gac-x12sa 493c95fa04 Closedown
2023-04-03 14:37:55 +02:00

135 lines
4.8 KiB
Python

import org.zeromq.ZMQ as ZMQ
import org.zeromq.ZMQ.Socket as Socket
import json
class Array10Array(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray):
def getSize(self):
return 0 if (self.take() is None) else len(self.take())
def set(self, data):
self.onReadout(data)
class Array10Matrix(ReadonlyAsyncRegisterBase, ReadonlyRegisterMatrix):
def getWidth(self):
return 0 if (self.take() is None) else len(self.take()[0])
def getHeight(self):
return 0 if (self.take() is None) else len(self.take())
def set(self, data, shape):
if (data is not None) and (shape is not None):
self.onReadout(Convert.reshape(data, shape))
class Array10(DeviceBase, Readable, Cacheable, Readable.ReadableType):
def __init__(self, name, address, mode=ZMQ.PULL):
""" Device implementing communication with over Array10.
Args:
name(str): device name
address(str): address of stream
modulo_array(int, default=1): if defined create child device that holds the data array subsampled by this value.
modulo_matrix(int, default=1): if defined create child device that holds the 2d image subsampled by this value.
"""
super(Array10, self).__init__(name)
self.address = address
self.mode = mode
self.context = None
self.socket = None
self.running = False
self.header=None
self.data=None
self.devArray=None
self.devMatrix=None
self.message_count=0
self.devArray = Array10Array(self.name + "_array" )
self.addChild(self.devArray)
self.devMatrix = Array10Matrix(self.name + "_matrix" )
self.devMatrix.modulo = 1
self.addChild(self.devMatrix)
print "Scripted Array10"
def doInitialize(self):
self.stop()
self.header=None
self.data=None
self.message_count=0
self.start()
def start(self):
if self.running==False:
self.getLogger().info("Starting");
self.running = True
self.task = fork(self.rx_thread)
def stop(self):
if self.running:
self.getLogger().info("Stopping");
self.running = False
ret = join(self.task)
def rx_thread(self):
self.getLogger().info("Enter rx thread");
try:
self.context = ZMQ.context(1)
self.socket = self.context.socket(self.mode)
self.socket.connect(self.address)
if self.mode == ZMQ.SUB:
self.socket.subscribe("")
self.getLogger().info("Running " + str(self.mode) + " " + str(self.address))
first = True
while self.running:
header = self.socket.recv(ZMQ.NOBLOCK)
if (header is not None):
try:
self.header=json.loads(''.join(chr(i) for i in header))
self.shape = self.header.get("shape", None)
self.dtype = self.header.get("type", "int8")
data = self.socket.recv()
if data is not None:
self.data=BufferConverter.fromArray(data, Type.fromKey(self.dtype))
self.setCache({"header":self.header, "data":self.data}, None)
self.message_count=self.message_count+1
self.devArray.set(self.data)
if self.message_count % self.devMatrix.modulo == 0:
if self.shape: self.shape=(self.shape[1],self.shape[0]) #TODO: FIXME
self.devMatrix.set(self.data, self.shape)
continue
except Exception as e:
print e
time.sleep(0.01)
except Exception as e:
print e
finally:
if self.socket:
self.socket.close()
if self.context:
self.context.term()
self.getLogger().info("Quit rx thread");
def doClose(self):
"""Close the channel.
"""
self.stop()
#super(Array10, self).doClose()
DeviceBase.doClose(self)
#Readable interface
def read(self):
self.waitCacheChange(-1)
return self.take()
#Testing
#add_device(Array10("a10", "tcp://localhost:1234", modulo_array=1, modulo_matrix=10), True)
#add_device(a10.devArray, True)
#add_device(a10.devMatrix, True)
#add_device(RegisterMatrixSource("src_a10", a10.devMatrix), True)