Files
EsfRixsApps/spectrumProc/spectrumProc.py
2024-08-29 16:35:14 +02:00

301 lines
9.6 KiB
Python
Executable File

#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
SwissFEL Athos spectrometer data processing
!!! If there is a problem with library or version, try: !!!
!!! /opt/gfa/python-3.8/latest/bin/python /sf/furka/applications/EsfRixsApps/spectrumProc/spectrumProc.py !!!
- select file 'FM_data.pickle'
- select run '226'
- calibrate
bitmask for simulation:
0x01:
0x02:
0x04:
0x08:
0x10:
0x20:
0x40:
0x80:
"""
import os.path
import sys, logging, pickle
_log=logging.getLogger(__name__)
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QSlider, QLineEdit,\
QCheckBox, QHBoxLayout, QVBoxLayout, QGroupBox, QGridLayout, QComboBox, QFileDialog
from PyQt5.QtGui import QPainter, QColor, QPen, QBrush, QPolygon, QTransform
import PyQt5.QtCore as QtCore
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
import numpy as np
import scipy
try:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression,RANSACRegressor
except ModuleNotFoundError as e:
_log.warning(f'sklearn import failed!\n pip install scipy scikit-learn -U --user')
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=None):
'''
Finds the best curvature function. one could use a simple polynomial
fit but this regressor is a bit more advanced because it discards outliers
'''
X=evnts_j_tot
y=evnts_i_tot
MAD=scipy.stats.median_abs_deviation(y)
model=Pipeline([('poly', PolynomialFeatures(degree=2)),
('RANSAC',
RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold=MAD*MAD_n))])
model=model.fit(X[:, np.newaxis], y)
c=model.named_steps['RANSAC'].estimator_.coef_
if wnd:
inlier_mask=model.named_steps['RANSAC'].inlier_mask_
wnd.axes.plot(X, y, 'o', markersize=0.5)
wnd.axes.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize=0.5, )
x_linreg=np.linspace(min(X), max(X))
wnd.axes.plot(x_linreg, c[0]+c[1]*x_linreg+c[2]*x_linreg**2, '-')
wnd.axes.set_ylim(np.mean(y)-MAD*MAD_n*3, np.mean(y)+MAD*MAD_n*3)
return c
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, wnd=None):
'''
Applies the curvature correction
'''
row_curv_corr=evnts_i_tot-slope*evnts_j_tot-C_2*evnts_j_tot**2
x=np.linspace(ROI[0][0], ROI[0][1])
if wnd:
wnd.axes.set_xlim(ROI[0][0], ROI[0][1])
wnd.axes.set_ylim(ROI[1][0], ROI[1][1])
wnd.axes.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
# plt.plot(x, intercept + 0*x, '-')
return row_curv_corr
def plot_1d_RIXS(row_curv_corr, ROI=[[0, 1800], [0, 1800]], pb=2.5, wnd=None):
'''
makes a 1D plot according to a specific binning
'''
# pb is pixel binning
# spread is about 2.5 pixels, by comparing with image based data
pxY_num=ROI[1][1]
startY_px=ROI[1][0]
endY_px=ROI[1][0]+ROI[1][1]-1
scf, b=np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
x_pxl=.5*(b[:-1]+b[1:]) # + B*px_col/2
if wnd:
wnd[0].axes.set_xlim(ROI[1][0], ROI[1][1])
wnd[0].axes.plot(x_pxl, scf, '-o', markersize=1)
wnd[1].axes.plot(x_pxl, scf)
return x_pxl, scf
class MplCanvas(FigureCanvasQTAgg):
def __init__(self, parent=None, width=5, height=4, dpi=100):
#https://www.pythonguis.com/tutorials/plotting-matplotlib/
fig=Figure(figsize=(width, height), dpi=dpi)
self.axes=fig.add_subplot(111)
super(MplCanvas, self).__init__(fig)
class WndMainRIXS(QWidget):
def __init__(self,fn):
super().__init__()
self.initUI(fn)
def initUI(self,fn):
self.setGeometry(100, 100, 350, 350)
app=QApplication.instance()
#dev=app._dev
#self.setWindowTitle(f'{dev._name} main')
self._wDevs=w=QWidget(self)
w.move(10, 10)
lv=QVBoxLayout(w)
for pb,cb in (('load data',self.load_data),
('calibrate',self.calibrate),
('process',self.process)):
w=QPushButton(pb)
w.clicked.connect(cb)
#w.clicked.connect(lambda dummy,pb=pb: self.OnOpenChildWnd(pb))
lv.addWidget(w)
w=QGroupBox('process spectrum data', self)
lv.addWidget(w)
lg=QGridLayout(w)
row=0
t='raw data'
w=QLabel(t);
lg.addWidget(w, row, 0)
self._cbRaw=w=QComboBox(objectName=t);
lg.addWidget(w, row, 1)
#for i, tx in enumerate(tuple(dev._probes.keys())):
# w.insertItem(i, tx)
#w.currentIndexChanged.connect(lambda val, w=w:self.OnEnergyChanged(w,val))
#w=QCheckBox('fix', objectName=t)
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
lg.addWidget(w, row, 2)
row+=1
if fn is not None:
self.load_data(fn)
self.show()
def load_data(self,fn,**kwargs):
if type(fn)!=str:
options=QFileDialog.Options()
options|=QFileDialog.ReadOnly
#filePath, _=QFileDialog.getOpenFileName(self, "Open File", "", "All Files (*);;Text Files (*.txt)", options=options)
fn, _=QFileDialog.getOpenFileName(self, "Open File", "", "pickle Files (*.pickle *.pkl);;All Files (*)", options=options)
if fn is None:
_log.warning('no filename')
return
with open(fn, 'rb') as fh:
app=QApplication.instance()
app._rawData=pickle.load(fh)
#_log.info(app._rawData.keys())
#run=app._rawData[226]
#_log.info(run.keys())
#raw2D=run['2D']
#_log.info(f'{type(raw2D)}:{len(raw2D)} {raw2D}')
#line=raw2D[0]
#_log.info(f'{type(line)}: {line}')
#_log.info(raw2D)
#np.shape(raw2D)
w=self._cbRaw
for i, tx in enumerate(tuple(app._rawData.keys())):
w.insertItem(i, str(tx))
w.currentIndexChanged.connect(self.OnRawChanged)
#w=QCheckBox('fix', objectName=t)
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
def OnRawChanged(self,**kwargs):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
data=app._rawData[int(t)]
n=len(data['2D'][0])
_log.info(f'{t} -> {n} events')
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot=data['2D']
#plt.figure()
#plt.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
#plt.show()
try:
wnd=app._wndRaw
except AttributeError as e:
app._wndRaw=wnd=MplCanvas(self, width=5, height=4, dpi=100)
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
wnd.show()
else:
_log.info('new data')
wnd.axes.cla() # Clear the canvas.
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
wnd.draw()
def calibrate(self):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
try:
wnd=app._wndCal
except AttributeError as e:
app._wndCal=wnd=MplCanvas(self, width=5, height=4, dpi=100)
wnd.show()
else:
_log.info('new data')
wnd.axes.cla() # Clear the canvas.
data=app._rawData[int(t)]
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
app._calib=c=FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=wnd) #compute curvature
wnd.draw()
pass
def process(self):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
try:
wnd=app._wndProc
except AttributeError as e:
wnd=list()
for i in range(3):
wnd.append(MplCanvas(self, width=5, height=4, dpi=100))
app._wndProc=tuple(wnd)
for w in wnd:
w.show()
else:
_log.info('new data')
for w in wnd:
w.axes.cla() # Clear the canvas.
data=app._rawData[int(t)]
c=app._calib
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], wnd=wnd[0]) #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, wnd=wnd[1:])
for w in wnd:
w.draw()
# if attr in self._pvs:
# return self.get(attr)
# elif attr in self.__dict__:
# return self.__dict__[attr]
if __name__ == '__main__':
import argparse
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
def main():
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
fn=os.path.join(os.path.dirname(os.path.relpath(os.path.realpath(__file__))),'FM_data.pickle')
#fn=os.path.join(os.path.dirname(os.path.realpath(__file__)),'FM_data.pickle')
parser.add_argument("--file", "-f", help="raw data file default=%(default)s", default=fn)
args=parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
app=QApplication(sys.argv)
app._args=args
if args.mode&0x01:
app._wndRIXS=WndMainRIXS(args.file)
sys.exit(app.exec_())
main()