results and clara changes

This commit is contained in:
Appleby Martin Vears
2024-11-04 13:04:16 +01:00
parent a2f05b768d
commit 4b8bab43ef
2 changed files with 182 additions and 33 deletions

View File

@@ -5,6 +5,8 @@ import datetime
import json import json
import os import os
import signal import signal
import h5py
import numpy as np
import subprocess as sub import subprocess as sub
import sys import sys
import time import time
@@ -16,7 +18,7 @@ from loguru import logger
import receive_msg import receive_msg
#define log file place: #define log file place:
LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") LOG_FILENAME = time.strftime("/sf/cristallina/data/p21981/res/clara_%Y%m.log")
logger.add(LOG_FILENAME, level="INFO", rotation="100MB") logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
@@ -332,8 +334,8 @@ class CollectedH5:
f2.write("\n") f2.write("\n")
f2.write("0/min_fs = 0 \n") f2.write("0/min_fs = 0 \n")
f2.write("0/min_ss = 0 \n") f2.write("0/min_ss = 0 \n")
f2.write("0/max_fs =" + str(self.message["detector_height_pxl"] ) + "\n") f2.write("0/max_fs =" + str(self.message["detector_width_pxl"] -1 ) + "\n")
f2.write("0/max_ss =" + str(self.message["detector_width_pxl"] ) + "\n") f2.write("0/max_ss =" + str(self.message["detector_height_pxl"] -1) + "\n")
f2.write("0/corner_x = -" + str(self.message["beam_x_pxl"]) + "\n") f2.write("0/corner_x = -" + str(self.message["beam_x_pxl"]) + "\n")
f2.write("0/corner_y = -" + str(self.message["beam_y_pxl"]) + "\n") f2.write("0/corner_y = -" + str(self.message["beam_y_pxl"]) + "\n")
f2.write("0/fs = x \n") f2.write("0/fs = x \n")
@@ -355,25 +357,151 @@ class CollectedH5:
Function to generate a list file with the path of the input H5 file Function to generate a list file with the path of the input H5 file
:return:None :return:None
""" """
print('writing files')
#Assemble path for raw data #Assemble path for raw data
pgroup = str(self.message["experiment_group"]) pgroup = str(self.message["experiment_group"])
raw = "raw" raw = "raw"
data = "data"
filen = str(self.message["filename"]) filen = str(self.message["filename"])
file_path = pa / pgroup / raw / filen file_path = pa / pgroup / raw / filen
# write to cell file in output folder # write to cell file in output folder
name = str(self.message["run_number"]) name = str(self.message["run_number"])
f = open(name + ".list", "w") #f = open(name + ".list", "w")
f.write(str(file_path)) #f.write(str(file_path))
#f.close()
bsdata_name = filen.split('.')[0]+'.BSDATA.h5'
bsdata_path= pa/ pgroup /raw / bsdata_name
print(bsdata_path)
while not bsdata_path.exists():
time.sleep(1)
print('survived the while loop')
try:
bsdata = h5py.File(bsdata_path, "r") #r"/sf/cristallina/data/p21630/raw/run0065-lov_movestop_normal_1/data/acq0001.BSDATA.h5"
except Exception as e:
print(f"didn't open bsdata due to error {e}") #_logger.error(f"Cannot open {data_file} (due to {e})")
#print(pulseids_JF[0])
pulseids_BS = bsdata[f"/SAR-CVME-TIFALL6:EvtSet/pulse_id"][:]
#print(pulseids_BS[0])
evt_set=bsdata[f"/SAR-CVME-TIFALL6:EvtSet/data"][:]
jf_path= file_path
#print(jf_path)
try:
#r"/sf/cristallina/data/p21630/raw/run0065-lov_movestop_normal_1/data/acq0001.JF17T16V01.h5"
x = h5py.File(jf_path, "r")
except Exception as e:
print(f"didn't open JF17T16V01.h5 due to error {e}") #_logger.error(f"Cannot open {data_file} (due to {e})")
return
pulseids_JF = x[f"/entry/xfel/pulseID"][:]
#pulseids in JFJ joch path
#pulseids_JF = x[f"/data/{detector}/pulse_id"][:]
f.close()
#Insert new key into message for triggered/non tirggered for i, pulse_id in enumerate(pulseids_BS):
self.message["trigger"] = "on" pulseids_BS[i]=pulse_id-1
n_pulse_id = len(pulseids_JF) #- maybe not needed ?
#if f"/data/{detector}/is_good_frame" in bsdata:
# is_good_frame = evt_set=bsdata[f"/SAR-CVME-TIFALL6:EvtSet/data"][:][f"/data/{detector}/is_good_frame"][:]
#else:
# is_good_frame = [1] * n_pulse_id
#daq_recs = x[f"/data/{detector}/daq_rec"][:]
#nGoodFrames = 0
#nProcessedFrames = 0
index_dark = []
index_light = []
blanks = []
for i in range(n_pulse_id):
#if not is_good_frame[i]:
# continue
#nGoodFrames += 1
#nProcessedFrames += 1
p = pulseids_JF[i]
q = pulseids_BS[i]
#print(p)
#print(q)
if p != q:
#print(f'acquisition = {acquisition}')
#print(f'Jungfrau pulse id {p} != BS data id {q} for image {i}')
event_i = np.where(pulseids_JF == q)[0]
event_i = i
#print(f'new i = {event_i}, not {i}, now calling BS data for pulse id {pulseids_BS[event_i]}')
else:
event_i=i
events=evt_set[event_i]
#print(evt_set)
#print(e)
#print(event_i)
#print(i)
if events[216] and events[200]:
index_light.append(i)
elif events[200]:
index_dark.append(i)
else:
#print('Should only be here because 200 is false. 200 is {0}'.format(e[200]))
blanks.append(i)
bsdata.close()
x.close()
acq_dark = []
acq_light = []
acq_blank = []
delim = "//"
if index_dark:
for frame_number in index_dark:
acq_dark.append(f"{jf_path} {delim}{frame_number}")
file_off = name+'_off.list'
logger.info(f"List of dark frames : {file_off} , {len(index_dark)} frames")
with open(file_off, "w") as f_list:
for frame in acq_dark:
print(f"{frame}", file = f_list)
if index_light:
for frame_number in index_light:
acq_light.append(f"{jf_path} {delim}{frame_number}")
file_on = name+'_on.list'
logger.info(f"List of light frames : {file_on} , {len(index_light)} frames")
with open(file_on, "w") as f_list:
for frame in acq_light:
print(f"{frame}", file = f_list)
if blanks:
for frame_number in blanks:
acq_blank.append(f"{jf_path} {delim}{frame_number}")
file_blank = name+'_blank.list'
with open(file_blank, "w") as f_list:
for frame in acq_blank:
print(f"{frame}", file = f_list)
return None return None
def create_slurm_script(self): def create_slurm_script(self,trigger):
""" """
Creates the input SLURM file with the following info: Creates the input SLURM file with the following info:
SLURM parameters ( CPUS , nodes, etc) SLURM parameters ( CPUS , nodes, etc)
@@ -389,10 +517,10 @@ class CollectedH5:
# get dat file name without any preceding paths.. # get dat file name without any preceding paths..
#last_pos = str(self.message["dataFileName"]).rfind("/") #last_pos = str(self.message["dataFileName"]).rfind("/")
#data_file_name = str(self.message["filename"][: -3]) #data_file_name = str(self.message["filename"][: -3])
data_file_name = str(self.message["run_number"]) data_file_name = str(self.message["run_number"])+'_'+trigger
# write file # write file
f = open("run_SLURM", "w") f = open("run_SLURM_" + trigger, "w")
f.write("#!/bin/bash \n") f.write("#!/bin/bash \n")
f.write("#SBATCH --job-name=index \n") f.write("#SBATCH --job-name=index \n")
f.write("#SBATCH --partition=prod-aramis \n") f.write("#SBATCH --partition=prod-aramis \n")
@@ -414,17 +542,28 @@ class CollectedH5:
f.write("\n\n") f.write("\n\n")
f.write("# Actual Indexing command for crystFEL \n") f.write("# Actual Indexing command for crystFEL \n")
f.write( f.write(
" indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold=" " indexamajig --peaks="
+ str(int(self.message["user_data"]["crystfelTreshold"])) + str(self.message["user_data"]["peaks"]
#+ " --int-radius=2,3,5 " + " --indexing="
+ str(self.message["user_data"]["indexing"]
+ " --xgandalf-fast-execution --threshold="
+ str(self.message["user_data"]["threshold"])
+ " --tolerance="
+ str(self.message["user_data"]["tolerance"])
+ " --int-radius="
+ str(self.message["user_data"]["int-radius"])
+ " --integration="
+ str(self.message["user_data"]["integration"]
+ " -p " + " -p "
+ str(self.message["run_number"]) + str(self.message["run_number"])
+ ".cell --min-snr=" + ".cell --min-snr="
+ str(self.message["user_data"]["crystfelMinSNR"]) + str(self.message["user_data"]["min-snr"])
+ " --min-peaks=6 --min-pix-count=" + " --min-peaks="
+ str(self.message["user_data"]["crystfelMinPixCount"]) + str(self.message["user_data"]["min-peaks"])
+ ' --min-pix-count="
+ str(self.message["user_data"]["min-pix-count"])
+ " -i " + " -i "
+ str(self.message["run_number"]) + str(self.message["run_number"]) + "_" + trigger
+ ".list -o " + ".list -o "
+ data_file_name + data_file_name
+ ".stream -g " + ".stream -g "
@@ -432,7 +571,11 @@ class CollectedH5:
+ "_jf.geom" + "_jf.geom"
+ " -j `nproc` --min-res=75 " + " -j `nproc` --min-res=75 "
) )
if self.message["user_data"]["crystfelMultiCrystal"]: if self.message["user_data"]["retry"]:
f.write(" --retry")
if self.message["user_data"]["check-peaks"]:
f.write(" --check-peaks")
if self.message["user_data"]["multi"]:
f.write(" --multi" + ">& " + data_file_name + ".log\n") f.write(" --multi" + ">& " + data_file_name + ".log\n")
else: else:
f.write(" --no-multi" + ">& " + data_file_name + ".log\n") f.write(" --no-multi" + ">& " + data_file_name + ".log\n")
@@ -456,7 +599,7 @@ class CollectedH5:
return None return None
def submit_job_to_slurm(self): def submit_job_to_slurm(self, trigger):
""" """
submit job to SLURM (on RA or 6S/6D nodes) submit job to SLURM (on RA or 6S/6D nodes)
needs the slurm input file. needs the slurm input file.
@@ -470,11 +613,11 @@ class CollectedH5:
# sub.run(["sbatch", "run_SLURM"]) # sub.run(["sbatch", "run_SLURM"])
try: try:
slurm_out = sub.run(["sbatch", "run_SLURM"], capture_output=True) slurm_out = sub.run(["sbatch", "run_SLURM_" + trigger ], capture_output=True)
txt = slurm_out.stdout.decode().split() txt = slurm_out.stdout.decode().split()
# grep the slurm number # grep the slurm number
logger.info(f"submitted batch job number: {txt[-1]}") logger.info(f"submitted batch job number: {txt[-1]}")
self.message["SlurmJobID"] = str(txt[-1]) self.message["SlurmJobID_" + trigger] = str(txt[-1])
except Exception as e: except Exception as e:
logger.info("Could not submit SLURM job: {}".format(e)) logger.info("Could not submit SLURM job: {}".format(e))
@@ -557,18 +700,19 @@ if __name__ == "__main__":
context = zmq.Context() context = zmq.Context()
subscriber = context.socket(zmq.SUB) subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://sf-broker-01.psi.ch:5555") subscriber.connect("tcp://sf-broker-01.psi.ch:5555")
#subscriber.connect("tcp://sf-daqtest-01.psi.ch:5401")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "") subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
#-------------------------------------------------- #--------------------------------------------------
#UNcomment if handshake between publisher and subscriber is not neccesary #UNcomment if handshake between publisher and subscriber is not neccesary
ready_socket = context.socket(zmq.REQ) #ready_socket = context.socket(zmq.REQ)
ready_socket.connect("tcp://sf-broker-01.psi.ch:5556") #ready_socket.connect("tcp://sf-broker-01.psi.ch:5556")
# Notify publisher that subscriber is ready # Notify publisher that subscriber is ready
ready_socket.send(b"READY") #ready_socket.send(b"READY")
start_message= ready_socket.recv() #start_message= ready_socket.recv()
print(f"Received start message: {start_message}") #print(f"Received start message: {start_message}")
#---------------------------------------------------------------------- #----------------------------------------------------------------------
time.sleep(3) time.sleep(3)
@@ -596,10 +740,13 @@ if __name__ == "__main__":
mess_inp.create_geom_from_master() mess_inp.create_geom_from_master()
logger.info("geom file created") logger.info("geom file created")
mess_inp.create_list_file() mess_inp.create_list_file()
logger.info("list file created") logger.info("list files created")
mess_inp.create_slurm_script() # if we have two list files, loop with two different arguments "on" /"off":
logger.info("slurm submission script created") for i in ["on","off"]:
mess_inp.submit_job_to_slurm() mess_inp.create_slurm_script(i)
mess_inp.submit_job_to_slurm(i)
mess_inp.create_msg_file() mess_inp.create_msg_file()
logger.info("message dumped in processing folder") logger.info("message dumped in processing folder")
#code BLOCK what is happening to message #code BLOCK what is happening to message

View File

@@ -13,7 +13,8 @@ import MxdbVdpTools
# import matplotlib.pyplot as plt # import matplotlib.pyplot as plt
LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") #LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log")
LOG_FILENAME = time.strftime("/sf/cristallina/data/p21981/res/clara_%Y%m.log")
logger.add(LOG_FILENAME, level="INFO", rotation="100MB") logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
@@ -240,6 +241,7 @@ def get_data_from_streamfiles():
# old message is a dict with all the input message params # old message is a dict with all the input message params
# parse stream into dict # parse stream into dict
#TODO get on or off from streamfile naming and put it in output message
parsed_stream = stream_to_dictionary(sys.argv[1]) parsed_stream = stream_to_dictionary(sys.argv[1])
old_message["numberOfImages"] = len(parsed_stream) old_message["numberOfImages"] = len(parsed_stream)