""" Created on Tue Nov 26 15:42:43 2019 @author: tartarotti_d-adm """ import sys import atexit import signal import time import numpy as np import ctypes as ct from numpy import sqrt, arctan2, sin, cos import scipy.signal #from pylab import * #ADQAPI = ct.cdll.LoadLibrary("ADQAPI.dll") ADQAPI = ct.cdll.LoadLibrary("libadq.so.0") #For different trigger modes SW_TRIG = 1 EXT_TRIG_1 = 2 #This external trigger does not work if the level of the trigger is very close to 0.5V. Now we have it close to 3V, and it works EXT_TRIG_2 = 7 EXT_TRIG_3 = 8 LVL_TRIG = 3 INT_TRIG = 4 LVL_FALLING = 0 LVL_RISING = 1 #samples_per_record=16384 ADQ_TRANSFER_MODE_NORMAL = 0x00 ADQ_CHANNELS_MASK = 0x3 #f_LO = 40 def butter_lowpass(cutoff, sr, order=5): nyq = 0.5 * sr normal_cutoff = cutoff / nyq b, a = scipy.signal.butter(order, normal_cutoff, btype = 'low', analog = False) return b, a class Adq(object): max_number_of_channels = 2 samp_freq = 2 #ndecimate = 50 # decimation ratio (2GHz / 40 MHz) ndecimate = 50 def __init__(self, number_of_records, samples_per_record, bw_cutoff): self.number_of_records = number_of_records self.samples_per_record = samples_per_record self.bw_cutoff = bw_cutoff ADQAPI.ADQAPI_GetRevision() # Manually set return type from some ADQAPI functions ADQAPI.CreateADQControlUnit.restype = ct.c_void_p ADQAPI.ADQ_GetRevision.restype = ct.c_void_p ADQAPI.ADQ_GetPtrStream.restype = ct.POINTER(ct.c_int16) ADQAPI.ADQControlUnit_FindDevices.argtypes = [ct.c_void_p] # Create ADQControlUnit self.adq_cu = ct.c_void_p(ADQAPI.CreateADQControlUnit()) ADQAPI.ADQControlUnit_EnableErrorTrace(self.adq_cu, 3, '.') self.adq_num = 1 # Find ADQ devices ADQAPI.ADQControlUnit_FindDevices(self.adq_cu) n_of_ADQ = ADQAPI.ADQControlUnit_NofADQ(self.adq_cu) if n_of_ADQ != 1: raise ValueError('number of ADQs must be 1, not %d' % n_of_ADQ) rev = ADQAPI.ADQ_GetRevision(self.adq_cu, self.adq_num) revision = ct.cast(rev,ct.POINTER(ct.c_int)) print('\nConnected to ADQ #1') # Print revision information print('FPGA Revision: {}'.format(revision[0])) if (revision[1]): print('Local copy') else : print('SVN Managed') if (revision[2]): print('Mixed Revision') else : print('SVN Updated') print('') ADQ_CLOCK_INT_INTREF = 0 #internal clock source ADQ_CLOCK_EXT_REF = 1 #internal clock source, external reference ADQ_CLOCK_EXT_CLOCK = 2 #External clock source ADQAPI.ADQ_SetClockSource(self.adq_cu, self.adq_num, ADQ_CLOCK_EXT_REF); ########################## # Test pattern #ADQAPI.ADQ_SetTestPatternMode(self.adq_cu, self.adq_num, 4) ########################## # Sample skip #ADQAPI.ADQ_SetSampleSkip(self.adq_cu, self.adq_num, 1) ########################## # Set trig mode self.trigger = EXT_TRIG_1 #trigger = LVL_TRIG success = ADQAPI.ADQ_SetTriggerMode(self.adq_cu, self.adq_num, self.trigger) if (success == 0): print('ADQ_SetTriggerMode failed.') if (self.trigger == LVL_TRIG): success = ADQAPI.ADQ_SetLvlTrigLevel(self.adq_cu, self.adq_num, -100) if (success == 0): print('ADQ_SetLvlTrigLevel failed.') success = ADQAPI.ADQ_SetTrigLevelResetValue(self.adq_cu, self.adq_num, 1000) if (success == 0): print('ADQ_SetTrigLevelResetValue failed.') success = ADQAPI.ADQ_SetLvlTrigChannel(self.adq_cu, self.adq_num, 1) if (success == 0): print('ADQ_SetLvlTrigChannel failed.') success = ADQAPI.ADQ_SetLvlTrigEdge(self.adq_cu, self.adq_num, LVL_RISING) if (success == 0): print('ADQ_SetLvlTrigEdge failed.') elif (self.trigger == EXT_TRIG_1) : success = ADQAPI.ADQ_SetExternTrigEdge(self.adq_cu, self.adq_num,2) if (success == 0): print('ADQ_SetLvlTrigEdge failed.') # success = ADQAPI.ADQ_SetTriggerThresholdVoltage(self.adq_cu, self.adq_num, trigger, ct.c_double(0.2)) # if (success == 0): # print('SetTriggerThresholdVoltage failed.') print("CHANNEL:"+str(ct.c_int(ADQAPI.ADQ_GetLvlTrigChannel(self.adq_cu, self.adq_num)))) self.setup_target_buffers() atexit.register(self.deletecu) signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) def setup_target_buffers(self): # Setup target buffers for data self.target_buffers=(ct.POINTER(ct.c_int16 * self.samples_per_record * self.number_of_records) * self.max_number_of_channels)() for bufp in self.target_buffers: bufp.contents = (ct.c_int16 * self.samples_per_record * self.number_of_records)() def deletecu(self): # Only disarm trigger after data is collected ADQAPI.ADQ_DisarmTrigger(self.adq_cu, self.adq_num) ADQAPI.ADQ_MultiRecordClose(self.adq_cu, self.adq_num); # Delete ADQControlunit ADQAPI.DeleteADQControlUnit(self.adq_cu) print('ADQ closed') def start(self): """start data acquisition""" # samples_per_records = samples_per_record/number_of_records # Change number of pulses to be acquired acording to how many records are taken # Start acquisition ADQAPI.ADQ_MultiRecordSetup(self.adq_cu, self.adq_num, self.number_of_records, self.samples_per_record) ADQAPI.ADQ_DisarmTrigger(self.adq_cu, self.adq_num) ADQAPI.ADQ_ArmTrigger(self.adq_cu, self.adq_num) def getdata(self): """wait for aquisition to be finished and get data""" #start = time.time() while(ADQAPI.ADQ_GetAcquiredAll(self.adq_cu,self.adq_num) == 0): time.sleep(0.001) #if (self.trigger == SW_TRIG): # ADQAPI.ADQ_SWTrig(self.adq_cu, self.adq_num) #mid = time.time() status = ADQAPI.ADQ_GetData(self.adq_cu, self.adq_num, self.target_buffers, self.samples_per_record * self.number_of_records, 2, 0, self.number_of_records, ADQ_CHANNELS_MASK, 0, self.samples_per_record, ADQ_TRANSFER_MODE_NORMAL); #print(time.time()-mid,mid-start) if not status: raise ValueError('no succesS from ADQ_GetDATA') # Now this is an array with all records, but the time is artificial data = [] for ch in range(2): onedim = np.frombuffer(self.target_buffers[ch].contents, dtype=np.int16) data.append(onedim.reshape(self.number_of_records, self.samples_per_record) / float(2**14)) # 14 bits ADC return data def acquire(self): self.start() return self.getdata() def sinW(self,sig,freq,ti,tf): # sig: signal array # freq # ti, tf: initial and end time si = int(ti * self.samp_freq) nperiods = freq * (tf - ti) n = int(round(max(2, int(nperiods)) / nperiods * (tf-ti) * self.samp_freq)) self.nperiods = n t = np.arange(si, len(sig)) / self.samp_freq t = t[:n] self.pulselen = n / self.samp_freq sig = sig[si:si+n] a = 2*np.sum(sig*np.cos(2*np.pi*freq*t))/len(sig) b = 2*np.sum(sig*np.sin(2*np.pi*freq*t))/len(sig) return a, b def mix(self, sigin, sigout, freq, ti, tf): # sigin, sigout: signal array, incomping, output # freq # ti, tf: initial and end time if sigin a, b = self.sinW(sigin, freq, ti, tf) phase = arctan2(a,b) * 180 / np.pi amp = sqrt(a**2 + b**2) a, b = a/amp, b/amp #si = int(ti * self.samp_freq) t = np.arange(len(sigout)) / self.samp_freq wave1 = sigout * (a * cos(2*np.pi*freq*t) + b * sin(2*np.pi*freq*t)) wave2 = sigout * (a * sin(2*np.pi*freq*t) - b * cos(2*np.pi*freq*t)) return wave1, wave2 def averageiq(self, data, freq, ti, tf): '''Average over records''' iorq = np.array([self.mix(data[0][i], data[1][i], freq, ti, tf) for i in range(self.number_of_records)]) # iorq = np.array([self.mix(data[0][:], data[1][:], freq, ti, tf)]) return iorq.sum(axis=0) / self.number_of_records def filtro(self, iorq, cutoff): b, a = butter_lowpass(cutoff, self.samp_freq*1e9) #ifi = np.array(scipy.signal.filtfilt(b,a,iorq[0])) #qf = np.array(scipy.signal.filtfilt(b,a,iorq[1])) iqf = [scipy.signal.filtfilt(b,a,iorq[i]) for i in np.arange(len(iorq))] return iqf def box(self, iorq, ti, tf): si = int(self.samp_freq * ti) sf = int(self.samp_freq * tf) bxa = [sum(iorq[i][si:sf])/(sf-si) for i in np.arange(len(iorq))] return bxa def gates_and_curves(self, data, freq, pulse, roi): """return iq values of rois and prepare plottable curves for iq""" self.ndecimate = int(round(2E9/freq)) times = [] times.append(('aviq', time.time())) iq = self.averageiq(data,freq*1e-9,*pulse) times.append(('filtro', time.time())) iqf = self.filtro(iq,self.bw_cutoff) m = len(iqf[0]) // self.ndecimate ll = m * self.ndecimate iqf = [iqfx[0:ll] for iqfx in iqf] times.append(('iqdec', time.time())) iqd = np.average(np.resize(iqf, (2, m, self.ndecimate)), axis=2) t_axis = np.arange(m) * self.ndecimate / self.samp_freq pulsig = np.abs(data[0][0]) times.append(('pulsig', time.time())) pulsig = np.average(np.resize(pulsig, (m, self.ndecimate)), axis=1) self.curves = (t_axis, iqd[0], iqd[1], pulsig) #print(times) return [self.box(iqf,*r) for r in roi]