Files
sfbd/interface/slic.py

95 lines
2.7 KiB
Python

import h5py
import numpy as np
import time
from threading import Thread
from PyQt5.QtCore import QObject, pyqtSignal
# to do:
# 1 - check if scan thread is running
# 2 - import of BSread data
from sfbd.interface import getSnap
class SlicScan(QObject):
siginc = pyqtSignal(int, int) # signal for increment
sigterm = pyqtSignal(int) # signal for termination
sigsnap = pyqtSignal(bool)
def __init__(self):
QObject.__init__(self)
self.clear()
def clear(self):
self.daq = None
self.data = None
self.act = None
self.snap = None
def start(self,daq,snap=False):
self.clear()
self.daq=daq
Thread(target=self.Tmonitor).start()
self.startSnap(snap)
def startSnap(self,snap=False):
if snap:
Thread(target=self.Tsnap).start()
def Tsnap(self):
self.snap = getSnap()
self.sigsnap.emit(True)
def Tmonitor(self):
mythread = Thread(target=self.Tscanner).start()
time.sleep(1)
ostep = -1
while(self.daq.running()):
istep,nstep=self.daq.status()
if istep>ostep:
ostep=istep
self.siginc.emit(istep,nstep)
time.sleep(1)
if not mythread == None: # wait till scanning thread is done
mythread.join()
istep,nstep=self.daq.status()
self.siginc.emit(istep,nstep)
self.data,self.act = importSlicScan(self.daq.info())
if hasattr(self.daq,'auxdata'):
self.data.update(self.daq.auxdata)
self.sigterm.emit(istep==nstep)
def Tscanner(self):
self.daq.scan()
def stop(self):
self.daq.stop()
def importSlicScan(scan_info):
if not isinstance(scan_info,dict):
return None,None
if not 'scan_files' in scan_info.keys():
return None,None
sfiles = scan_info['scan_files']
data = {}
for istep, sfile in enumerate(sfiles):
hid = h5py.File(sfile[0],'r')
for name, h5obj in hid.items():
if isinstance(h5obj,h5py.Dataset): # pv channels
data[name] = addDatasetToData(data,name,h5obj)
elif isinstance(h5obj,h5py.Group): # bs read channels
if 'data' in h5obj:
data[name] = addDatasetToData(data,name,h5obj['data'])
actuator = {}
name = scan_info['scan_parameters']['name'][0]
actuator[name]=np.array(scan_info['scan_values'])
data[name]=np.array(scan_info['scan_readbacks'])
return data,actuator
def addDatasetToData(data,name,h5obj):
if not name in data:
return np.array([h5obj[()]])
else:
return np.append(data[name],np.array([h5obj[()]]),axis=0)