import org.zeromq.ZMQ as ZMQ import org.zeromq.ZMQ.Socket as Socket import json class Array10Array(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray): def getSize(self): return 0 if (self.take() is None) else len(self.take()) def set(self, data): self.onReadout(data) class Array10Matrix(ReadonlyAsyncRegisterBase, ReadonlyRegisterMatrix): def getWidth(self): return 0 if (self.take() is None) else len(self.take()[0]) def getHeight(self): return 0 if (self.take() is None) else len(self.take()) def set(self, data, shape): if (data is not None) and (shape is not None): self.onReadout(Convert.reshape(data, shape)) class Array10(DeviceBase, Readable, Cacheable, Readable.ReadableType): def __init__(self, name, address, modulo_array=1, modulo_matrix=1, mode=ZMQ.PULL): """ Device implementing communication with over Array10. Args: name(str): device name address(str): address of stream modulo_array(int, default=1): if defined create child device that holds the data array subsampled by this value. modulo_matrix(int, default=1): if defined create child device that holds the 2d image subsampled by this value. """ super(Array10, self).__init__(name) self.address = address self.mode = mode self.context = None self.socket = None self.running = False self.header=None self.data=None self.array_dev=None self.matrix_dev=None self.message_count=0 if modulo_array is not None: self.array_dev = Array10Array(self.name + "_array" ) self.addChild(self.array_dev) self.array_dev.modulo = modulo_array else: self.array_dev = None if modulo_matrix is not None: self.matrix_dev = Array10Matrix(self.name + "_matrix" ) self.addChild(self.matrix_dev) self.matrix_dev.modulo = modulo_matrix else: self.matrix_dev = None def doInitialize(self): self.stop() self.header=None self.data=None self.message_count=0 self.start() def start(self): if self.running==False: self.getLogger().info("Starting"); self.running = True self.task = fork(self.rx_thread) def stop(self): if self.running: self.getLogger().info("Stopping"); self.running = False ret = join(self.task) def rx_thread(self): self.getLogger().info("Enter rx thread"); try: self.context = ZMQ.context(1) self.socket = self.context.socket(self.mode) self.socket.connect(self.address) while self.running: header = self.socket.recv(ZMQ.NOBLOCK) if (header is not None): try: self.header=json.loads(''.join(chr(i) for i in header)) self.data = self.socket.recv() if self.data is not None: self.setCache({"header":self.header, "data":self.data}, None) self.message_count=self.message_count+1 if self.array_dev is not None: if self.message_count % self.array_dev.modulo == 0: self.array_dev.set(self.data) if self.matrix_dev is not None: if self.message_count % self.matrix_dev.modulo == 0: shape = self.header.get("shape", None) if shape: shape=(shape[1],shape[0]) #TODO: FIXME self.matrix_dev.set(self.data, shape) continue except Exception as e: print e time.sleep(0.01) except Exception as e: print e finally: if self.socket: self.socket.close() if self.context: self.context.term() self.getLogger().info("Quit rx thread"); def doClose(self): """Close the channel. """ self.stop() super(Array10, self).doClose() #Readable interface def read(self): self.waitCacheChange(-1) return self.take() #Testing #add_device(Array10("a10", "tcp://localhost:1234", modulo_array=1, modulo_matrix=10), True) #add_device(a10.array_dev, True) #add_device(a10.matrix_dev, True) #add_device(RegisterMatrixSource("src_a10", a10.matrix_dev), True)