Files
x12sa-eiger/script/devices/Array10.py
gac-x12sa 06906eb57e
2022-12-21 17:15:40 +01:00

159 lines
6.0 KiB
Python

import org.zeromq.ZMQ as ZMQ
import org.zeromq.ZMQ.Socket as Socket
import java.nio.ByteBuffer as ByteBuffer
import ch.psi.bsread.converter.MatlabByteConverter as Converter
import ch.psi.bsread.message.ChannelConfig as ChannelConfig
import ch.psi.bsread.message.Type as Type
import json
class Array10Array(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray):
def getSize(self):
return 0 if (self.take() is None) else len(self.take())
def set(self, data):
self.onReadout(data)
class Array10Matrix(ReadonlyAsyncRegisterBase, ReadonlyRegisterMatrix):
def getWidth(self):
return 0 if (self.take() is None) else len(self.take()[0])
def getHeight(self):
return 0 if (self.take() is None) else len(self.take())
def set(self, data, shape):
if (data is not None) and (shape is not None):
self.onReadout(Convert.reshape(data, shape))
class Array10(DeviceBase, Readable, Cacheable, Readable.ReadableType):
def __init__(self, name, address, modulo_array=1, modulo_matrix=1, mode=ZMQ.PULL):
""" Device implementing communication with over Array10.
Args:
name(str): device name
address(str): address of stream
modulo_array(int, default=1): if defined create child device that holds the data array subsampled by this value.
modulo_matrix(int, default=1): if defined create child device that holds the 2d image subsampled by this value.
"""
super(Array10, self).__init__(name)
self.address = address
self.mode = mode
self.context = None
self.socket = None
self.running = False
self.header=None
self.data=None
self.array_dev=None
self.matrix_dev=None
self.message_count=0
self.converter=Converter()
if modulo_array is not None:
self.array_dev = Array10Array(self.name + "_array" )
self.addChild(self.array_dev)
self.array_dev.modulo = modulo_array
else:
self.array_dev = None
if modulo_matrix is not None:
self.matrix_dev = Array10Matrix(self.name + "_matrix" )
self.addChild(self.matrix_dev)
self.matrix_dev.modulo = modulo_matrix
else:
self.matrix_dev = None
def doInitialize(self):
self.stop()
self.header=None
self.data=None
self.message_count=0
self.start()
def start(self):
if self.running==False:
self.getLogger().info("Starting");
self.running = True
self.task = fork(self.rx_thread)
def stop(self):
if self.running:
self.getLogger().info("Stopping");
self.running = False
ret = join(self.task)
def rx_thread(self):
self.getLogger().info("Enter rx thread");
try:
self.context = ZMQ.context(1)
self.socket = self.context.socket(self.mode)
self.socket.connect(self.address)
if self.mode == ZMQ.SUB:
self.socket.subscribe("")
self.getLogger().info("Running " + str(self.mode) + " " + str(self.address))
first = True
while self.running:
header = self.socket.recv(ZMQ.NOBLOCK)
if (header is not None):
try:
self.header=json.loads(''.join(chr(i) for i in header))
self.shape = self.header.get("shape", None)
self.dtype = self.header.get("type", "int8")
data = self.socket.recv()
if data is not None:
if self.dtype != "int8":
bb = ByteBuffer.wrap(data)
self.bb=bb
self.dtype = self.dtype.capitalize()
if self.dtype[0]=="U":
self.dtype =self.dtype[:1] + self.dtype[1:].capitalize()
cfg=ChannelConfig("",Type.valueOf(self.dtype),[len(data)],1,0)
data=self.converter.getValue(None,None,cfg,bb,None)
self.data=data
self.setCache({"header":self.header, "data":self.data}, None)
self.message_count=self.message_count+1
if self.array_dev is not None:
if self.message_count % self.array_dev.modulo == 0:
self.array_dev.set(self.data)
if self.matrix_dev is not None:
if self.message_count % self.matrix_dev.modulo == 0:
#if self.shape: self.shape=(self.shape[1],self.shape[0]) #TODO: FIXME
self.matrix_dev.set(self.data, self.shape)
continue
except Exception as e:
print e
time.sleep(0.01)
except Exception as e:
print e
finally:
if self.socket:
self.socket.close()
if self.context:
self.context.term()
self.getLogger().info("Quit rx thread");
def doClose(self):
"""Close the channel.
"""
self.stop()
#super(Array10, self).doClose()
DeviceBase.doClose(self)
#Readable interface
def read(self):
self.waitCacheChange(-1)
return self.take()
#Testing
#add_device(Array10("a10", "tcp://localhost:1234", modulo_array=1, modulo_matrix=10), True)
#add_device(a10.array_dev, True)
#add_device(a10.matrix_dev, True)
#add_device(RegisterMatrixSource("src_a10", a10.matrix_dev), True)