Files
tina/src/analysis.py
2024-12-06 13:58:30 +01:00

698 lines
25 KiB
Python

"""
Analysis class
"""
from collections import OrderedDict
from datetime import datetime
import logging
import math
import os
import pandas as pd
import time
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import ticker
import numpy as np
from scipy import signal
#from scipy.signal import hilbert
from qtpy.QtCore import QObject, Signal, Slot
from apps4ops.bdbase import h5_storage, utils
from apps4ops.bdbase.enumkind import MsgSeverity
matplotlib.use('Agg')
_pymodule = os.path.basename(__file__)
PROGRESS_THREAD_INIT = 0
PROGRESS_THREAD_START = 1
PROGRESS_THREAD_ABORTING = 2
PROGRESS_THREAD_ABORTED = 3
PROGRESS_THREAD_ERROR = 4
PROGRESS_THREAD_END = 100
class AnalysisProcedure(QObject):
"""
Analysis procedure
"""
trigger_abort = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self.parent = parent
self.settings = self.parent.settings
self.cafe = self.parent.cafe
self.cyca = self.parent.cyca
self.check_status = self.parent.check_status
self.check_status_list = self.parent.check_status_list
self.trigger_progressbar = self.parent.trigger_progressbar
self.daq_timeout = 30
self.daq_ready = 0
self.logging = self.parent.logging
self.logger = self.logging.getLogger(__name__)
self.logger.debug('Logging activated in analysis procedure')
self.abort = False
self.trigger_abort.connect(self.receive_abort)
# Hold PV names and their values
self.pv_value_dict = OrderedDict()
self.pv_dict = {}
# Package the data as so:
self.all_data = {}
self.all_data['Input data'] = {}
self.all_data['Ambient data'] = {}
#self.all_data['Application Raw data'] = {}
self.all_data['Processed data'] = {}
self.all_data['Figure data'] = {}
self.all_data['Raw data'] = {}
self.raw_data = {}
self.injector_2 = self.parent.injector_2
self.ring_cyclotron = self.parent.ring_cyclotron
# Declare input parameters
self.input_data = None
self.debug = False
self.log_level = logging.INFO
self.simulation = False
self.accelerator = self.ring_cyclotron
self.harmonic_no = 6
self.injector2_current = 0
self.n_turns = None
# self.t_stepsize = 0.000000019750043 #0.00000002
self.rf_freq = 50.6328 # 10**6
self.rf_sample = 3.0 # 10**6
self.pulse_stepsize = 1/(self.rf_freq*10**6)
self.t_stepsize = 1/(self.rf_sample*10**9)
self.t_interval = math.ceil(self.pulse_stepsize/self.t_stepsize)
self.dt_cable = 44 # ns
self.dn_pickup = -1
self.mod_freq = 500 # GHz
self.duty_cycle = 1 # percentage
# Turn off DEBUG for MATLAB
mat_logger = logging.getLogger('matplotlib')
mat_logger.setLevel(logging.ERROR)
@Slot()
def receive_abort(self):
"""
Set abort variable to interrupt measurement
"""
self.abort = True
self.trigger_progressbar.emit(int(PROGRESS_THREAD_ABORTING))
print("RECEIVE ABORT...", flush=True)
def aborting(self, line_no):
self.abort = False
#mess = "Measurement aborted"
self.trigger_progressbar.emit(int(PROGRESS_THREAD_ABORTED))
self.parent.trigger_log_message.emit(
MsgSeverity.WARN.name, _pymodule, line_no,
('Measurement procedure aborted in analysis thread'), {})
print("ABORTING...", flush=True)
#########INITIALIZE THE INPUTS FOM THE GUI#######################
def initialize_input_parameters(self, input_data: dict):
self.input_data = input_data
self.all_data['Input data'] = self.input_data
print('==>Initialize Input Parameters')
print(self.input_data)
print('==>Initialized Input Parameters')
if 'debug' in self.input_data.keys():
self.debug = self.input_data['debug']
if self.debug:
self.logger.debug(f'INPUT DATA to LOG:{self.input_data}')
self.simulation = bool(self.input_data['simulation'])
self.rf_freq = float(self.input_data['freqrf'])
# 2.5 MHz if oscilloscpe
if self.simulation:
self.rf_sample = 2.5
mess = 'Sampling rate changed to 2.5 MHz for oscilloscope data'
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, utils.line_no(), mess, {})
else:
self.rf_sample = float(self.input_data['freqsampling'])
try:
self.accelerator = self.input_data['accelerator']
self.harmonic_no = float(
self.input_data[self.accelerator]['harmonic'])
self.dt_cable = float(
self.input_data[self.accelerator]['deltaTcable'])
self.dn_pickup = int(
self.input_data[self.accelerator]['deltaNpickup'])
if self.injector_2 in self.accelerator:
self.mod_freq = float(
self.input_data[self.accelerator]['freqmod']) # * 10**9 GHz
self.duty_cycle = float(
self.input_data[self.accelerator]['dutycycle']) # * 0.01
self.loglevel = self.input_data['loggingLevel']
self.logger.setLevel(self.logging.getLevelName(self.loglevel))
self.logger.info('INPUT PARAMETERS')
self.logger.info(f'Accelerator: {self.accelerator}')
self.logger.info(f'Simulation {self.simulation}')
self.logger.info(
f'RF Frequency (10**6 Hz) {self.rf_freq}')
self.logger.info(
f'RF Sampling (10**9 Hz) {self.rf_sample}')
self.logger.info(f'Harmonic No. {self.harmonic_no}')
self.logger.info(f'dT Cable {self.dt_cable}')
self.logger.info(f'dN Pickup {self.dn_pickup}')
except KeyError as ex:
self.logger.error(f'KeyError {ex}')
except ValueError as ex:
self.logger.error(f'ValueError {ex}')
def measure_and_analyze(self, input_data=None):
'''This method is initiated by the START button in Procedure panel
'''
if input_data is None:
mess = 'No input parameters given; no measurement performed'
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, utils.line_no(), mess, {})
return None
# Read the input parameters from the GUI
self.initialize_input_parameters(input_data)
# Step 1 - Collect ambient data relate to the machine
self.all_data['Ambient data'] = self.collect_ambient_data()
self.trigger_progressbar.emit(int(PROGRESS_THREAD_START))
# Step 2 - Perform measurement and return data for processing
self.all_data['Raw data'] = self.measure()
if self.all_data['Raw data'] is None:
self.trigger_progressbar.emit(int(PROGRESS_THREAD_ERROR))
return None
# Step 3 - Process the raw data
self.all_data['Processed data'] = self.process()
# Step 4 - Provide plots
self.all_data['Figure data'] = self.make_figs()
self.trigger_progressbar.emit(int(PROGRESS_THREAD_END))
return self.all_data
def load_hdf_file(self, hdf_filename_loaded):
print(f'load_hdf_file==> {hdf_filename_loaded}', flush=True)
raw_data = h5_storage.loadH5Recursive(hdf_filename_loaded)
self.raw_data = raw_data
return raw_data
def reanalyze(self, all_data):
'''Reanalysis
'''
print('Reanalyze', flush=True)
print(all_data)
input_data = all_data['Input_data']
# Read the input parameters
self.initialize_input_parameters(input_data)
ambient_data = all_data['Ambient_data']
self.raw_data = all_data['Raw_data']
self.all_data['Raw data'] = self.raw_data
ambient_data['Time in seconds'] = int(ambient_data['Time in seconds'])
try:
ambient_data['I_Inj2'] = float(ambient_data['I_Inj2'])
self.injector2_current = ambient_data['I_inj2']
except KeyError:
self.injector2_current = 0.0
self.parent.from_hdf = True
self.time_stamp = ambient_data['Time stamp']
self.all_data['Ambient data'] = ambient_data
self.all_data['Processed data'] = self.process(from_hdf5=True)
self.all_data['Figure data'] = self.make_figs()
self.trigger_progressbar.emit(PROGRESS_THREAD_END)
return self.all_data
def collect_ambient_data(self):
'''Collect ambient data and return it as a dictionary.
Also opens PV channels for DAQ
'''
# Time in seconds in an integer and can be stored in hdf5
time_in_seconds = time.time()
self.time_stamp = datetime.fromtimestamp(
time_in_seconds).strftime('%a %d-%m-%Y %H:%M:%S')
ambient_data = {
'Time in seconds': int(time_in_seconds),
'Time stamp': self.time_stamp,
'I_Inj2': 0,
}
self.logger.debug(f'Ambient data = {ambient_data}')
# if self.simulation:
# return ambient_data
# EPICS...
# Attach context, open DAQ PV channels
handles = self.cafe.getHandles()[0]
status = self.cafe.attachContext(handles[0])
if status == self.cyca.ECAFE_NULLCONTEXT:
options = {}
options['statusCode'] = (str(status) + ' ' +
self.cafe.getStatusCodeAsString(status))
options['statusInfo'] = self.cafe.getStatusInfo(status)
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, utils.line_no(),
('Cannot attach CA context in thread ' +
'Measurement will not be initiated!'), options)
if self.abort:
self.aborting(utils.line_no())
return {}
self.trigger_progressbar.emit(PROGRESS_THREAD_ERROR)
return {}
# Retrieve I_INJ2
mwc2_ist_2 = self.cafe.getCache('MWC2:IST:2')
if mwc2_ist_2 is not None:
ambient_data['I_Inj2'] = mwc2_ist_2
#mA
self.injector2_current = ambient_data['I_Inj2']
pv_list = []
for key, value in self.settings.data['PV'][self.accelerator].items():
self.pv_value_dict[key] = OrderedDict()
self.pv_value_dict[key][value] = 0
self.pv_dict[key] = value
pv_list.append(value)
self.cafe.openPrepare()
handle_list = self.cafe.open(pv_list)
self.cafe.openNowAndWait(1.0)
self.cafe.setGetActionWhenMonitorPolicyAllHandles(
self.cyca.GET_FROM_CACHE)
value_list, status, status_list = self.cafe.getScalarList(handle_list)
if self.debug:
for pv, val, stat in zip(pv_list, value_list, status_list):
print(pv, val, stat)
if status != self.cyca.ICAFE_NORMAL:
self.check_status_list(_pymodule, 'getScalarList',
pv_list, status_list, utils.line_no())
pv_daq_ready = self.pv_dict['daqReady']
self.daq_ready = self.cafe.getCache(pv_daq_ready)
if self.daq_ready is None:
stat = self.cafe.getStatus(pv_daq_ready)
self.check_status(_pymodule, 'getCache', pv_daq_ready, stat,
utils.line_no())
pv_daq_error_count = self.pv_dict['daqErrorCount']
daq_error_count = self.cafe.getCache(pv_daq_error_count)
if daq_error_count is None:
stat = self.cafe.getStatus(pv_daq_error_count)
self.check_status(_pymodule, 'getCache', pv_daq_error_count, stat,
utils.line_no())
# Put values in dictionary for inspection
for i, (dict_key) in enumerate(self.pv_value_dict.keys()):
self.pv_value_dict[dict_key] = value_list[i]
if self.debug:
print(f'EPICS PVS==> {self.pv_value_dict}', flush=True)
#In GUI
#self.cafe.monitor(pv_daq_ready)
#Not in GUI
self.cafe.monitor(pv_daq_error_count)
return ambient_data
def extract_peak_data(self):
''' Using signal package for peak search
'''
if not self.simulation:
height = 50.0
else:
height = 0.005
y1_peaks_pre = signal.find_peaks(self.y1_sample, height=height,
distance=10)
##y1_peaks_avg = np.average(y1_peaks_pre[1]['peak_heights'])
min_y1_p = np.min(y1_peaks_pre[1]['peak_heights'])
max_y1_p = np.max(y1_peaks_pre[1]['peak_heights'])
print(f'min and max value of peak {min_y1_p}, {max_y1_p}')
y1_height = min_y1_p * 0.9 # y1_peaks_avg * 0.726667
y2_peaks_pre = signal.find_peaks(self.y2_sample, height=height,
distance=10)
##y2_peaks_avg = np.average(y2_peaks_pre[1]['peak_heights'])
min_y2_p = np.min(y2_peaks_pre[1]['peak_heights'])
max_y2_p = np.max(y2_peaks_pre[1]['peak_heights'])
print(f'min and max value of peak {min_y2_p}, {max_y2_p}')
y2_height = min_y2_p * 0.9 # y2_peaks_avg * 0.566667
print(f'AVG = {y1_height}, {y2_height}', flush=True)
y1_peaks = signal.find_peaks(
self.y1_sample, height=y1_height, distance=5)
y2_peaks = signal.find_peaks(
self.y2_sample, height=y2_height, distance=5)
print((f'PEAKS==> {y1_peaks}, {y2_peaks},' +
f'{len(y1_peaks[0])}, {len(y2_peaks[0])}'), flush=True)
print(y1_peaks[1]['peak_heights'], flush=True)
#import sys
# sys.exit()
self.y1_pulse = (y1_peaks[1]['peak_heights'])
self.y2_pulse = (y2_peaks[1]['peak_heights'])
def measure(self):
''' Enable DAQ and read in the collected data from EPICS
'''
if self.abort:
self.aborting(utils.line_no())
return None
self.parent.from_hdf = False
# Start and Stop Run
# Collect Data and out into numpy array
# Read Data file if simulation
# raw data
self.y1_sample = []
self.y2_sample = []
self.t_sample = []
# filtered raw data correspoding to max amplitude of pulse
self.y1_pulse = []
self.y2_pulse = []
self.t_pulse = []
def extract_raw_data():
'''Oscilloscope data
'''
t_inc = 0
for entry in self.content[5:]:
entry = entry.replace('\n', '')
val = entry.split('\t')
self.t_sample.append(float(t_inc))
self.y1_sample.append(float(val[1])*(-1))
self.y2_sample.append(float(val[2]))
t_inc += self.t_stepsize
if not self.simulation:
# start DAQ
# Set
pv_daq_trigger = self.pv_dict['daqTrigger']
pv_daq_ready = self.pv_dict['daqReady']
pv_daq_error_count = self.pv_dict['daqErrorCount']
pv_wf_entry = self.pv_dict['wfEntry']
pv_wf_exit = self.pv_dict['wfExit']
pv_wf = [pv_wf_entry, pv_wf_exit]
self.daq_ready = self.cafe.getCache(pv_daq_ready)
stat = self.cafe.set(pv_daq_trigger, 8)
self.check_status(_pymodule, 'set', pv_daq_trigger, stat,
utils.line_no())
time.sleep(0.2)
stat = self.cafe.set(pv_daq_trigger, 0)
self.check_status(_pymodule, 'set', pv_daq_trigger, stat,
utils.line_no())
# Monitor DAQ State
start = time.time()
finished = False
icount = 0
value = 0
while (time.time() - start) < self.daq_timeout:
if self.abort:
self.aborting(utils.line_no())
return None
value = self.cafe.getCache(pv_daq_ready)
print('present cnt', value, flush=True)
if value is None:
stat = self.cafe.getStatus(pv_daq_ready)
self.check_status(_pymodule, 'getCache', pv_daq_ready,
stat, utils.line_no())
elif value != 0:
finished = True
break
time.sleep(1.0)
icount += 1
progress = int(100*icount/self.daq_timeout)
if progress > PROGRESS_THREAD_ERROR:
self.trigger_progressbar.emit(progress)
if not finished:
mess = ('DAQ not completed. Exceeded allowed ' +
f'time limit of {self.daq_timeout}s')
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, utils.line_no(),
mess, {})
return None
daq_error_count = self.cafe.getCache(pv_daq_error_count)
if daq_error_count is None:
stat = self.cafe.getStatus(pv_daq_error_count)
self.check_status(_pymodule, 'getCache', pv_daq_error_count,
stat, utils.line_no())
elif daq_error_count:
mess = ('Results discarded as DAQ reports ' +
f'{daq_error_count} errors')
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, utils.line_no(),
mess, {})
return None
# Read WF from EPICS and fill sample y1_sample, y2_sample
(self.y1_sample, self.y2_sample), status, status_list = \
self.cafe.getCompoundList(pv_wf, cacheFlag=False)
if status != self.cyca.ICAFE_NORMAL:
self.check_status_list(_pymodule, 'getCompoundList',
pv_wf, status_list, utils.line_no())
return None
print(f'y1 sample length = {len(self.y1_sample)}')
print(f'y2 sample length = {len(self.y2_sample)}', flush=True)
series = pd.Series(self.y1_sample)
#self.y1_sample = (series * (-1)).tolist()
self.y1_sample = (series).tolist()
self.t_sample = [None] * len(self.y1_sample)
self.t_sample[0] = 0
t_inc = 0
for i in range(1, len(self.y1_sample)):
t_inc += self.t_stepsize
self.t_sample[i] = t_inc
else:
self.trigger_progressbar.emit(30)
with open(
'/hipa/bd/data/measurements/tina/20240710-223007_2000.txt',
'r', encoding='utf-8') as file:
self.content = file.readlines()
if self.abort:
self.aborting(utils.line_no())
return None
self.trigger_progressbar.emit(60)
extract_raw_data()
self.extract_peak_data()
if self.abort:
self.aborting(utils.line_no())
return None
self.trigger_progressbar.emit(70)
# Fill Raw data here
rawdata = {
'y1': list(self.y1_sample),
'y2': list(self.y2_sample),
't': list(self.t_sample),
}
return rawdata
def unpack_hdf_data(self):
self.y1_sample = self.raw_data['y1']
self.y2_sample = self.raw_data['y2']
self.t_sample = self.raw_data['t']
self.extract_peak_data()
def process(self, from_hdf5=False):
''' Process the collected data
'''
if self.abort:
self.aborting(utils.line_no())
return None
self.trigger_progressbar.emit(95)
if from_hdf5:
self.unpack_hdf_data()
self.mean_amplitude_y1 = np.mean(self.y1_pulse, keepdims=True)
self.mean_amplitude_y2 = np.mean(self.y2_pulse, keepdims=True)
self.std_amplitude_y1 = np.std(self.y1_pulse, keepdims=True)
self.std_amplitude_y2 = np.std(self.y2_pulse, keepdims=True)
self.normalized_amplitude_envelope_1 = (
self.y1_pulse - self.mean_amplitude_y1)/self.std_amplitude_y1
self.normalized_amplitude_envelope_2 = (
self.y2_pulse - self.mean_amplitude_y2)/(self.std_amplitude_y2*len(self.y2_pulse))
self.corr_full = signal.correlate(
self.normalized_amplitude_envelope_2,
self.normalized_amplitude_envelope_1, mode='full', method='auto')
self.lags_full_array = signal.correlation_lags(
len(self.normalized_amplitude_envelope_2),
len(self.normalized_amplitude_envelope_1), mode='full')
self.lag_full = int(self.lags_full_array[np.argmax(self.corr_full)])
#self.delay = self.lag_full * self.t_stepsize*self.t_interval
self.delay = float(self.lag_full * self.pulse_stepsize)
print('lag', self.lag_full)
print('delay', self.delay, flush=True)
print('dTcable', self.dt_cable, flush=True)
print('rf freq', self.rf_freq, flush=True)
print('harmonic', self.harmonic_no, flush=True)
print('dN pickup', self.dn_pickup, flush=True)
self.n_turns = (
((self.delay-self.dt_cable*10**(-9))*self.rf_freq*10**6)
/ self.harmonic_no) + self.dn_pickup
print((f'lag = {self.lag_full}, ' +
f'delay = {self.delay*10**6:.3f} \u00B5s ' +
f'nturns = {self.n_turns:.4f}'))
if self.abort:
self.aborting(utils.line_no())
return None
# Fill Processed data here
proc_data = {
'y1': self.y1_pulse.tolist(),
'y2': self.y2_pulse.tolist(),
't_stepsize': self.pulse_stepsize,
'lag': self.lag_full,
'delay': self.delay,
'nturns': self.n_turns
}
return proc_data
def make_figs(self):
''' Figure construction with matplotlib
'''
fig, (ax) = plt.subplots(nrows=2, ncols=1,
figsize=(18, 9), layout='tight')
fig2, (ax2) = plt.subplots(nrows=1, ncols=1, figsize=(18, 9))
fig.patch.set_facecolor('#FAF9F6')
fig2.patch.set_facecolor('#FAF9F6')
ln = 500 # 500
off = 0 # 10000
s = off
e = off + ln
#ax[0].ticklabel_format(useOffset=False, style='plain')
ax[0].plot(self.t_sample[s:e], self.y1_sample[s:e], '.r-', label='')
ax[1].plot(self.t_sample[s:e], self.y2_sample[s:e], '.r-', label='')
ax[0].xaxis.set_major_locator(
ticker.MultipleLocator(self.t_stepsize*self.t_interval))
ax[0].set_xlabel('Time [s]')
ax[0].set_ylabel('Amplitude')
ax[0].set_title('Pulse at Entry')
ax[0].set_facecolor('lightgrey')
# ax[0].legend()
ax[0].grid(visible=True, which='major', axis='both',
linestyle='--', linewidth=0.8)
ax[1].xaxis.set_major_locator(
ticker.MultipleLocator(self.t_stepsize*self.t_interval))
ax[1].set_xlabel('Time [s]')
ax[1].set_ylabel('Amplitude')
ax[1].set_title('Pulse at Exit')
ax[1].set_facecolor('lightgray')
ax[1].grid(visible=True, which='major', axis='both',
linestyle='--', linewidth=0.8)
# ax[1].legend()
ax2.set_title(
f'Cross-correlation between {self.accelerator} Entrance and Exit',
fontsize=16)
ax2.set_ylabel('x-corr', fontsize=14)
ax2.set_xlabel(f't (units of {self.pulse_stepsize*10**9:.2f} ns)',
fontsize=14)
ax2.grid(visible=True, which='major', axis='both', linestyle='--',
linewidth=0.8)
ax2.set_facecolor('#F5F5F5')
ax2.plot(self.lags_full_array[:], self.corr_full[:])
_, _, ymin, ymax = ax2.axis()
line_start = ymin # self.corr_full.min()
line_end = ymax # self.corr_full.max()
ax2.plot([self.lag_full, self.lag_full], [line_start, line_end],
':', color='r')
ax2.set_ylim(line_start, line_end)
text = f'No of Turns = {self.n_turns:0.0f}'
plt.figtext(0.65, 0.82, self.accelerator, weight='bold', fontsize=16)
plt.figtext(0.65, 0.77, text, weight='bold', fontsize=16)
if self.injector2_current != 0:
inj_current_text = f'I Inj2 = {self.injector2_current:.3f} mA'
plt.figtext(0.80, 0.85, inj_current_text, weight='normal',
fontsize=10)
plt.figtext(0.7, 0.72, f'lag = {self.lag_full}', weight='normal',
fontsize=14)
text = f'delay = {self.delay*10**6:.3f} \u00B5s'
plt.figtext(0.7, 0.67, text, weight='normal', fontsize=14)
if self.settings.data['GUI']['showDate'] == 1:
plt.figtext(0.75, 0.12, self.time_stamp, size='small')
fig_data = {'Canvas 1': [fig2], 'Canvas 2': [fig]}
return fig_data