Files
sf/ambientdata.py

212 lines
6.7 KiB
Python

def collect_ambient_data(self):
'''
Collect ambient data such as bunch association and return it
as a dictionary
'''
# Time in seconds in an integer and can be stored in hdf5
time_in_seconds = time.time()
time_stamp = datetime.fromtimestamp(
time_in_seconds).strftime('%a %d-%m-%Y %H:%M:%S')
# Bunch number and destination
handles = self.cafe.getHandles()[0]
status = self.cafe.attachContext(handles[0])
if status == self.cyca.ECAFE_NULLCONTEXT:
options = {}
options['statusCode'] = (str(status) + " " +
self.cafe.getStatusCodeAsString(status))
options['statusInfo'] = self.cafe.getStatusInfo(status)
self.parent.trigger_log_message.emit(
MsgSeverity.ERROR.name, _pymodule, _line(),
("Cannot attach CA context in thread " +
"Scan will not be initiated!"), options)
if self.abort:
self.aborting(_line())
return {}
self.parent.trigger_progressbar.emit(PROGRESS_THREAD_ERROR)
return {}
pv_value_dict = OrderedDict()
pv_list = []
for key, inner_key in self.settings.data["PVAmbient"].items():
pv_value_dict[key] = OrderedDict()
for ikey, value in inner_key.items():
pv_value_dict[key][ikey] = 0
pv_list.append(value)
self.cafe.openPrepare()
handle_list = self.cafe.open(pv_list)
self.cafe.openNowAndWait(1.0)
self.cafe.setGetActionWhenMonitorPolicyAllHandles(
self.cyca.GET_FROM_CACHE)
value_list, stat, status_list = self.cafe.getScalarList(handle_list)
if self.debug:
for pv, val, stat in zip(pv_list, value_list, status_list):
print(pv, "//", val, "//", stat)
if stat != self.cyca.ICAFE_NORMAL:
self.check_status_list(pv_list, status_list, _line())
#Put values in dictionary for inspection
i = 0
for dict_key in pv_value_dict.keys():
for inner_key in pv_value_dict[dict_key].keys():
pv_value_dict[dict_key][inner_key] = value_list[i]
i += 1
bunch_key_dict = {ARAMIS: None, ATHOS: None, PORTHOS: None}
bunch_no_dict = {ARAMIS: 0, ATHOS: 0, PORTHOS: 0}
def get_target_bunch():
for target in target_list:
target_bunch = None
target_bunch_no = 0
for key in pv_value_dict.keys():
if "dest" in pv_value_dict[key].keys():
if pv_value_dict[key]["dest"] == target:
target_bunch = key
break
if target_bunch is None:
print("Beamline {0} not ready".format(target))
else:
target_bunch_no = re.findall(r'\d+', target_bunch)[0]
bunch_key_dict[target] = target_bunch
bunch_no_dict[target] = target_bunch_no
get_target_bunch()
print(bunch_key_dict, flush=True)
print(bunch_no_dict, flush=True)
active_target_dict = {ARAMIS: False, ATHOS: False, PORTHOS: False}
def is_target_active():
bar = bunch_key_dict[ARAMIS]
bat = bunch_key_dict[ATHOS]
if None in [bar, bat]:
return
target_found = True
if pv_value_dict[bar]["rep"] > 0.0:
if pv_value_dict[bat]["rep"] <= 0.0:
active_target_dict[ARAMIS] = True
else:
laser_ar = pv_value_dict[bar]["laser"]
lar = pv_value_dict["Las"][laser_ar]
laser_at = pv_value_dict[bat]["laser"]
lat = pv_value_dict["Las"][laser_at]
if lar > 0.0:
if lat <= 0.0:
active_target_dict[ARAMIS] = True
else:
target_found = False
elif lat > 0.0:
active_target_dict[ATHOS] = True
else:
target_found = False
elif pv_value_dict[bat]["rep"] > 0.0:
active_target_dict[ATHOS] = True
for key, value in active_target_dict.items():
if value:
return key
return None
active_target = is_target_active()
if self.debug:
print("active_target_dict:", active_target_dict)
print("active_target: ", active_target)
if active_target is not None:
active_bunch = bunch_key_dict[active_target]
beamline = active_target
bunch_no = bunch_no_dict[active_target]
gun_laser = pv_value_dict[active_bunch]["laser"]
rep_rate = pv_value_dict[active_bunch]["rep"]
#beamline_color = "blue"
bunch_label = "B{0}".format(bunch_no)
dest_label = "to {0} \n({1}, {2} Hz)".format(
beamline, gun_laser, rep_rate)
else:
active_bunch = "Bunch1"
beamline = ARAMIS + "/" + ATHOS
bunch_no = 0
gun1 = pv_value_dict["Bunch1"]["laser"]
gun2 = pv_value_dict["Bunch2"]["laser"]
gun_bunch1 = gun1 if gun1 else "unknown"
gun_bunch2 = gun2 if gun2 else "unknown"
gun_laser = (gun_bunch1 + "/" + gun_bunch2)
rep1 = pv_value_dict["Bunch1"]["rep"]
rep2 = pv_value_dict["Bunch2"]["rep"]
rep_bunch1 = str(rep1) if rep1 is not None else "unknown"
rep_bunch2 = str(rep2) if rep2 is not None else "unknown"
rep_rate = (rep_bunch1 + "/" + rep_bunch2)
#beamline_color = "blue"
bunch_label = "B1/B2"
dest_label = " "
mess = "Bunch: {0} {1}".format(bunch_label, dest_label)
self.parent.trigger_log_message.emit(
MsgSeverity.INFO.name, _pymodule, _line(), mess, {})
if self.debug:
print(beamline, bunch_no, gun_laser, rep_rate)
print("Active bunch", active_bunch, flush=True)
# Bunch charge and TD voltage
pv_charge = self.settings.data["PVAmbient"][active_bunch]["charge"]
pv_charge_voltage = [pv_charge]
self.cafe.openPrepare()
self.cafe.open(pv_charge_voltage)
self.cafe.openNowAndWait(0.2)
value_list, stat, status_list = self.cafe.getScalarList(
pv_charge_voltage)
if stat != self.cyca.ICAFE_NORMAL:
self.check_status_list(pv_charge_voltage, status_list, _line())
bunch_charge = value_list[0] * (10**-12)
if self.debug:
print("\nCharge : ", bunch_charge, " [C]")
ambient_data = {
'Time in seconds': time_in_seconds,
'Time stamp': time_stamp,
'Bunch number': bunch_no,
'Beam line': beamline,
'Gun laser': gun_laser,
'Repetition rate': rep_rate,
'Bunch label': bunch_label,
'Destination label': dest_label,
'Bunch charge': bunch_charge
}
if self.debug:
print(ambient_data)
return ambient_data