Files
EsfRixsApps/spectrumProc/sample_RIXSprocess.py

251 lines
7.6 KiB
Python
Executable File

#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
SwissFEL Furka RIXS stectrum processing
https://jira.psi.ch/browse/SFELPHOTON-1232
if an too old scipy version is istalled:
pip install scipy scikit-learn -U --user
"""
import sys, logging, copy
import epics
import numpy as np
import matplotlib.pyplot as plt
import pickle
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
#matplotlib widget
import scipy
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression,RANSACRegressor
_log=logging.getLogger(__name__)
class col:
d = '\033[0m' #default
r = '\033[31m' #red
g = '\033[32m' #green
y = '\033[33m' #yellow
rr= '\033[91m' #red(bright)
gg= '\033[92m' #green(bright)
yy= '\033[93m' #yellow(bright)
b = '\033[1m' #bold
u = '\033[4m' #underline
R = '\033[1;31m' #bold, red
G = '\033[1;32m' #bold, green
Y = '\033[1;33m' #bold, yellow
class logHandler(logging.StreamHandler):
def __init__(self):
logging.StreamHandler.__init__(self)
def emit(self, record):
'''override function of base class'''
try:
msg=self.format(record)
# print(record.__dict__)
if record.levelno<=10:
c=col.g
elif record.levelno<=20:
c=col.y
elif record.levelno<=30:
c=col.yy
elif record.levelno<=40:
c=col.r
else:
c=col.rr+col.b
msg=c+msg+col.d
stream=self.stream
stream.write(msg+self.terminator)
self.flush()
except RecursionError:
raise
except Exception:
self.handleError(record)
#https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.median_abs_deviation.html#r63fe0ba43769-1
#https://github.com/scipy/scipy/blob/v1.14.1/scipy/stats/_stats_py.py#L3466-L3617
#def median_abs_deviation(x, axis=0, center=np.median, scale=1.0, nan_policy='propagate'):
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True'):
'''
Finds the best curvature function. one could use a simple polynomial
fit but this regressor is a bit more advanced because it discards outliers
'''
X = evnts_j_tot
y = evnts_i_tot
MAD = scipy.stats.median_abs_deviation(y)
model = Pipeline([('poly', PolynomialFeatures(degree=2)),
('RANSAC', RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold = MAD*MAD_n))])
model = model.fit(X[:, np.newaxis], y)
c = model.named_steps['RANSAC'].estimator_.coef_
if plot == 'True':
inlier_mask = model.named_steps['RANSAC'].inlier_mask_
plt.figure()
plt.plot(X, y, 'o', markersize = 0.5)
plt.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize = 0.5, )
x_linreg =np.linspace(min(X), max(X))
plt.plot(x_linreg, c[0] + c[1]*x_linreg + c[2]*x_linreg**2 , '-')
plt.ylim(np.mean(y) - MAD*MAD_n*3, np.mean(y) + MAD*MAD_n*3)
return c
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, plot = 'True'):
'''
Applies the curvature correction
'''
row_curv_corr = evnts_i_tot -slope*evnts_j_tot - C_2*evnts_j_tot**2
x =np.linspace(ROI[0][0],ROI[0][1])
if plot == 'True':
plt.figure()
plt.xlim(ROI[0][0], ROI[0][1])
plt.ylim(ROI[1][0], ROI[1][1])
plt.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
# plt.plot(x, intercept + 0*x, '-')
return row_curv_corr
def plot_1d_RIXS(row_curv_corr, ROI=[[0,1800],[0,1800]], pb=2.5, plot = 'True'):
'''
makes a 1D plot according to a specific binning
'''
#pb is pixel binning
#spread is about 2.5 pixels, by comparing with image based data
pxY_num = ROI[1][1]
startY_px = ROI[1][0]
endY_px = ROI[1][0]+ROI[1][1]-1
scf, b = np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
x_pxl = .5*(b[:-1] + b[1:]) #+ B*px_col/2
if plot == 'True':
plt.figure()
plt.xlim(ROI[1][0], ROI[1][1])
plt.plot(x_pxl, scf, '-o', markersize=1)
return x_pxl, scf
class RIXSprocess:
def run(self):
_log.info('...')
with open('FM_data.pickle', 'rb') as handle:
rixs_data = pickle.load(handle)
#plt.ion()
run = 226 #mirror left
#the single photon counting data comes as events, defined by their (i,j) coordinates
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
#plt.ioff()
#plt.show()
run = 225 #no mirror
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
run = 227 #mirror right
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
plt.figure()
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
rixs_data[227].keys()
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[226]['2D']
c = FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True') #compute curvature
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'True') #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
plt.figure()
plt.plot(x,y)
#some real data
runs = [464,461,467]
fig, axs = plt.subplots(1,3, figsize = (8,5))
for i,run in enumerate(runs):
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
axs[i].plot(evnts_j_tot, evnts_i_tot,'o',ms = 0.2, alpha = 0.66)
#we correct using a curvature function obtained before
plt.figure()
for run in runs:
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'False') #apply curvatur and extract
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
plt.plot(x,y,label = str(run))
plt.legend()
plt.xlim(250,800)
#plt.ioff()
plt.show()
# if attr in self._pvs:
# return self.get(attr)
# elif attr in self.__dict__:
# return self.__dict__[attr]
if __name__ == '__main__':
import argparse
logging.basicConfig(level=logging.INFO, handlers=[logHandler()], format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
def main():
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
args=parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
if args.mode&0x01:
obj=RIXSprocess()
obj.run()
main()