From 4b8bab43effb07f31dfc6a2654aeb3beae493ce6 Mon Sep 17 00:00:00 2001 From: Appleby Martin Vears Date: Mon, 4 Nov 2024 13:04:16 +0100 Subject: [PATCH] results and clara changes --- src/clara.py | 211 +++++++++++++++++++++++++++++++++++++++++-------- src/results.py | 4 +- 2 files changed, 182 insertions(+), 33 deletions(-) diff --git a/src/clara.py b/src/clara.py index b342217..453a20d 100644 --- a/src/clara.py +++ b/src/clara.py @@ -5,6 +5,8 @@ import datetime import json import os import signal +import h5py +import numpy as np import subprocess as sub import sys import time @@ -16,7 +18,7 @@ from loguru import logger import receive_msg #define log file place: -LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") +LOG_FILENAME = time.strftime("/sf/cristallina/data/p21981/res/clara_%Y%m.log") logger.add(LOG_FILENAME, level="INFO", rotation="100MB") @@ -332,8 +334,8 @@ class CollectedH5: f2.write("\n") f2.write("0/min_fs = 0 \n") f2.write("0/min_ss = 0 \n") - f2.write("0/max_fs =" + str(self.message["detector_height_pxl"] ) + "\n") - f2.write("0/max_ss =" + str(self.message["detector_width_pxl"] ) + "\n") + f2.write("0/max_fs =" + str(self.message["detector_width_pxl"] -1 ) + "\n") + f2.write("0/max_ss =" + str(self.message["detector_height_pxl"] -1) + "\n") f2.write("0/corner_x = -" + str(self.message["beam_x_pxl"]) + "\n") f2.write("0/corner_y = -" + str(self.message["beam_y_pxl"]) + "\n") f2.write("0/fs = x \n") @@ -355,25 +357,151 @@ class CollectedH5: Function to generate a list file with the path of the input H5 file :return:None """ + print('writing files') #Assemble path for raw data pgroup = str(self.message["experiment_group"]) raw = "raw" + data = "data" filen = str(self.message["filename"]) file_path = pa / pgroup / raw / filen # write to cell file in output folder name = str(self.message["run_number"]) - f = open(name + ".list", "w") - f.write(str(file_path)) + #f = open(name + ".list", "w") + #f.write(str(file_path)) + #f.close() + bsdata_name = filen.split('.')[0]+'.BSDATA.h5' + + bsdata_path= pa/ pgroup /raw / bsdata_name + + print(bsdata_path) + while not bsdata_path.exists(): + time.sleep(1) + print('survived the while loop') + try: + bsdata = h5py.File(bsdata_path, "r") #r"/sf/cristallina/data/p21630/raw/run0065-lov_movestop_normal_1/data/acq0001.BSDATA.h5" + except Exception as e: + print(f"didn't open bsdata due to error {e}") #_logger.error(f"Cannot open {data_file} (due to {e})") + + #print(pulseids_JF[0]) + + pulseids_BS = bsdata[f"/SAR-CVME-TIFALL6:EvtSet/pulse_id"][:] + + #print(pulseids_BS[0]) + + evt_set=bsdata[f"/SAR-CVME-TIFALL6:EvtSet/data"][:] + + jf_path= file_path + #print(jf_path) + try: + #r"/sf/cristallina/data/p21630/raw/run0065-lov_movestop_normal_1/data/acq0001.JF17T16V01.h5" + x = h5py.File(jf_path, "r") + except Exception as e: + print(f"didn't open JF17T16V01.h5 due to error {e}") #_logger.error(f"Cannot open {data_file} (due to {e})") + return + pulseids_JF = x[f"/entry/xfel/pulseID"][:] + #pulseids in JFJ joch path + #pulseids_JF = x[f"/data/{detector}/pulse_id"][:] + + + + for i, pulse_id in enumerate(pulseids_BS): + pulseids_BS[i]=pulse_id-1 + + n_pulse_id = len(pulseids_JF) #- maybe not needed ? + + #if f"/data/{detector}/is_good_frame" in bsdata: + # is_good_frame = evt_set=bsdata[f"/SAR-CVME-TIFALL6:EvtSet/data"][:][f"/data/{detector}/is_good_frame"][:] + #else: + # is_good_frame = [1] * n_pulse_id + + #daq_recs = x[f"/data/{detector}/daq_rec"][:] + + #nGoodFrames = 0 + #nProcessedFrames = 0 + + index_dark = [] + index_light = [] + blanks = [] + + for i in range(n_pulse_id): - f.close() + #if not is_good_frame[i]: + # continue - #Insert new key into message for triggered/non tirggered - self.message["trigger"] = "on" + #nGoodFrames += 1 + #nProcessedFrames += 1 + + p = pulseids_JF[i] + q = pulseids_BS[i] + #print(p) + #print(q) + if p != q: + #print(f'acquisition = {acquisition}') + #print(f'Jungfrau pulse id {p} != BS data id {q} for image {i}') + event_i = np.where(pulseids_JF == q)[0] + event_i = i + #print(f'new i = {event_i}, not {i}, now calling BS data for pulse id {pulseids_BS[event_i]}') + else: + event_i=i + + events=evt_set[event_i] + #print(evt_set) + #print(e) + #print(event_i) + #print(i) + if events[216] and events[200]: + index_light.append(i) + + elif events[200]: + index_dark.append(i) + + else: + #print('Should only be here because 200 is false. 200 is {0}'.format(e[200])) + blanks.append(i) + + bsdata.close() + x.close() + + acq_dark = [] + acq_light = [] + acq_blank = [] + delim = "//" + + if index_dark: + for frame_number in index_dark: + acq_dark.append(f"{jf_path} {delim}{frame_number}") + + file_off = name+'_off.list' + + logger.info(f"List of dark frames : {file_off} , {len(index_dark)} frames") + + with open(file_off, "w") as f_list: + for frame in acq_dark: + print(f"{frame}", file = f_list) + + if index_light: + for frame_number in index_light: + acq_light.append(f"{jf_path} {delim}{frame_number}") + file_on = name+'_on.list' + logger.info(f"List of light frames : {file_on} , {len(index_light)} frames") + with open(file_on, "w") as f_list: + for frame in acq_light: + print(f"{frame}", file = f_list) + + if blanks: + for frame_number in blanks: + acq_blank.append(f"{jf_path} {delim}{frame_number}") + file_blank = name+'_blank.list' + with open(file_blank, "w") as f_list: + for frame in acq_blank: + print(f"{frame}", file = f_list) + + return None - def create_slurm_script(self): + def create_slurm_script(self,trigger): """ Creates the input SLURM file with the following info: SLURM parameters ( CPUS , nodes, etc) @@ -389,10 +517,10 @@ class CollectedH5: # get dat file name without any preceding paths.. #last_pos = str(self.message["dataFileName"]).rfind("/") #data_file_name = str(self.message["filename"][: -3]) - data_file_name = str(self.message["run_number"]) + data_file_name = str(self.message["run_number"])+'_'+trigger # write file - f = open("run_SLURM", "w") + f = open("run_SLURM_" + trigger, "w") f.write("#!/bin/bash \n") f.write("#SBATCH --job-name=index \n") f.write("#SBATCH --partition=prod-aramis \n") @@ -414,25 +542,40 @@ class CollectedH5: f.write("\n\n") f.write("# Actual Indexing command for crystFEL \n") f.write( - " indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold=" - + str(int(self.message["user_data"]["crystfelTreshold"])) - #+ " --int-radius=2,3,5 " + " indexamajig --peaks=" + + str(self.message["user_data"]["peaks"] + + " --indexing=" + + str(self.message["user_data"]["indexing"] + + " --xgandalf-fast-execution --threshold=" + + str(self.message["user_data"]["threshold"]) + + " --tolerance=" + + str(self.message["user_data"]["tolerance"]) + + " --int-radius=" + + str(self.message["user_data"]["int-radius"]) + + " --integration=" + + str(self.message["user_data"]["integration"] + " -p " + str(self.message["run_number"]) + ".cell --min-snr=" - + str(self.message["user_data"]["crystfelMinSNR"]) - + " --min-peaks=6 --min-pix-count=" - + str(self.message["user_data"]["crystfelMinPixCount"]) + + str(self.message["user_data"]["min-snr"]) + + " --min-peaks=" + + str(self.message["user_data"]["min-peaks"]) + + ' --min-pix-count=" + + str(self.message["user_data"]["min-pix-count"]) + " -i " - + str(self.message["run_number"]) + + str(self.message["run_number"]) + "_" + trigger + ".list -o " + data_file_name + ".stream -g " + str(self.message["run_number"]) - + "_jf.geom " + + "_jf.geom" + " -j `nproc` --min-res=75 " ) - if self.message["user_data"]["crystfelMultiCrystal"]: + if self.message["user_data"]["retry"]: + f.write(" --retry") + if self.message["user_data"]["check-peaks"]: + f.write(" --check-peaks") + if self.message["user_data"]["multi"]: f.write(" --multi" + ">& " + data_file_name + ".log\n") else: f.write(" --no-multi" + ">& " + data_file_name + ".log\n") @@ -456,7 +599,7 @@ class CollectedH5: return None - def submit_job_to_slurm(self): + def submit_job_to_slurm(self, trigger): """ submit job to SLURM (on RA or 6S/6D nodes) needs the slurm input file. @@ -470,11 +613,11 @@ class CollectedH5: # sub.run(["sbatch", "run_SLURM"]) try: - slurm_out = sub.run(["sbatch", "run_SLURM"], capture_output=True) + slurm_out = sub.run(["sbatch", "run_SLURM_" + trigger ], capture_output=True) txt = slurm_out.stdout.decode().split() # grep the slurm number logger.info(f"submitted batch job number: {txt[-1]}") - self.message["SlurmJobID"] = str(txt[-1]) + self.message["SlurmJobID_" + trigger] = str(txt[-1]) except Exception as e: logger.info("Could not submit SLURM job: {}".format(e)) @@ -557,18 +700,19 @@ if __name__ == "__main__": context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://sf-broker-01.psi.ch:5555") + #subscriber.connect("tcp://sf-daqtest-01.psi.ch:5401") subscriber.setsockopt_string(zmq.SUBSCRIBE, "") #-------------------------------------------------- #UNcomment if handshake between publisher and subscriber is not neccesary - ready_socket = context.socket(zmq.REQ) - ready_socket.connect("tcp://sf-broker-01.psi.ch:5556") + #ready_socket = context.socket(zmq.REQ) + #ready_socket.connect("tcp://sf-broker-01.psi.ch:5556") # Notify publisher that subscriber is ready - ready_socket.send(b"READY") - start_message= ready_socket.recv() - print(f"Received start message: {start_message}") + #ready_socket.send(b"READY") + #start_message= ready_socket.recv() + #print(f"Received start message: {start_message}") #---------------------------------------------------------------------- time.sleep(3) @@ -596,10 +740,13 @@ if __name__ == "__main__": mess_inp.create_geom_from_master() logger.info("geom file created") mess_inp.create_list_file() - logger.info("list file created") - mess_inp.create_slurm_script() - logger.info("slurm submission script created") - mess_inp.submit_job_to_slurm() + logger.info("list files created") + # if we have two list files, loop with two different arguments "on" /"off": + for i in ["on","off"]: + mess_inp.create_slurm_script(i) + mess_inp.submit_job_to_slurm(i) + + mess_inp.create_msg_file() logger.info("message dumped in processing folder") #code BLOCK what is happening to message diff --git a/src/results.py b/src/results.py index 87aacb2..17a22ce 100644 --- a/src/results.py +++ b/src/results.py @@ -13,7 +13,8 @@ import MxdbVdpTools # import matplotlib.pyplot as plt -LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") +#LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") +LOG_FILENAME = time.strftime("/sf/cristallina/data/p21981/res/clara_%Y%m.log") logger.add(LOG_FILENAME, level="INFO", rotation="100MB") @@ -240,6 +241,7 @@ def get_data_from_streamfiles(): # old message is a dict with all the input message params # parse stream into dict + #TODO get on or off from streamfile naming and put it in output message parsed_stream = stream_to_dictionary(sys.argv[1]) old_message["numberOfImages"] = len(parsed_stream)