adapt for cristallina

This commit is contained in:
2024-06-04 14:20:07 +02:00
parent 7d33a710ce
commit 87082d7de7
3 changed files with 179 additions and 161 deletions

View File

@@ -1,4 +1,4 @@
# Author: Assmann G. (2023)
# Author: Assmann G. (2024)
import contextlib
import datetime
@@ -8,6 +8,7 @@ import signal
import subprocess as sub
import sys
import time
import zmq
from pathlib import Path
from loguru import logger
@@ -22,7 +23,7 @@ logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
# hardcoded data path for e20233, as VDP is only used by e20233 so far for now. If this needs to be changed , change
# in function mk_cd_output_dir_bl some commented lines
pa = Path("/sls/MX/Data10/e20233")
#pa = Path("/sf/cristallina/data")
class StreamToLogger:
@@ -51,8 +52,9 @@ def main():
def sigint_handler(signum, frame):
global TERMINATE_SERVER
print("CTRL-C caught --- Terminating VDP now")
print("CTRL-C caught --- Terminating now")
TERMINATE_SERVER = True
print(TERMINATE_SERVER)
def to_json(obj):
@@ -82,26 +84,19 @@ class CollectedH5:
def mk_cd_output_dir_bl(self):
"""
mk putput dir with pathlib and change into this dir.
Output dir to MX/Data10/exxx/ ... can only be written as e account
Output dir to /sf/cristallina/data.. will be written as gac-account
:return: None
"""
# generate output dir
# Assemble output dir path
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
pgroup = "p" + str(self.message["eaccount"][1:3])
eaccount = "e" + str(self.message["eaccount"][1:])
merge_id = str(self.message["mergeID"])
# if first char is a slash, get rid of it
if (str(self.message["dataFileName"][0])) == "/":
file_name = Path(str(self.message["dataFileName"][1:-3]) + "_" + str(now))
# if not use the full path
else:
file_name = Path(str(self.message["dataFileName"][:-3]) + "_" + str(now))
# today = str(date.today())
# if you dont want to use the hard coded path anymore, but the eaccount from the message, uncomment:
# p = Path("/sls")
# out_path = p / "MX" / "Data10" / eaccount / "vespa_vdp" / merge_id / file_name
# TODO add random number or second to processing folder
out_path = pa / "vespa_vdp" / merge_id / file_name
pgroup = str(self.message["user_data"]["paccount"])
merge_id = str(self.message["user_data"]["mergeID"])
res = "res"
data_path = "data"
file_name = Path(str(self.message["filename"][:-3]) + "_" + str(now))
out_path = pa / pgroup / res / merge_id/ data_path /file_name
logger.info(f"processing folder will be created at : {out_path}")
try:
out_path.mkdir(parents=True, exist_ok=True)
@@ -116,33 +111,6 @@ class CollectedH5:
return None
def mk_cd_output_dir_ra(self):
"""
mk putput dir with pathlib and change into this dir.
:return: None
"""
# generate output dir
pgroup = "p" + str(self.message["eaccount"][1:3])
paccount = "p" + str(self.message["eaccount"][1:])
merge_id = str(self.message["mergeID"])
file_name = str(self.message["dataFileName"][:-3])
today = str(date.today())
p = Path("/das")
out_path = p / "work" / pgroup / paccount / "vespa" / today / merge_id / file_name
logger.info(f"processing folder is created at : {out_path}")
try:
out_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.info("could not create processing directory {}".format(e))
# change into output dir
try:
os.chdir(out_path)
except Exception as e:
logger.info("Could not cd into processing directory: {}".format(e))
return None
def convert_spg_num(self, sg: int):
"""
@@ -232,7 +200,7 @@ class CollectedH5:
latt = None
ua = None
cen = sg[0]
print(len(sg))
#print(len(sg))
if sg[1] == "1":
latt = "L_TRICLINIC"
ua = "*"
@@ -282,12 +250,12 @@ class CollectedH5:
processing folder with the corresponding mergeID for processing
:return: -
"""
merge_id = str(self.message["mergeID"])
merge_id = str(self.message["user_data"]["mergeID"])
f = open(merge_id + ".cell", "w")
# start writing the cell file
f.write("CrystFEL unit cell file version 1.0\n\n")
# get lattice params and write to file
space_group = self.convert_spg_num(self.message["spaceGroupNumber"])
space_group = self.convert_spg_num(self.message["user_data"]["spaceGroupNumber"])
lat_type, unique_a, cent = self.get_spaceg_params(space_group)
f.write("lattice_type = " + lat_type[2:].lower() + "\n")
f.write("centering = " + cent + "\n")
@@ -297,37 +265,37 @@ class CollectedH5:
f.write("\n\n")
# print unit cell constants
f.write("a = " + str(self.message["unitCell"]["a"]) + " A\n")
f.write("b = " + str(self.message["unitCell"]["b"]) + " A\n")
f.write("c = " + str(self.message["unitCell"]["c"]) + " A\n")
f.write("al = " + str(self.message["unitCell"]["alpha"]) + " deg\n")
f.write("be = " + str(self.message["unitCell"]["beta"]) + " deg\n")
f.write("ga = " + str(self.message["unitCell"]["gamma"]) + " deg\n")
f.write("a = " + str(self.message["user_data"]["unitCell"]["a"]) + " A\n")
f.write("b = " + str(self.message["user_data"]["unitCell"]["b"]) + " A\n")
f.write("c = " + str(self.message["user_data"]["unitCell"]["c"]) + " A\n")
f.write("al = " + str(self.message["user_data"]["unitCell"]["alpha"]) + " deg\n")
f.write("be = " + str(self.message["user_data"]["unitCell"]["beta"]) + " deg\n")
f.write("ga = " + str(self.message["user_data"]["unitCell"]["gamma"]) + " deg\n")
f.close()
return None
def create_geom_from_master(self):
"""
generates the geom file from the input message for processing wih Crystfel .
write to mergeid_jf.geom file in processing folder
:param self:
:return: none
"""
merge_id = str(self.message["mergeID"])
# write to mergeid_jf.geom file in processing folder
merge_id = str(self.message["user_data"]["mergeID"])
f2 = open(merge_id + "_jf.geom", "w")
f2.write("; PSI JF9M \n")
f2.write("; PSI JF8M \n")
f2.write("\n")
f2.write("\n")
f2.write("; Camera length (in m) and photon energy (eV) \n")
f2.write("clen = " + str(self.message["detectorDistance_mm"] * 0.001) + "\n")
f2.write("photon_energy = " + str(self.message["enery_kev"] * 1000) + "\n")
f2.write("flag_lessthan = " + str(self.message["underload"]) + "\n")
f2.write("clen = " + str(self.message["user_data"]["detectorDistance_mm"] * 0.001) + "\n")
f2.write("photon_energy = " + str(self.message["user_data"]["enery_kev"] * 1000) + "\n")
f2.write("flag_lessthan = " + str(self.message["user_data"]["underload"]) + "\n")
f2.write("\n")
f2.write("adu_per_eV = 0.00008065\n")
#f2.write("adu_per_eV = 0.00008065\n")
f2.write("adu_per_eV = 0.00008285\n")
# f2.write("adu_per_photon = 1\n")
f2.write("res = 13333.3 ; " + str(self.message["pixelSize_um"]) + " micron pixel size\n")
f2.write("res = 13333.3 ; " + str(self.message["user_data"]["pixelSize_um"]) + " micron pixel size\n")
f2.write("\n")
f2.write("rigid_group_0 = 0 \n")
f2.write("rigid_group_collection_0 = 0 \n")
@@ -339,10 +307,16 @@ class CollectedH5:
f2.write("data = /entry/data/data \n")
f2.write("\n")
f2.write("\n")
if str(self.message["masterFileName"])[0] == "/":
f2.write("mask_file =" + str(pa.resolve()) + self.message["masterFileName"] + "\n")
else:
f2.write("mask_file =" + str(pa.resolve()) + "/" + self.message["masterFileName"] + "\n")
# Assembling path for master file
pgroup = str(self.message["user_data"]["paccount"])
raw = "raw"
data_path = "data"
merge_id = str(self.message["user_data"]["mergeID"])
master_file = str(self.message["filename"][:-11]) + str("master.h5")
master_file_path = pa / pgroup / raw / merge_id / data_path / master_file
f2.write("mask_file ="+ str(master_file_path) +"\n")
f2.write("mask = /entry/instrument/detector/pixel_mask \n")
f2.write("mask_good = 0x0 \n")
f2.write("mask_bad = 0xFFFFFFFF\n")
@@ -352,17 +326,15 @@ class CollectedH5:
f2.write("\n")
f2.write("0/min_fs = 0 \n")
f2.write("0/min_ss = 0 \n")
f2.write("0/max_fs =" + str(self.message["detectorWidth_pxl"] - 1) + "\n")
f2.write("0/max_ss =" + str(self.message["detectorHeight_pxl"] - 1) + "\n")
f2.write("0/corner_x = -" + str(self.message["beamCenterX_pxl"]) + "\n")
f2.write("0/corner_y = -" + str(self.message["beamCenterY_pxl"]) + "\n")
f2.write("0/max_fs =" + str(self.message["user_data"]["detectorWidth_pxl"] - 1) + "\n")
f2.write("0/max_ss =" + str(self.message["user_data"]["detectorHeight_pxl"] - 1) + "\n")
f2.write("0/corner_x = -" + str(self.message["user_data"]["beamCenterX_pxl"]) + "\n")
f2.write("0/corner_y = -" + str(self.message["user_data"]["beamCenterY_pxl"]) + "\n")
f2.write("0/fs = x \n")
f2.write("0/ss = y \n")
f2.write("\n")
# f2.write("badregionA/min_fs = 774 \n")
# f2.write("badregionA/max_fs = 1032 \n")
# f2.write("badregionA/min_ss = 0 \n")
# f2.write("badregionA/max_ss = 256 \n")
# BAD REGION EXAMPLE
# f2.write("\n")
# f2.write("badregionB/min_fs = 256 \n")
# f2.write("badregionB/max_fs = 774 \n")
@@ -377,33 +349,19 @@ class CollectedH5:
Function to generate a list file with the path of the input H5 file
:return:None
"""
merge_id = str(self.message["mergeID"])
#Assemble path for raw data
pgroup = str(self.message["user_data"]["paccount"])
raw = "raw"
data_path = "data"
merge_id = str(self.message["user_data"]["mergeID"])
filen = str(self.message["filename"])
file_path = pa / pgroup / raw / merge_id / data_path / filen
# write to cell file in output folder
f = open(merge_id + ".list", "w")
print(pa.resolve())
if (str(self.message["dataFileName"][0])) == "/":
f.write(str(pa.resolve()) + str(self.message["dataFileName"]))
else:
f.write(str(pa.resolve()) + "/" + str(self.message["dataFileName"]))
f.write(str(file_path))
"""
if count == 0:
print("count 0")
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"]))
# if count =1 and at beginning
elif count == 1 and (str(self.message["dataFileName"][0])) == "/":
print("count 1 and first char")
# remove first char
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][1:]))
# else if count >0 and not at beginning
elif count > 0:
print("count more and middle")
# get position of last "/" and remove until then
last_pos = self.message["dataFileName"].rfind("/")
print("last_pos", last_pos)
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][(last_pos + 1) :]))
"""
f.close()
return None
@@ -422,8 +380,8 @@ class CollectedH5:
:return: None
"""
# get dat file name without any preceding paths..
last_pos = str(self.message["dataFileName"]).rfind("/")
data_file_name = str(self.message["dataFileName"][(last_pos + 1) : -3])
#last_pos = str(self.message["dataFileName"]).rfind("/")
data_file_name = str(self.message["filename"][: -3])
# write file
f = open("run_SLURM", "w")
@@ -439,32 +397,31 @@ class CollectedH5:
f.write("# Load modules \n")
f.write("module purge \n")
f.write("module use MX unstable \n")
# f.write("module load crystfel/0.10.2 \n")
# TODO ask Leo to install libs on CN for crystfel/0.10.2
f.write(
"module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n"
)
f.write("module load crystfel/0.10.2 \n")
# f.write(
# "module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n"
# )
f.write("\n\n")
f.write("# Actual Indexing command for crystFEL \n")
f.write(
" indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold="
+ str(int(self.message["crystfelTreshold"]))
+ str(int(self.message["user_data"]["crystfelTreshold"]))
+ " --int-radius=2,3,5 -p "
+ str(self.message["mergeID"])
+ str(self.message["user_data"]["mergeID"])
+ ".cell --min-snr="
+ str(self.message["crystfelMinSNR"])
+ str(self.message["user_data"]["crystfelMinSNR"])
+ " --min-peaks=6 --min-pix-count="
+ str(self.message["crystfelMinPixCount"])
+ str(self.message["user_data"]["crystfelMinPixCount"])
+ " -i "
+ str(self.message["mergeID"])
+ str(self.message["user_data"]["mergeID"])
+ ".list -o "
+ data_file_name
+ data_file_name
+ ".stream -g "
+ str(self.message["mergeID"])
+ str(self.message["user_data"]["mergeID"])
+ "_jf.geom "
+ " -j `nproc` --min-res=75 "
)
if self.message["crystfelMultiCrystal"]:
if self.message["user_data"]["crystfelMultiCrystal"]:
f.write(" --multi" + ">& " + data_file_name + ".log\n")
else:
f.write(" --no-multi" + ">& " + data_file_name + ".log\n")
@@ -474,9 +431,10 @@ class CollectedH5:
f.write("\n\n")
f.write("# Executing results.py to get results and send to Database \n")
f.write(
"module load anaconda \n"
+ "conda activate /sls/MX/applications/conda_envs/vdp \n"
+ "python /sls/MX/applications/git/vdp/src/results.py "
"/sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python /sf/cristallina/applications/mx/clara/src/results.py "
#"module load anaconda \n"
#+ "conda activate /sls/MX/applications/conda_envs/vdp \n"
#+ "python /sls/MX/applications/git/vdp/src/results.py "
+ data_file_name
+ ".stream "
# + data_file_name
@@ -491,8 +449,7 @@ class CollectedH5:
"""
submit job to SLURM (on RA or 6S/6D nodes)
needs the slurm input file.
1.) Go to processing folder
2.) execute processing command
-> execute processing command
:return: None
"""
# some info: sub.run needs either a list with the different args or needs the full command as string,
@@ -534,48 +491,96 @@ if __name__ == "__main__":
# redirect stdout to logging file
stream = StreamToLogger()
with contextlib.redirect_stdout(stream):
# potential message recieving:
vdp_server = "sf-broker-01.psi.ch"
vdp_port = 61613
vdp_inqueue = "/queue/test_in"
logger.info("In_queue is: {}", vdp_inqueue)
vdp_outqueue = "not_relevant_atm"
vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue)
vdp_listener.connect()
logger.info("connected to in_queue")
if sys.argv[1] == "a":
# potential message recieving:
logger.info("connecting to ActiveMQ broker")
vdp_server = "sf-broker-01.psi.ch"
vdp_port = 61613
vdp_inqueue = "/queue/test_in"
logger.info("In_queue is: {}", vdp_inqueue)
vdp_outqueue = "not_relevant_atm"
vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue)
vdp_listener.connect()
logger.info("connected to in_queue")
TERMINATE_SERVER = False
TERMINATE_SERVER = False
logger.info("\nWaiting for SIGINT to stop...")
signal.signal(signal.SIGINT, sigint_handler)
logger.info("\nWaiting for SIGINT to stop...")
signal.signal(signal.SIGINT, sigint_handler)
while not TERMINATE_SERVER:
if vdp_listener.incoming_messages_queue.empty():
time.sleep(0.1)
else:
# recieves message from queue. function from python package queue. same as empty.
logger.info("received message from in_queue, started processing...")
message = vdp_listener.incoming_messages_queue.get()
# Do something with the message
logger.info(f"message is: {message}")
mess_inp = CollectedH5(message)
#mess_inp.mk_cd_output_dir_bl()
#logger.info("subfolder created")
#mess_inp.create_cell_file()
#logger.info("cell file created")
#mess_inp.create_geom_from_master()
#logger.info("geom file created")
#mess_inp.create_list_file()
#logger.info("list file created")
#mess_inp.create_slurm_script()
#logger.info("slurm script created")
#mess_inp.submit_job_to_slurm()
#logger.info("job submitted to SLURM")
#mess_inp.create_msg_file()
#logger.info("message file created")
while not TERMINATE_SERVER:
if vdp_listener.incoming_messages_queue.empty():
time.sleep(0.1)
else:
# recieves message from queue. function from python package queue. same as empty.
logger.info("received message from in_queue, started processing...")
message = vdp_listener.incoming_messages_queue.get()
# Do something with the message
logger.info(f"message is: {message}")
mess_inp = CollectedH5(message)
#mess_inp.mk_cd_output_dir_bl()
#logger.info("subfolder created")
#mess_inp.create_cell_file()
#logger.info("cell file created")
#mess_inp.create_geom_from_master()
#logger.info("geom file created")
#mess_inp.create_list_file()
#logger.info("list file created")
#mess_inp.create_slurm_script()
#logger.info("slurm script created")
#mess_inp.submit_job_to_slurm()
#logger.info("job submitted to SLURM")
#mess_inp.create_msg_file()
#logger.info("message file created")
vdp_listener.acknowledge(message.headers["ack"])
logger.info("message was acknowledged")
logger.info("waiting for the next message")
vdp_listener.acknowledge(message.headers["ack"])
logger.info("message was acknowledged")
logger.info("waiting for the next message")
vdp_listener.disconnect()
vdp_listener.disconnect()
elif sys.argv[1] == "z":
pa = Path("/sf/cristallina/data/")
# potential message recieving:
logger.info("SUBscribing to ZeroMQ PUBlisher")
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://sf-broker-01.psi.ch:5555")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
logger.info("connected to ZeroMQ PUBlisher")
TERMINATE_SERVER = False
signal.signal(signal.SIGINT, sigint_handler)
#INFO: Blocking Call: By default, recv() is a blocking call. This means that the function call will block (pause)
# the execution of the program until a message is available to be received on the socket.
# If there are no messages available, the function will wait until a message arrives.
# In order to be able to terminate with SIGINT , the zmq.NOBLOCK and zmq.Active block must be implemented!
while not TERMINATE_SERVER:
try:
message = subscriber.recv(flags=zmq.NOBLOCK)
mess_dec = json.loads(message.decode("utf-8"))
logger.info(f"Received message is: {mess_dec}")
mess_inp = CollectedH5(mess_dec)
mess_inp.mk_cd_output_dir_bl()
logger.info("output folder created")
mess_inp.create_cell_file()
logger.info("cell file created")
mess_inp.create_geom_from_master()
logger.info("geom file created")
mess_inp.create_list_file()
logger.info("list file created")
mess_inp.create_slurm_script()
logger.info("slurm submission script created")
#mess_inp.submit_job_to_slurm()
mess_inp.create_msg_file()
logger.info("message dumped in processing folder")
#code BLOCK what is happening to message
except zmq.Again:
#if no SIGING kill signal was sent from terminal
pass
subscriber.close()
logger.info("termindated zeroMQ connection")

View File

@@ -2,4 +2,4 @@
db=/sf/cristallina/applications/mx/clara_tools/mxdbclient/src/
env PYTHONPATH=$db /sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python ./clara.py !*
env PYTHONPATH=$db /sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python ./clara.py z !*

13
src/clara_0mq.py Normal file
View File

@@ -0,0 +1,13 @@
# This piece subscribes to a zeroMQ publisher that is somewhere on the same network .
# In this case it is sf-broker-01.psi.ch, needs to be adjusted accordingly
import zmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://sf-broker-01.psi.ch:5555")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
message = subscriber.recv()
print("Received message:", message.decode("utf-8"))