diff --git a/src/clara.py b/src/clara.py index 783afb8..643890c 100644 --- a/src/clara.py +++ b/src/clara.py @@ -1,4 +1,4 @@ -# Author: Assmann G. (2023) +# Author: Assmann G. (2024) import contextlib import datetime @@ -8,6 +8,7 @@ import signal import subprocess as sub import sys import time +import zmq from pathlib import Path from loguru import logger @@ -22,7 +23,7 @@ logger.add(LOG_FILENAME, level="INFO", rotation="100MB") # hardcoded data path for e20233, as VDP is only used by e20233 so far for now. If this needs to be changed , change # in function mk_cd_output_dir_bl some commented lines -pa = Path("/sls/MX/Data10/e20233") +#pa = Path("/sf/cristallina/data") class StreamToLogger: @@ -51,8 +52,9 @@ def main(): def sigint_handler(signum, frame): global TERMINATE_SERVER - print("CTRL-C caught --- Terminating VDP now") + print("CTRL-C caught --- Terminating now") TERMINATE_SERVER = True + print(TERMINATE_SERVER) def to_json(obj): @@ -82,26 +84,19 @@ class CollectedH5: def mk_cd_output_dir_bl(self): """ mk putput dir with pathlib and change into this dir. - Output dir to MX/Data10/exxx/ ... can only be written as e account + Output dir to /sf/cristallina/data.. will be written as gac-account :return: None """ - # generate output dir + # Assemble output dir path now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - pgroup = "p" + str(self.message["eaccount"][1:3]) - eaccount = "e" + str(self.message["eaccount"][1:]) - merge_id = str(self.message["mergeID"]) - # if first char is a slash, get rid of it - if (str(self.message["dataFileName"][0])) == "/": - file_name = Path(str(self.message["dataFileName"][1:-3]) + "_" + str(now)) - # if not use the full path - else: - file_name = Path(str(self.message["dataFileName"][:-3]) + "_" + str(now)) - # today = str(date.today()) - # if you dont want to use the hard coded path anymore, but the eaccount from the message, uncomment: - # p = Path("/sls") - # out_path = p / "MX" / "Data10" / eaccount / "vespa_vdp" / merge_id / file_name - # TODO add random number or second to processing folder - out_path = pa / "vespa_vdp" / merge_id / file_name + pgroup = str(self.message["user_data"]["paccount"]) + merge_id = str(self.message["user_data"]["mergeID"]) + res = "res" + data_path = "data" + + + file_name = Path(str(self.message["filename"][:-3]) + "_" + str(now)) + out_path = pa / pgroup / res / merge_id/ data_path /file_name logger.info(f"processing folder will be created at : {out_path}") try: out_path.mkdir(parents=True, exist_ok=True) @@ -116,33 +111,6 @@ class CollectedH5: return None - def mk_cd_output_dir_ra(self): - """ - mk putput dir with pathlib and change into this dir. - :return: None - - """ - # generate output dir - pgroup = "p" + str(self.message["eaccount"][1:3]) - paccount = "p" + str(self.message["eaccount"][1:]) - merge_id = str(self.message["mergeID"]) - file_name = str(self.message["dataFileName"][:-3]) - today = str(date.today()) - p = Path("/das") - out_path = p / "work" / pgroup / paccount / "vespa" / today / merge_id / file_name - logger.info(f"processing folder is created at : {out_path}") - try: - out_path.mkdir(parents=True, exist_ok=True) - except Exception as e: - logger.info("could not create processing directory {}".format(e)) - - # change into output dir - try: - os.chdir(out_path) - except Exception as e: - logger.info("Could not cd into processing directory: {}".format(e)) - - return None def convert_spg_num(self, sg: int): """ @@ -232,7 +200,7 @@ class CollectedH5: latt = None ua = None cen = sg[0] - print(len(sg)) + #print(len(sg)) if sg[1] == "1": latt = "L_TRICLINIC" ua = "*" @@ -282,12 +250,12 @@ class CollectedH5: processing folder with the corresponding mergeID for processing :return: - """ - merge_id = str(self.message["mergeID"]) + merge_id = str(self.message["user_data"]["mergeID"]) f = open(merge_id + ".cell", "w") # start writing the cell file f.write("CrystFEL unit cell file version 1.0\n\n") # get lattice params and write to file - space_group = self.convert_spg_num(self.message["spaceGroupNumber"]) + space_group = self.convert_spg_num(self.message["user_data"]["spaceGroupNumber"]) lat_type, unique_a, cent = self.get_spaceg_params(space_group) f.write("lattice_type = " + lat_type[2:].lower() + "\n") f.write("centering = " + cent + "\n") @@ -297,37 +265,37 @@ class CollectedH5: f.write("\n\n") # print unit cell constants - f.write("a = " + str(self.message["unitCell"]["a"]) + " A\n") - f.write("b = " + str(self.message["unitCell"]["b"]) + " A\n") - f.write("c = " + str(self.message["unitCell"]["c"]) + " A\n") - f.write("al = " + str(self.message["unitCell"]["alpha"]) + " deg\n") - f.write("be = " + str(self.message["unitCell"]["beta"]) + " deg\n") - f.write("ga = " + str(self.message["unitCell"]["gamma"]) + " deg\n") + f.write("a = " + str(self.message["user_data"]["unitCell"]["a"]) + " A\n") + f.write("b = " + str(self.message["user_data"]["unitCell"]["b"]) + " A\n") + f.write("c = " + str(self.message["user_data"]["unitCell"]["c"]) + " A\n") + f.write("al = " + str(self.message["user_data"]["unitCell"]["alpha"]) + " deg\n") + f.write("be = " + str(self.message["user_data"]["unitCell"]["beta"]) + " deg\n") + f.write("ga = " + str(self.message["user_data"]["unitCell"]["gamma"]) + " deg\n") f.close() return None def create_geom_from_master(self): """ generates the geom file from the input message for processing wih Crystfel . + write to mergeid_jf.geom file in processing folder :param self: :return: none """ - merge_id = str(self.message["mergeID"]) - - # write to mergeid_jf.geom file in processing folder + merge_id = str(self.message["user_data"]["mergeID"]) f2 = open(merge_id + "_jf.geom", "w") - f2.write("; PSI JF9M \n") + f2.write("; PSI JF8M \n") f2.write("\n") f2.write("\n") f2.write("; Camera length (in m) and photon energy (eV) \n") - f2.write("clen = " + str(self.message["detectorDistance_mm"] * 0.001) + "\n") - f2.write("photon_energy = " + str(self.message["enery_kev"] * 1000) + "\n") - f2.write("flag_lessthan = " + str(self.message["underload"]) + "\n") + f2.write("clen = " + str(self.message["user_data"]["detectorDistance_mm"] * 0.001) + "\n") + f2.write("photon_energy = " + str(self.message["user_data"]["enery_kev"] * 1000) + "\n") + f2.write("flag_lessthan = " + str(self.message["user_data"]["underload"]) + "\n") f2.write("\n") - f2.write("adu_per_eV = 0.00008065\n") + #f2.write("adu_per_eV = 0.00008065\n") + f2.write("adu_per_eV = 0.00008285\n") # f2.write("adu_per_photon = 1\n") - f2.write("res = 13333.3 ; " + str(self.message["pixelSize_um"]) + " micron pixel size\n") + f2.write("res = 13333.3 ; " + str(self.message["user_data"]["pixelSize_um"]) + " micron pixel size\n") f2.write("\n") f2.write("rigid_group_0 = 0 \n") f2.write("rigid_group_collection_0 = 0 \n") @@ -339,10 +307,16 @@ class CollectedH5: f2.write("data = /entry/data/data \n") f2.write("\n") f2.write("\n") - if str(self.message["masterFileName"])[0] == "/": - f2.write("mask_file =" + str(pa.resolve()) + self.message["masterFileName"] + "\n") - else: - f2.write("mask_file =" + str(pa.resolve()) + "/" + self.message["masterFileName"] + "\n") + + # Assembling path for master file + pgroup = str(self.message["user_data"]["paccount"]) + raw = "raw" + data_path = "data" + merge_id = str(self.message["user_data"]["mergeID"]) + master_file = str(self.message["filename"][:-11]) + str("master.h5") + master_file_path = pa / pgroup / raw / merge_id / data_path / master_file + + f2.write("mask_file ="+ str(master_file_path) +"\n") f2.write("mask = /entry/instrument/detector/pixel_mask \n") f2.write("mask_good = 0x0 \n") f2.write("mask_bad = 0xFFFFFFFF\n") @@ -352,17 +326,15 @@ class CollectedH5: f2.write("\n") f2.write("0/min_fs = 0 \n") f2.write("0/min_ss = 0 \n") - f2.write("0/max_fs =" + str(self.message["detectorWidth_pxl"] - 1) + "\n") - f2.write("0/max_ss =" + str(self.message["detectorHeight_pxl"] - 1) + "\n") - f2.write("0/corner_x = -" + str(self.message["beamCenterX_pxl"]) + "\n") - f2.write("0/corner_y = -" + str(self.message["beamCenterY_pxl"]) + "\n") + f2.write("0/max_fs =" + str(self.message["user_data"]["detectorWidth_pxl"] - 1) + "\n") + f2.write("0/max_ss =" + str(self.message["user_data"]["detectorHeight_pxl"] - 1) + "\n") + f2.write("0/corner_x = -" + str(self.message["user_data"]["beamCenterX_pxl"]) + "\n") + f2.write("0/corner_y = -" + str(self.message["user_data"]["beamCenterY_pxl"]) + "\n") f2.write("0/fs = x \n") f2.write("0/ss = y \n") f2.write("\n") - # f2.write("badregionA/min_fs = 774 \n") - # f2.write("badregionA/max_fs = 1032 \n") - # f2.write("badregionA/min_ss = 0 \n") - # f2.write("badregionA/max_ss = 256 \n") + + # BAD REGION EXAMPLE # f2.write("\n") # f2.write("badregionB/min_fs = 256 \n") # f2.write("badregionB/max_fs = 774 \n") @@ -377,33 +349,19 @@ class CollectedH5: Function to generate a list file with the path of the input H5 file :return:None """ - merge_id = str(self.message["mergeID"]) + #Assemble path for raw data + pgroup = str(self.message["user_data"]["paccount"]) + raw = "raw" + data_path = "data" + merge_id = str(self.message["user_data"]["mergeID"]) + filen = str(self.message["filename"]) + file_path = pa / pgroup / raw / merge_id / data_path / filen # write to cell file in output folder f = open(merge_id + ".list", "w") - print(pa.resolve()) - if (str(self.message["dataFileName"][0])) == "/": - f.write(str(pa.resolve()) + str(self.message["dataFileName"])) - else: - f.write(str(pa.resolve()) + "/" + str(self.message["dataFileName"])) + f.write(str(file_path)) + - """ - if count == 0: - print("count 0") - f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"])) - # if count =1 and at beginning - elif count == 1 and (str(self.message["dataFileName"][0])) == "/": - print("count 1 and first char") - # remove first char - f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][1:])) - # else if count >0 and not at beginning - elif count > 0: - print("count more and middle") - # get position of last "/" and remove until then - last_pos = self.message["dataFileName"].rfind("/") - print("last_pos", last_pos) - f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][(last_pos + 1) :])) - """ f.close() return None @@ -422,8 +380,8 @@ class CollectedH5: :return: None """ # get dat file name without any preceding paths.. - last_pos = str(self.message["dataFileName"]).rfind("/") - data_file_name = str(self.message["dataFileName"][(last_pos + 1) : -3]) + #last_pos = str(self.message["dataFileName"]).rfind("/") + data_file_name = str(self.message["filename"][: -3]) # write file f = open("run_SLURM", "w") @@ -439,32 +397,31 @@ class CollectedH5: f.write("# Load modules \n") f.write("module purge \n") f.write("module use MX unstable \n") - # f.write("module load crystfel/0.10.2 \n") - # TODO ask Leo to install libs on CN for crystfel/0.10.2 - f.write( - "module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n" - ) + f.write("module load crystfel/0.10.2 \n") + # f.write( + # "module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n" + # ) f.write("\n\n") f.write("# Actual Indexing command for crystFEL \n") f.write( " indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold=" - + str(int(self.message["crystfelTreshold"])) + + str(int(self.message["user_data"]["crystfelTreshold"])) + " --int-radius=2,3,5 -p " - + str(self.message["mergeID"]) + + str(self.message["user_data"]["mergeID"]) + ".cell --min-snr=" - + str(self.message["crystfelMinSNR"]) + + str(self.message["user_data"]["crystfelMinSNR"]) + " --min-peaks=6 --min-pix-count=" - + str(self.message["crystfelMinPixCount"]) + + str(self.message["user_data"]["crystfelMinPixCount"]) + " -i " - + str(self.message["mergeID"]) + + str(self.message["user_data"]["mergeID"]) + ".list -o " - + data_file_name + + data_file_name + ".stream -g " - + str(self.message["mergeID"]) + + str(self.message["user_data"]["mergeID"]) + "_jf.geom " + " -j `nproc` --min-res=75 " ) - if self.message["crystfelMultiCrystal"]: + if self.message["user_data"]["crystfelMultiCrystal"]: f.write(" --multi" + ">& " + data_file_name + ".log\n") else: f.write(" --no-multi" + ">& " + data_file_name + ".log\n") @@ -474,9 +431,10 @@ class CollectedH5: f.write("\n\n") f.write("# Executing results.py to get results and send to Database \n") f.write( - "module load anaconda \n" - + "conda activate /sls/MX/applications/conda_envs/vdp \n" - + "python /sls/MX/applications/git/vdp/src/results.py " + "/sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python /sf/cristallina/applications/mx/clara/src/results.py " + #"module load anaconda \n" + #+ "conda activate /sls/MX/applications/conda_envs/vdp \n" + #+ "python /sls/MX/applications/git/vdp/src/results.py " + data_file_name + ".stream " # + data_file_name @@ -491,8 +449,7 @@ class CollectedH5: """ submit job to SLURM (on RA or 6S/6D nodes) needs the slurm input file. - 1.) Go to processing folder - 2.) execute processing command + -> execute processing command :return: None """ # some info: sub.run needs either a list with the different args or needs the full command as string, @@ -534,48 +491,96 @@ if __name__ == "__main__": # redirect stdout to logging file stream = StreamToLogger() with contextlib.redirect_stdout(stream): - # potential message recieving: - vdp_server = "sf-broker-01.psi.ch" - vdp_port = 61613 - vdp_inqueue = "/queue/test_in" - logger.info("In_queue is: {}", vdp_inqueue) - vdp_outqueue = "not_relevant_atm" - vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue) - vdp_listener.connect() - logger.info("connected to in_queue") + if sys.argv[1] == "a": + # potential message recieving: + logger.info("connecting to ActiveMQ broker") + vdp_server = "sf-broker-01.psi.ch" + vdp_port = 61613 + vdp_inqueue = "/queue/test_in" + logger.info("In_queue is: {}", vdp_inqueue) + vdp_outqueue = "not_relevant_atm" + vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue) + vdp_listener.connect() + logger.info("connected to in_queue") - TERMINATE_SERVER = False + TERMINATE_SERVER = False - logger.info("\nWaiting for SIGINT to stop...") - signal.signal(signal.SIGINT, sigint_handler) + logger.info("\nWaiting for SIGINT to stop...") + signal.signal(signal.SIGINT, sigint_handler) - while not TERMINATE_SERVER: - if vdp_listener.incoming_messages_queue.empty(): - time.sleep(0.1) - else: - # recieves message from queue. function from python package queue. same as empty. - logger.info("received message from in_queue, started processing...") - message = vdp_listener.incoming_messages_queue.get() - # Do something with the message - logger.info(f"message is: {message}") - mess_inp = CollectedH5(message) - #mess_inp.mk_cd_output_dir_bl() - #logger.info("subfolder created") - #mess_inp.create_cell_file() - #logger.info("cell file created") - #mess_inp.create_geom_from_master() - #logger.info("geom file created") - #mess_inp.create_list_file() - #logger.info("list file created") - #mess_inp.create_slurm_script() - #logger.info("slurm script created") - #mess_inp.submit_job_to_slurm() - #logger.info("job submitted to SLURM") - #mess_inp.create_msg_file() - #logger.info("message file created") + while not TERMINATE_SERVER: + if vdp_listener.incoming_messages_queue.empty(): + time.sleep(0.1) + else: + # recieves message from queue. function from python package queue. same as empty. + logger.info("received message from in_queue, started processing...") + message = vdp_listener.incoming_messages_queue.get() + # Do something with the message + logger.info(f"message is: {message}") + mess_inp = CollectedH5(message) + #mess_inp.mk_cd_output_dir_bl() + #logger.info("subfolder created") + #mess_inp.create_cell_file() + #logger.info("cell file created") + #mess_inp.create_geom_from_master() + #logger.info("geom file created") + #mess_inp.create_list_file() + #logger.info("list file created") + #mess_inp.create_slurm_script() + #logger.info("slurm script created") + #mess_inp.submit_job_to_slurm() + #logger.info("job submitted to SLURM") + #mess_inp.create_msg_file() + #logger.info("message file created") - vdp_listener.acknowledge(message.headers["ack"]) - logger.info("message was acknowledged") - logger.info("waiting for the next message") + vdp_listener.acknowledge(message.headers["ack"]) + logger.info("message was acknowledged") + logger.info("waiting for the next message") - vdp_listener.disconnect() + vdp_listener.disconnect() + + elif sys.argv[1] == "z": + pa = Path("/sf/cristallina/data/") + # potential message recieving: + logger.info("SUBscribing to ZeroMQ PUBlisher") + context = zmq.Context() + subscriber = context.socket(zmq.SUB) + subscriber.connect("tcp://sf-broker-01.psi.ch:5555") + subscriber.setsockopt_string(zmq.SUBSCRIBE, "") + logger.info("connected to ZeroMQ PUBlisher") + + TERMINATE_SERVER = False + signal.signal(signal.SIGINT, sigint_handler) + + #INFO: Blocking Call: By default, recv() is a blocking call. This means that the function call will block (pause) + # the execution of the program until a message is available to be received on the socket. + # If there are no messages available, the function will wait until a message arrives. + # In order to be able to terminate with SIGINT , the zmq.NOBLOCK and zmq.Active block must be implemented! + + while not TERMINATE_SERVER: + try: + message = subscriber.recv(flags=zmq.NOBLOCK) + mess_dec = json.loads(message.decode("utf-8")) + logger.info(f"Received message is: {mess_dec}") + mess_inp = CollectedH5(mess_dec) + mess_inp.mk_cd_output_dir_bl() + logger.info("output folder created") + mess_inp.create_cell_file() + logger.info("cell file created") + mess_inp.create_geom_from_master() + logger.info("geom file created") + mess_inp.create_list_file() + logger.info("list file created") + mess_inp.create_slurm_script() + logger.info("slurm submission script created") + #mess_inp.submit_job_to_slurm() + mess_inp.create_msg_file() + logger.info("message dumped in processing folder") + #code BLOCK what is happening to message + + except zmq.Again: + #if no SIGING kill signal was sent from terminal + pass + + subscriber.close() + logger.info("termindated zeroMQ connection") diff --git a/src/clara.sh b/src/clara.sh index e1e42c8..2355220 100755 --- a/src/clara.sh +++ b/src/clara.sh @@ -2,4 +2,4 @@ db=/sf/cristallina/applications/mx/clara_tools/mxdbclient/src/ -env PYTHONPATH=$db /sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python ./clara.py !* +env PYTHONPATH=$db /sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python ./clara.py z !* diff --git a/src/clara_0mq.py b/src/clara_0mq.py new file mode 100644 index 0000000..2b88450 --- /dev/null +++ b/src/clara_0mq.py @@ -0,0 +1,13 @@ +# This piece subscribes to a zeroMQ publisher that is somewhere on the same network . +# In this case it is sf-broker-01.psi.ch, needs to be adjusted accordingly + +import zmq + +context = zmq.Context() +subscriber = context.socket(zmq.SUB) +subscriber.connect("tcp://sf-broker-01.psi.ch:5555") +subscriber.setsockopt_string(zmq.SUBSCRIBE, "") + +while True: + message = subscriber.recv() + print("Received message:", message.decode("utf-8"))