From a2f05b768d5237a4e890a08758ff7dbbd0c2ed0e Mon Sep 17 00:00:00 2001 From: Assmann Greta Marie Date: Thu, 20 Jun 2024 16:48:20 +0200 Subject: [PATCH] JF adaptations --- src/clara.py | 140 +++++++++++++++++++++++++++++-------------------- src/results.py | 30 +---------- 2 files changed, 84 insertions(+), 86 deletions(-) diff --git a/src/clara.py b/src/clara.py index 643890c..b342217 100644 --- a/src/clara.py +++ b/src/clara.py @@ -84,19 +84,20 @@ class CollectedH5: def mk_cd_output_dir_bl(self): """ mk putput dir with pathlib and change into this dir. - Output dir to /sf/cristallina/data.. will be written as gac-account + Output dir to /sf/cristallina/data/.. will be written as gac-account + pa = "/sf/cristallina/data/" :return: None + """ # Assemble output dir path now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - pgroup = str(self.message["user_data"]["paccount"]) - merge_id = str(self.message["user_data"]["mergeID"]) + pgroup = str(self.message["experiment_group"]) res = "res" - data_path = "data" + #data_path = "data" file_name = Path(str(self.message["filename"][:-3]) + "_" + str(now)) - out_path = pa / pgroup / res / merge_id/ data_path /file_name + out_path = pa / pgroup / res / file_name logger.info(f"processing folder will be created at : {out_path}") try: out_path.mkdir(parents=True, exist_ok=True) @@ -250,28 +251,35 @@ class CollectedH5: processing folder with the corresponding mergeID for processing :return: - """ - merge_id = str(self.message["user_data"]["mergeID"]) - f = open(merge_id + ".cell", "w") - # start writing the cell file - f.write("CrystFEL unit cell file version 1.0\n\n") - # get lattice params and write to file - space_group = self.convert_spg_num(self.message["user_data"]["spaceGroupNumber"]) - lat_type, unique_a, cent = self.get_spaceg_params(space_group) - f.write("lattice_type = " + lat_type[2:].lower() + "\n") - f.write("centering = " + cent + "\n") - if unique_a != "*": - f.write("unique_axis = " + unique_a + "\n\n") - else: - f.write("\n\n") - # print unit cell constants - f.write("a = " + str(self.message["user_data"]["unitCell"]["a"]) + " A\n") - f.write("b = " + str(self.message["user_data"]["unitCell"]["b"]) + " A\n") - f.write("c = " + str(self.message["user_data"]["unitCell"]["c"]) + " A\n") - f.write("al = " + str(self.message["user_data"]["unitCell"]["alpha"]) + " deg\n") - f.write("be = " + str(self.message["user_data"]["unitCell"]["beta"]) + " deg\n") - f.write("ga = " + str(self.message["user_data"]["unitCell"]["gamma"]) + " deg\n") - f.close() + + #check if cell parameters were sent with the message or not + if "space_group_number" in self.message: + name = str(self.message["run_number"]) + f = open(name + ".cell", "w") + # start writing the cell file + f.write("CrystFEL unit cell file version 1.0\n\n") + # get lattice params and write to file + space_group = self.convert_spg_num(self.message["space_group_number"]) + lat_type, unique_a, cent = self.get_spaceg_params(space_group) + f.write("lattice_type = " + lat_type[2:].lower() + "\n") + f.write("centering = " + cent + "\n") + if unique_a != "*": + f.write("unique_axis = " + unique_a + "\n\n") + else: + f.write("\n\n") + + # print unit cell constants + f.write("a = " + str(self.message["unit_cell"]["a"]) + " A\n") + f.write("b = " + str(self.message["unit_cell"]["b"]) + " A\n") + f.write("c = " + str(self.message["unit_cell"]["c"]) + " A\n") + f.write("al = " + str(self.message["unit_cell"]["alpha"]) + " deg\n") + f.write("be = " + str(self.message["unit_cell"]["beta"]) + " deg\n") + f.write("ga = " + str(self.message["unit_cell"]["gamma"]) + " deg\n") + f.close() + else: + logger.info("no space group sent, no cell file written") + return None return None def create_geom_from_master(self): @@ -282,20 +290,20 @@ class CollectedH5: :return: none """ - merge_id = str(self.message["user_data"]["mergeID"]) - f2 = open(merge_id + "_jf.geom", "w") + name = str(self.message["run_number"]) + f2 = open(name + "_jf.geom", "w") f2.write("; PSI JF8M \n") f2.write("\n") f2.write("\n") f2.write("; Camera length (in m) and photon energy (eV) \n") - f2.write("clen = " + str(self.message["user_data"]["detectorDistance_mm"] * 0.001) + "\n") - f2.write("photon_energy = " + str(self.message["user_data"]["enery_kev"] * 1000) + "\n") - f2.write("flag_lessthan = " + str(self.message["user_data"]["underload"]) + "\n") + f2.write("clen = " + str(self.message["detector_distance_m"]) + "\n") + f2.write("photon_energy = " + str(self.message["photon_energy_eV"]) + "\n") + f2.write("flag_lessthan = " + str(self.message["underload"]) + "\n") f2.write("\n") #f2.write("adu_per_eV = 0.00008065\n") f2.write("adu_per_eV = 0.00008285\n") # f2.write("adu_per_photon = 1\n") - f2.write("res = 13333.3 ; " + str(self.message["user_data"]["pixelSize_um"]) + " micron pixel size\n") + f2.write("res = 13333.3 ; " + str(self.message["pixel_size_m"] *1000000) + " micron pixel size\n") f2.write("\n") f2.write("rigid_group_0 = 0 \n") f2.write("rigid_group_collection_0 = 0 \n") @@ -309,27 +317,25 @@ class CollectedH5: f2.write("\n") # Assembling path for master file - pgroup = str(self.message["user_data"]["paccount"]) + pgroup = str(self.message["experiment_group"]) raw = "raw" - data_path = "data" - merge_id = str(self.message["user_data"]["mergeID"]) master_file = str(self.message["filename"][:-11]) + str("master.h5") - master_file_path = pa / pgroup / raw / merge_id / data_path / master_file + master_file_path = pa / pgroup / raw / master_file - f2.write("mask_file ="+ str(master_file_path) +"\n") - f2.write("mask = /entry/instrument/detector/pixel_mask \n") - f2.write("mask_good = 0x0 \n") - f2.write("mask_bad = 0xFFFFFFFF\n") + f2.write(";mask_file ="+ str(master_file_path) +"\n") + f2.write(";mask = /entry/instrument/detector/pixel_mask \n") + f2.write(";mask_good = 0x0 \n") + f2.write(";mask_bad = 0xFFFFFFFF\n") f2.write("\n") f2.write("; corner_{x,y} set the position of the corner of the detector (in pixels) \n") f2.write("; relative to the beam \n") f2.write("\n") f2.write("0/min_fs = 0 \n") f2.write("0/min_ss = 0 \n") - f2.write("0/max_fs =" + str(self.message["user_data"]["detectorWidth_pxl"] - 1) + "\n") - f2.write("0/max_ss =" + str(self.message["user_data"]["detectorHeight_pxl"] - 1) + "\n") - f2.write("0/corner_x = -" + str(self.message["user_data"]["beamCenterX_pxl"]) + "\n") - f2.write("0/corner_y = -" + str(self.message["user_data"]["beamCenterY_pxl"]) + "\n") + f2.write("0/max_fs =" + str(self.message["detector_height_pxl"] ) + "\n") + f2.write("0/max_ss =" + str(self.message["detector_width_pxl"] ) + "\n") + f2.write("0/corner_x = -" + str(self.message["beam_x_pxl"]) + "\n") + f2.write("0/corner_y = -" + str(self.message["beam_y_pxl"]) + "\n") f2.write("0/fs = x \n") f2.write("0/ss = y \n") f2.write("\n") @@ -350,20 +356,21 @@ class CollectedH5: :return:None """ #Assemble path for raw data - pgroup = str(self.message["user_data"]["paccount"]) + pgroup = str(self.message["experiment_group"]) raw = "raw" - data_path = "data" - merge_id = str(self.message["user_data"]["mergeID"]) filen = str(self.message["filename"]) - file_path = pa / pgroup / raw / merge_id / data_path / filen + file_path = pa / pgroup / raw / filen # write to cell file in output folder - f = open(merge_id + ".list", "w") + name = str(self.message["run_number"]) + f = open(name + ".list", "w") f.write(str(file_path)) f.close() + #Insert new key into message for triggered/non tirggered + self.message["trigger"] = "on" return None def create_slurm_script(self): @@ -381,12 +388,15 @@ class CollectedH5: """ # get dat file name without any preceding paths.. #last_pos = str(self.message["dataFileName"]).rfind("/") - data_file_name = str(self.message["filename"][: -3]) + #data_file_name = str(self.message["filename"][: -3]) + data_file_name = str(self.message["run_number"]) # write file f = open("run_SLURM", "w") f.write("#!/bin/bash \n") f.write("#SBATCH --job-name=index \n") + f.write("#SBATCH --partition=prod-aramis \n") + #f.write("#SBATCH --nodelist=sf-cn-21 \n") # uncomment if on RA # f.write("#SBATCH --partition=hour \n") f.write("#SBATCH --cpus-per-task=32 \n") @@ -397,7 +407,7 @@ class CollectedH5: f.write("# Load modules \n") f.write("module purge \n") f.write("module use MX unstable \n") - f.write("module load crystfel/0.10.2 \n") + f.write("module load crystfel/0.10.2-rhel8 \n") # f.write( # "module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n" # ) @@ -406,18 +416,19 @@ class CollectedH5: f.write( " indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold=" + str(int(self.message["user_data"]["crystfelTreshold"])) - + " --int-radius=2,3,5 -p " - + str(self.message["user_data"]["mergeID"]) + #+ " --int-radius=2,3,5 " + + " -p " + + str(self.message["run_number"]) + ".cell --min-snr=" + str(self.message["user_data"]["crystfelMinSNR"]) + " --min-peaks=6 --min-pix-count=" + str(self.message["user_data"]["crystfelMinPixCount"]) + " -i " - + str(self.message["user_data"]["mergeID"]) + + str(self.message["run_number"]) + ".list -o " + data_file_name + ".stream -g " - + str(self.message["user_data"]["mergeID"]) + + str(self.message["run_number"]) + "_jf.geom " + " -j `nproc` --min-res=75 " ) @@ -542,13 +553,28 @@ if __name__ == "__main__": elif sys.argv[1] == "z": pa = Path("/sf/cristallina/data/") # potential message recieving: - logger.info("SUBscribing to ZeroMQ PUBlisher") + logger.info("SUBscribing to ZeroMQ PUBlisher.. connecting ...") context = zmq.Context() subscriber = context.socket(zmq.SUB) subscriber.connect("tcp://sf-broker-01.psi.ch:5555") subscriber.setsockopt_string(zmq.SUBSCRIBE, "") + + + #-------------------------------------------------- + #UNcomment if handshake between publisher and subscriber is not neccesary + ready_socket = context.socket(zmq.REQ) + ready_socket.connect("tcp://sf-broker-01.psi.ch:5556") + + # Notify publisher that subscriber is ready + ready_socket.send(b"READY") + start_message= ready_socket.recv() + print(f"Received start message: {start_message}") + #---------------------------------------------------------------------- + + time.sleep(3) logger.info("connected to ZeroMQ PUBlisher") + TERMINATE_SERVER = False signal.signal(signal.SIGINT, sigint_handler) @@ -573,7 +599,7 @@ if __name__ == "__main__": logger.info("list file created") mess_inp.create_slurm_script() logger.info("slurm submission script created") - #mess_inp.submit_job_to_slurm() + mess_inp.submit_job_to_slurm() mess_inp.create_msg_file() logger.info("message dumped in processing folder") #code BLOCK what is happening to message diff --git a/src/results.py b/src/results.py index 76a34c0..87aacb2 100644 --- a/src/results.py +++ b/src/results.py @@ -13,10 +13,7 @@ import MxdbVdpTools # import matplotlib.pyplot as plt -LOG_FILENAME = time.strftime("/sls/MX/Data10/e20233/log/vdp_%Y%m.log") # as eaccount at beamline -# LOG_FILENAME = time.strftime("/sls/MX/Data10-staff/e19370/log/vdp_%Y%m.log") # as eaccount at beamline -# LOG_FILENAME = time.strftime("/home/assman_g/Documents/log/vdp_%Y%m.log") # as assman_g at beamline -# LOG_FILENAME = time.strftime("/das/home/assman_g/vdp_%Y%m.log") # on RA +LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log") logger.add(LOG_FILENAME, level="INFO", rotation="100MB") @@ -353,31 +350,6 @@ if __name__ == "__main__": # insert message to DB _id = mxdb.insert(results_message) - #EXAMPLE MESSAGE { - # "mergeID": "something_mergeID", - # "trackingId": "something_track", - # "eaccount": "e19370", - # "masterFileName": "Lyso_12p4keV_1kHz_150mm_run000026_master.h5", - # "dataFileName": "Lyso_12p4keV_1kHz_150mm_run000026_data_000010.h5", - # "filesystemPath": "/das/work/p19/p19607/FromGreta/REDML_indx_data/lyso/processing/", - # "detectorDistance_mm": 150.0, - # "beamCenterX_pxl": 1103.7, - # "beamCenterY_pxl": 1175.1, - # "pixelSize_um": 75, - # "numberOfImages": 10000, - # "imageTime_us": 100, - # "enery_kev": 12398.0, - # "detectorWidth_pxl": 2067, - # "detectorHeight_pxl": 2163, - # "underload": -30000, - # "overload": 30000, - # "unitCell": {"a": 79.5, "b": 79.5, "c": 38.6, "alpha": 90.0, "beta": 90.0, "gamma": 90.0}, - # "spaceGroupNumber": 96, - # "crystfelTreshold": 6.0, - # "crystfelMinSNR": 4.0, - # "crystfelMinPixCount": 1, - # "crystfelMultiCrystal": False, - #} logger.info("message inserted to DB")