Upload files to "PLE"

This commit is contained in:
2026-06-03 12:14:02 +02:00
parent ddbd0981b2
commit 11d734e678
5 changed files with 1128 additions and 0 deletions
+251
View File
@@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
"""
Created on Fri Nov 8 21:45:30 2024
@author: shen_t2
"""
import nidaqmx
# %% CW-PLE-CD
def ext_samp_clk_trig_count_task(counting_rate, samps_per_chan):
task = nidaqmx.Task("SPD_CountEdges")
# CountEdges.xxx == task.channels.xxx
CountEdges = task.ci_channels.add_ci_count_edges_chan(counter = "Dev1/ctr0",
edge = nidaqmx.constants.Edge.RISING,
initial_count = 0,
count_direction = nidaqmx.constants.CountDirection.COUNT_UP)
# SPD Channel
CountEdges.ci_count_edges_term = "/Dev1/PFI0"
# Timing Channel
task.timing.cfg_samp_clk_timing(rate = counting_rate, # Hz per channel
source = "/Dev1/PFI4",
sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
# Trigger Channel: NOT available for this task!
print("------------------------------")
print("Task has been created:\n")
print(task)
print(CountEdges)
print("\nSPD Channel: " + CountEdges.ci_count_edges_term)
print(CountEdges.ci_count_edges_active_edge)
print(CountEdges.ci_count_edges_dir)
print(CountEdges.ci_count_edges_initial_cnt)
return task
def gen_trig_src_task(counting_rate, pulse_duty_cycle = 0.5):
task = nidaqmx.Task("Sample_clock_source")
# GenPulses.xxx == task.channels.xxx
GenPulses = task.co_channels.add_co_pulse_chan_freq(counter = "Dev1/ctr1",
freq = counting_rate, # Hz
duty_cycle = pulse_duty_cycle)
# Output Channel
GenPulses.co_pulse_term = "/Dev1/PFI7"
# Timing Setting
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS)
# Trigger Channel
task.triggers.start_trigger.cfg_dig_edge_start_trig("/Dev1/PFI3",
trigger_edge = nidaqmx.constants.Edge.RISING)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(GenPulses)
print("\nOutput Channel: " + GenPulses.co_pulse_term)
return task
# %% Pulsed / TTL gated detection
def pulse_gated_count_task(samps_per_chan):
task = nidaqmx.Task("pulse_gated_SPD_counting")
# PulseWidth.xxx == task.channels.xxx
PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0",
min_val = 0.0,
max_val = 2e7,
units = nidaqmx.constants.TimeUnits.TICKS,
starting_edge = nidaqmx.constants.Edge.FALLING)
# SPD Channel
PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0"
# Pulsed TTL Channel (from PFI7 to PFI4)
PulseWidth.ci_pulse_width_term = "/Dev1/PFI4"
# Timing
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(PulseWidth)
print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src)
print(PulseWidth.ci_ctr_timebase_active_edge)
print(PulseWidth.ci_dup_count_prevention)
print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term)
print(PulseWidth.ci_pulse_width_units)
print(PulseWidth.ci_pulse_width_starting_edge)
return task
def arbitaryTTL_gated_count_task(samps_per_chan):
task = nidaqmx.Task("arbitaryTTL_gated_SPD_counting")
# PulseWidth.xxx == task.channels.xxx
PulseWidth = task.ci_channels.add_ci_pulse_width_chan(counter = "Dev1/ctr0",
min_val = 0.0,
max_val = 2e7,
units = nidaqmx.constants.TimeUnits.TICKS,
starting_edge = nidaqmx.constants.Edge.RISING)
# SPD Channel
PulseWidth.ci_ctr_timebase_src = "/Dev1/PFI0"
# arbitary TTL Channel (from RFSoC)
PulseWidth.ci_pulse_width_term = "/Dev1/PFI1"
# Timing
task.timing.cfg_implicit_timing(sample_mode = nidaqmx.constants.AcquisitionType.FINITE,
samps_per_chan = samps_per_chan)
print("------------------------------")
print("Task has been created:\n")
print(task)
print(PulseWidth)
print("\nSPD Channel: " + PulseWidth.ci_ctr_timebase_src)
print(PulseWidth.ci_ctr_timebase_active_edge)
print(PulseWidth.ci_dup_count_prevention)
print("\nTTL Channel: " + PulseWidth.ci_pulse_width_term)
print(PulseWidth.ci_pulse_width_units)
print(PulseWidth.ci_pulse_width_starting_edge)
return task
# %% correct counting time
def correct_cps(DAQ_counts, DAQ_counting_time, SAPD_dead_time):
# DAQ_counts <- np.array
corrected_time = DAQ_counting_time - SAPD_dead_time * DAQ_counts
corrected_counts = DAQ_counts / corrected_time
return corrected_counts
# %% main
if __name__ == '__main__':
import numpy as np
import matplotlib.pyplot as plt
AOM_ON_time = 0.1 # s
DAQ_counting_time = 0.1 # s
DAQ_counting_rate = 1 / (AOM_ON_time + DAQ_counting_time) # Hz
DAQ_samps_per_chan = 50
DAQ_loop_per_chan = 1
try:
task_count.close()
task_clock.close()
except:
pass
finally:
print("------------------------------")
print("Tasks have been cleared.\n")
raw_counts_all = []
# CW_CD
# task_count = ext_samp_clk_trig_count_task(DAQ_counting_rate, DAQ_samps_per_chan)
# task_clock = gen_trig_src_task(DAQ_counting_rate)
# CW_GatedD
task_count = pulse_gated_count_task(DAQ_samps_per_chan)
task_clock = gen_trig_src_task(DAQ_counting_rate,
pulse_duty_cycle = AOM_ON_time * DAQ_counting_rate)
for ii in range(DAQ_loop_per_chan):
task_clock.start()
task_count.start()
raw_counts_accumulated = task_count.read(nidaqmx.constants.READ_ALL_AVAILABLE, nidaqmx.constants.WAIT_INFINITELY)
task_count.stop()
task_clock.stop()
raw_counts_accumulated = np.array(raw_counts_accumulated)
raw_counts = raw_counts_accumulated[1:] - raw_counts_accumulated[:-1]
raw_counts_all.append(raw_counts)
#######################################
# real-time plot
plt.figure(1, figsize=[9,6], dpi=100)
plt.clf()
plt.plot(raw_counts, '.-r', label='loop {:}'.format(ii) )
plt.grid()
plt.legend(loc=1)
# plt.title(plot_title)
# plt.xlabel(plot_xlabel)
# plt.ylabel('SAPD counts ({:}, raw)'.format(ODFilter) )
plt.show()
plt.pause(0.1)
task_count.close()
task_clock.close()
print("------------------------------")
print("ALL END. (NO ERRORS)\n")
+134
View File
@@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 9 19:55:10 2024
@author: shen_t2
"""
import PowerSupplyCaylarLib
import traceback
import time
def EMagnet_connect_test(powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT):
print("==================================================")
print("Test Caylar power supply library start !\n")
# powerSupply = PowerSupplyCaylarLib.CaylarPowerSupply(2)
try:
print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...")
powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT)
print("Successfully connected to the power supply!\n")
print("Power supply info:")
print("---------------------------")
print(" - IDN: " + powerSupply.getIDN())
print("\n - Températures infos:")
print(" - Rack temp = " + powerSupply.getRackTemp().split(" ")[1])
print(" - Box temp = " + powerSupply.getBoxTemp().split(" ")[1])
print(" - ADC/DAC temp = " + powerSupply.getAdcDacTemp().split(" ")[1])
print("\n - Power infos:")
power_state = int(powerSupply.getPowerState().split(" ")[1])
selector_pos = int(powerSupply.getSelectorPos().split(" ")[1])
print(" - Power is " + ("off", "on")[power_state])
print(" - Output tension = " + powerSupply.getVoltage().split(" ")[1])
print(" - Output current = " + powerSupply.getCurrent().split(" ")[1])
print(" - Current remote from " + ("front potentiometer", "internal DAC", "external voltage source")[selector_pos])
print("\n - Securities infos:")
default_state = int(powerSupply.getDefaultState().split(" ")[1])
maintenance_state = int(powerSupply.getMaintenanceState().split(" ")[1])
print(" - Power supply power securitie is " + ("off", "on")[default_state])
print(" - Last default name is " + powerSupply.getDefaultName().split(" ")[1])
print(" - Maintenance is " + ("off", "on")[maintenance_state])
except:
print("soft error ...")
traceback.print_exc()
finally:
powerSupply.disconnect()
def set_Bfield_Current(B_current, powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT):
try:
# print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...")
powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT)
# print("Successfully connected to the power supply!\n")
print("------------------------------")
powerSupply.setCurrent_wait(B_current)
print("Successfully set the current!\n")
time.sleep(5)
print(" - Output current = " + powerSupply.getCurrent().split(" ")[1])
# print(" - Hall probe field = " + powerSupply.getField().split(" ")[1])
except:
print("soft error ...")
traceback.print_exc()
finally:
powerSupply.disconnect()
def set_Bfield_Gauss(Bfield_Gs, poles_gap, powerSupply, USER_PSUPPLY_IP, USER_PSUPPLY_PORT):
try:
# print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...")
powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT)
# print("Successfully connected to the power supply!\n")
print("------------------------------")
calib_name = "Caylar_{:}_center".format(poles_gap)
powerSupply.import_calibration(calib_name)
print("Successfully calibrated with [{:}]!\n".format(calib_name))
powerSupply.setField(Bfield_Gs)
print("Successfully set the current!\n")
time.sleep(5)
print(" - Output current = " + powerSupply.getCurrent().split(" ")[1])
print(" - Set field = " + str(Bfield_Gs) + " Gauss")
except:
print("soft error ...")
traceback.print_exc()
finally:
powerSupply.disconnect()
# %% main
if __name__ == '__main__':
EMagnet_PSUPPLY_IP = "QPS-CAYLAR-1"
EMagnet_PSUPPLY_PORT = 1234
EMagnet_poles_gap = "large" # "small" or "large"
scanB = 1000 # Gauss
powerSupply = PowerSupplyCaylarLib.CaylarPowerSupply(2)
EMagnet_connect_test(powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT)
set_Bfield_Current(-100, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT)
set_Bfield_Gauss(scanB, EMagnet_poles_gap, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT)
# set_Bfield_Current(0, powerSupply, EMagnet_PSUPPLY_IP, EMagnet_PSUPPLY_PORT)
+54
View File
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 3 14:38:24 2025
@author: shen_t2
experiment configuration info
"""
EMagnet_PSUPPLY_IP = "QPS-CAYLAR-1"
# EMagnet_PSUPPLY_IP = "129.129.180.xxx"
EMagnet_PSUPPLY_PORT = 1234
Laser_DLCPRO_IP = "129.129.180.85"
# %%
# laser_current = 110 # [mA] CTL output in power stabilization = 2 mW
# laser_current = 120
laser_current = 150 # 10 mW
# laser_current = 200 # 20.66 mW
# laser_current = 270 # 35 mW
# laser_current = 290 # 39.76 mW
# laser_current = 300 # 40.50 mW
Bext_field_scan = [+0] # [Gauss]
# Bext_field_scan = [+1000]
ODFilter = 'OD0p0'
# ODFilter = 'OD1p0'
# ODFilter = 'OD3p0'
laser_pol = 'sigma'
# laser_pol = 'pi'
# Temp_info = 'roomT'
Temp_info = 'lowT 4.4 K'
# Temp_info = 'lowT 4.2 K' # w/o heat shield
# sample = '#1, 0.1%, 166Er'
sample = '#2, 0.01%, 166Er'
# sample = '#22, 0.01%, 167Er'
# sample = '#24, 0.1%, 167Er'
SAPD_dead_time = 15 * 1e-6 # [s]
+106
View File
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 9 16:21:22 2024
@author: shen_t2
"""
from toptica.lasersdk.dlcpro.v2_6_0 import DLCpro, NetworkConnection, DeviceNotFoundError
from toptica.lasersdk.dlcpro.v2_6_0 import Subscription, Timestamp, SubscriptionValue
import time
def set_widescan_para(DLCPRO_IP, scan_channel, start_value, end_value, scan_speed, start_trig_threshold):
try:
with DLCpro(NetworkConnection(DLCPRO_IP)) as dlcpro:
dlcpro.laser1.wide_scan.output_channel.set(scan_channel)
if scan_channel == 79:
dlcpro.laser1.ctl.wavelength_set.set(start_value)
if scan_channel == 50:
dlcpro.laser1.dl.pc.voltage_set.set(start_value)
dlcpro.laser1.wide_scan.scan_begin.set(start_value)
dlcpro.laser1.wide_scan.scan_end.set(end_value)
dlcpro.laser1.wide_scan.shape.set(0)
dlcpro.laser1.wide_scan.speed.set(scan_speed)
dlcpro.laser1.wide_scan.trigger.output_enabled.set(False)
dlcpro.laser1.wide_scan.trigger.output_channel.set(3)
dlcpro.io.digital_out3.mode.set(2)
dlcpro.laser1.wide_scan.trigger.output_threshold.set(start_trig_threshold)
dlcpro.laser1.wide_scan.trigger.output_enabled.set(True)
time.sleep(5)
print("------------------------------")
print("Laser wide scan has been set.")
print("Expected scan time: {:.2f} s".format(dlcpro.laser1.wide_scan.duration.get()))
except DeviceNotFoundError:
print('DLCpro device not found.')
return dlcpro
def callback(subscription: Subscription, timestamp: Timestamp, value: SubscriptionValue):
print("{}: Parameter '{}' changed its value to {}".format(timestamp.time(), subscription.name, value.get()))
# %% callback test
# if __name__ == '__main__':
# with DLCpro(NetworkConnection(Laser_DLCPRO_IP)) as dlc:
# with dlc.laser1.wide_scan.state_txt.subscribe(callback):
# dlc.run()
# time.sleep(5)
# dlc.stop()
# %% main
if __name__ == '__main__':
Laser_DLCPRO_IP = "129.129.131.110"
wide_scan_output_channel = 79 # 50 = piezo V // 79 = CTL wl //
# wide_scan_start_wl = 1530.00 # nm
wide_scan_trigger_threshold = 1530.00 # nm
wide_scan_end_wl = 1530.40 # nm
wide_scan_speed = 0.0010 # nm/s
wide_scan_start_wl = wide_scan_trigger_threshold - 15 * wide_scan_speed
dlcpro = set_widescan_para(Laser_DLCPRO_IP,
wide_scan_output_channel,
wide_scan_start_wl,
wide_scan_end_wl,
wide_scan_speed,
wide_scan_trigger_threshold)
dlcpro.open()
print(dlcpro.laser1.wide_scan.state.get(), dlcpro.laser1.wide_scan.state_txt.get())
dlcpro.laser1.wide_scan.start()
# add DAQ tasks here
time.sleep(20)
dlcpro.laser1.wide_scan.stop()
dlcpro.close()
+583
View File
@@ -0,0 +1,583 @@
#!/usr/bin/env python
# coding: utf-8
"""
This library is provided with Caylar power supplies to allow remote control
via Ethernet connection.
This library contains an example code that can be run directly from this file
(see the code at the end of the file).
This library does not cover all available Ethernet commands.
Please refer to the Ethernet interface notice for a complete list of available commands.
For any questions or custom development please contact our team at www.caylar.net
"""
__version__ = "1.0"
__author__ = "Louis Bernot"
__email__ = "caylar@caylar.net"
__status__ = "Beta"
__license__ = "MIT"
from ast import Try
import socket
from threading import Lock
from threading import Thread
import time
import numpy as np
import pandas as pd
from scipy import interpolate
class CaylarPowerSupply(Thread):
def __init__(self, time_out=5, gap="large"):
super().__init__()
"""
create and init the power supply object with the specified time_out.
:param time_out: time out is the maximum response time from the socket in second.
:type ip: int
"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.time_out = time_out
self.sock_mutex = Lock() #to make library thread safe, i.e. to be sure that
#not two thread modifiy the resource at the same time
self.calibration = None
def import_calibration(self, name, caylar_path=""):
if name=="Caylar_large_holder":
data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", sheet_name="field_vs_current_gap80D10mm_V1_", usecols=range(0, 7))
B= np.array(data["Column4"][3:], dtype=float)
I = np.array(data["Column2"][3:], dtype=float)
elif name=="Caylar_large_center":
data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", sheet_name="field_vs_current_gap80D10mm_V1_", usecols=range(0, 7))
B = np.array(data["Column6"][3:], dtype=float)
I = np.array(data["Column2"][3:], dtype=float)
elif name=="Caylar_small_holder":
data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", usecols=range(0, 7))
I = np.array(data["Column2"][3:], dtype=float)
B = np.array(data["Column4"][3:], dtype=float)
elif name=="Caylar_small_center":
data = pd.read_excel(caylar_path+"Caylar_calibration.xlsx", usecols=range(0, 7))
B = np.array(data["Column6"][3:], dtype=float)
I = np.array(data["Column2"][3:], dtype=float)
else:
A = np.loadtxt(name)
I = A[:,0]
B = A[:,1]
self.calibrationB = interpolate.interp1d(B, I)
self.calibrationI = interpolate.interp1d(I, B)
def run_calibration(self, calibration_name):
power_state = int(self.getPowerState().split(" ")[1])
if (power_state == 0):
self.setPowerOn()
folder = "../../Code/caylar_class/"
tmp_rs = self.getRampSpeed_float()
self.setRampSpeed(10)
self.setCurrent_wait(-100)
sI = np.linspace(-100, 100, 41)
B = np.zeros_like(sI)
for i, I in enumerate(sI):
self.setCurrent_wait(I)
time.sleep(0.6)
B[i] =self.getField_float()
A = np.zeros((len(sI),2))
A[:, 0] = sI
A[:, 1] = B
np.savetxt(folder+calibration_name+"low.txt", A)
sI = np.linspace(100, -100, 41)
B = np.zeros_like(sI)
for i, I in enumerate(sI):
self.setCurrent_wait(I)
time.sleep(0.6)
B[i] =self.getField_float()
A = np.zeros((len(sI),2))
A[:, 0] = sI
A[:, 1] = B
np.savetxt(folder+calibration_name+"high.txt", A)
self.setCurrent_wait(0)
self.setRampSpeed(tmp_rs)
return sI, B
def connect(self, ip, port):
"""
Connect function try to create and open a socket to connect to the power supply
from the given IP on port 1234 wich is the default port of caylar power supply.
:param ip: the power supply ip
:type ip: string
"""
with self.sock_mutex:
# try:
# self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.sock.settimeout(self.time_out)
# self.sock.connect((ip, port))
# except:
# return 1
# else:
# return 0
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.time_out)
self.sock.connect((ip, port))
return 0
def disconnect(self):
"""
Close and destroy the socket connected (or not) to the power supply.
"""
with self.sock_mutex:
self.sock.close()
def getIDN(self):
"""
Returns the power supply factory id.
:return: Power supply idn.
Return format is 'CAYLAR_MPUXXXX_XXX' where XXXX_XXX is the id.
:rtype: string.
"""
return self.__sendCmd("*IDN?")
def clearDefault(self):
"""
Reset the power supply defaults.
If the present default has not been fixed, the command has no effect and the
default status remains active.
:return: Power supply clearDefault acknowledgment.
Return format is 'CLEAR_DEFAULT_OK'.
:rtype: string.
"""
return self.__sendCmd("CLEAR_DEFAULT")
def getRackTemp(self):
"""
Returns the last measured temperature of the rack containing the power supply
control electronics.
:return: Rack temperature in degree Celsius.
Return format is 'RACK_TEMP= +XX.XX Deg'.
:rtype: string.
"""
return self.__sendCmd("GET_RACK_TEMP")
def getBoxTemp(self):
"""
Returns the last measured temperature of boxe containing the power supply precision
electronics wich can be thermostated according to the power supply options.
:return: Boxe temperature in degree Celsius.
Return format is 'BOX_TEMP= +XX.XX Deg'.
:rtype: string.
"""
return self.__sendCmd("GET_BOX_TEMP")
def getAdcDacTemp(self):
"""
Returns the last measured temperature of the pcb containing the DAC and ADS.
:return: PCB (DAC and ADC) temperature in degree Celsius.
Return format is 'ADC_DAC_TEMP= +XX.XX Deg'.
:rtype: string.
"""
return self.__sendCmd("GET_ADC_DAC_TEMP")
def getWaterTemp(self):
"""
Returns the last measured temperature of power supply water cooling
at the outlet of the circuit.
WARNING: water temperature measurement is an option.
:return: Water temperature in degree Celsius.
Return format is 'WATER_TEMP= +XX.XX Deg'.
:rtype: string.
"""
return self.__sendCmd("GET_WATER_TEMP")
def getWaterFlow(self):
"""
Returns the last measured coolant flow value of power supply at the
outlet of the circuit.
WARNING: water flow measurement is an option.
:return: Coolant flow in L/Min.
Return format is 'WATER_FLOW= +XX.X L/Min'.
:rtype: string.
"""
return self.__sendCmd("GET_WATER_FLOW")
def getField(self):
"""
Returns the last measurement of the field made by the power supply hall probe. The field is measured approximately every 0.6
second depending on options and settings. The field is returned in Gauss.
"""
return self.__sendCmd("GET_FIELD")
def getField_float(self):
return float(self.getField().split(" ")[1])
def getHallTemp(self):
return self.__sendCmd("GET_HALL_PROBE_TEMP")
def getHallTemp_float(self):
return float(self.getHallTemp().split(" ")[1])
def getCurrent(self):
"""
Returns the last current measurement of the power supply.
:return: Power supply current in ampere.
Return format is 'CURRENT= +XXX.XXXXXX A'.
:rtype: string.
"""
return self.__sendCmd("GET_CURRENT")
def getRampSpeed(self):
return self.__sendCmd("GET_DIGITAL_CURRENT_RAMP_SPEED")
def getRampSpeed_float(self):
return float(self.getRampSpeed().split(" ")[2])
def getVoltage(self):
"""
Returns the last voltage measurement of the power supply.
:return: Power supply out voltage in volts.
Return format is 'VOLTAGE= +XX.XXX V'.
:rtype: string.
"""
return self.__sendCmd("GET_VOLTAGE")
def getDefaultState(self):
"""
Returns the power security status of the power supply.
:return: Power supply security status.
Return format is 'DEFAULT_STATE= X'.
X is the security status: 1=error, 0=no_error.
:rtype: string.
"""
return self.__sendCmd("GET_DEFAULT_STATE")
def getDefaultName(self):
"""
Returns the last default name of the power supply.
:return: Last default name.
Return format is 'DEFAULT_NAME= XXXXXXXXXXX'.
XXXXXXXXXXX can take the following values :
"ALIMS AUX", "MAINS", "QUENCH", "INTERLOCK 3", "LIMIT POWER", "WATCH DOG",
"LIMIT I", "PC DEFAULT", "NEG BANK", "DCCT", "POS BANK", "BANK TEMP",
"CONDENSATION", "INTERLOCK 2", "BRIDGE TEMP", "INTERLOCK 1"
:rtype: string.
"""
return self.__sendCmd("GET_DEFAULT_NAME")
def getSelectorPos(self):
"""
Returns the position of the control switch on the front pannel of the power supply.
The switch selects the source of the current setpoint between the front potentiometer,
the DAC or the external voltage entry.
:return: Selector position code.
Return format is 'CMD_SELEC= X'.
X is the selec pos code : 0=potentiometer, 1=dac, 2=external.
:rtype: string
"""
return self.__sendCmd("GET_CMD_SELEC")
def getPowerState(self):
"""
Returns the power status of the power supply.
:return: Power status.
Return format is 'POWER_STATE= X'.
X is the power status of the power supply: 1=power_on, 0=power_off.
:rtype: string.
"""
return self.__sendCmd("GET_POWER_STATE")
def getMaintenanceState(self):
"""
Returns the maintenance state of the power supply.
:return: Maintenance state.
Return format is 'MAINTENANCE_STATE= X'.
X is the maintenance state of the power supply: 1=maintenance_on, 0=maintenance_off.
:rtype: string.
"""
return self.__sendCmd("GET_MAINTENANCE_STATE")
def setCurrent(self, cmd):
"""
Changes the power supply current setpoint in DAC mode.
:param cmd: cmd is the setpoint in ampere.
:type cmd: double, int or float - signed or not.
:return: power supply out current in Ampere.
Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'.
XXX.XXXXXX is the feedback setpoint which corresponds to cmd.
Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high.
Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and
'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY'
Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_CURRENT " + str(cmd))
def setCurrent_wait(self, cmd):
cc = float(self.getCurrent().split(" ")[1])
rs = self.getRampSpeed_float()
ret = self.setCurrent(cmd)
#print(rs, ret, cmd, np.abs(cmd-cc)/rs*1.3)
time.sleep(np.abs(cmd-cc)/rs*1.3)
return ret
def setRampSpeed(self, cmd):
return self.__sendCmd("SET_DIGITAL_FIELD_RAMP_SPEED " + str(cmd))
def setField(self,cmd):
"""
Set the digital field setpoint in field regulation (from hall probe).
:param cmd: cmd is the setpoint in ampere.
:type cmd: double, int or float - signed or not.
:return: Response in case of unrecognized argument <setpoint> : SET_FIELD_ERROR BAD_ARG
Response in case of overrange in <setpoint> argument : SET_FIELD_ERROR OVERRANGE
Response in case of power off state with « CMD UPDATE : PON ONLY» (1): SET_FIELD_ERROR POWER_OFF
Response in case of current regulation mode : SET_FIELD_ERROR CURRENT_REG
Response if maintenance is on : SET_FIELD_ERROR MAINTENANCE_ON
:rtype: string.
"""
return self.setCurrent_wait(self.calibrationB(cmd))
def getModCurrent(self):
return self.__sendCmd("GET_MODUL_CURRENT_AMPLI")
def setModCurrent(self, cmd):
"""
Changes the Modulation current
:param cmd: cmd is the setpoint in ampere.
:type cmd: double, int or float - signed or not.
:return: modulation current in Ampere.
TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'.
XXX.XXXXXX is the feedback setpoint which corresponds to cmd.
Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high.
Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and
'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY'
Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_MODUL_CURRENT_AMPLI " + str(cmd))
def getModFreq(self):
return self.__sendCmd("GET_MODUL_FREQ")
def setModFreq(self, cmd):
"""
Changes the Modulation frequency
:param cmd: cmd is the setpoint in Hz.
:type cmd: double, int or float - signed or not.
:return: modulation frequency.
TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'.
XXX.XXXXXX is the feedback setpoint which corresponds to cmd.
Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high.
Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and
'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY'
Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_MODUL_FREQ " + str(cmd))
def setModOn(self):
"""
Turn on the modulatoin coil
TODO:
:return: Modulation driver setPowerOn acknowledgment
TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'.
XXX.XXXXXX is the feedback setpoint which corresponds to cmd.
Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high.
Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and
'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY'
Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_MODUL_ON")
def setsaveModOn(self):
return self.setModOn()
def setModOff(self):
"""
Turn off the modulatoin coil
TODO:
:return: Modulation driver setPowerOn acknowledgment
TODO: Return format is 'SET_CURRENT_OK +XXX.XXXXXX A'.
XXX.XXXXXX is the feedback setpoint which corresponds to cmd.
Return 'SET_CURRENT_ERROR OVERRANGE' if cmd is to high.
Return 'SET_CURRENT_ERROR POWER_OFF' if power is of and
'CMD UPDATE' parameter in 'divers menu' of power supply = 'PON ONLY'
Return 'SET_CURRENT_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_MODUL_OFF")
def setMaintenanceOn(self):
"""
Switch power supply to maintenance mode to prevent SET Ethernet commands from being
able to drive power when an operator is physically working on the power supply.
See getMaintenanceState() to get the maintenance mode of the power supply.
WARNING: Maintenance mode can only be removed physically from the front face
of the power supply. The maintenance selector is available in the "SECURITIES"
menu by pressing the F3 key on the keyboard.
:return: Power supply setMaintenanceOn acknowledgment.
Return format is 'SET_MAINTENANCE_ON_OK'.
Return 'SET_MAINTENANCE_ON_ERROR MAINTENANCE_ON' if maintenance is already active.
:rtype: string.
"""
return self.__sendCmd("SET_MAINTENANCE_ON")
def setPowerOn(self):
"""
Turn the power on.
:return: Power supply setPowerOn acknowledgment.
Return format is 'SET_POWER_ON_OK'.
Return 'SET_POWER_ON_ERROR DEFAULT_ON' if default is active.
Return 'SET_POWER_ON_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string - if there is no error the
"""
return self.__sendCmd("SET_POWER_ON")
def setPowerOff(self):
"""
Turn the power off.
WARNING: Turning off the power while current is circulating should be avoided
as much as possible. Check that the current is at zero before switching off the power.
:return: Power supply setPowerOff acknowledgment.
Return format is 'SET_POWER_OFF_OK'.
Return 'SET_POWER_OFF_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_POWER_OFF")
def setUserPowerErrorOn(self):
"""
Create a user power default and prevent the power supply from restarting until the default
is removed (see setUserPowerErrorOff()).
WARNING: This function cuts the power to the power supply instantly.
Therefore, it must be used only in case of emergency.
:return: Power supply setUserPowerErrorOn acknowledgment.
Return format is 'SET_DEFAULT_ON_OK'.
Return 'SET_DEFAULT_ON_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_DEFAULT_ON")
def setUserPowerErrorOff(self):
"""
Clear the user power default.
WARNING: This function don't clear the global power default state of the power supply.
You can clear the global default by press the reset button in front pannel of power supply
or by the clearDefault() function.
:return: Power supply setUserPowerErrorOn acknowledgment.
Return format is 'SET_DEFAULT_OFF_OK'.
Return 'SET_DEFAULT_OFF_ERROR MAINTENANCE_ON' if maintenance is active.
:rtype: string.
"""
return self.__sendCmd("SET_DEFAULT_OFF")
def __sendCmd(self, cmd):
with self.sock_mutex:
self.sock.send(bytes(cmd + '\n', 'ascii'))
time.sleep(0.01)
rx_buff = self.sock.recv(50).decode('ascii').replace("\n", "").replace("\r", "")
return rx_buff
"""
Little demo of the librairie.
---
This program does not take control of the power.
It simply get some of power supply reading values.
"""
if __name__ == "__main__":
import traceback
USER_PSUPPLY_IP = "129.129.180.91" #CHANGE USER_PSUPPLY_IP WITH YOUR OWN POWER SUPPLY IP HERE
USER_PSUPPLY_PORT = 1234 #CHANGE USER_PSUPPLY_IP WITH YOUR OWN POWER SUPPLY IP HERE
print("==================================================")
print("Test Caylar power supply library start !\n")
powerSupply = CaylarPowerSupply(2)
try:
print("Try to connect to " + USER_PSUPPLY_IP + " (port 1234) ...")
powerSupply.connect(USER_PSUPPLY_IP,USER_PSUPPLY_PORT)
print("Successfully connected to the power supply!\n")
print("Power supply info:")
print("---------------------------")
print(" - IDN: " + powerSupply.getIDN())
print("\n - Températures infos:")
print(" - Rack temp = " + powerSupply.getRackTemp().split(" ")[1])
print(" - Box temp = " + powerSupply.getBoxTemp().split(" ")[1])
print(" - ADC/DAC temp = " + powerSupply.getAdcDacTemp().split(" ")[1])
print("\n - Power infos:")
power_state = int(powerSupply.getPowerState().split(" ")[1])
selector_pos = int(powerSupply.getSelectorPos().split(" ")[1])
print(" - Power is " + ("off", "on")[power_state])
print(" - Output tension = " + powerSupply.getVoltage().split(" ")[1])
print(" - Output current = " + powerSupply.getCurrent().split(" ")[1])
print(" - Current remote from " + ("front potentiometer", "internal DAC", "external voltage source")[selector_pos])
print("\n - Securities infos:")
default_state = int(powerSupply.getDefaultState().split(" ")[1])
maintenance_state = int(powerSupply.getMaintenanceState().split(" ")[1])
print(" - Power supply power securitie is " + ("off", "on")[default_state])
print(" - Last default name is " + powerSupply.getDefaultName().split(" ")[1])
print(" - Maintenance is " + ("off", "on")[maintenance_state])
except:
print("soft error ...")
traceback.print_exc()
finally:
powerSupply.disconnect()