# -*- coding: utf-8 -*- """ Created on Fri Nov 8 21:45:30 2024 @author: shen_t2 """ import nidaqmx # %% CW-PLE-CD def ext_samp_clk_trig_count_task(counting_rate, samps_per_chan): task = nidaqmx.Task("SPD_CountEdges") # CountEdges.xxx == task.channels.xxx CountEdges = task.ci_channels.add_ci_count_edges_chan(counter = "Dev1/ctr0", edge = nidaqmx.constants.Edge.RISING, initial_count = 0, count_direction = nidaqmx.constants.CountDirection.COUNT_UP) # SPD Channel CountEdges.ci_count_edges_term = "/Dev1/PFI0" # Timing Channel task.timing.cfg_samp_clk_timing(rate = counting_rate, # Hz per channel source = "/Dev1/PFI4", sample_mode = nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan = samps_per_chan) # Trigger Channel: NOT available for this task! print("------------------------------") print("Task has been created:\n") print(task) print(CountEdges) print("\nSPD Channel: " + CountEdges.ci_count_edges_term) print(CountEdges.ci_count_edges_active_edge) print(CountEdges.ci_count_edges_dir) print(CountEdges.ci_count_edges_initial_cnt) return task def gen_trig_src_task(counting_rate, pulse_duty_cycle = 0.5): task = nidaqmx.Task("Sample_clock_source") # GenPulses.xxx == task.channels.xxx GenPulses = task.co_channels.add_co_pulse_chan_freq(counter = "Dev1/ctr1", freq = counting_rate, # Hz duty_cycle = pulse_duty_cycle) # Output Channel GenPulses.co_pulse_term = "/Dev1/PFI7" # Timing Setting task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS) # Trigger Channel task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/PFI3", trigger_edge = nidaqmx.constants.Edge.RISING) print("------------------------------") print("Task has been created:\n") print(task) print(GenPulses) print("\nOutput Channel: " + GenPulses.co_pulse_term) return task # %% Pulsed / TTL gated detection def pulse_gated_count_task(samps_per_chan): task = nidaqmx.Task("pulse_gated_SPD_counting") # PulseWidth.xxx == task.channels.xxx PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0", min_val = 0.0, max_val = 2e7, units = nidaqmx.constants.TimeUnits.TICKS, starting_edge = nidaqmx.constants.Edge.FALLING) # SPD Channel PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0" # Pulsed TTL Channel (from PFI7 to PFI4) PulseWidth.ci_pulse_width_term = "/Dev1/PFI4" # Timing task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan = samps_per_chan) print("------------------------------") print("Task has been created:\n") print(task) print(PulseWidth) print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src) print(PulseWidth.ci_ctr_timebase_active_edge) print(PulseWidth.ci_dup_count_prevention) print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term) print(PulseWidth.ci_pulse_width_units) print(PulseWidth.ci_pulse_width_starting_edge) return task def arbitaryTTL_gated_count_task(samps_per_chan): task = nidaqmx.Task("arbitaryTTL_gated_SPD_counting") # PulseWidth.xxx == task.channels.xxx PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0", min_val = 0.0, max_val = 2e7, units = nidaqmx.constants.TimeUnits.TICKS, starting_edge = nidaqmx.constants.Edge.RISING) # SPD Channel PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0" # arbitary TTL Channel (from RFSoC) PulseWidth.ci_pulse_width_term = "/Dev1/PFI1" # Timing task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan = samps_per_chan) print("------------------------------") print("Task has been created:\n") print(task) print(PulseWidth) print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src) print(PulseWidth.ci_ctr_timebase_active_edge) print(PulseWidth.ci_dup_count_prevention) print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term) print(PulseWidth.ci_pulse_width_units) print(PulseWidth.ci_pulse_width_starting_edge) return task # %% correct counting time def correct_cps(DAQ_counts, DAQ_counting_time, SAPD_dead_time): # DAQ_counts <- np.array corrected_time = DAQ_counting_time - SAPD_dead_time * DAQ_counts corrected_counts = DAQ_counts / corrected_time return corrected_counts # %% main if __name__ == '__main__': import numpy as np import matplotlib.pyplot as plt AOM_ON_time = 0.1 # s DAQ_counting_time = 0.1 # s DAQ_counting_rate = 1 / (AOM_ON_time + DAQ_counting_time) # Hz DAQ_samps_per_chan = 50 DAQ_loop_per_chan = 1 try: task_count.close() task_clock.close() except: pass finally: print("------------------------------") print("Tasks have been cleared.\n") raw_counts_all = [] # CW_CD # task_count = ext_samp_clk_trig_count_task(DAQ_counting_rate, DAQ_samps_per_chan) # task_clock = gen_trig_src_task(DAQ_counting_rate) # CW_GatedD task_count = pulse_gated_count_task(DAQ_samps_per_chan) task_clock = gen_trig_src_task(DAQ_counting_rate, pulse_duty_cycle = AOM_ON_time * DAQ_counting_rate) for ii in range(DAQ_loop_per_chan): task_clock.start() task_count.start() raw_counts_accumulated = task_count.read(nidaqmx.constants.READ_ALL_AVAILABLE, nidaqmx.constants.WAIT_INFINITELY) task_count.stop() task_clock.stop() raw_counts_accumulated = np.array(raw_counts_accumulated) raw_counts = raw_counts_accumulated[1:] - raw_counts_accumulated[:-1] raw_counts_all.append(raw_counts) ####################################### # real-time plot plt.figure(1, figsize=[9,6], dpi=100) plt.clf() plt.plot(raw_counts, '.-r', label='loop {:}'.format(ii) ) plt.grid() plt.legend(loc=1) # plt.title(plot_title) # plt.xlabel(plot_xlabel) # plt.ylabel('SAPD counts ({:}, raw)'.format(ODFilter) ) plt.show() plt.pause(0.1) task_count.close() task_clock.close() print("------------------------------") print("ALL END. (NO ERRORS)\n")