From 11d734e67803b4494db09f06df09fabddb30db6b Mon Sep 17 00:00:00 2001 From: shen_t2 Date: Wed, 3 Jun 2026 12:14:02 +0200 Subject: [PATCH] Upload files to "PLE" --- PLE/DAQCountingFunctions.py | 251 ++++++++++++ PLE/EMagnetSetFields.py | 134 +++++++ PLE/Global_Experiment_Parameters.py | 54 +++ PLE/LaserWideScanSettings.py | 106 +++++ PLE/PowerSupplyCaylarLib.py | 583 ++++++++++++++++++++++++++++ 5 files changed, 1128 insertions(+) create mode 100644 PLE/DAQCountingFunctions.py create mode 100644 PLE/EMagnetSetFields.py create mode 100644 PLE/Global_Experiment_Parameters.py create mode 100644 PLE/LaserWideScanSettings.py create mode 100644 PLE/PowerSupplyCaylarLib.py diff --git a/PLE/DAQCountingFunctions.py b/PLE/DAQCountingFunctions.py new file mode 100644 index 0000000..e9c9ef3 --- /dev/null +++ b/PLE/DAQCountingFunctions.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Nov 8 21:45:30 2024 + +@author: shen_t2 +""" + + +import nidaqmx + + +# %% CW-PLE-CD + +def ext_samp_clk_trig_count_task(counting_rate, samps_per_chan): + + task = nidaqmx.Task("SPD_CountEdges") + + # CountEdges.xxx == task.channels.xxx + CountEdges = task.ci_channels.add_ci_count_edges_chan(counter = "Dev1/ctr0", + edge = nidaqmx.constants.Edge.RISING, + initial_count = 0, + count_direction = nidaqmx.constants.CountDirection.COUNT_UP) + + # SPD Channel + CountEdges.ci_count_edges_term = "/Dev1/PFI0" + + # Timing Channel + task.timing.cfg_samp_clk_timing(rate = counting_rate, # Hz per channel + source = "/Dev1/PFI4", + sample_mode = nidaqmx.constants.AcquisitionType.FINITE, + samps_per_chan = samps_per_chan) + + # Trigger Channel: NOT available for this task! + + + print("------------------------------") + print("Task has been created:\n") + print(task) + print(CountEdges) + print("\nSPD Channel: " + CountEdges.ci_count_edges_term) + print(CountEdges.ci_count_edges_active_edge) + print(CountEdges.ci_count_edges_dir) + print(CountEdges.ci_count_edges_initial_cnt) + + return task + + + +def gen_trig_src_task(counting_rate, pulse_duty_cycle = 0.5): + + task = nidaqmx.Task("Sample_clock_source") + + # GenPulses.xxx == task.channels.xxx + GenPulses = task.co_channels.add_co_pulse_chan_freq(counter = "Dev1/ctr1", + freq = counting_rate, # Hz + duty_cycle = pulse_duty_cycle) + + # Output Channel + GenPulses.co_pulse_term = "/Dev1/PFI7" + + # Timing Setting + task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS) + + # Trigger Channel + task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/PFI3", + trigger_edge = nidaqmx.constants.Edge.RISING) + + + print("------------------------------") + print("Task has been created:\n") + print(task) + print(GenPulses) + print("\nOutput Channel: " + GenPulses.co_pulse_term) + + return task + + + +# %% Pulsed / TTL gated detection + + + +def pulse_gated_count_task(samps_per_chan): + + task = nidaqmx.Task("pulse_gated_SPD_counting") + + # PulseWidth.xxx == task.channels.xxx + PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0", + min_val = 0.0, + max_val = 2e7, + units = nidaqmx.constants.TimeUnits.TICKS, + starting_edge = nidaqmx.constants.Edge.FALLING) + + # SPD Channel + PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0" + + # Pulsed TTL Channel (from PFI7 to PFI4) + PulseWidth.ci_pulse_width_term = "/Dev1/PFI4" + + # Timing + task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE, + samps_per_chan = samps_per_chan) + + + print("------------------------------") + print("Task has been created:\n") + print(task) + print(PulseWidth) + print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src) + print(PulseWidth.ci_ctr_timebase_active_edge) + print(PulseWidth.ci_dup_count_prevention) + print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term) + print(PulseWidth.ci_pulse_width_units) + print(PulseWidth.ci_pulse_width_starting_edge) + + return task + + + + +def arbitaryTTL_gated_count_task(samps_per_chan): + + task = nidaqmx.Task("arbitaryTTL_gated_SPD_counting") + + # PulseWidth.xxx == task.channels.xxx + PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0", + min_val = 0.0, + max_val = 2e7, + units = nidaqmx.constants.TimeUnits.TICKS, + starting_edge = nidaqmx.constants.Edge.RISING) + + # SPD Channel + PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0" + + # arbitary TTL Channel (from RFSoC) + PulseWidth.ci_pulse_width_term = "/Dev1/PFI1" + + # Timing + task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE, + samps_per_chan = samps_per_chan) + + + print("------------------------------") + print("Task has been created:\n") + print(task) + print(PulseWidth) + print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src) + print(PulseWidth.ci_ctr_timebase_active_edge) + print(PulseWidth.ci_dup_count_prevention) + print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term) + print(PulseWidth.ci_pulse_width_units) + print(PulseWidth.ci_pulse_width_starting_edge) + + return task + + + +# %% correct counting time + +def correct_cps(DAQ_counts, DAQ_counting_time, SAPD_dead_time): + + # DAQ_counts <- np.array + + corrected_time = DAQ_counting_time - SAPD_dead_time * DAQ_counts + corrected_counts = DAQ_counts / corrected_time + + return corrected_counts + + + +# %% main + + +if __name__ == '__main__': + + + import numpy as np + import matplotlib.pyplot as plt + + + AOM_ON_time = 0.1 # s + + DAQ_counting_time = 0.1 # s + DAQ_counting_rate = 1 / (AOM_ON_time + DAQ_counting_time) # Hz + DAQ_samps_per_chan = 50 + DAQ_loop_per_chan = 1 + + + try: + task_count.close() + task_clock.close() + except: + pass + finally: + print("------------------------------") + print("Tasks have been cleared.\n") + + + raw_counts_all = [] + + + # CW_CD + # task_count = ext_samp_clk_trig_count_task(DAQ_counting_rate, DAQ_samps_per_chan) + # task_clock = gen_trig_src_task(DAQ_counting_rate) + + + # CW_GatedD + task_count = pulse_gated_count_task(DAQ_samps_per_chan) + task_clock = gen_trig_src_task(DAQ_counting_rate, + pulse_duty_cycle = AOM_ON_time * DAQ_counting_rate) + + + + for ii in range(DAQ_loop_per_chan): + + task_clock.start() + task_count.start() + + raw_counts_accumulated = task_count.read(nidaqmx.constants.READ_ALL_AVAILABLE, nidaqmx.constants.WAIT_INFINITELY) + + task_count.stop() + task_clock.stop() + + + raw_counts_accumulated = np.array(raw_counts_accumulated) + raw_counts = raw_counts_accumulated[1:] - raw_counts_accumulated[:-1] + raw_counts_all.append(raw_counts) + + + ####################################### + # real-time plot + plt.figure(1, figsize=[9,6], dpi=100) + plt.clf() + plt.plot(raw_counts, '.-r', label='loop {:}'.format(ii) ) + plt.grid() + plt.legend(loc=1) + # plt.title(plot_title) + # plt.xlabel(plot_xlabel) + # plt.ylabel('SAPD counts ({:}, raw)'.format(ODFilter) ) + plt.show() + plt.pause(0.1) + + + task_count.close() + task_clock.close() + + + print("------------------------------") + print("ALL END. (NO ERRORS)\n") + + diff --git a/PLE/EMagnetSetFields.py b/PLE/EMagnetSetFields.py new file mode 100644 index 0000000..a0d00b0 --- /dev/null +++ b/PLE/EMagnetSetFields.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +""" +Created on Sat Nov 9 19:55:10 2024 + +@author: shen_t2 +""" + + +import PowerSupplyCaylarLib + +import traceback +import time + + +def EMagnet_connect_test(powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT): + + print("==================================================") + print("Test Caylar power supply library start !\n") + # powerSupply = PowerSupplyCaylarLib.CaylarPowerSupply(2) + + try: + print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...") + powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT) + print("Successfully connected to the power supply!\n") + print("Power supply info:") + print("---------------------------") + print(" - IDN: " + powerSupply.getIDN()) + print("\n - Températures infos:") + print(" - Rack temp = " + powerSupply.getRackTemp().split(" ")[1]) + print(" - Box temp = " + powerSupply.getBoxTemp().split(" ")[1]) + print(" - ADC/DAC temp = " + powerSupply.getAdcDacTemp().split(" ")[1]) + print("\n - Power infos:") + power_state = int(powerSupply.getPowerState().split(" ")[1]) + selector_pos = int(powerSupply.getSelectorPos().split(" ")[1]) + print(" - Power is " + ("off", "on")[power_state]) + print(" - Output tension = " + powerSupply.getVoltage().split(" ")[1]) + print(" - Output current = " + powerSupply.getCurrent().split(" ")[1]) + print(" - Current remote from " + ("front potentiometer", "internal DAC", "external voltage source")[selector_pos]) + print("\n - Securities infos:") + default_state = int(powerSupply.getDefaultState().split(" ")[1]) + maintenance_state = int(powerSupply.getMaintenanceState().split(" ")[1]) + print(" - Power supply power securitie is " + ("off", "on")[default_state]) + print(" - Last default name is " + powerSupply.getDefaultName().split(" ")[1]) + print(" - Maintenance is " + ("off", "on")[maintenance_state]) + + except: + print("soft error ...") + traceback.print_exc() + + finally: + powerSupply.disconnect() + + + +def set_Bfield_Current(B_current, powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT): + + try: + # print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...") + powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT) + # print("Successfully connected to the power supply!\n") + print("------------------------------") + + powerSupply.setCurrent_wait(B_current) + print("Successfully set the current!\n") + + time.sleep(5) + print(" - Output current = " + powerSupply.getCurrent().split(" ")[1]) + # print(" - Hall probe field = " + powerSupply.getField().split(" ")[1]) + + + except: + print("soft error ...") + traceback.print_exc() + + finally: + powerSupply.disconnect() + + + + +def set_Bfield_Gauss(Bfield_Gs, poles_gap, powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT): + + try: + # print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...") + powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT) + # print("Successfully connected to the power supply!\n") + print("------------------------------") + + calib_name = "Caylar_{:}_center".format(poles_gap) + powerSupply.import_calibration(calib_name) + print("Successfully calibrated with [{:}]!\n".format(calib_name)) + + powerSupply.setField(Bfield_Gs) + print("Successfully set the current!\n") + + time.sleep(5) + print(" - Output current = " + powerSupply.getCurrent().split(" ")[1]) + print(" - Set field = " + str(Bfield_Gs) + " Gauss") + + + except: + print("soft error ...") + traceback.print_exc() + + finally: + powerSupply.disconnect() + + + + +# %% main + + +if __name__ == '__main__': + + EMagnet_PSUPPLY_IP = "QPS-CAYLAR-1" + EMagnet_PSUPPLY_PORT = 1234 + EMagnet_poles_gap = "large" # "small" or "large" + + scanB = 1000 # Gauss + + + powerSupply = PowerSupplyCaylarLib.CaylarPowerSupply(2) + + EMagnet_connect_test(powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT) + + set_Bfield_Current(-100, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT) + + set_Bfield_Gauss(scanB, EMagnet_poles_gap, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT) + + # set_Bfield_Current(0, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT) + + + \ No newline at end of file diff --git a/PLE/Global_Experiment_Parameters.py b/PLE/Global_Experiment_Parameters.py new file mode 100644 index 0000000..665d235 --- /dev/null +++ b/PLE/Global_Experiment_Parameters.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Mar 3 14:38:24 2025 + +@author: shen_t2 + +experiment configuration info + +""" + +EMagnet_PSUPPLY_IP = "QPS-CAYLAR-1" +# EMagnet_PSUPPLY_IP = "129.129.180.xxx" +EMagnet_PSUPPLY_PORT = 1234 + +Laser_DLCPRO_IP = "129.129.180.85" + +# %% + +# laser_current = 110 # [mA] CTL output in power stabilization = 2 mW +# laser_current = 120 +laser_current = 150 # 10 mW +# laser_current = 200 # 20.66 mW +# laser_current = 270 # 35 mW +# laser_current = 290 # 39.76 mW +# laser_current = 300 # 40.50 mW + + +Bext_field_scan = [+0] # [Gauss] +# Bext_field_scan = [+1000] + + +ODFilter = 'OD0p0' +# ODFilter = 'OD1p0' +# ODFilter = 'OD3p0' + + +laser_pol = 'sigma' +# laser_pol = 'pi' + + +# Temp_info = 'roomT' +Temp_info = 'lowT 4.4 K' +# Temp_info = 'lowT 4.2 K' # w/o heat shield + + +# sample = '#1, 0.1%, 166Er' +sample = '#2, 0.01%, 166Er' +# sample = '#22, 0.01%, 167Er' +# sample = '#24, 0.1%, 167Er' + + +SAPD_dead_time = 15 * 1e-6 # [s] + + diff --git a/PLE/LaserWideScanSettings.py b/PLE/LaserWideScanSettings.py new file mode 100644 index 0000000..4163eda --- /dev/null +++ b/PLE/LaserWideScanSettings.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" +Created on Sat Nov 9 16:21:22 2024 + +@author: shen_t2 +""" + + +from toptica.lasersdk.dlcpro.v2_6_0 import DLCpro, NetworkConnection, DeviceNotFoundError +from toptica.lasersdk.dlcpro.v2_6_0 import Subscription, Timestamp, SubscriptionValue + +import time + + +def set_widescan_para(DLCPRO_IP, scan_channel, start_value, end_value, scan_speed, start_trig_threshold): + + try: + with DLCpro(NetworkConnection(DLCPRO_IP)) as dlcpro: + + dlcpro.laser1.wide_scan.output_channel.set(scan_channel) + + if scan_channel == 79: + dlcpro.laser1.ctl.wavelength_set.set(start_value) + if scan_channel == 50: + dlcpro.laser1.dl.pc.voltage_set.set(start_value) + + dlcpro.laser1.wide_scan.scan_begin.set(start_value) + dlcpro.laser1.wide_scan.scan_end.set(end_value) + + dlcpro.laser1.wide_scan.shape.set(0) + dlcpro.laser1.wide_scan.speed.set(scan_speed) + + dlcpro.laser1.wide_scan.trigger.output_enabled.set(False) + dlcpro.laser1.wide_scan.trigger.output_channel.set(3) + dlcpro.io.digital_out3.mode.set(2) + dlcpro.laser1.wide_scan.trigger.output_threshold.set(start_trig_threshold) + dlcpro.laser1.wide_scan.trigger.output_enabled.set(True) + + time.sleep(5) + + print("------------------------------") + print("Laser wide scan has been set.") + print("Expected scan time: {:.2f} s".format(dlcpro.laser1.wide_scan.duration.get())) + + + except DeviceNotFoundError: + print('DLCpro device not found.') + + return dlcpro + + + +def callback(subscription: Subscription, timestamp: Timestamp, value: SubscriptionValue): + + print("{}: Parameter '{}' changed its value to {}".format(timestamp.time(), subscription.name, value.get())) + + +# %% callback test + +# if __name__ == '__main__': + +# with DLCpro(NetworkConnection(Laser_DLCPRO_IP)) as dlc: +# with dlc.laser1.wide_scan.state_txt.subscribe(callback): +# dlc.run() +# time.sleep(5) +# dlc.stop() + + +# %% main + +if __name__ == '__main__': + + Laser_DLCPRO_IP = "129.129.131.110" + + + wide_scan_output_channel = 79 # 50 = piezo V // 79 = CTL wl // + # wide_scan_start_wl = 1530.00 # nm + wide_scan_trigger_threshold = 1530.00 # nm + wide_scan_end_wl = 1530.40 # nm + wide_scan_speed = 0.0010 # nm/s + wide_scan_start_wl = wide_scan_trigger_threshold - 15 * wide_scan_speed + + + dlcpro = set_widescan_para(Laser_DLCPRO_IP, + wide_scan_output_channel, + wide_scan_start_wl, + wide_scan_end_wl, + wide_scan_speed, + wide_scan_trigger_threshold) + + + + dlcpro.open() + + print(dlcpro.laser1.wide_scan.state.get(), dlcpro.laser1.wide_scan.state_txt.get()) + + dlcpro.laser1.wide_scan.start() + + # add DAQ tasks here + + time.sleep(20) + dlcpro.laser1.wide_scan.stop() + + dlcpro.close() + + diff --git a/PLE/PowerSupplyCaylarLib.py b/PLE/PowerSupplyCaylarLib.py new file mode 100644 index 0000000..594a658 --- /dev/null +++ b/PLE/PowerSupplyCaylarLib.py @@ -0,0 +1,583 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" +This library is provided with Caylar power supplies to allow remote control +via Ethernet connection. + +This library contains an example code that can be run directly from this file +(see the code at the end of the file). + +This library does not cover all available Ethernet commands. +Please refer to the Ethernet interface notice for a complete list of available commands. + +For any questions or custom development please contact our team at www.caylar.net +""" + +__version__ = "1.0" +__author__ = "Louis Bernot" +__email__ = "caylar@caylar.net" +__status__ = "Beta" +__license__ = "MIT" + +from ast import Try +import socket +from threading import Lock +from threading import Thread +import time +import numpy as np +import pandas as pd +from scipy import interpolate + +class CaylarPowerSupply(Thread): + + def __init__(self, time_out=5, gap="large"): + super().__init__() + """ + create and init the power supply object with the specified time_out. + + :param time_out: time out is the maximum response time from the socket in second. + :type ip: int + """ + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.time_out = time_out + self.sock_mutex = Lock() #to make library thread safe, i.e. to be sure that + #not two thread modifiy the resource at the same time + self.calibration = None + + + def import_calibration(self, name, caylar_path=""): + + if name=="Caylar_large_holder": + data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", sheet_name="field_vs_current_gap80D10mm_V1_", usecols=range(0, 7)) + B= np.array(data["Column4"][3:], dtype=float) + I = np.array(data["Column2"][3:], dtype=float) + elif name=="Caylar_large_center": + data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", sheet_name="field_vs_current_gap80D10mm_V1_", usecols=range(0, 7)) + B = np.array(data["Column6"][3:], dtype=float) + I = np.array(data["Column2"][3:], dtype=float) + elif name=="Caylar_small_holder": + data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", usecols=range(0, 7)) + I = np.array(data["Column2"][3:], dtype=float) + B = np.array(data["Column4"][3:], dtype=float) + elif name=="Caylar_small_center": + data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", usecols=range(0, 7)) + B = np.array(data["Column6"][3:], dtype=float) + I = np.array(data["Column2"][3:], dtype=float) + else: + A = np.loadtxt(name) + I = A[:,0] + B = A[:,1] + + self.calibrationB = interpolate.interp1d(B, I) + self.calibrationI = interpolate.interp1d(I, B) + + def run_calibration(self, calibration_name): + power_state = int(self.getPowerState().split(" ")[1]) + if (power_state == 0): + self.setPowerOn() + + folder = "../../Code/caylar_class/" + + tmp_rs = self.getRampSpeed_float() + self.setRampSpeed(10) + + self.setCurrent_wait(-100) + + sI = np.linspace(-100, 100, 41) + B = np.zeros_like(sI) + + for i, I in enumerate(sI): + self.setCurrent_wait(I) + time.sleep(0.6) + B[i] =self.getField_float() + + A = np.zeros((len(sI),2)) + A[:, 0] = sI + A[:, 1] = B + np.savetxt(folder+calibration_name+"low.txt", A) + sI = np.linspace(100, -100, 41) + B = np.zeros_like(sI) + + for i, I in enumerate(sI): + self.setCurrent_wait(I) + time.sleep(0.6) + B[i] =self.getField_float() + + A = np.zeros((len(sI),2)) + A[:, 0] = sI + A[:, 1] = B + np.savetxt(folder+calibration_name+"high.txt", A) + + self.setCurrent_wait(0) + self.setRampSpeed(tmp_rs) + return sI, B + + def connect(self, ip, port): + """ + Connect function try to create and open a socket to connect to the power supply + from the given IP on port 1234 wich is the default port of caylar power supply. + :param ip: the power supply ip + :type ip: string + """ + with self.sock_mutex: + # try: + # self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # self.sock.settimeout(self.time_out) + # self.sock.connect((ip, port)) + # except: + # return 1 + # else: + # return 0 + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(self.time_out) + self.sock.connect((ip, port)) + + return 0 + + + def disconnect(self): + """ + Close and destroy the socket connected (or not) to the power supply. + """ + with self.sock_mutex: + self.sock.close() + + def getIDN(self): + """ + Returns the power supply factory id. + + :return: Power supply idn. + Return format is 'CAYLAR_MPUXXXX_XXX' where XXXX_XXX is the id. + :rtype: string. + """ + return self.__sendCmd("*IDN?") + + def clearDefault(self): + """ + Reset the power supply defaults. + If the present default has not been fixed, the command has no effect and the + default status remains active. + + :return: Power supply clearDefault acknowledgment. + Return format is 'CLEAR_DEFAULT_OK'. + :rtype: string. + """ + return self.__sendCmd("CLEAR_DEFAULT") + + def getRackTemp(self): + """ + Returns the last measured temperature of the rack containing the power supply + control electronics. + + :return: Rack temperature in degree Celsius. + Return format is 'RACK_TEMP= +XX.XX Deg'. + :rtype: string. + """ + return self.__sendCmd("GET_RACK_TEMP") + + def getBoxTemp(self): + """ + Returns the last measured temperature of boxe containing the power supply precision + electronics wich can be thermostated according to the power supply options. + + :return: Boxe temperature in degree Celsius. + Return format is 'BOX_TEMP= +XX.XX Deg'. + :rtype: string. + """ + return self.__sendCmd("GET_BOX_TEMP") + + def getAdcDacTemp(self): + """ + Returns the last measured temperature of the pcb containing the DAC and ADS. + + :return: PCB (DAC and ADC) temperature in degree Celsius. + Return format is 'ADC_DAC_TEMP= +XX.XX Deg'. + :rtype: string. + """ + return self.__sendCmd("GET_ADC_DAC_TEMP") + + def getWaterTemp(self): + """ + Returns the last measured temperature of power supply water cooling + at the outlet of the circuit. + WARNING: water temperature measurement is an option. + + :return: Water temperature in degree Celsius. + Return format is 'WATER_TEMP= +XX.XX Deg'. + :rtype: string. + """ + return self.__sendCmd("GET_WATER_TEMP") + + def getWaterFlow(self): + """ + Returns the last measured coolant flow value of power supply at the + outlet of the circuit. + WARNING: water flow measurement is an option. + + :return: Coolant flow in L/Min. + Return format is 'WATER_FLOW= +XX.X L/Min'. + :rtype: string. + """ + return self.__sendCmd("GET_WATER_FLOW") + + def getField(self): + """ + Returns the last measurement of the field made by the power supply hall probe. The field is measured approximately every 0.6 + second depending on options and settings. The field is returned in Gauss. + """ + + return self.__sendCmd("GET_FIELD") + + def getField_float(self): + return float(self.getField().split(" ")[1]) + + def getHallTemp(self): + + return self.__sendCmd("GET_HALL_PROBE_TEMP") + + def getHallTemp_float(self): + return float(self.getHallTemp().split(" ")[1]) + + def getCurrent(self): + """ + Returns the last current measurement of the power supply. + + :return: Power supply current in ampere. + Return format is 'CURRENT= +XXX.XXXXXX A'. + :rtype: string. + """ + return self.__sendCmd("GET_CURRENT") + def getRampSpeed(self): + + return self.__sendCmd("GET_DIGITAL_CURRENT_RAMP_SPEED") + + def getRampSpeed_float(self): + return float(self.getRampSpeed().split(" ")[2]) + + def getVoltage(self): + """ + Returns the last voltage measurement of the power supply. + + :return: Power supply out voltage in volts. + Return format is 'VOLTAGE= +XX.XXX V'. + :rtype: string. + """ + return self.__sendCmd("GET_VOLTAGE") + + def getDefaultState(self): + """ + Returns the power security status of the power supply. + + :return: Power supply security status. + Return format is 'DEFAULT_STATE= X'. + X is the security status: 1=error, 0=no_error. + :rtype: string. + """ + return self.__sendCmd("GET_DEFAULT_STATE") + + def getDefaultName(self): + """ + Returns the last default name of the power supply. + + :return: Last default name. + Return format is 'DEFAULT_NAME= XXXXXXXXXXX'. + XXXXXXXXXXX can take the following values : + "ALIMS AUX", "MAINS", "QUENCH", "INTERLOCK 3", "LIMIT POWER", "WATCH DOG", + "LIMIT I", "PC DEFAULT", "NEG BANK", "DCCT", "POS BANK", "BANK TEMP", + "CONDENSATION", "INTERLOCK 2", "BRIDGE TEMP", "INTERLOCK 1" + :rtype: string. + """ + return self.__sendCmd("GET_DEFAULT_NAME") + + def getSelectorPos(self): + """ + Returns the position of the control switch on the front pannel of the power supply. + The switch selects the source of the current setpoint between the front potentiometer, + the DAC or the external voltage entry. + + :return: Selector position code. + Return format is 'CMD_SELEC= X'. + X is the selec pos code : 0=potentiometer, 1=dac, 2=external. + :rtype: string + """ + return self.__sendCmd("GET_CMD_SELEC") + + def getPowerState(self): + """ + Returns the power status of the power supply. + + :return: Power status. + Return format is 'POWER_STATE= X'. + X is the power status of the power supply: 1=power_on, 0=power_off. + :rtype: string. + """ + return self.__sendCmd("GET_POWER_STATE") + + def getMaintenanceState(self): + """ + Returns the maintenance state of the power supply. + + :return: Maintenance state. + Return format is 'MAINTENANCE_STATE= X'. + X is the maintenance state of the power supply: 1=maintenance_on, 0=maintenance_off. + :rtype: string. + """ + return self.__sendCmd("GET_MAINTENANCE_STATE") + + def setCurrent(self, cmd): + """ + Changes the power supply current setpoint in DAC mode. + + :param cmd: cmd is the setpoint in ampere. + :type cmd: double, int or float - signed or not. + :return: power supply out current in Ampere. + Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'. + XXX.XXXXXX is the feedback setpoint which corresponds to cmd. + Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high. + Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and + 'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY' + Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_CURRENT " + str(cmd)) + + + def setCurrent_wait(self, cmd): + cc = float(self.getCurrent().split(" ")[1]) + rs = self.getRampSpeed_float() + ret = self.setCurrent(cmd) + #print(rs, ret, cmd, np.abs(cmd-cc)/rs*1.3) + time.sleep(np.abs(cmd-cc)/rs*1.3) + + return ret + + def setRampSpeed(self, cmd): + return self.__sendCmd("SET_DIGITAL_FIELD_RAMP_SPEED " + str(cmd)) + + def setField(self,cmd): + """ + Set the digital field setpoint in field regulation (from hall probe). + + :param cmd: cmd is the setpoint in ampere. + :type cmd: double, int or float - signed or not. + :return: Response in case of unrecognized argument : SET_FIELD_ERROR BAD_ARG + Response in case of overrange in argument : SET_FIELD_ERROR OVERRANGE + Response in case of power off state with « CMD UPDATE : PON ONLY» (1): SET_FIELD_ERROR POWER_OFF + Response in case of current regulation mode : SET_FIELD_ERROR CURRENT_REG + Response if maintenance is on : SET_FIELD_ERROR MAINTENANCE_ON + :rtype: string. + """ + + + return self.setCurrent_wait(self.calibrationB(cmd)) + + def getModCurrent(self): + return self.__sendCmd("GET_MODUL_CURRENT_AMPLI") + def setModCurrent(self, cmd): + """ + Changes the Modulation current + + :param cmd: cmd is the setpoint in ampere. + :type cmd: double, int or float - signed or not. + :return: modulation current in Ampere. + TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'. + XXX.XXXXXX is the feedback setpoint which corresponds to cmd. + Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high. + Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and + 'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY' + Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_MODUL_CURRENT_AMPLI " + str(cmd)) + + def getModFreq(self): + return self.__sendCmd("GET_MODUL_FREQ") + + def setModFreq(self, cmd): + """ + Changes the Modulation frequency + + :param cmd: cmd is the setpoint in Hz. + :type cmd: double, int or float - signed or not. + :return: modulation frequency. + TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'. + XXX.XXXXXX is the feedback setpoint which corresponds to cmd. + Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high. + Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and + 'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY' + Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_MODUL_FREQ " + str(cmd)) + + def setModOn(self): + """ + Turn on the modulatoin coil + + TODO: + :return: Modulation driver setPowerOn acknowledgment + TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'. + XXX.XXXXXX is the feedback setpoint which corresponds to cmd. + Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high. + Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and + 'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY' + Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_MODUL_ON") + + def setsaveModOn(self): + + return self.setModOn() + + + def setModOff(self): + """ + Turn off the modulatoin coil + + TODO: + :return: Modulation driver setPowerOn acknowledgment + TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'. + XXX.XXXXXX is the feedback setpoint which corresponds to cmd. + Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high. + Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and + 'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY' + Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_MODUL_OFF") + + + def setMaintenanceOn(self): + """ + Switch power supply to maintenance mode to prevent SET Ethernet commands from being + able to drive power when an operator is physically working on the power supply. + See getMaintenanceState() to get the maintenance mode of the power supply. + + WARNING: Maintenance mode can only be removed physically from the front face + of the power supply. The maintenance selector is available in the "SECURITIES" + menu by pressing the F3 key on the keyboard. + + :return: Power supply setMaintenanceOn acknowledgment. + Return format is 'SET_MAINTENANCE_ON_OK'. + Return 'SET_MAINTENANCE_ON_ERROR MAINTENANCE_ON' if maintenance is already active. + :rtype: string. + """ + return self.__sendCmd("SET_MAINTENANCE_ON") + + def setPowerOn(self): + """ + Turn the power on. + + :return: Power supply setPowerOn acknowledgment. + Return format is 'SET_POWER_ON_OK'. + Return 'SET_POWER_ON_ERROR DEFAULT_ON' if default is active. + Return 'SET_POWER_ON_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string - if there is no error the + """ + return self.__sendCmd("SET_POWER_ON") + + def setPowerOff(self): + """ + Turn the power off. + + WARNING: Turning off the power while current is circulating should be avoided + as much as possible. Check that the current is at zero before switching off the power. + + :return: Power supply setPowerOff acknowledgment. + Return format is 'SET_POWER_OFF_OK'. + Return 'SET_POWER_OFF_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_POWER_OFF") + + def setUserPowerErrorOn(self): + """ + Create a user power default and prevent the power supply from restarting until the default + is removed (see setUserPowerErrorOff()). + + WARNING: This function cuts the power to the power supply instantly. + Therefore, it must be used only in case of emergency. + + :return: Power supply setUserPowerErrorOn acknowledgment. + Return format is 'SET_DEFAULT_ON_OK'. + Return 'SET_DEFAULT_ON_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_DEFAULT_ON") + + def setUserPowerErrorOff(self): + """ + Clear the user power default. + + WARNING: This function don't clear the global power default state of the power supply. + You can clear the global default by press the reset button in front pannel of power supply + or by the clearDefault() function. + + :return: Power supply setUserPowerErrorOn acknowledgment. + Return format is 'SET_DEFAULT_OFF_OK'. + Return 'SET_DEFAULT_OFF_ERROR MAINTENANCE_ON' if maintenance is active. + :rtype: string. + """ + return self.__sendCmd("SET_DEFAULT_OFF") + + def __sendCmd(self, cmd): + with self.sock_mutex: + self.sock.send(bytes(cmd + '\n', 'ascii')) + time.sleep(0.01) + rx_buff = self.sock.recv(50).decode('ascii').replace("\n", "").replace("\r", "") + return rx_buff + + +""" +Little demo of the librairie. +--- +This program does not take control of the power. +It simply get some of power supply reading values. + +""" + +if __name__ == "__main__": + + import traceback + + + USER_PSUPPLY_IP = "129.129.180.91" #CHANGE USER_PSUPPLY_IP WITH YOUR OWN POWER SUPPLY IP HERE + USER_PSUPPLY_PORT = 1234 #CHANGE USER_PSUPPLY_IP WITH YOUR OWN POWER SUPPLY IP HERE + print("==================================================") + print("Test Caylar power supply library start !\n") + powerSupply = CaylarPowerSupply(2) + + try: + print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...") + powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT) + print("Successfully connected to the power supply!\n") + print("Power supply info:") + print("---------------------------") + print(" - IDN: " + powerSupply.getIDN()) + print("\n - Températures infos:") + print(" - Rack temp = " + powerSupply.getRackTemp().split(" ")[1]) + print(" - Box temp = " + powerSupply.getBoxTemp().split(" ")[1]) + print(" - ADC/DAC temp = " + powerSupply.getAdcDacTemp().split(" ")[1]) + print("\n - Power infos:") + power_state = int(powerSupply.getPowerState().split(" ")[1]) + selector_pos = int(powerSupply.getSelectorPos().split(" ")[1]) + print(" - Power is " + ("off", "on")[power_state]) + print(" - Output tension = " + powerSupply.getVoltage().split(" ")[1]) + print(" - Output current = " + powerSupply.getCurrent().split(" ")[1]) + print(" - Current remote from " + ("front potentiometer", "internal DAC", "external voltage source")[selector_pos]) + print("\n - Securities infos:") + default_state = int(powerSupply.getDefaultState().split(" ")[1]) + maintenance_state = int(powerSupply.getMaintenanceState().split(" ")[1]) + print(" - Power supply power securitie is " + ("off", "on")[default_state]) + print(" - Last default name is " + powerSupply.getDefaultName().split(" ")[1]) + print(" - Maintenance is " + ("off", "on")[maintenance_state]) + + except: + print("soft error ...") + traceback.print_exc() + + finally: + powerSupply.disconnect()