Files
REQubit-control/PLE/DAQCountingFunctions.py
2026-06-03 12:14:02 +02:00

252 lines
7.7 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Fri Nov 8 21:45:30 2024
@author: shen_t2
"""
import nidaqmx
# %% CW-PLE-CD
def ext_samp_clk_trig_count_task(counting_rate, samps_per_chan):
task = nidaqmx.Task("SPD_CountEdges")
# CountEdges.xxx == task.channels.xxx
CountEdges = task.ci_channels.add_ci_count_edges_chan(counter = "Dev1/ctr0",
edge = nidaqmx.constants.Edge.RISING,
initial_count = 0,
count_direction = nidaqmx.constants.CountDirection.COUNT_UP)
# SPD Channel
CountEdges.ci_count_edges_term = "/Dev1/PFI0"
# Timing Channel
task.timing.cfg_samp_clk_timing(rate = counting_rate, # Hz per channel
source = "/Dev1/PFI4",
sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
# Trigger Channel: NOT available for this task!
print("------------------------------")
print("Task has been created:\n")
print(task)
print(CountEdges)
print("\nSPD Channel: " + CountEdges.ci_count_edges_term)
print(CountEdges.ci_count_edges_active_edge)
print(CountEdges.ci_count_edges_dir)
print(CountEdges.ci_count_edges_initial_cnt)
return task
def gen_trig_src_task(counting_rate, pulse_duty_cycle = 0.5):
task = nidaqmx.Task("Sample_clock_source")
# GenPulses.xxx == task.channels.xxx
GenPulses = task.co_channels.add_co_pulse_chan_freq(counter = "Dev1/ctr1",
freq = counting_rate, # Hz
duty_cycle = pulse_duty_cycle)
# Output Channel
GenPulses.co_pulse_term = "/Dev1/PFI7"
# Timing Setting
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS)
# Trigger Channel
task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/PFI3",
trigger_edge = nidaqmx.constants.Edge.RISING)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(GenPulses)
print("\nOutput Channel: " + GenPulses.co_pulse_term)
return task
# %% Pulsed / TTL gated detection
def pulse_gated_count_task(samps_per_chan):
task = nidaqmx.Task("pulse_gated_SPD_counting")
# PulseWidth.xxx == task.channels.xxx
PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0",
min_val = 0.0,
max_val = 2e7,
units = nidaqmx.constants.TimeUnits.TICKS,
starting_edge = nidaqmx.constants.Edge.FALLING)
# SPD Channel
PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0"
# Pulsed TTL Channel (from PFI7 to PFI4)
PulseWidth.ci_pulse_width_term = "/Dev1/PFI4"
# Timing
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(PulseWidth)
print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src)
print(PulseWidth.ci_ctr_timebase_active_edge)
print(PulseWidth.ci_dup_count_prevention)
print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term)
print(PulseWidth.ci_pulse_width_units)
print(PulseWidth.ci_pulse_width_starting_edge)
return task
def arbitaryTTL_gated_count_task(samps_per_chan):
task = nidaqmx.Task("arbitaryTTL_gated_SPD_counting")
# PulseWidth.xxx == task.channels.xxx
PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0",
min_val = 0.0,
max_val = 2e7,
units = nidaqmx.constants.TimeUnits.TICKS,
starting_edge = nidaqmx.constants.Edge.RISING)
# SPD Channel
PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0"
# arbitary TTL Channel (from RFSoC)
PulseWidth.ci_pulse_width_term = "/Dev1/PFI1"
# Timing
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(PulseWidth)
print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src)
print(PulseWidth.ci_ctr_timebase_active_edge)
print(PulseWidth.ci_dup_count_prevention)
print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term)
print(PulseWidth.ci_pulse_width_units)
print(PulseWidth.ci_pulse_width_starting_edge)
return task
# %% correct counting time
def correct_cps(DAQ_counts, DAQ_counting_time, SAPD_dead_time):
# DAQ_counts <- np.array
corrected_time = DAQ_counting_time - SAPD_dead_time * DAQ_counts
corrected_counts = DAQ_counts / corrected_time
return corrected_counts
# %% main
if __name__ == '__main__':
import numpy as np
import matplotlib.pyplot as plt
AOM_ON_time = 0.1 # s
DAQ_counting_time = 0.1 # s
DAQ_counting_rate = 1 / (AOM_ON_time + DAQ_counting_time) # Hz
DAQ_samps_per_chan = 50
DAQ_loop_per_chan = 1
try:
task_count.close()
task_clock.close()
except:
pass
finally:
print("------------------------------")
print("Tasks have been cleared.\n")
raw_counts_all = []
# CW_CD
# task_count = ext_samp_clk_trig_count_task(DAQ_counting_rate, DAQ_samps_per_chan)
# task_clock = gen_trig_src_task(DAQ_counting_rate)
# CW_GatedD
task_count = pulse_gated_count_task(DAQ_samps_per_chan)
task_clock = gen_trig_src_task(DAQ_counting_rate,
pulse_duty_cycle = AOM_ON_time * DAQ_counting_rate)
for ii in range(DAQ_loop_per_chan):
task_clock.start()
task_count.start()
raw_counts_accumulated = task_count.read(nidaqmx.constants.READ_ALL_AVAILABLE, nidaqmx.constants.WAIT_INFINITELY)
task_count.stop()
task_clock.stop()
raw_counts_accumulated = np.array(raw_counts_accumulated)
raw_counts = raw_counts_accumulated[1:] - raw_counts_accumulated[:-1]
raw_counts_all.append(raw_counts)
#######################################
# real-time plot
plt.figure(1, figsize=[9,6], dpi=100)
plt.clf()
plt.plot(raw_counts, '.-r', label='loop {:}'.format(ii) )
plt.grid()
plt.legend(loc=1)
# plt.title(plot_title)
# plt.xlabel(plot_xlabel)
# plt.ylabel('SAPD counts ({:}, raw)'.format(ODFilter) )
plt.show()
plt.pause(0.1)
task_count.close()
task_clock.close()
print("------------------------------")
print("ALL END. (NO ERRORS)\n")