251 lines
7.6 KiB
Python
Executable File
251 lines
7.6 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# *-----------------------------------------------------------------------*
|
|
# | |
|
|
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
|
|
# | |
|
|
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
|
# *-----------------------------------------------------------------------*
|
|
|
|
"""
|
|
SwissFEL Furka RIXS stectrum processing
|
|
|
|
https://jira.psi.ch/browse/SFELPHOTON-1232
|
|
|
|
if an too old scipy version is istalled:
|
|
pip install scipy scikit-learn -U --user
|
|
"""
|
|
|
|
import sys, logging, copy
|
|
import epics
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import pickle
|
|
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
|
|
#matplotlib widget
|
|
import scipy
|
|
from sklearn.pipeline import Pipeline
|
|
from sklearn.preprocessing import PolynomialFeatures
|
|
from sklearn.linear_model import LinearRegression,RANSACRegressor
|
|
|
|
_log=logging.getLogger(__name__)
|
|
|
|
class col:
|
|
d = '\033[0m' #default
|
|
r = '\033[31m' #red
|
|
g = '\033[32m' #green
|
|
y = '\033[33m' #yellow
|
|
rr= '\033[91m' #red(bright)
|
|
gg= '\033[92m' #green(bright)
|
|
yy= '\033[93m' #yellow(bright)
|
|
b = '\033[1m' #bold
|
|
u = '\033[4m' #underline
|
|
R = '\033[1;31m' #bold, red
|
|
G = '\033[1;32m' #bold, green
|
|
Y = '\033[1;33m' #bold, yellow
|
|
|
|
|
|
class logHandler(logging.StreamHandler):
|
|
def __init__(self):
|
|
logging.StreamHandler.__init__(self)
|
|
|
|
def emit(self, record):
|
|
'''override function of base class'''
|
|
try:
|
|
msg=self.format(record)
|
|
# print(record.__dict__)
|
|
if record.levelno<=10:
|
|
c=col.g
|
|
elif record.levelno<=20:
|
|
c=col.y
|
|
elif record.levelno<=30:
|
|
c=col.yy
|
|
elif record.levelno<=40:
|
|
c=col.r
|
|
else:
|
|
c=col.rr+col.b
|
|
msg=c+msg+col.d
|
|
stream=self.stream
|
|
stream.write(msg+self.terminator)
|
|
self.flush()
|
|
except RecursionError:
|
|
raise
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
#https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.median_abs_deviation.html#r63fe0ba43769-1
|
|
#https://github.com/scipy/scipy/blob/v1.14.1/scipy/stats/_stats_py.py#L3466-L3617
|
|
#def median_abs_deviation(x, axis=0, center=np.median, scale=1.0, nan_policy='propagate'):
|
|
|
|
|
|
def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True'):
|
|
|
|
'''
|
|
Finds the best curvature function. one could use a simple polynomial
|
|
fit but this regressor is a bit more advanced because it discards outliers
|
|
'''
|
|
|
|
X = evnts_j_tot
|
|
y = evnts_i_tot
|
|
|
|
|
|
MAD = scipy.stats.median_abs_deviation(y)
|
|
|
|
model = Pipeline([('poly', PolynomialFeatures(degree=2)),
|
|
('RANSAC', RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold = MAD*MAD_n))])
|
|
|
|
model = model.fit(X[:, np.newaxis], y)
|
|
c = model.named_steps['RANSAC'].estimator_.coef_
|
|
|
|
if plot == 'True':
|
|
inlier_mask = model.named_steps['RANSAC'].inlier_mask_
|
|
|
|
plt.figure()
|
|
plt.plot(X, y, 'o', markersize = 0.5)
|
|
plt.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize = 0.5, )
|
|
|
|
|
|
x_linreg =np.linspace(min(X), max(X))
|
|
plt.plot(x_linreg, c[0] + c[1]*x_linreg + c[2]*x_linreg**2 , '-')
|
|
plt.ylim(np.mean(y) - MAD*MAD_n*3, np.mean(y) + MAD*MAD_n*3)
|
|
|
|
return c
|
|
|
|
def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, plot = 'True'):
|
|
|
|
|
|
'''
|
|
Applies the curvature correction
|
|
'''
|
|
|
|
row_curv_corr = evnts_i_tot -slope*evnts_j_tot - C_2*evnts_j_tot**2
|
|
x =np.linspace(ROI[0][0],ROI[0][1])
|
|
|
|
if plot == 'True':
|
|
plt.figure()
|
|
plt.xlim(ROI[0][0], ROI[0][1])
|
|
plt.ylim(ROI[1][0], ROI[1][1])
|
|
plt.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1)
|
|
# plt.plot(x, intercept + 0*x, '-')
|
|
|
|
return row_curv_corr
|
|
|
|
def plot_1d_RIXS(row_curv_corr, ROI=[[0,1800],[0,1800]], pb=2.5, plot = 'True'):
|
|
|
|
'''
|
|
makes a 1D plot according to a specific binning
|
|
'''
|
|
|
|
#pb is pixel binning
|
|
#spread is about 2.5 pixels, by comparing with image based data
|
|
pxY_num = ROI[1][1]
|
|
startY_px = ROI[1][0]
|
|
endY_px = ROI[1][0]+ROI[1][1]-1
|
|
|
|
scf, b = np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px))
|
|
x_pxl = .5*(b[:-1] + b[1:]) #+ B*px_col/2
|
|
|
|
|
|
if plot == 'True':
|
|
plt.figure()
|
|
plt.xlim(ROI[1][0], ROI[1][1])
|
|
plt.plot(x_pxl, scf, '-o', markersize=1)
|
|
|
|
return x_pxl, scf
|
|
|
|
|
|
class RIXSprocess:
|
|
|
|
def run(self):
|
|
_log.info('...')
|
|
with open('FM_data.pickle', 'rb') as handle:
|
|
rixs_data = pickle.load(handle)
|
|
|
|
#plt.ion()
|
|
|
|
run = 226 #mirror left
|
|
#the single photon counting data comes as events, defined by their (i,j) coordinates
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
|
|
plt.figure()
|
|
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
|
|
#plt.ioff()
|
|
#plt.show()
|
|
|
|
run = 225 #no mirror
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
|
|
plt.figure()
|
|
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
|
|
|
|
run = 227 #mirror right
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
|
|
plt.figure()
|
|
plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66)
|
|
|
|
|
|
rixs_data[227].keys()
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[226]['2D']
|
|
c = FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True') #compute curvature
|
|
|
|
|
|
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'True') #apply curvatur and extract
|
|
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
|
|
plt.figure()
|
|
plt.plot(x,y)
|
|
|
|
|
|
#some real data
|
|
runs = [464,461,467]
|
|
|
|
fig, axs = plt.subplots(1,3, figsize = (8,5))
|
|
for i,run in enumerate(runs):
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
|
|
axs[i].plot(evnts_j_tot, evnts_i_tot,'o',ms = 0.2, alpha = 0.66)
|
|
|
|
#we correct using a curvature function obtained before
|
|
plt.figure()
|
|
for run in runs:
|
|
evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D']
|
|
row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'False') #apply curvatur and extract
|
|
x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False')
|
|
plt.plot(x,y,label = str(run))
|
|
plt.legend()
|
|
plt.xlim(250,800)
|
|
|
|
#plt.ioff()
|
|
plt.show()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if attr in self._pvs:
|
|
# return self.get(attr)
|
|
# elif attr in self.__dict__:
|
|
# return self.__dict__[attr]
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
logging.basicConfig(level=logging.INFO, handlers=[logHandler()], format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
|
|
|
|
|
|
def main():
|
|
epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n'
|
|
parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1)
|
|
parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01)
|
|
args=parser.parse_args()
|
|
_log.info('Arguments:{}'.format(args.__dict__))
|
|
|
|
if args.mode&0x01:
|
|
obj=RIXSprocess()
|
|
obj.run()
|
|
|
|
main() |