Files
dev/script/cpy/TestOphydWrapper.py
2024-06-10 10:44:16 +02:00

94 lines
2.9 KiB
Python

class ReaderWrapper():
def __init__(self, dev):
self.dev=dev
self.name=dev.getName()
self.parent = None
try:
self.source = self.dev.getChannelName()
except:
self.source = "Unknown"
try:
try:
self.shape = [self.dev.getHeight(), self.dev.getWidth()]
self.shape_str = "["+str(self.shape[0])+"]"+"["+str(self.shape[1])+"]"
except:
self.shape = [self.dev.getSize()]
self.shape_str = "["+str(self.shape[0])+"]"
except:
self.shape = []
self.shape_str=""
try:
self.precision = self.dev.getPrecision()
except:
self.precision = None
try:
v = self.dev.take()
if v is None: raise Exception("")
except:
v = self.dev.read()
if type(v) is str:
self.dtype = 'string'
elif type(v) is int:
self.dtype = 'integer'
elif type(v) is bool:
self.dtype = 'boolean'
else:
#try:
# len(v)
# self.dtype = 'array'
#except:
# self.dtype = 'number'
self.dtype = 'number'
self.description = {self.name: { \
'dtype':self.dtype , \
'shape': self.shape, \
'source': self.source, \
'precision': self.precision \
}}
self.cfg_description = {"shape_str":{"dtype":"string", 'shape':(len(self.shape_str),), "source":""}, }
self.configuration = {"shape_str":{"value":self.shape_str, "timestamp":time.time()}}
self.name = self.name+self.shape_str
def read(self):
if is_main_thread():
value = self.dev.read()
else:
global __return__,__exception__
__return__ = __exception__ = None
handler.queue.append(("read", (self.dev)))
while (__return__ is None) and (__exception__ is None):
time.sleep(0.01)
if __exception__ is not None:
raise __exception__
value=__return__
if True: #Lself.dtype == "array":
value = numpy.array(value)
return {self.name:{"value":value, "timestamp":time.time()}}
def describe(self):
self.description = {self.name: { \
'dtype':self.dtype , \
'shape': self.shape, \
'source': self.source, \
'precision': self.precision \
}}
return self.description
def describe_configuration(self):
#print(3)
return self.cfg_description
def read_configuration(self):
#print(4)
return self.configuration
det4=ReaderWrapper(get_device("arr"))
dets = [det4,]
RE(count(dets, num=5, delay=0.5))