Files
tina/src/analysis.py
2024-07-03 16:17:55 +02:00

362 lines
12 KiB
Python

"""
Analysis class
"""
from datetime import datetime
import inspect
import os
from statistics import mean
import time
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy import signal
from scipy.signal import chirp, hilbert
from qtpy.QtCore import QObject, Signal, Slot
from apps4ops.bdbase.utils import _line
from apps4ops.bdbase.enumkind import MsgSeverity
matplotlib.use('Agg')
_pymodule = os.path.basename(__file__)
PROGRESS_BAR_THREAD_INIT = 0
PROGRESS_BAR_THREAD_START = 1
PROGRESS_BAR_THREAD_ABORTING = 2
PROGRESS_BAR_THREAD_ABORTED = 3
PROGRESS_BAR_THREAD_ERROR = 4
PROGRESS_BAR_THREAD_END = 100
class AnalysisProcedure(QObject):
"""
Analysis procedure
"""
trigger_abort = Signal()
def __init__(self, parent=None):
super(AnalysisProcedure, self).__init__(parent)
self.parent = parent
self.settings = self.parent.settings
self.cafe = self.parent.cafe
self.cyca = self.parent.cyca
self.logging = self.parent.logging
self.logger = self.logging.getLogger(__name__)
self.logger.debug("Logging activated in analysis procedure")
self.abort = False
self.raw_data = {}
self.trigger_abort.connect(self.receive_abort)
#Declare input parameters
self.input_data = None
self.debug = False
self.simulation = True
self.facility = None
self.maxmin = None
self.maximize = None
self.N_events = None
self.N_points = None
self.checkbox = None
@Slot()
def receive_abort(self):
"""
Set abort variable to interrupt measurement
"""
self.abort = True
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_ABORTING)
def aborting(self, line_no):
self.abort = False
mess = "Measurement aborted"
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_ABORTED)
#########INITIALIZE THE INPUTS FOM THE GUI#######################
def initialize_input_parameters(self, input_data: dict):
self.input_data = input_data
if 'debug' in self.input_data.keys():
self.debug = self.input_data['debug']
if self.debug:
self.logger.debug("INPUT DATA to LOG:{0}".format(self.input_data))
self.simulation = bool(self.input_data['simulation'])
print("INPUT:", self.input_data)
#self.facility = self.input_data['facility']
try:
self.accelerator_selected = self.input_data['qtabdata']
self.harmonic_no = float(
self.input_data[self.accelerator_selected]['harmonic'])
self.rf_fref = float(
self.input_data[self.accelerator_selected]['freqrf'])
self.dTcable = int(
self.input_data[self.accelerator_selected]['deltaTcable'])
self.dNpickup = int(
self.input_data[self.accelerator_selected]['deltaNpickup'])
print("logging info level==>", self.logger.getEffectiveLevel(),
flush=True)
self.loglevel = self.input_data['loggingLevel']
self.logger.setLevel(self.logging.getLevelName(self.loglevel))
print("logging info level==>", self.logger.getEffectiveLevel(),
flush=True)
self.logger.debug("INPUT PARAMETERS")
self.logger.debug("Accelerator: {0}".format(
self.accelerator_selected))
self.logger.debug("Simulation {0}".format(self.simulation))
self.logger.debug("Harmonic No. {0}".format(self.harmonic_no))
self.logger.debug("RF Frequency (Hz) {0}".format(self.rf_freq))
self.logger.debug("dT Cable {0}".format(self.dTcable))
self.logger.debug("dN Pickup {0}".format(self.dNpickup))
except KeyError as ex:
self.logger.debug("KeyError {0}".format(ex))
except ValueError as ex:
self.logger.debug("ValueError {0}".format(ex))
except Exception as ex:
self.logger.debug("Exception {0}".format(ex))
def measure_and_analyze(self, input_data=None):
'''
This method is initiated by the START button in Procedure panel
'''
if input_data is None:
mess = "No input parameters given; no measurement performed"
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, _line(), mess, {})
return None
#Read the input parameters from the GUI
self.initialize_input_parameters(input_data)
#Step 1 - Collect ambient data relate to the machine
ambient_data = self.collect_ambient_data()
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_START)
#Step 2 - Perform measurement and return data for processing
self.raw_data = self.measure()
print("raw data", self.raw_data)
if self.raw_data is None:
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_ERROR)
return None
#Step 3 - Process the raw data
proc_data = self.process(ambient_data)
#Step 4 - Provide plots
fig_data = self.make_figs(ambient_data, proc_data)
#Step 5 - Package to all_data dictionary
all_data = self.combine_data(ambient_data, proc_data, fig_data)
print("all data", all_data)
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_END)
return all_data
def reanalyze(self, all_data):
input_data = all_data['Input data']
ambient_data = all_data['Ambient data']
self.raw_data = all_data['Raw data']
proc_data = self.process(ambient_data, from_hdf5=True)
fig_data = self.make_figs(ambient_data, proc_data)
all_data_new = self.combine_data(ambient_data, proc_data,
fig_data)
return(all_data_new)
def collect_ambient_data(self):
"""Collect ambient data and return it as a dictionary
"""
# Time in seconds in an integer and can be stored in hdf5
time_in_seconds = time.time()
time_stamp = datetime.fromtimestamp(
time_in_seconds).strftime('%a %d-%m-%Y %H:%M:%S')
#EPICS...
handles = self.cafe.getHandles()[0]
status = self.cafe.attachContext(handles[0])
if status == self.cyca.ECAFE_NULLCONTEXT:
options = {}
options['statusCode'] = (str(status) + " " +
self.cafe.getStatusCodeAsString(status))
options['statusInfo'] = self.cafe.getStatusInfo(status)
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, _line(),
("Cannot attach CA context in thread " +
"Scan will not be initiated!"), _options)
if self.abort:
self.aborting(_line())
return {}
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_ERROR)
return {}
ambient_data = {
'Time in seconds': time_in_seconds,
'Time stamp': time_stamp,
}
self.logger.debug("{0}".format(ambient_data))
return ambient_data
def measure(self):
for i in range (1, 100):
if i%10 == 0:
self.parent.trigger_progressbar.emit(i)
time.sleep(0.1)
if self.abort:
self.aborting(_line())
return None
#Fill Raw data here
raw_data = {
'test': 'testing'
}
return raw_data
def process(self, ambient_data, from_hdf5=False):
self.parent.trigger_progressbar.emit(PROGRESS_BAR_THREAD_START)
####envelope
duration = 1.0
fs = 400.0 #400.0
samples = int(fs*duration)
t = np.arange(samples) / fs
#We create a chirp of which the frequency increases from
#20 Hz to 100 Hz and apply an amplitude modulation.
signal_chirp = chirp(t, 20.0, t[-1], 100.0)
signal_chirp *= (1.0 + 0.5 * np.sin(2.0*np.pi*3.0*t))
'''
The amplitude envelope is given by magnitude of the analytic signal.
The instantaneous frequency can be obtained by differentiating the
instantaneous phase in respect to time. The instantaneous phase
corresponds to the phase angle of the analytic signal.
'''
analytic_signal = hilbert(signal_chirp)
self.amplitude_envelope = np.abs(analytic_signal)
instantaneous_phase = np.unwrap(np.angle(analytic_signal))
self.instantaneous_frequency = (
np.diff(instantaneous_phase) / (2.0*np.pi) * fs)
self.signal_chirp = signal_chirp
self.t = t
###cross-correlation
rng = np.random.default_rng()
self.sig = np.repeat([0., 1., 1., 0., 1., 0., 0., 1.], 128)
self.sig_noise = self.sig + rng.standard_normal(len(self.sig))
analytic = hilbert(self.sig_noise)
self.correlation_envelope = np.abs(analytic)
##self.sig = signal_chirp
#self.corr = signal.correlate(self.sig_noise, np.ones(128), mode='same') / 128
self.corr = signal.correlate(self.correlation_envelope, np.ones(128), mode='same') / 128
self.clock = np.arange(64, len(self.sig), 128)
nturns = 180
proc_data = {"nturns": nturns}
return proc_data
def make_figs(self, ambient_data, proc_data):
fig, (ax_orig, ax_envelope, ax_noise, ax_corr) = plt.subplots(4, 1, sharex=True) #True
ax_orig.plot(self.sig)
ax_orig.plot(self.clock, self.sig[self.clock], 'ro')
ax_orig.set_title('Original signal')
ax_orig.plot(self.t, self.signal_chirp, label='signal')
#ax_orig.plot(self.t, self.amplitude_envelope, label='envelope')
#ax_orig.set_xlabel("t (s)")
#ax_orig.set_ylabel("Amplitude")
#ax_orig.legend()
ax_noise.plot(self.sig_noise)
ax_noise.set_title('Signal with noise')
ax_corr.plot(self.corr)
ax_corr.plot(self.clock, self.corr[self.clock], 'ro')
ax_corr.axhline(0.5, ls=':')
ax_corr.set_title('Cross-correlated with rectangular pulse')
ax_orig.margins(0, 0.1)
ax_envelope.plot(self.correlation_envelope)
ax_envelope.set_title('Envelope signal')
fig.tight_layout()
#fig_data = {'Canvas 1': [fig]}
fig2, (ax0, ax1) = plt.subplots(nrows=2)
ax0.plot(self.t, self.signal_chirp, label='signal')
ax0.plot(self.t, self.amplitude_envelope, label='envelope')
ax0.set_xlabel("t (s)")
ax0.set_ylabel("Amplitude")
ax0.legend()
ax1.plot(self.t[1:], self.instantaneous_frequency)
ax1.set_xlabel("t (s)")
ax1.set_ylabel("f (Hz)")
ax1.set_ylim(0.0, 120.0)
#fig2.tight_layout()
fig_data = {'Canvas 1': [fig, fig2]}
'''
# Data for plotting
t = np.arange(0.0, 2.0, 0.01)
s = 1 + np.sin(2 * np.pi * t)
fig, ax = plt.subplots() #figsize=(0.3, 0.2))
ax.plot(t, s)
ax.set(xlabel='Time (s)', ylabel='Voltage (mV)',
title='Sinusoidal Wave-1')
ax.grid()
t = np.arange(0.0, 3.0, 0.01)
s = 1 + np.sin(3 * np.pi * t)
fig2, ax = plt.subplots()
ax.plot(t, s)
ax.set(xlabel='Time (s)', ylabel='Voltage (mV)',
title='Sinusoidal Wave-2')
ax.grid()
fig3, ax = plt.subplots()
ax.plot(t, s)
ax.set(xlabel='Time (s)', ylabel='Voltage (mV)',
title='Sinusoidal Wave-3')
ax.grid()
fig_data = {'Canvas 1': [fig, fig2]}
fig_data['Canvas 2'] = fig3
'''
return fig_data
def combine_data(self, ambient_data, proc_data, fig_data):
all_data = {'Input data': self.input_data}
all_data['Ambient data'] = ambient_data
all_data['Application Raw data'] = self.raw_data
all_data['Processed data'] = proc_data
all_data['Figure data'] = fig_data
all_data['Raw data'] = {}
return(all_data)