first preliminary spectrumProc.py

This commit is contained in:
2024-08-27 18:49:55 +02:00
parent 690d54de5f
commit 3681fd80d0
3 changed files with 536 additions and 0 deletions

BIN
spectrumProc/FM_data.pickle Normal file

Binary file not shown.

View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
SwissFEL Furka RIXS stectrum processing
https://jira.psi.ch/browse/SFELPHOTON-1232
if an too old scipy version is istalled:
pip install scipy scikit-learn -U --user
"""
import sys, logging, copy
import epics
import numpy as np
import matplotlib.pyplot as plt
import pickle
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
#matplotlib widget
import scipy
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression,RANSACRegressor
_log=logging.getLogger(__name__)
class col:
d = '\033[0m' #default
r = '\033[31m' #red
g = '\033[32m' #green
y = '\033[33m' #yellow
rr= '\033[91m' #red(bright)
gg= '\033[92m' #green(bright)
yy= '\033[93m' #yellow(bright)
b = '\033[1m' #bold
u = '\033[4m' #underline
R = '\033[1;31m' #bold, red
G = '\033[1;32m' #bold, green
Y = '\033[1;33m' #bold, yellow
class logHandler(logging.StreamHandler):
def __init__(self):
logging.StreamHandler.__init__(self)
def emit(self, record):
'''override function of base class'''
try:
msg=self.format(record)
# print(record.__dict__)
if record.levelno<=10:
c=col.g
elif record.levelno<=20:
c=col.y
elif record.levelno<=30:
c=col.yy
elif record.levelno<=40:
c=col.r
else:
c=col.rr+col.b
msg=c+msg+col.d
stream=self.stream
stream.write(msg+self.terminator)
self.flush()
except RecursionError:
raise
except Exception:
self.handleError(record)
#https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.median_abs_deviation.html#r63fe0ba43769-1
#https://github.com/scipy/scipy/blob/v1.14.1/scipy/stats/_stats_py.py#L3466-L3617
#def median_abs_deviation(x, axis=0, center=np.median, scale=1.0, nan_policy='propagate'):
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True'):
'''
Finds the best curvature function. one could use a simple polynomial
fit but this regressor is a bit more advanced because it discards outliers
'''
X = evnts_j_tot
y = evnts_i_tot
MAD = scipy.stats.median_abs_deviation(y)
model = Pipeline([('poly', PolynomialFeatures(degree=2)),
('RANSAC', RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold = MAD*MAD_n))])
model = model.fit(X[:, np.newaxis], y)
c = model.named_steps['RANSAC'].estimator_.coef_
if plot == 'True':
inlier_mask = model.named_steps['RANSAC'].inlier_mask_
plt.figure()
plt.plot(X, y, 'o', markersize = 0.5)
plt.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize = 0.5, )
x_linreg =np.linspace(min(X), max(X))
plt.plot(x_linreg, c[0] + c[1]*x_linreg + c[2]*x_linreg**2 , '-')
plt.ylim(np.mean(y) - MAD*MAD_n*3, np.mean(y) + MAD*MAD_n*3)
return c
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, plot = 'True'):
'''
Applies the curvature correction
'''
row_curv_corr = evnts_i_tot -slope*evnts_j_tot - C_2*evnts_j_tot**2
x =np.linspace(ROI[0][0],ROI[0][1])
if plot == 'True':
plt.figure()
plt.xlim(ROI[0][0], ROI[0][1])
plt.ylim(ROI[1][0], ROI[1][1])
plt.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
# plt.plot(x, intercept + 0*x, '-')
return row_curv_corr
def plot_1d_RIXS(row_curv_corr, ROI=[[0,1800],[0,1800]], pb=2.5, plot = 'True'):
'''
makes a 1D plot according to a specific binning
'''
#pb is pixel binning
#spread is about 2.5 pixels, by comparing with image based data
pxY_num = ROI[1][1]
startY_px = ROI[1][0]
endY_px = ROI[1][0]+ROI[1][1]-1
scf, b = np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
x_pxl = .5*(b[:-1] + b[1:]) #+ B*px_col/2
if plot == 'True':
plt.figure()
plt.xlim(ROI[1][0], ROI[1][1])
plt.plot(x_pxl, scf, '-o', markersize=1)
return x_pxl, scf
class RIXSprocess:
def run(self):
_log.info('...')
with open('FM_data.pickle', 'rb') as handle:
rixs_data = pickle.load(handle)
#plt.ion()
run = 226 #mirror left
#the single photon counting data comes as events, defined by their (i,j) coordinates
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
#plt.ioff()
#plt.show()
run = 225 #no mirror
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
run = 227 #mirror right
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
rixs_data[227].keys()
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[226]['2D']
c = FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True') #compute curvature
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'True') #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
plt.figure()
plt.plot(x,y)
#some real data
runs = [464,461,467]
fig, axs = plt.subplots(1,3, figsize = (8,5))
for i,run in enumerate(runs):
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
axs[i].plot(evnts_j_tot, evnts_i_tot,'o',ms = 0.2, alpha = 0.66)
#we correct using a curvature function obtained before
plt.figure()
for run in runs:
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'False') #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
plt.plot(x,y,label = str(run))
plt.legend()
plt.xlim(250,800)
#plt.ioff()
plt.show()
# if attr in self._pvs:
# return self.get(attr)
# elif attr in self.__dict__:
# return self.__dict__[attr]
if __name__ == '__main__':
import argparse
logging.basicConfig(level=logging.INFO, handlers=[logHandler()], format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
def main():
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
args=parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
if args.mode&0x01:
obj=RIXSprocess()
obj.run()
main()

285
spectrumProc/spectrumProc.py Executable file
View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
SwissFEL Athos spectrometer
- select file 'FM_data.pickle'
- select run '226'
- calibrate
bitmask for simulation:
0x01:
0x02:
0x04:
0x08:
0x10:
0x20:
0x40:
0x80:
"""
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QPushButton, QSlider, QLineEdit,\
QCheckBox, QHBoxLayout, QVBoxLayout, QGroupBox, QGridLayout, QComboBox, QFileDialog
from PyQt5.QtGui import QPainter, QColor, QPen, QBrush, QPolygon, QTransform
import PyQt5.QtCore as QtCore
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
import sys, logging, pickle
import numpy as np
import scipy
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression,RANSACRegressor
_log=logging.getLogger(__name__)
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=None):
'''
Finds the best curvature function. one could use a simple polynomial
fit but this regressor is a bit more advanced because it discards outliers
'''
X=evnts_j_tot
y=evnts_i_tot
MAD=scipy.stats.median_abs_deviation(y)
model=Pipeline([('poly', PolynomialFeatures(degree=2)),
('RANSAC',
RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold=MAD*MAD_n))])
model=model.fit(X[:, np.newaxis], y)
c=model.named_steps['RANSAC'].estimator_.coef_
if wnd:
inlier_mask=model.named_steps['RANSAC'].inlier_mask_
wnd.axes.plot(X, y, 'o', markersize=0.5)
wnd.axes.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize=0.5, )
x_linreg=np.linspace(min(X), max(X))
wnd.axes.plot(x_linreg, c[0]+c[1]*x_linreg+c[2]*x_linreg**2, '-')
wnd.axes.set_ylim(np.mean(y)-MAD*MAD_n*3, np.mean(y)+MAD*MAD_n*3)
return c
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, wnd=None):
'''
Applies the curvature correction
'''
row_curv_corr=evnts_i_tot-slope*evnts_j_tot-C_2*evnts_j_tot**2
x=np.linspace(ROI[0][0], ROI[0][1])
if wnd:
wnd.axes.set_xlim(ROI[0][0], ROI[0][1])
wnd.axes.set_ylim(ROI[1][0], ROI[1][1])
wnd.axes.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
# plt.plot(x, intercept + 0*x, '-')
return row_curv_corr
def plot_1d_RIXS(row_curv_corr, ROI=[[0, 1800], [0, 1800]], pb=2.5, wnd=None):
'''
makes a 1D plot according to a specific binning
'''
# pb is pixel binning
# spread is about 2.5 pixels, by comparing with image based data
pxY_num=ROI[1][1]
startY_px=ROI[1][0]
endY_px=ROI[1][0]+ROI[1][1]-1
scf, b=np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
x_pxl=.5*(b[:-1]+b[1:]) # + B*px_col/2
if wnd:
wnd[0].axes.set_xlim(ROI[1][0], ROI[1][1])
wnd[0].axes.plot(x_pxl, scf, '-o', markersize=1)
wnd[1].axes.plot(x_pxl, scf)
return x_pxl, scf
class MplCanvas(FigureCanvasQTAgg):
def __init__(self, parent=None, width=5, height=4, dpi=100):
#https://www.pythonguis.com/tutorials/plotting-matplotlib/
fig=Figure(figsize=(width, height), dpi=dpi)
self.axes=fig.add_subplot(111)
super(MplCanvas, self).__init__(fig)
class WndMainRIXS(QWidget):
def __init__(self,fn):
super().__init__()
self.initUI(fn)
def initUI(self,fn):
self.setGeometry(100, 100, 350, 350)
app=QApplication.instance()
#dev=app._dev
#self.setWindowTitle(f'{dev._name} main')
self._wDevs=w=QWidget(self)
w.move(10, 10)
lv=QVBoxLayout(w)
for pb,cb in (('load data',self.load_data),
('calibrate',self.calibrate),
('process',self.process)):
w=QPushButton(pb)
w.clicked.connect(cb)
#w.clicked.connect(lambda dummy,pb=pb: self.OnOpenChildWnd(pb))
lv.addWidget(w)
w=QGroupBox('process spectrum data', self)
lv.addWidget(w)
lg=QGridLayout(w)
row=0
t='raw data'
w=QLabel(t);
lg.addWidget(w, row, 0)
self._cbRaw=w=QComboBox(objectName=t);
lg.addWidget(w, row, 1)
#for i, tx in enumerate(tuple(dev._probes.keys())):
# w.insertItem(i, tx)
#w.currentIndexChanged.connect(lambda val, w=w:self.OnEnergyChanged(w,val))
#w=QCheckBox('fix', objectName=t)
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
lg.addWidget(w, row, 2)
row+=1
if fn is not None:
self.load_data(fn)
self.show()
def load_data(self,fn,**kwargs):
if type(fn)!=str:
options=QFileDialog.Options()
options|=QFileDialog.ReadOnly
#filePath, _=QFileDialog.getOpenFileName(self, "Open File", "", "All Files (*);;Text Files (*.txt)", options=options)
fn, _=QFileDialog.getOpenFileName(self, "Open File", "", "pickle Files (*.pickle *.pkl);;All Files (*)", options=options)
if fn is None:
_log.warning('no filename')
return
with open(fn, 'rb') as fh:
app=QApplication.instance()
app._rawData=pickle.load(fh)
_log.info(app._rawData.keys())
w=self._cbRaw
for i, tx in enumerate(tuple(app._rawData.keys())):
w.insertItem(i, str(tx))
w.currentIndexChanged.connect(self.OnRawChanged)
#w=QCheckBox('fix', objectName=t)
#w.clicked.connect(lambda val, w=w:self.OnEnergyChanged(w, val))
def OnRawChanged(self,**kwargs):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
data=app._rawData[int(t)]
n=len(data['2D'][0])
_log.info(f'{t} -> {n} events')
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot=data['2D']
#plt.figure()
#plt.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
#plt.show()
try:
wnd=app._wndRaw
except AttributeError as e:
app._wndRaw=wnd=MplCanvas(self, width=5, height=4, dpi=100)
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
wnd.show()
else:
_log.info('new data')
wnd.axes.cla() # Clear the canvas.
wnd.axes.plot(evnts_i_tot, evnts_j_tot, 'o', ms=0.2, alpha=0.66)
wnd.draw()
def calibrate(self):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
try:
wnd=app._wndCal
except AttributeError as e:
app._wndCal=wnd=MplCanvas(self, width=5, height=4, dpi=100)
wnd.show()
else:
_log.info('new data')
wnd.axes.cla() # Clear the canvas.
data=app._rawData[int(t)]
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
app._calib=c=FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, wnd=wnd) #compute curvature
wnd.draw()
pass
def process(self):
w=self._cbRaw
i=w.currentIndex()
t=w.currentText()
app=QApplication.instance()
try:
wnd=app._wndProc
except AttributeError as e:
wnd=list()
for i in range(3):
wnd.append(MplCanvas(self, width=5, height=4, dpi=100))
app._wndProc=tuple(wnd)
for w in wnd:
w.show()
else:
_log.info('new data')
for w in wnd:
w.axes.cla() # Clear the canvas.
data=app._rawData[int(t)]
c=app._calib
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = data['2D']
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], wnd=wnd[0]) #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, wnd=wnd[1:])
for w in wnd:
w.draw()
# if attr in self._pvs:
# return self.get(attr)
# elif attr in self.__dict__:
# return self.__dict__[attr]
if __name__ == '__main__':
import argparse
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
def main():
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
parser.add_argument("--file", "-f", help="raw data file default=%(default)s", default='FM_data.pickle')
args=parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
app=QApplication(sys.argv)
app._args=args
if args.mode&0x01:
app._wndRIXS=WndMainRIXS(args.file)
sys.exit(app.exec_())
main()