301 lines
9.6 KiB
Python
Executable File
301 lines
9.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# *-----------------------------------------------------------------------*
|
|
# | |
|
|
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
|
|
# | |
|
|
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
|
# *-----------------------------------------------------------------------*
|
|
|
|
"""
|
|
SwissFEL Athos spectrometer data processing
|
|
|
|
!!! If there is a problem with library or version, try: !!!
|
|
!!! /opt/gfa/python-3.8/latest/bin/python /sf/furka/applications/EsfRixsApps/spectrumProc/spectrumProc.py !!!
|
|
|
|
- select file 'FM_data.pickle'
|
|
- select run '226'
|
|
- calibrate
|
|
|
|
bitmask for simulation:
|
|
0x01:
|
|
0x02:
|
|
0x04:
|
|
0x08:
|
|
0x10:
|
|
0x20:
|
|
0x40:
|
|
0x80:
|
|
|
|
"""
|
|
import os.path
|
|
import sys, logging, pickle
|
|
_log=logging.getLogger(__name__)
|
|
|
|
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QSlider, QLineEdit,\
|
|
QCheckBox, QHBoxLayout, QVBoxLayout, QGroupBox, QGridLayout, QComboBox, QFileDialog
|
|
from PyQt5.QtGui import QPainter, QColor, QPen, QBrush, QPolygon, QTransform
|
|
import PyQt5.QtCore as QtCore
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg
|
|
from matplotlib.figure import Figure
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import scipy
|
|
try:
|
|
from sklearn.pipeline import Pipeline
|
|
from sklearn.preprocessing import PolynomialFeatures
|
|
from sklearn.linear_model import LinearRegression,RANSACRegressor
|
|
except ModuleNotFoundError as e:
|
|
_log.warning(f'sklearn import failed!\n pip install scipy scikit-learn -U --user')
|
|
|
|
|
|
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=None):
|
|
'''
|
|
Finds the best curvature function. one could use a simple polynomial
|
|
fit but this regressor is a bit more advanced because it discards outliers
|
|
'''
|
|
X=evnts_j_tot
|
|
y=evnts_i_tot
|
|
|
|
MAD=scipy.stats.median_abs_deviation(y)
|
|
|
|
model=Pipeline([('poly', PolynomialFeatures(degree=2)),
|
|
('RANSAC',
|
|
RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold=MAD*MAD_n))])
|
|
|
|
model=model.fit(X[:, np.newaxis], y)
|
|
c=model.named_steps['RANSAC'].estimator_.coef_
|
|
|
|
if wnd:
|
|
inlier_mask=model.named_steps['RANSAC'].inlier_mask_
|
|
|
|
wnd.axes.plot(X, y, 'o', markersize=0.5)
|
|
wnd.axes.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize=0.5, )
|
|
|
|
x_linreg=np.linspace(min(X), max(X))
|
|
wnd.axes.plot(x_linreg, c[0]+c[1]*x_linreg+c[2]*x_linreg**2, '-')
|
|
wnd.axes.set_ylim(np.mean(y)-MAD*MAD_n*3, np.mean(y)+MAD*MAD_n*3)
|
|
return c
|
|
|
|
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, wnd=None):
|
|
'''
|
|
Applies the curvature correction
|
|
'''
|
|
row_curv_corr=evnts_i_tot-slope*evnts_j_tot-C_2*evnts_j_tot**2
|
|
x=np.linspace(ROI[0][0], ROI[0][1])
|
|
|
|
if wnd:
|
|
wnd.axes.set_xlim(ROI[0][0], ROI[0][1])
|
|
wnd.axes.set_ylim(ROI[1][0], ROI[1][1])
|
|
wnd.axes.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
|
|
# plt.plot(x, intercept + 0*x, '-')
|
|
return row_curv_corr
|
|
|
|
def plot_1d_RIXS(row_curv_corr, ROI=[[0, 1800], [0, 1800]], pb=2.5, wnd=None):
|
|
'''
|
|
makes a 1D plot according to a specific binning
|
|
'''
|
|
# pb is pixel binning
|
|
# spread is about 2.5 pixels, by comparing with image based data
|
|
pxY_num=ROI[1][1]
|
|
startY_px=ROI[1][0]
|
|
endY_px=ROI[1][0]+ROI[1][1]-1
|
|
|
|
scf, b=np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
|
|
x_pxl=.5*(b[:-1]+b[1:]) # + B*px_col/2
|
|
|
|
if wnd:
|
|
wnd[0].axes.set_xlim(ROI[1][0], ROI[1][1])
|
|
wnd[0].axes.plot(x_pxl, scf, '-o', markersize=1)
|
|
|
|
wnd[1].axes.plot(x_pxl, scf)
|
|
|
|
return x_pxl, scf
|
|
|
|
|
|
class MplCanvas(FigureCanvasQTAgg):
|
|
|
|
def __init__(self, parent=None, width=5, height=4, dpi=100):
|
|
#https://www.pythonguis.com/tutorials/plotting-matplotlib/
|
|
fig=Figure(figsize=(width, height), dpi=dpi)
|
|
self.axes=fig.add_subplot(111)
|
|
super(MplCanvas, self).__init__(fig)
|
|
|
|
|
|
class WndMainRIXS(QWidget):
|
|
def __init__(self,fn):
|
|
super().__init__()
|
|
self.initUI(fn)
|
|
|
|
def initUI(self,fn):
|
|
self.setGeometry(100, 100, 350, 350)
|
|
app=QApplication.instance()
|
|
#dev=app._dev
|
|
#self.setWindowTitle(f'{dev._name} main')
|
|
self._wDevs=w=QWidget(self)
|
|
w.move(10, 10)
|
|
lv=QVBoxLayout(w)
|
|
|
|
for pb,cb in (('load data',self.load_data),
|
|
('calibrate',self.calibrate),
|
|
('process',self.process)):
|
|
w=QPushButton(pb)
|
|
w.clicked.connect(cb)
|
|
#w.clicked.connect(lambda dummy,pb=pb: self.OnOpenChildWnd(pb))
|
|
lv.addWidget(w)
|
|
|
|
w=QGroupBox('process spectrum data', self)
|
|
lv.addWidget(w)
|
|
lg=QGridLayout(w)
|
|
row=0
|
|
|
|
t='raw data'
|
|
w=QLabel(t);
|
|
lg.addWidget(w, row, 0)
|
|
self._cbRaw=w=QComboBox(objectName=t);
|
|
lg.addWidget(w, row, 1)
|
|
#for i, tx in enumerate(tuple(dev._probes.keys())):
|
|
# w.insertItem(i, tx)
|
|
#w.currentIndexChanged.connect(lambda val, w=w:self.OnEnergyChanged(w,val))
|
|
#w=QCheckBox('fix', objectName=t)
|
|
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
|
|
lg.addWidget(w, row, 2)
|
|
row+=1
|
|
|
|
if fn is not None:
|
|
self.load_data(fn)
|
|
|
|
self.show()
|
|
|
|
def load_data(self,fn,**kwargs):
|
|
if type(fn)!=str:
|
|
options=QFileDialog.Options()
|
|
options|=QFileDialog.ReadOnly
|
|
#filePath, _=QFileDialog.getOpenFileName(self, "Open File", "", "All Files (*);;Text Files (*.txt)", options=options)
|
|
fn, _=QFileDialog.getOpenFileName(self, "Open File", "", "pickle Files (*.pickle *.pkl);;All Files (*)", options=options)
|
|
if fn is None:
|
|
_log.warning('no filename')
|
|
return
|
|
with open(fn, 'rb') as fh:
|
|
app=QApplication.instance()
|
|
app._rawData=pickle.load(fh)
|
|
#_log.info(app._rawData.keys())
|
|
#run=app._rawData[226]
|
|
#_log.info(run.keys())
|
|
#raw2D=run['2D']
|
|
#_log.info(f'{type(raw2D)}:{len(raw2D)} {raw2D}')
|
|
#line=raw2D[0]
|
|
#_log.info(f'{type(line)}: {line}')
|
|
#_log.info(raw2D)
|
|
#np.shape(raw2D)
|
|
w=self._cbRaw
|
|
for i, tx in enumerate(tuple(app._rawData.keys())):
|
|
w.insertItem(i, str(tx))
|
|
w.currentIndexChanged.connect(self.OnRawChanged)
|
|
#w=QCheckBox('fix', objectName=t)
|
|
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
|
|
|
|
def OnRawChanged(self,**kwargs):
|
|
w=self._cbRaw
|
|
i=w.currentIndex()
|
|
t=w.currentText()
|
|
app=QApplication.instance()
|
|
data=app._rawData[int(t)]
|
|
n=len(data['2D'][0])
|
|
_log.info(f'{t} -> {n} events')
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot=data['2D']
|
|
#plt.figure()
|
|
#plt.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
|
|
#plt.show()
|
|
try:
|
|
wnd=app._wndRaw
|
|
except AttributeError as e:
|
|
app._wndRaw=wnd=MplCanvas(self, width=5, height=4, dpi=100)
|
|
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
|
|
wnd.show()
|
|
else:
|
|
_log.info('new data')
|
|
wnd.axes.cla() # Clear the canvas.
|
|
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
|
|
wnd.draw()
|
|
|
|
def calibrate(self):
|
|
w=self._cbRaw
|
|
i=w.currentIndex()
|
|
t=w.currentText()
|
|
app=QApplication.instance()
|
|
try:
|
|
wnd=app._wndCal
|
|
except AttributeError as e:
|
|
app._wndCal=wnd=MplCanvas(self, width=5, height=4, dpi=100)
|
|
wnd.show()
|
|
else:
|
|
_log.info('new data')
|
|
wnd.axes.cla() # Clear the canvas.
|
|
|
|
data=app._rawData[int(t)]
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
|
|
app._calib=c=FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=wnd) #compute curvature
|
|
wnd.draw()
|
|
|
|
pass
|
|
|
|
def process(self):
|
|
w=self._cbRaw
|
|
i=w.currentIndex()
|
|
t=w.currentText()
|
|
app=QApplication.instance()
|
|
try:
|
|
wnd=app._wndProc
|
|
except AttributeError as e:
|
|
wnd=list()
|
|
for i in range(3):
|
|
wnd.append(MplCanvas(self, width=5, height=4, dpi=100))
|
|
app._wndProc=tuple(wnd)
|
|
for w in wnd:
|
|
w.show()
|
|
else:
|
|
_log.info('new data')
|
|
for w in wnd:
|
|
w.axes.cla() # Clear the canvas.
|
|
|
|
data=app._rawData[int(t)]
|
|
c=app._calib
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
|
|
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], wnd=wnd[0]) #apply curvatur and extract
|
|
|
|
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, wnd=wnd[1:])
|
|
for w in wnd:
|
|
w.draw()
|
|
|
|
|
|
# if attr in self._pvs:
|
|
# return self.get(attr)
|
|
# elif attr in self.__dict__:
|
|
# return self.__dict__[attr]
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
|
|
|
|
|
|
def main():
|
|
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
|
|
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
|
|
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
|
|
fn=os.path.join(os.path.dirname(os.path.relpath(os.path.realpath(__file__))),'FM_data.pickle')
|
|
#fn=os.path.join(os.path.dirname(os.path.realpath(__file__)),'FM_data.pickle')
|
|
parser.add_argument("--file", "-f", help="raw data file default=%(default)s", default=fn)
|
|
args=parser.parse_args()
|
|
_log.info('Arguments:{}'.format(args.__dict__))
|
|
|
|
app=QApplication(sys.argv)
|
|
app._args=args
|
|
|
|
if args.mode&0x01:
|
|
app._wndRIXS=WndMainRIXS(args.file)
|
|
|
|
sys.exit(app.exec_())
|
|
|
|
main() |