Files
clara/src/clara.py
2024-06-20 16:48:20 +02:00

613 lines
22 KiB
Python

# Author: Assmann G. (2024)
import contextlib
import datetime
import json
import os
import signal
import subprocess as sub
import sys
import time
import zmq
from pathlib import Path
from loguru import logger
import receive_msg
#define log file place:
LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log")
logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
# hardcoded data path for e20233, as VDP is only used by e20233 so far for now. If this needs to be changed , change
# in function mk_cd_output_dir_bl some commented lines
#pa = Path("/sf/cristallina/data")
class StreamToLogger:
def __init__(self, level="INFO"):
self._level = level
def write(self, buffer):
for line in buffer.rstrip().splitlines():
logger.opt(depth=1).log(self._level, line.rstrip())
def flush(self):
pass
# ========== functions ================
def main():
"""
hello world testing
:return: nothing
"""
print("hello world")
pass
def sigint_handler(signum, frame):
global TERMINATE_SERVER
print("CTRL-C caught --- Terminating now")
TERMINATE_SERVER = True
print(TERMINATE_SERVER)
def to_json(obj):
"""
makes an object serializable for json
:param obj: class object
:return: json serialzable object with indent=4
"""
return json.dumps(obj, default=lambda obj: obj.__dict__, indent=4)
# --------class with functions-----
class CollectedH5:
def __init__(self, mess_in):
# dictionary of the json message
self.message = mess_in
def get_message_dict(self):
"""
returns dictionary of the message (json)
:return: self.message
"""
return self.message
def mk_cd_output_dir_bl(self):
"""
mk putput dir with pathlib and change into this dir.
Output dir to /sf/cristallina/data/.. will be written as gac-account
pa = "/sf/cristallina/data/"
:return: None
"""
# Assemble output dir path
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
pgroup = str(self.message["experiment_group"])
res = "res"
#data_path = "data"
file_name = Path(str(self.message["filename"][:-3]) + "_" + str(now))
out_path = pa / pgroup / res / file_name
logger.info(f"processing folder will be created at : {out_path}")
try:
out_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.info("could not create processing directory {}".format(e))
# change into output dir
try:
os.chdir(out_path)
except Exception as e:
logger.info("Could not cd into processing directory: {}".format(e))
return None
def convert_spg_num(self, sg: int):
"""
converts space group number to Hermann-Mauguin notation , 65 space groups included
96 --> P43212
:param sg: space group number
:return: sg_HM space group in H-M notation
"""
space_groups = {
1: "P1",
3: "P2",
4: "P21",
5: "C2",
16: "P222",
17: "P2221",
18: "P2122",
19: "P21212",
20: "C2221",
21: "C222",
22: "F222",
23: "I222",
24: "I212121",
75: "P4",
76: "P41",
77: "P42",
78: "P43",
79: "I4",
80: "I41",
89: "P422",
90: "P4212",
91: "P4122",
92: "P41212",
93: "P4222",
94: "P42212",
95: "P4322",
96: "P43212",
97: "I422",
98: "I4122",
143: "P3",
144: "P31",
145: "P32",
146: "R3",
149: "P312",
150: "P321",
151: "P3112",
152: "P3121",
153: "P3212",
154: "P3221",
155: "R32",
168: "P6",
169: "P61",
170: "P65",
171: "P62",
172: "P64",
173: "P63",
177: "P622",
178: "P6122",
179: "P6522",
180: "P6222",
181: "P6422",
182: "P6322",
195: "P23",
196: "F23",
197: "I23",
198: "P213",
199: "I213",
207: "P432",
208: "P4232",
209: "F432",
210: "F4132",
211: "I432",
212: "P4332",
213: "P4132",
214: "I4132",
}
return space_groups[sg]
def get_spaceg_params(self, sg: int):
"""
function to provide the pararmeters for the cell file besides the actual unit cell constants
:param sg: space group in HM notation as a string
:return: lattice,unique axis, centering
REMARK: probably not the optimal way to handle things. Maybe no conversion from the space group number needed,
rather direct conversion from the number to the lattice. can be improved
"""
latt = None
ua = None
cen = sg[0]
#print(len(sg))
if sg[1] == "1":
latt = "L_TRICLINIC"
ua = "*"
elif sg[1:3] == "23":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "213":
latt = "L_CUBIC"
ua = "*"
elif sg[3:5] == "32":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "432":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "222":
latt = "L_ORTHORHOMBIC"
ua = "*"
elif sg[1:4] == "212":
latt = "L_ORTHORHOMBIC"
ua = "*"
elif sg[1] == "2" and len(sg) < 4:
latt = "L_MONOCLINIC"
ua = "b"
elif sg[1] == "4":
latt = "L_TETRAGONAL"
ua = "c"
elif sg[1] == "6":
latt = "L_HEXAGONAL"
ua = "c"
elif sg[1] == "3":
if sg[0] == "P":
latt = "L_HEXAGONAL"
ua = "c"
else:
latt = "L_RHOMBOHEDRAL"
ua = "*"
else:
print("Couldn't understand '{}'\n".format(sg))
latt = "L_TRICLINIC"
return latt, ua, cen
def create_cell_file(self):
"""
Creates cell file with the name mergingID.cell and writes it into the
processing folder with the corresponding mergeID for processing
:return: -
"""
#check if cell parameters were sent with the message or not
if "space_group_number" in self.message:
name = str(self.message["run_number"])
f = open(name + ".cell", "w")
# start writing the cell file
f.write("CrystFEL unit cell file version 1.0\n\n")
# get lattice params and write to file
space_group = self.convert_spg_num(self.message["space_group_number"])
lat_type, unique_a, cent = self.get_spaceg_params(space_group)
f.write("lattice_type = " + lat_type[2:].lower() + "\n")
f.write("centering = " + cent + "\n")
if unique_a != "*":
f.write("unique_axis = " + unique_a + "\n\n")
else:
f.write("\n\n")
# print unit cell constants
f.write("a = " + str(self.message["unit_cell"]["a"]) + " A\n")
f.write("b = " + str(self.message["unit_cell"]["b"]) + " A\n")
f.write("c = " + str(self.message["unit_cell"]["c"]) + " A\n")
f.write("al = " + str(self.message["unit_cell"]["alpha"]) + " deg\n")
f.write("be = " + str(self.message["unit_cell"]["beta"]) + " deg\n")
f.write("ga = " + str(self.message["unit_cell"]["gamma"]) + " deg\n")
f.close()
else:
logger.info("no space group sent, no cell file written")
return None
return None
def create_geom_from_master(self):
"""
generates the geom file from the input message for processing wih Crystfel .
write to mergeid_jf.geom file in processing folder
:param self:
:return: none
"""
name = str(self.message["run_number"])
f2 = open(name + "_jf.geom", "w")
f2.write("; PSI JF8M \n")
f2.write("\n")
f2.write("\n")
f2.write("; Camera length (in m) and photon energy (eV) \n")
f2.write("clen = " + str(self.message["detector_distance_m"]) + "\n")
f2.write("photon_energy = " + str(self.message["photon_energy_eV"]) + "\n")
f2.write("flag_lessthan = " + str(self.message["underload"]) + "\n")
f2.write("\n")
#f2.write("adu_per_eV = 0.00008065\n")
f2.write("adu_per_eV = 0.00008285\n")
# f2.write("adu_per_photon = 1\n")
f2.write("res = 13333.3 ; " + str(self.message["pixel_size_m"] *1000000) + " micron pixel size\n")
f2.write("\n")
f2.write("rigid_group_0 = 0 \n")
f2.write("rigid_group_collection_0 = 0 \n")
f2.write("\n")
f2.write("; These lines describe the data layout for the JF native multi-event files \n")
f2.write("dim0 = % \n")
f2.write("dim1 = ss \n")
f2.write("dim2 = fs \n")
f2.write("data = /entry/data/data \n")
f2.write("\n")
f2.write("\n")
# Assembling path for master file
pgroup = str(self.message["experiment_group"])
raw = "raw"
master_file = str(self.message["filename"][:-11]) + str("master.h5")
master_file_path = pa / pgroup / raw / master_file
f2.write(";mask_file ="+ str(master_file_path) +"\n")
f2.write(";mask = /entry/instrument/detector/pixel_mask \n")
f2.write(";mask_good = 0x0 \n")
f2.write(";mask_bad = 0xFFFFFFFF\n")
f2.write("\n")
f2.write("; corner_{x,y} set the position of the corner of the detector (in pixels) \n")
f2.write("; relative to the beam \n")
f2.write("\n")
f2.write("0/min_fs = 0 \n")
f2.write("0/min_ss = 0 \n")
f2.write("0/max_fs =" + str(self.message["detector_height_pxl"] ) + "\n")
f2.write("0/max_ss =" + str(self.message["detector_width_pxl"] ) + "\n")
f2.write("0/corner_x = -" + str(self.message["beam_x_pxl"]) + "\n")
f2.write("0/corner_y = -" + str(self.message["beam_y_pxl"]) + "\n")
f2.write("0/fs = x \n")
f2.write("0/ss = y \n")
f2.write("\n")
# BAD REGION EXAMPLE
# f2.write("\n")
# f2.write("badregionB/min_fs = 256 \n")
# f2.write("badregionB/max_fs = 774 \n")
# f2.write("badregionB/min_ss = 1906 \n")
# f2.write("badregionB/max_ss = 2162 \n")
# f2.write("\n")
f2.close()
return None
def create_list_file(self):
"""
Function to generate a list file with the path of the input H5 file
:return:None
"""
#Assemble path for raw data
pgroup = str(self.message["experiment_group"])
raw = "raw"
filen = str(self.message["filename"])
file_path = pa / pgroup / raw / filen
# write to cell file in output folder
name = str(self.message["run_number"])
f = open(name + ".list", "w")
f.write(str(file_path))
f.close()
#Insert new key into message for triggered/non tirggered
self.message["trigger"] = "on"
return None
def create_slurm_script(self):
"""
Creates the input SLURM file with the following info:
SLURM parameters ( CPUS , nodes, etc)
Output Log files
Input parameters for indexing job
Loading of Modules for indexing with Crystfel
Actual indexing job that is executed (indexamajig)
Also executing a python script that gets the results after processing
SLURM Outputs are redirected to the logfile.
TODO: So far only a few parameters for crystFEL are sent with the message. Additional parameters might be useful to insert into the message.
:return: None
"""
# get dat file name without any preceding paths..
#last_pos = str(self.message["dataFileName"]).rfind("/")
#data_file_name = str(self.message["filename"][: -3])
data_file_name = str(self.message["run_number"])
# write file
f = open("run_SLURM", "w")
f.write("#!/bin/bash \n")
f.write("#SBATCH --job-name=index \n")
f.write("#SBATCH --partition=prod-aramis \n")
#f.write("#SBATCH --nodelist=sf-cn-21 \n")
# uncomment if on RA
# f.write("#SBATCH --partition=hour \n")
f.write("#SBATCH --cpus-per-task=32 \n")
# f.write("#SBATCH --output=" + LOG_FILENAME + "\n")
# f.write("#SBATCH --open-mode=append \n")
f.write("#========================================")
f.write("\n\n")
f.write("# Load modules \n")
f.write("module purge \n")
f.write("module use MX unstable \n")
f.write("module load crystfel/0.10.2-rhel8 \n")
# f.write(
# "module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n"
# )
f.write("\n\n")
f.write("# Actual Indexing command for crystFEL \n")
f.write(
" indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold="
+ str(int(self.message["user_data"]["crystfelTreshold"]))
#+ " --int-radius=2,3,5 "
+ " -p "
+ str(self.message["run_number"])
+ ".cell --min-snr="
+ str(self.message["user_data"]["crystfelMinSNR"])
+ " --min-peaks=6 --min-pix-count="
+ str(self.message["user_data"]["crystfelMinPixCount"])
+ " -i "
+ str(self.message["run_number"])
+ ".list -o "
+ data_file_name
+ ".stream -g "
+ str(self.message["run_number"])
+ "_jf.geom "
+ " -j `nproc` --min-res=75 "
)
if self.message["user_data"]["crystfelMultiCrystal"]:
f.write(" --multi" + ">& " + data_file_name + ".log\n")
else:
f.write(" --no-multi" + ">& " + data_file_name + ".log\n")
# Execute the a python script to get the results
# for now loads my conda env. needs to be changed at the beamline
# argument is the streamfile that is created by indexing
f.write("\n\n")
f.write("# Executing results.py to get results and send to Database \n")
f.write(
"/sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python /sf/cristallina/applications/mx/clara/src/results.py "
#"module load anaconda \n"
#+ "conda activate /sls/MX/applications/conda_envs/vdp \n"
#+ "python /sls/MX/applications/git/vdp/src/results.py "
+ data_file_name
+ ".stream "
# + data_file_name
# + ".log "
)
f.close()
return None
def submit_job_to_slurm(self):
"""
submit job to SLURM (on RA or 6S/6D nodes)
needs the slurm input file.
-> execute processing command
:return: None
"""
# some info: sub.run needs either a list with the different args or needs the full command as string,
# but then it also needs the arg shell=True!
# EASY way without grepping slurm job id
# sub.run(["sbatch", "run_SLURM"])
try:
slurm_out = sub.run(["sbatch", "run_SLURM"], capture_output=True)
txt = slurm_out.stdout.decode().split()
# grep the slurm number
logger.info(f"submitted batch job number: {txt[-1]}")
self.message["SlurmJobID"] = str(txt[-1])
except Exception as e:
logger.info("Could not submit SLURM job: {}".format(e))
return None
def create_msg_file(self):
"""
writes message to message file in folder. Can be retrieved by results.py to send the message to the database
:return:None
"""
# write message as json file to folder
f = open("msg.json", "w")
# tmp = json.dumps(self.message, indent=4) #RA
tmp = to_json(self.message)
f.write(tmp)
f.close()
return None
if __name__ == "__main__":
# main()
logger.info("CLARA starting up")
# redirect stdout to logging file
stream = StreamToLogger()
with contextlib.redirect_stdout(stream):
if sys.argv[1] == "a":
# potential message recieving:
logger.info("connecting to ActiveMQ broker")
vdp_server = "sf-broker-01.psi.ch"
vdp_port = 61613
vdp_inqueue = "/queue/test_in"
logger.info("In_queue is: {}", vdp_inqueue)
vdp_outqueue = "not_relevant_atm"
vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue)
vdp_listener.connect()
logger.info("connected to in_queue")
TERMINATE_SERVER = False
logger.info("\nWaiting for SIGINT to stop...")
signal.signal(signal.SIGINT, sigint_handler)
while not TERMINATE_SERVER:
if vdp_listener.incoming_messages_queue.empty():
time.sleep(0.1)
else:
# recieves message from queue. function from python package queue. same as empty.
logger.info("received message from in_queue, started processing...")
message = vdp_listener.incoming_messages_queue.get()
# Do something with the message
logger.info(f"message is: {message}")
mess_inp = CollectedH5(message)
#mess_inp.mk_cd_output_dir_bl()
#logger.info("subfolder created")
#mess_inp.create_cell_file()
#logger.info("cell file created")
#mess_inp.create_geom_from_master()
#logger.info("geom file created")
#mess_inp.create_list_file()
#logger.info("list file created")
#mess_inp.create_slurm_script()
#logger.info("slurm script created")
#mess_inp.submit_job_to_slurm()
#logger.info("job submitted to SLURM")
#mess_inp.create_msg_file()
#logger.info("message file created")
vdp_listener.acknowledge(message.headers["ack"])
logger.info("message was acknowledged")
logger.info("waiting for the next message")
vdp_listener.disconnect()
elif sys.argv[1] == "z":
pa = Path("/sf/cristallina/data/")
# potential message recieving:
logger.info("SUBscribing to ZeroMQ PUBlisher.. connecting ...")
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://sf-broker-01.psi.ch:5555")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
#--------------------------------------------------
#UNcomment if handshake between publisher and subscriber is not neccesary
ready_socket = context.socket(zmq.REQ)
ready_socket.connect("tcp://sf-broker-01.psi.ch:5556")
# Notify publisher that subscriber is ready
ready_socket.send(b"READY")
start_message= ready_socket.recv()
print(f"Received start message: {start_message}")
#----------------------------------------------------------------------
time.sleep(3)
logger.info("connected to ZeroMQ PUBlisher")
TERMINATE_SERVER = False
signal.signal(signal.SIGINT, sigint_handler)
#INFO: Blocking Call: By default, recv() is a blocking call. This means that the function call will block (pause)
# the execution of the program until a message is available to be received on the socket.
# If there are no messages available, the function will wait until a message arrives.
# In order to be able to terminate with SIGINT , the zmq.NOBLOCK and zmq.Active block must be implemented!
while not TERMINATE_SERVER:
try:
message = subscriber.recv(flags=zmq.NOBLOCK)
mess_dec = json.loads(message.decode("utf-8"))
logger.info(f"Received message is: {mess_dec}")
mess_inp = CollectedH5(mess_dec)
mess_inp.mk_cd_output_dir_bl()
logger.info("output folder created")
mess_inp.create_cell_file()
logger.info("cell file created")
mess_inp.create_geom_from_master()
logger.info("geom file created")
mess_inp.create_list_file()
logger.info("list file created")
mess_inp.create_slurm_script()
logger.info("slurm submission script created")
mess_inp.submit_job_to_slurm()
mess_inp.create_msg_file()
logger.info("message dumped in processing folder")
#code BLOCK what is happening to message
except zmq.Again:
#if no SIGING kill signal was sent from terminal
pass
subscriber.close()
logger.info("termindated zeroMQ connection")