#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) | # | | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* """ SwissFEL Furka RIXS stectrum processing https://jira.psi.ch/browse/SFELPHOTON-1232 if an too old scipy version is istalled: pip install scipy scikit-learn -U --user """ import sys, logging, copy import epics import numpy as np import matplotlib.pyplot as plt import pickle from matplotlib.ticker import (MultipleLocator, AutoMinorLocator) #matplotlib widget import scipy from sklearn.pipeline import Pipeline from sklearn.preprocessing import PolynomialFeatures from sklearn.linear_model import LinearRegression,RANSACRegressor _log=logging.getLogger(__name__) class col: d = '\033[0m' #default r = '\033[31m' #red g = '\033[32m' #green y = '\033[33m' #yellow rr= '\033[91m' #red(bright) gg= '\033[92m' #green(bright) yy= '\033[93m' #yellow(bright) b = '\033[1m' #bold u = '\033[4m' #underline R = '\033[1;31m' #bold, red G = '\033[1;32m' #bold, green Y = '\033[1;33m' #bold, yellow class logHandler(logging.StreamHandler): def __init__(self): logging.StreamHandler.__init__(self) def emit(self, record): '''override function of base class''' try: msg=self.format(record) # print(record.__dict__) if record.levelno<=10: c=col.g elif record.levelno<=20: c=col.y elif record.levelno<=30: c=col.yy elif record.levelno<=40: c=col.r else: c=col.rr+col.b msg=c+msg+col.d stream=self.stream stream.write(msg+self.terminator) self.flush() except RecursionError: raise except Exception: self.handleError(record) #https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.median_abs_deviation.html#r63fe0ba43769-1 #https://github.com/scipy/scipy/blob/v1.14.1/scipy/stats/_stats_py.py#L3466-L3617 #def median_abs_deviation(x, axis=0, center=np.median, scale=1.0, nan_policy='propagate'): def FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True'): ''' Finds the best curvature function. one could use a simple polynomial fit but this regressor is a bit more advanced because it discards outliers ''' X = evnts_j_tot y = evnts_i_tot MAD = scipy.stats.median_abs_deviation(y) model = Pipeline([('poly', PolynomialFeatures(degree=2)), ('RANSAC', RANSACRegressor(estimator=LinearRegression(fit_intercept=False), residual_threshold = MAD*MAD_n))]) model = model.fit(X[:, np.newaxis], y) c = model.named_steps['RANSAC'].estimator_.coef_ if plot == 'True': inlier_mask = model.named_steps['RANSAC'].inlier_mask_ plt.figure() plt.plot(X, y, 'o', markersize = 0.5) plt.plot(X[inlier_mask], y[inlier_mask], 'o', 'r', markersize = 0.5, ) x_linreg =np.linspace(min(X), max(X)) plt.plot(x_linreg, c[0] + c[1]*x_linreg + c[2]*x_linreg**2 , '-') plt.ylim(np.mean(y) - MAD*MAD_n*3, np.mean(y) + MAD*MAD_n*3) return c def Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, intercept, slope, C_2, plot = 'True'): ''' Applies the curvature correction ''' row_curv_corr = evnts_i_tot -slope*evnts_j_tot - C_2*evnts_j_tot**2 x =np.linspace(ROI[0][0],ROI[0][1]) if plot == 'True': plt.figure() plt.xlim(ROI[0][0], ROI[0][1]) plt.ylim(ROI[1][0], ROI[1][1]) plt.plot(evnts_j_tot, row_curv_corr, 'o', markersize=1) # plt.plot(x, intercept + 0*x, '-') return row_curv_corr def plot_1d_RIXS(row_curv_corr, ROI=[[0,1800],[0,1800]], pb=2.5, plot = 'True'): ''' makes a 1D plot according to a specific binning ''' #pb is pixel binning #spread is about 2.5 pixels, by comparing with image based data pxY_num = ROI[1][1] startY_px = ROI[1][0] endY_px = ROI[1][0]+ROI[1][1]-1 scf, b = np.histogram(row_curv_corr, bins=int(pxY_num/pb), range=(startY_px, endY_px)) x_pxl = .5*(b[:-1] + b[1:]) #+ B*px_col/2 if plot == 'True': plt.figure() plt.xlim(ROI[1][0], ROI[1][1]) plt.plot(x_pxl, scf, '-o', markersize=1) return x_pxl, scf class RIXSprocess: def run(self): _log.info('...') with open('FM_data.pickle', 'rb') as handle: rixs_data = pickle.load(handle) #plt.ion() run = 226 #mirror left #the single photon counting data comes as events, defined by their (i,j) coordinates evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D'] plt.figure() plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66) #plt.ioff() #plt.show() run = 225 #no mirror evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D'] plt.figure() plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66) run = 227 #mirror right evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D'] plt.figure() plt.plot(evnts_i_tot, evnts_j_tot,'o',ms = 0.2, alpha = 0.66) rixs_data[227].keys() evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[226]['2D'] c = FitCurv_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, MAD_n=4, plot = 'True') #compute curvature row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'True') #apply curvatur and extract x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False') plt.figure() plt.plot(x,y) #some real data runs = [464,461,467] fig, axs = plt.subplots(1,3, figsize = (8,5)) for i,run in enumerate(runs): evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D'] axs[i].plot(evnts_j_tot, evnts_i_tot,'o',ms = 0.2, alpha = 0.66) #we correct using a curvature function obtained before plt.figure() for run in runs: evnts_j_tot, evnts_i_tot, ROI, frames_num, ph_num_tot = rixs_data[run]['2D'] row_curv_corr = Curv_Corr_SPC_Cmos(evnts_j_tot, evnts_i_tot, ROI, c[0], c[1], c[2], plot = 'False') #apply curvatur and extract x, y = plot_1d_RIXS(row_curv_corr, ROI=ROI , pb=1, plot = 'False') plt.plot(x,y,label = str(run)) plt.legend() plt.xlim(250,800) #plt.ioff() plt.show() # if attr in self._pvs: # return self.get(attr) # elif attr in self.__dict__: # return self.__dict__[attr] if __name__ == '__main__': import argparse logging.basicConfig(level=logging.INFO, handlers=[logHandler()], format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ') def main(): epilog=__doc__ # +'\nExamples:'+''.join(map(lambda s:cmd+s, exampleCmd))+'\n' parser=argparse.ArgumentParser(epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--mode', '-m', type=lambda x:int(x, 0), help='mode (see bitmasks) default=0x%(default)x', default=1) parser.add_argument("--sim", "-s", type=lambda x: int(x,0), help="simulate devices (see bitmasks) default=0x%(default)x", default=0x01) args=parser.parse_args() _log.info('Arguments:{}'.format(args.__dict__)) if args.mode&0x01: obj=RIXSprocess() obj.run() main()