add all files

This commit is contained in:
2024-05-02 16:39:56 +02:00
parent 1f894ea6e7
commit fb798fc038
12 changed files with 1579 additions and 1 deletions

98
src/MxdbVdpTools.py Normal file
View File

@@ -0,0 +1,98 @@
import sys
# Use the new HTTPS mxdbserver for SLS2.0 (not used by standard beamline operations)
sys.path.insert(0, "/sls/MX/applications/mxdbclient/all-beamlines/v20230516-1043-24bc74e-master")
import copy
import datetime
# Ensure the path for messages.py script is available
# sys.path.insert(0, "/sls/MX/applications/mxlibs3/all-beamlines/stable")
import messages
import mxdbclient
VDP_COLLECTION = "Vdp"
class MxdbVdpTools(object):
"""Class for VDP to communicate with mxdb"""
def __init__(self, mxdb_host="https://mx-webapps.psi.ch", mxdb_port=8080):
self.message = None
self.msg_id = None
self._vdp_collection = VDP_COLLECTION
self._mxdbclient = mxdbclient.mxdbclient(host=mxdb_host, port=mxdb_port)
def insert(self, message):
"""Inserts message to VDP collection with insert timestamp
=> createdOn: datetime.datetime.now().isoformat()
"""
if isinstance(message, messages.BaseMessage):
message = message.as_dict()
self.msg_id = None # clean msg_id from previous insert
self.message = message.copy()
self.message["createdOn"] = datetime.datetime.now().isoformat()
try:
answer = self._mxdbclient.insert(message=self.message, collection=self._vdp_collection)
self.msg_id = answer["insertID"]
return answer
except Exception as e:
raise Exception("Cannot insert message to mxdb. Reason: {}".format(e))
def query(self, **kwargs):
kwargs["collection"] = self._vdp_collection
try:
answer = self._mxdbclient.query(**kwargs)
return answer
except Exception as e:
raise Exception("Cannot get message from mxdb. Reason: {}".format(e))
if __name__ == "__main__":
# Examples
# Init the class
mxdb = MxdbVdpTools()
# Insert message to database
_id = mxdb.insert(
{
"mergeID": "something_mergeID",
"trackingId": "something_track",
"eaccount": "e19370",
"masterFileName": "Lyso_12p4keV_1kHz_150mm_run000026_master.h5",
"dataFileName": "Lyso_12p4keV_1kHz_150mm_run000026_data_000010.h5",
"filesystemPath": "/das/work/p19/p19607/FromGreta/REDML_indx_data/lyso/processing/",
"detectorDistance_mm": 150.0,
"beamCenterX_pxl": 1103.7,
"beamCenterY_pxl": 1175.1,
"pixelSize_um": 75,
"numberOfImages": 10000,
"imageTime_us": 100,
"enery_kev": 12398.0,
"detectorWidth_pxl": 2067,
"detectorHeight_pxl": 2163,
"underload": -30000,
"overload": 30000,
"unitCell": {"a": 79.5, "b": 79.5, "c": 38.6, "alpha": 90.0, "beta": 90.0, "gamma": 90.0},
"spaceGroupNumber": 96,
"crystfelTreshold": 6.0,
"crystfelMinSNR": 4.0,
"crystfelMinPixCount": 1,
"crystfelMultiCrystal": False,
}
)
print(f"message successfully inserted with _id: {_id}")
# retrieve the inserted document from the database
doc = mxdb.query(_id=_id["insertID"])
print(f"DB message for {_id} is: \n {doc}")
# if you want to search documents that match a key value defined in your inserted document
# e.g. search all documents with given eaccount
eaccount = "e19370"
doc = mxdb.query(eaccount=eaccount)
# print length of returned array (number of documents)
print(f"Number of documents for {eaccount} is {len(doc)}")

581
src/clara.py Normal file
View File

@@ -0,0 +1,581 @@
# Author: Assmann G. (2023)
import contextlib
import datetime
import json
import os
import signal
import subprocess as sub
import sys
import time
from pathlib import Path
from loguru import logger
import receive_msg
#define log file place:
LOG_FILENAME = time.strftime("/sf/cristallina/applications/mx/clara_tools/log/clara_%Y%m.log")
logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
# hardcoded data path for e20233, as VDP is only used by e20233 so far for now. If this needs to be changed , change
# in function mk_cd_output_dir_bl some commented lines
pa = Path("/sls/MX/Data10/e20233")
class StreamToLogger:
def __init__(self, level="INFO"):
self._level = level
def write(self, buffer):
for line in buffer.rstrip().splitlines():
logger.opt(depth=1).log(self._level, line.rstrip())
def flush(self):
pass
# ========== functions ================
def main():
"""
hello world testing
:return: nothing
"""
print("hello world")
pass
def sigint_handler(signum, frame):
global TERMINATE_SERVER
print("CTRL-C caught --- Terminating VDP now")
TERMINATE_SERVER = True
def to_json(obj):
"""
makes an object serializable for json
:param obj: class object
:return: json serialzable object with indent=4
"""
return json.dumps(obj, default=lambda obj: obj.__dict__, indent=4)
# --------class with functions-----
class CollectedH5:
def __init__(self, mess_in):
# dictionary of the json message
self.message = mess_in
def get_message_dict(self):
"""
returns dictionary of the message (json)
:return: self.message
"""
return self.message
def mk_cd_output_dir_bl(self):
"""
mk putput dir with pathlib and change into this dir.
Output dir to MX/Data10/exxx/ ... can only be written as e account
:return: None
"""
# generate output dir
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
pgroup = "p" + str(self.message["eaccount"][1:3])
eaccount = "e" + str(self.message["eaccount"][1:])
merge_id = str(self.message["mergeID"])
# if first char is a slash, get rid of it
if (str(self.message["dataFileName"][0])) == "/":
file_name = Path(str(self.message["dataFileName"][1:-3]) + "_" + str(now))
# if not use the full path
else:
file_name = Path(str(self.message["dataFileName"][:-3]) + "_" + str(now))
# today = str(date.today())
# if you dont want to use the hard coded path anymore, but the eaccount from the message, uncomment:
# p = Path("/sls")
# out_path = p / "MX" / "Data10" / eaccount / "vespa_vdp" / merge_id / file_name
# TODO add random number or second to processing folder
out_path = pa / "vespa_vdp" / merge_id / file_name
logger.info(f"processing folder will be created at : {out_path}")
try:
out_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.info("could not create processing directory {}".format(e))
# change into output dir
try:
os.chdir(out_path)
except Exception as e:
logger.info("Could not cd into processing directory: {}".format(e))
return None
def mk_cd_output_dir_ra(self):
"""
mk putput dir with pathlib and change into this dir.
:return: None
"""
# generate output dir
pgroup = "p" + str(self.message["eaccount"][1:3])
paccount = "p" + str(self.message["eaccount"][1:])
merge_id = str(self.message["mergeID"])
file_name = str(self.message["dataFileName"][:-3])
today = str(date.today())
p = Path("/das")
out_path = p / "work" / pgroup / paccount / "vespa" / today / merge_id / file_name
logger.info(f"processing folder is created at : {out_path}")
try:
out_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.info("could not create processing directory {}".format(e))
# change into output dir
try:
os.chdir(out_path)
except Exception as e:
logger.info("Could not cd into processing directory: {}".format(e))
return None
def convert_spg_num(self, sg: int):
"""
converts space group number to Hermann-Mauguin notation , 65 space groups included
96 --> P43212
:param sg: space group number
:return: sg_HM space group in H-M notation
"""
space_groups = {
1: "P1",
3: "P2",
4: "P21",
5: "C2",
16: "P222",
17: "P2221",
18: "P2122",
19: "P21212",
20: "C2221",
21: "C222",
22: "F222",
23: "I222",
24: "I212121",
75: "P4",
76: "P41",
77: "P42",
78: "P43",
79: "I4",
80: "I41",
89: "P422",
90: "P4212",
91: "P4122",
92: "P41212",
93: "P4222",
94: "P42212",
95: "P4322",
96: "P43212",
97: "I422",
98: "I4122",
143: "P3",
144: "P31",
145: "P32",
146: "R3",
149: "P312",
150: "P321",
151: "P3112",
152: "P3121",
153: "P3212",
154: "P3221",
155: "R32",
168: "P6",
169: "P61",
170: "P65",
171: "P62",
172: "P64",
173: "P63",
177: "P622",
178: "P6122",
179: "P6522",
180: "P6222",
181: "P6422",
182: "P6322",
195: "P23",
196: "F23",
197: "I23",
198: "P213",
199: "I213",
207: "P432",
208: "P4232",
209: "F432",
210: "F4132",
211: "I432",
212: "P4332",
213: "P4132",
214: "I4132",
}
return space_groups[sg]
def get_spaceg_params(self, sg: int):
"""
function to provide the pararmeters for the cell file besides the actual unit cell constants
:param sg: space group in HM notation as a string
:return: lattice,unique axis, centering
REMARK: probably not the optimal way to handle things. Maybe no conversion from the space group number needed,
rather direct conversion from the number to the lattice. can be improved
"""
latt = None
ua = None
cen = sg[0]
print(len(sg))
if sg[1] == "1":
latt = "L_TRICLINIC"
ua = "*"
elif sg[1:3] == "23":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "213":
latt = "L_CUBIC"
ua = "*"
elif sg[3:5] == "32":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "432":
latt = "L_CUBIC"
ua = "*"
elif sg[1:4] == "222":
latt = "L_ORTHORHOMBIC"
ua = "*"
elif sg[1:4] == "212":
latt = "L_ORTHORHOMBIC"
ua = "*"
elif sg[1] == "2" and len(sg) < 4:
latt = "L_MONOCLINIC"
ua = "b"
elif sg[1] == "4":
latt = "L_TETRAGONAL"
ua = "c"
elif sg[1] == "6":
latt = "L_HEXAGONAL"
ua = "c"
elif sg[1] == "3":
if sg[0] == "P":
latt = "L_HEXAGONAL"
ua = "c"
else:
latt = "L_RHOMBOHEDRAL"
ua = "*"
else:
print("Couldn't understand '{}'\n".format(sg))
latt = "L_TRICLINIC"
return latt, ua, cen
def create_cell_file(self):
"""
Creates cell file with the name mergingID.cell and writes it into the
processing folder with the corresponding mergeID for processing
:return: -
"""
merge_id = str(self.message["mergeID"])
f = open(merge_id + ".cell", "w")
# start writing the cell file
f.write("CrystFEL unit cell file version 1.0\n\n")
# get lattice params and write to file
space_group = self.convert_spg_num(self.message["spaceGroupNumber"])
lat_type, unique_a, cent = self.get_spaceg_params(space_group)
f.write("lattice_type = " + lat_type[2:].lower() + "\n")
f.write("centering = " + cent + "\n")
if unique_a != "*":
f.write("unique_axis = " + unique_a + "\n\n")
else:
f.write("\n\n")
# print unit cell constants
f.write("a = " + str(self.message["unitCell"]["a"]) + " A\n")
f.write("b = " + str(self.message["unitCell"]["b"]) + " A\n")
f.write("c = " + str(self.message["unitCell"]["c"]) + " A\n")
f.write("al = " + str(self.message["unitCell"]["alpha"]) + " deg\n")
f.write("be = " + str(self.message["unitCell"]["beta"]) + " deg\n")
f.write("ga = " + str(self.message["unitCell"]["gamma"]) + " deg\n")
f.close()
return None
def create_geom_from_master(self):
"""
generates the geom file from the input message for processing wih Crystfel .
:param self:
:return: none
"""
merge_id = str(self.message["mergeID"])
# write to mergeid_jf.geom file in processing folder
f2 = open(merge_id + "_jf.geom", "w")
f2.write("; PSI JF9M \n")
f2.write("\n")
f2.write("\n")
f2.write("; Camera length (in m) and photon energy (eV) \n")
f2.write("clen = " + str(self.message["detectorDistance_mm"] * 0.001) + "\n")
f2.write("photon_energy = " + str(self.message["enery_kev"] * 1000) + "\n")
f2.write("flag_lessthan = " + str(self.message["underload"]) + "\n")
f2.write("\n")
f2.write("adu_per_eV = 0.00008065\n")
# f2.write("adu_per_photon = 1\n")
f2.write("res = 13333.3 ; " + str(self.message["pixelSize_um"]) + " micron pixel size\n")
f2.write("\n")
f2.write("rigid_group_0 = 0 \n")
f2.write("rigid_group_collection_0 = 0 \n")
f2.write("\n")
f2.write("; These lines describe the data layout for the JF native multi-event files \n")
f2.write("dim0 = % \n")
f2.write("dim1 = ss \n")
f2.write("dim2 = fs \n")
f2.write("data = /entry/data/data \n")
f2.write("\n")
f2.write("\n")
if str(self.message["masterFileName"])[0] == "/":
f2.write("mask_file =" + str(pa.resolve()) + self.message["masterFileName"] + "\n")
else:
f2.write("mask_file =" + str(pa.resolve()) + "/" + self.message["masterFileName"] + "\n")
f2.write("mask = /entry/instrument/detector/pixel_mask \n")
f2.write("mask_good = 0x0 \n")
f2.write("mask_bad = 0xFFFFFFFF\n")
f2.write("\n")
f2.write("; corner_{x,y} set the position of the corner of the detector (in pixels) \n")
f2.write("; relative to the beam \n")
f2.write("\n")
f2.write("0/min_fs = 0 \n")
f2.write("0/min_ss = 0 \n")
f2.write("0/max_fs =" + str(self.message["detectorWidth_pxl"] - 1) + "\n")
f2.write("0/max_ss =" + str(self.message["detectorHeight_pxl"] - 1) + "\n")
f2.write("0/corner_x = -" + str(self.message["beamCenterX_pxl"]) + "\n")
f2.write("0/corner_y = -" + str(self.message["beamCenterY_pxl"]) + "\n")
f2.write("0/fs = x \n")
f2.write("0/ss = y \n")
f2.write("\n")
# f2.write("badregionA/min_fs = 774 \n")
# f2.write("badregionA/max_fs = 1032 \n")
# f2.write("badregionA/min_ss = 0 \n")
# f2.write("badregionA/max_ss = 256 \n")
# f2.write("\n")
# f2.write("badregionB/min_fs = 256 \n")
# f2.write("badregionB/max_fs = 774 \n")
# f2.write("badregionB/min_ss = 1906 \n")
# f2.write("badregionB/max_ss = 2162 \n")
# f2.write("\n")
f2.close()
return None
def create_list_file(self):
"""
Function to generate a list file with the path of the input H5 file
:return:None
"""
merge_id = str(self.message["mergeID"])
# write to cell file in output folder
f = open(merge_id + ".list", "w")
print(pa.resolve())
if (str(self.message["dataFileName"][0])) == "/":
f.write(str(pa.resolve()) + str(self.message["dataFileName"]))
else:
f.write(str(pa.resolve()) + "/" + str(self.message["dataFileName"]))
"""
if count == 0:
print("count 0")
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"]))
# if count =1 and at beginning
elif count == 1 and (str(self.message["dataFileName"][0])) == "/":
print("count 1 and first char")
# remove first char
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][1:]))
# else if count >0 and not at beginning
elif count > 0:
print("count more and middle")
# get position of last "/" and remove until then
last_pos = self.message["dataFileName"].rfind("/")
print("last_pos", last_pos)
f.write(str(self.message["filesystemPath"]) + str(self.message["dataFileName"][(last_pos + 1) :]))
"""
f.close()
return None
def create_slurm_script(self):
"""
Creates the input SLURM file with the following info:
SLURM parameters ( CPUS , nodes, etc)
Output Log files
Input parameters for indexing job
Loading of Modules for indexing with Crystfel
Actual indexing job that is executed (indexamajig)
Also executing a python script that gets the results after processing
SLURM Outputs are redirected to the logfile.
TODO: So far only a few parameters for crystFEL are sent with the message. Additional parameters might be useful to insert into the message.
:return: None
"""
# get dat file name without any preceding paths..
last_pos = str(self.message["dataFileName"]).rfind("/")
data_file_name = str(self.message["dataFileName"][(last_pos + 1) : -3])
# write file
f = open("run_SLURM", "w")
f.write("#!/bin/bash \n")
f.write("#SBATCH --job-name=index \n")
# uncomment if on RA
# f.write("#SBATCH --partition=hour \n")
f.write("#SBATCH --cpus-per-task=32 \n")
# f.write("#SBATCH --output=" + LOG_FILENAME + "\n")
# f.write("#SBATCH --open-mode=append \n")
f.write("#========================================")
f.write("\n\n")
f.write("# Load modules \n")
f.write("module purge \n")
f.write("module use MX unstable \n")
# f.write("module load crystfel/0.10.2 \n")
# TODO ask Leo to install libs on CN for crystfel/0.10.2
f.write(
"module load crystfel/0.10.1-2 xgandalf/2018.01 HDF5_bitshuffle/2018.05 HDF5_LZ4/2018.05 gcc/4.8.5 hdf5_serial/1.10.3 \n"
)
f.write("\n\n")
f.write("# Actual Indexing command for crystFEL \n")
f.write(
" indexamajig --peaks=peakfinder8 --indexing=xgandalf --xgandalf-fast-execution --threshold="
+ str(int(self.message["crystfelTreshold"]))
+ " --int-radius=2,3,5 -p "
+ str(self.message["mergeID"])
+ ".cell --min-snr="
+ str(self.message["crystfelMinSNR"])
+ " --min-peaks=6 --min-pix-count="
+ str(self.message["crystfelMinPixCount"])
+ " -i "
+ str(self.message["mergeID"])
+ ".list -o "
+ data_file_name
+ ".stream -g "
+ str(self.message["mergeID"])
+ "_jf.geom "
+ " -j `nproc` --min-res=75 "
)
if self.message["crystfelMultiCrystal"]:
f.write(" --multi" + ">& " + data_file_name + ".log\n")
else:
f.write(" --no-multi" + ">& " + data_file_name + ".log\n")
# Execute the a python script to get the results
# for now loads my conda env. needs to be changed at the beamline
# argument is the streamfile that is created by indexing
f.write("\n\n")
f.write("# Executing results.py to get results and send to Database \n")
f.write(
"module load anaconda \n"
+ "conda activate /sls/MX/applications/conda_envs/vdp \n"
+ "python /sls/MX/applications/git/vdp/src/results.py "
+ data_file_name
+ ".stream "
# + data_file_name
# + ".log "
)
f.close()
return None
def submit_job_to_slurm(self):
"""
submit job to SLURM (on RA or 6S/6D nodes)
needs the slurm input file.
1.) Go to processing folder
2.) execute processing command
:return: None
"""
# some info: sub.run needs either a list with the different args or needs the full command as string,
# but then it also needs the arg shell=True!
# EASY way without grepping slurm job id
# sub.run(["sbatch", "run_SLURM"])
try:
slurm_out = sub.run(["sbatch", "run_SLURM"], capture_output=True)
txt = slurm_out.stdout.decode().split()
# grep the slurm number
logger.info(f"submitted batch job number: {txt[-1]}")
self.message["SlurmJobID"] = str(txt[-1])
except Exception as e:
logger.info("Could not submit SLURM job: {}".format(e))
return None
def create_msg_file(self):
"""
writes message to message file in folder. Can be retrieved by results.py to send the message to the database
:return:None
"""
# write message as json file to folder
f = open("msg.json", "w")
# tmp = json.dumps(self.message, indent=4) #RA
tmp = to_json(self.message)
f.write(tmp)
f.close()
return None
if __name__ == "__main__":
# main()
logger.info("CLARA starting up")
# redirect stdout to logging file
stream = StreamToLogger()
with contextlib.redirect_stdout(stream):
# potential message recieving:
vdp_server = "sf-broker-01.psi.ch"
vdp_port = 61613
vdp_inqueue = "/queue/test_in"
logger.info("In_queue is: {}", vdp_inqueue)
vdp_outqueue = "not_relevant_atm"
vdp_listener = receive_msg.MyListener(vdp_server, vdp_port, vdp_inqueue, vdp_outqueue)
vdp_listener.connect()
logger.info("connected to in_queue")
TERMINATE_SERVER = False
logger.info("\nWaiting for SIGINT to stop...")
signal.signal(signal.SIGINT, sigint_handler)
while not TERMINATE_SERVER:
if vdp_listener.incoming_messages_queue.empty():
time.sleep(0.1)
else:
# recieves message from queue. function from python package queue. same as empty.
logger.info("received message from in_queue, started processing...")
message = vdp_listener.incoming_messages_queue.get()
# Do something with the message
logger.info(f"message is: {message}")
mess_inp = CollectedH5(message)
#mess_inp.mk_cd_output_dir_bl()
#logger.info("subfolder created")
#mess_inp.create_cell_file()
#logger.info("cell file created")
#mess_inp.create_geom_from_master()
#logger.info("geom file created")
#mess_inp.create_list_file()
#logger.info("list file created")
#mess_inp.create_slurm_script()
#logger.info("slurm script created")
#mess_inp.submit_job_to_slurm()
#logger.info("job submitted to SLURM")
#mess_inp.create_msg_file()
#logger.info("message file created")
vdp_listener.acknowledge(message.headers["ack"])
logger.info("message was acknowledged")
logger.info("waiting for the next message")
vdp_listener.disconnect()

5
src/clara.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
db=/sf/cristallina/applications/mx/clara_tools/mxdbclient/src/
env PYTHONPATH=$db /sf/cristallina/applications/mx/conda/miniconda/envs/39clara/bin/python ./clara.py !*

9
src/mess_lyso.json Executable file
View File

@@ -0,0 +1,9 @@
{
"mergeID": "Lyso_pHjump",
"mergeDate": "20230920",
"run_number": ["105", "106", "107"],
"unitCell": {"a": 78.5, "b": 78.5, "c": 38.6, "alpha": 90.0, "beta": 90.0, "gamma": 90.0},
"spaceGroupNumber": 96,
"num_timepoints": 15,
"pointGroup": "4/mmm"
}

54
src/messages.py Normal file
View File

@@ -0,0 +1,54 @@
import os
import time
import json
import copy
import stomp
from pprint import pprint
import uuid
class BaseMessage(object):
"""The base message.
The base message includes the encoding/decoding methods.
"""
def __init__(self, message=None):
self.trackingId = None
if type(message) is str:
self.decodeJson(message)
elif type(message) is dict:
self.__dict__.update(message)
elif isinstance(message, BaseMessage):
self.__dict__.update(message.as_dict())
def encodeJson(self):
return json.dumps(self, default=lambda self: self.__dict__)
def decodeJson(self, message):
self.__dict__.update(json.loads(message))
def as_dict(self):
return copy.copy(self.__dict__)
def _dump(self, as_json=False):
if as_json:
pprint(json.dumps(self.__dict__, indent=4, sort_keys=True))
else:
pprint(self.__dict__)
def __getitem__(self, item):
return self.__dict__[item]
def __setitem__(self, item, value):
self.__dict__[item] = value
def __repr__(self):
x = type(self)
mro = str(x.mro())
s = "<" + mro
for k, v in list(self.__dict__.items()):
s = s + ("\n%20s : %s" % (k, str(v)))
s = s + "\n>\n"
return s

69
src/receive_msg.py Normal file
View File

@@ -0,0 +1,69 @@
# Adapted from J.W 2023
# class with the functions to connect to the queue and recieve the message
import copy
import queue
import time
import stomp
from loguru import logger
# uncomment at beamline, comment onRA
# TODO check if also available on RA - so no switching required
import messages
class MyListener(stomp.ConnectionListener):
"""Mylistener class"""
def __init__(self, server, port, inqueue, outqueue):
self.server = server
self.port = port
self.inqueue = inqueue
self.outqueue = outqueue
self.incoming_messages_queue = queue.Queue(maxsize=0)
def connect(self):
"""Connect and subscribe to the inqueue"""
self.conn = stomp.Connection12([(self.server, self.port)])
self.conn.set_listener("", self)
self.conn.connect()
headers = {"activemq.prefetchSize": 1}
self.conn.subscribe(destination=self.inqueue, id=1, ack="client", headers=headers)
def disconnect(self):
"""Close connection"""
self.conn.disconnect()
def on_error(self, message):
pass
def send(self, outqueue, message):
"""Send message to the outqueue
Send takes queue, body, content_type, headers and keyword_headers"""
message = messages.BaseMessage(message)
self.conn.send(destination=outqueue, body=message.encodeJson())
time.sleep(1.0)
# self.conn.unsubscribe(id=1)
def on_message(self, message):
"""Upon receiving message put it into incoming queue"""
logger.info("message is (on_message function) {}".format(message))
try:
m = messages.BaseMessage(message.body)
except BaseException as e:
logger.info("Exception occurred: {}".format(e))
return
if hasattr(m, "trackingId"):
m.headers = copy.copy(message.headers)
self.incoming_messages_queue.put(m)
logger.info("Received and processing message {}".format(m.trackingId))
def acknowledge(self, ack_id):
"""Acknowledge message dequeues it"""
self.conn.ack(ack_id)

389
src/results.py Normal file
View File

@@ -0,0 +1,389 @@
# Author Assman G. (2023)
# execute with
# /das/home/assman_g/p19370/vespa/2023-03-30/something_mergeID/Lyso_12p4keV_1kHz_150mm_run000026_data_000010/Lyso_12p4keV_1kHz_150mm_run000026_data_000010.th6.snr4.0.mpixco1.stream
import contextlib
import json
import sys
import time
# import crystfelparser.crystfelparser as crys # acknowledge P.Gasparotto
import numpy as np
from loguru import logger
import MxdbVdpTools
# import matplotlib.pyplot as plt
LOG_FILENAME = time.strftime("/sls/MX/Data10/e20233/log/vdp_%Y%m.log") # as eaccount at beamline
# LOG_FILENAME = time.strftime("/sls/MX/Data10-staff/e19370/log/vdp_%Y%m.log") # as eaccount at beamline
# LOG_FILENAME = time.strftime("/home/assman_g/Documents/log/vdp_%Y%m.log") # as assman_g at beamline
# LOG_FILENAME = time.strftime("/das/home/assman_g/vdp_%Y%m.log") # on RA
logger.add(LOG_FILENAME, level="INFO", rotation="100MB")
# ========== functions ================
def main():
"""
hello world testing
:return: nothing
"""
print("hello fish ")
pass
def stream_to_dictionary(streamfile):
"""
write a function that genaerates a dictionary with all the old paramweters
append the dictionary to a key called "crystals" of the original dictionary.
If theere is no lattice, the "crytals" key is an empty list, otherwise it has X entries as dictionaries. []
function from crystfelparser. edited, needs to be merged with crystfelparser
Returns:
A dictionary
"""
# series = defaultdict(dict)
series = dict()
def loop_over_next_n_lines(file, n_lines):
for cnt_tmp in range(n_lines):
line = file.readline()
return line
with open(streamfile, "r") as text_file:
# for ln,line in enumerate(text_file):
ln = -1
while True:
ln += 1
line = text_file.readline()
# print(line)
# if any(x in ["Begin","chunk"] for x in line.split()):
if "Begin chunk" in line:
# create a temporary dictionary to store the output for a frame
# tmpframe = defaultdict(int)
tmpframe = dict()
# loop over the next 3 lines to get the index of the image
# line 2 and 3 are where it is stored the image number
line = loop_over_next_n_lines(text_file, 3)
ln += 3
# save the image index and save it as zero-based
im_num = int(line.split()[-1]) - 1
tmpframe["Image serial number"] = im_num
# loop over the next 2 lines to see if the indexer worked
line = loop_over_next_n_lines(text_file, 2)
ln += 2
# save who indexed the image
indexer_tmp = line.split()[-1]
# if indexed, there is an additional line here
npeaks_lines = 6
if indexer_tmp == "none":
npeaks_lines = 5
tmpframe["multiple_lattices"] = 0
else:
tmpframe["multiple_lattices"] = 1
tmpframe["indexed_by"] = indexer_tmp
##### Get the STRONG REFLEXTIONS from the spotfinder #####
# loop over the next 5/6 lines to get the number of reflections
line = loop_over_next_n_lines(text_file, npeaks_lines)
ln += npeaks_lines
# get the number of peaks
num_peaks = int(line.split()[-1])
tmpframe["num_peaks"] = num_peaks
# get the resolution
line = text_file.readline()
ln += 1
tmpframe["peak_resolution [A]"] = float(line.split()[-2])
tmpframe["peak_resolution [nm^-1]"] = float(line.split()[2])
if num_peaks > 0:
# skip the first 2 lines
for tmpc in range(2):
text_file.readline()
ln += 1
# get the spots
# fs/px, ss/px, (1/d)/nm^-1, Intensity
# with
# dim1 = ss, dim2 = fs
tmpframe["peaks"] = np.asarray(
[text_file.readline().split()[:4] for tmpc in range(num_peaks)]
).astype(float)
# generate empty list for potential indexed lattices
##### Get the PREDICTIONS after indexing #####
# So far the framwork for multiple lattices is generated,
# but not finished. So far only the "last" lattice will be saved
# in terms of reflections etc.
# so far only the unit cell constants are all accounted for!
multiple = True
if tmpframe["indexed_by"] != "none":
tmpframe["crystals"] = []
# set lattice count to 0
lattice_count = 0
# generate a temp_crystal dict for every lattice from this frame
tmp_crystal = {}
# start a loop over this part to account for multiple lattices
while multiple == True:
# skip the first 2 header lines, only if not multiple lattice loop
if lattice_count == 0:
for tmpc in range(2):
text_file.readline()
ln += 1
# Get the unit cell -- as cell lengths and angles
# append unit cell constants if multiple lattices exist
line = text_file.readline().split()
tmp_crystal["Cell parameters"] = np.hstack([line[2:5], line[6:9]]).astype(float)
# Get the reciprocal unit cell as a 3x3 matrix
# multiple lattices not done yet
reciprocal_cell = []
for tmpc in range(3):
reciprocal_cell.append(text_file.readline().split()[2:5])
ln += 1
# print(reciprocal_cell)
tmp_crystal["reciprocal_cell_matrix"] = np.asarray(reciprocal_cell).astype(float)
# Save the lattice type
tmp_crystal["lattice_type"] = text_file.readline().split()[-1]
ln += 1
# loop over the next 5 lines to get the diffraction resolution
line = loop_over_next_n_lines(text_file, 5).split()
ln += 5
# multiple lattices not done yet
if line[0] == "predict_refine/det_shift":
tmp_crystal["det_shift_x"] = line[3]
tmp_crystal["det_shift_y"] = line[6]
line = loop_over_next_n_lines(text_file, 1).split()
ln += 1
tmp_crystal["diffraction_resolution_limit [nm^-1]"] = float(line[2])
tmp_crystal["diffraction_resolution_limit [A]"] = float(line[5])
# get the number of predicted reflections
num_reflections = int(text_file.readline().split()[-1])
tmp_crystal["num_predicted_reflections"] = num_reflections
# skip a few lines
line = loop_over_next_n_lines(text_file, 4)
ln += 4
# get the predicted reflections
if num_reflections > 0:
reflections_pos = []
for tmpc in range(num_reflections):
# read as:
# h k l I sigma(I) peak background fs/px ss/px
line = np.asarray(text_file.readline().split()[:9])
# append only: fs/px ss/px I sigma(I)
reflections_pos.append(line[[7, 8, 3, 4, 0, 1, 2]])
ln += 1
tmp_crystal["predicted_reflections"] = np.asarray(reflections_pos).astype(float)
# continue reading
line = text_file.readline()
line = text_file.readline()
line = text_file.readline()
# print(line)
if "Begin crystal" in line: # multi lattice
lattice_count = lattice_count + 1
tmpframe["multiple_lattices"] = tmpframe["multiple_lattices"] + 1
# append the lattice to the entry "crystals" in tmpframe
tmpframe["crystals"].append(tmp_crystal)
# print("multiple append")
ln += 1
# multiple=False
# lattice_count =0
else:
tmpframe["crystals"].append(tmp_crystal)
# print("else append", lattice_count)
multiple = False
if multiple == False:
break
# Add the frame to the series, using the frame index as key
series[im_num] = tmpframe
# condition to exit the while true reading cycle
if "" == line:
# print("file finished")
break
# return the series
return series
def get_data_from_streamfiles():
"""
get results from the streamfile .
Stream file is parsed as argument to the python script
Following info is greped:
1.) # of indexed crystals --> int | indexable_frames
2.) # of indexed lattices --> int | indexable_lattices
3.) # of spots per frame --> array of ints | spots_per_frame
4.) # of Lattices per images --> arrray of ints
5.) Mean beam center shift X in pixels --> float
6.) Mean beam center shift Y in pixels --> float
7.) Mean beam center shift STD X in pixels --> float
8.) Mean beam center shift STD Y in pixels --> float
9.) Mean unit cell indexed images --> float object of a,b,c, alpha,beta, gamma
10.) Mean unit cell indexed STD images --> float object of a,b,c, alpha,beta, gamma
11.) Mean processing time in sec TODO --> float
:return: old_message with additional entries
"""
# load current message in processing folder
tmpfile = open("msg.json", "r")
old_message = json.load(tmpfile)
# print(old_message)
# old message is a dict with all the input message params
# parse stream into dict
parsed_stream = stream_to_dictionary(sys.argv[1])
old_message["numberOfImages"] = len(parsed_stream)
# get number of indexable frames- not accounted for multilattice
# print(parsed_stream[0].keys())
indexable_frames = np.array(sorted([FR for FR in parsed_stream.keys() if len(parsed_stream[FR].keys()) > 7]))
old_message["numberOfImagesIndexed"] = len(indexable_frames)
# spots_per_frame = {} # as dict
# as array:
spots_per_frame = np.zeros((len(parsed_stream)))
indexable_lattices = 0
for i in range(0, len(parsed_stream)):
# get spots per indexable frames:
spots_per_frame[i] = parsed_stream[i]["num_peaks"]
# get total number of indexable lattices
indexable_lattices = indexable_lattices + parsed_stream[i]["multiple_lattices"]
# put into dict for results, convert to list to be json serializable
old_message["numberOfSpotsPerImage"] = (spots_per_frame.astype(int)).tolist()
old_message["numberOfLattices"] = indexable_lattices
# get number of indexed lattices per pattern
lattices_per_indx_frame = {}
for i in indexable_frames:
lattices_per_indx_frame[int(i)] = parsed_stream[i]["multiple_lattices"]
old_message["numberOfLatticesPerImage"] = lattices_per_indx_frame
# mean beam center shift X and Y
list_x = []
list_y = []
# define np.array
uc_array = np.zeros((indexable_lattices, 6))
b = 0
for i in indexable_frames:
for x in range(0, len(parsed_stream[i]["crystals"])):
# det shift in x and y
list_x.append((parsed_stream[i]["crystals"][x]["det_shift_x"]))
list_y.append((parsed_stream[i]["crystals"][x]["det_shift_y"]))
# unit cell constants
uc_array[b] = np.asarray((parsed_stream[i]["crystals"][x]["Cell parameters"]))
b = b + 1
# ------ DET SHIFT MEAN and STD-------
# plot det shift scatter plot
# plt.scatter(np.asarray(list_x).astype(float),np.asarray(list_y).astype(float) )
# plt.show()
mean_x = np.around(np.mean(np.asarray(list_x).astype(float)), 4)
std_x = np.around(np.std(np.asarray(list_x).astype(float)), 4)
mean_y = np.around(np.mean(np.asarray(list_y).astype(float)), 4)
std_y = np.around(np.std(np.asarray(list_y).astype(float)), 4)
# convert to pixel unit
# 0.075 mm = 1 pixel =75 um
# print(mean_x, mean_x * (1/0.075), std_x, std_x * (1/0.075), "x")
# print(mean_y, mean_y * (1/0.075), std_y, std_y * (1/0.075), "y")
old_message["beamShiftMeanX_pxl"] = np.around(mean_x * (1 / 0.075), 4)
old_message["beamShiftMeanY_pxl"] = np.around(mean_y * (1 / 0.075), 4)
old_message["beamShiftStdX_pxl"] = np.around(std_x * (1 / 0.075), 4)
old_message["beamShiftStdY_pxl"] = np.around(std_y * (1 / 0.075), 4)
# -------UC CONSTANTS MEAN and STD----
mean_uc = np.mean(uc_array, 0)
mean_uc[: 6 // 2] *= 10.0
std_uc = np.std(uc_array, 0)
std_uc[: 6 // 2] *= 10.0
#convert to list to be json serializable
old_message["unitCellIndexingMean"] = (np.around(mean_uc, 3)).tolist()
old_message["unitCellIndexingStd"] = (np.around(std_uc, 3)).tolist()
# print(old_message)
return old_message
# ============classes with functions=========
class StreamToLogger:
def __init__(self, level="INFO"):
self._level = level
def write(self, buffer):
for line in buffer.rstrip().splitlines():
logger.opt(depth=1).log(self._level, line.rstrip())
def flush(self):
pass
# =============MAIN====================
# if executed as main , code is executed in src folder of git repo
# needs to have the msg.json file in this folder to test.
if __name__ == "__main__":
# main()
# get results from streamfile
stream = StreamToLogger()
with contextlib.redirect_stdout(stream):
results_message = get_data_from_streamfiles()
#logger.info("message can be send to DB :{}", results_message)
logger.info(f"message can be send to DB :{results_message}")
# send message to database:
# init the class
mxdb = MxdbVdpTools.MxdbVdpTools()
# insert message to DB
_id = mxdb.insert(results_message)
#EXAMPLE MESSAGE {
# "mergeID": "something_mergeID",
# "trackingId": "something_track",
# "eaccount": "e19370",
# "masterFileName": "Lyso_12p4keV_1kHz_150mm_run000026_master.h5",
# "dataFileName": "Lyso_12p4keV_1kHz_150mm_run000026_data_000010.h5",
# "filesystemPath": "/das/work/p19/p19607/FromGreta/REDML_indx_data/lyso/processing/",
# "detectorDistance_mm": 150.0,
# "beamCenterX_pxl": 1103.7,
# "beamCenterY_pxl": 1175.1,
# "pixelSize_um": 75,
# "numberOfImages": 10000,
# "imageTime_us": 100,
# "enery_kev": 12398.0,
# "detectorWidth_pxl": 2067,
# "detectorHeight_pxl": 2163,
# "underload": -30000,
# "overload": 30000,
# "unitCell": {"a": 79.5, "b": 79.5, "c": 38.6, "alpha": 90.0, "beta": 90.0, "gamma": 90.0},
# "spaceGroupNumber": 96,
# "crystfelTreshold": 6.0,
# "crystfelMinSNR": 4.0,
# "crystfelMinPixCount": 1,
# "crystfelMultiCrystal": False,
#}
logger.info("message inserted to DB")
# retreive the inserted doc from the database
#doc = mxdb.query(_id=_id["insertID"])
#logger.info("doc info from DB is: ")
#logger.info(doc)

108
src/send_msg.py Normal file
View File

@@ -0,0 +1,108 @@
import copy
import json
import sys
import time
import messages
import stomp
# ------FUNCTIONS -------------
def main():
"""
hello world testing
:return: nothing
"""
print("hello snake ")
pass
# -------CLASS -------------------
class MySender:
"""
Class to connect to queue and send message
"""
def __init__(self, server, port):
self.server = server
self.port = port
def connect(self):
"""Connect and subscribe to the inqueue"""
self.conn = stomp.Connection12([(self.server, self.port)])
self.conn.connect()
def disconnect(self):
"""Close connection"""
self.conn.disconnect()
def on_error(self, headers, message):
pass
def send(self, queue, message):
"""Send message to the queue"""
message = messages.BaseMessage(message)
# print out message, comment in real scenario:
# print(message.as_dict())
# send message, uncomment in real scenario
self.conn.send(destination=queue, body=message.encodeJson())
time.sleep(0.2)
def unsubscribe(self):
"""Unsubscribe from the queue"""
self.conn.unsubscribe(id=2)
# =============MAIN====================
if __name__ == "__main__":
main()
vdp_server = "sf-broker-01.psi.ch"
vdp_port = 61613
# queue name ? - beamline specific ?
vdp_queue = "/queue/test_in"
#data_number = "%02d" % float(sys.argv[1])
data_number = "pups"
# -------------CHOSE Test Message ----------
# LYSO
vdp_msg_lyso = {
"mergeID": "something_mergeID_EP",
"trackingId": "something_track_EP",
"eaccount": "p21734",
"masterFileName": "/run9999-20240219/data/second_dataset_master.h5",
"dataFileName": "/run9999-20240219/data/second_dataset_data_000.h5",
"filesystemPath": "/sf/cristallina/data/p21734/raw/",
"detectorDistance_mm": 150.8,
"beamCenterX_pxl": 1135.5,
"beamCenterY_pxl": 1159.1,
"pixelSize_um": 75,
"numberOfImages": 6250,
"imageTime_us": 100,
"enery_kev": 12.4,
"detectorWidth_pxl": 2067,
"detectorHeight_pxl": 2163,
"underload": -30000,
"overload": 30000,
"unitCell": {"a": 45.8, "b": 73.9, "c": 53.5, "alpha": 90.0, "beta": 109.6, "gamma": 90.0},
"spaceGroupNumber": 4,
"crystfelTreshold": 7.0,
"crystfelMinSNR": 3.0,
"crystfelMinPixCount": 1,
"crystfelMultiCrystal": False,
}
# initialze MySender class with server and port and connect:
print("before connecting")
vdp_sender = MySender(vdp_server, vdp_port)
vdp_sender.connect()
print("after connecting")
# send message to queue:
vdp_sender.send(vdp_queue, vdp_msg_lyso)
print("after sending")
vdp_sender.disconnect()

25
src/test_bl.json Normal file
View File

@@ -0,0 +1,25 @@
{
"mergeID": "something_mergeID",
"trackingID": "something_track",
"eaccount": "e19370",
"masterFileName": "run000024_EP2_1kHz_980us_without_laser_master.h5",
"dataFileName": "run000024_EP2_1kHz_980us_without_laser_data_007.h5",
"filesystemPath": "/sls/MX/Data10/e20233/20230228/EP2/",
"detectorDistance_mm": 150.8,
"beamCenterX_pxl": -1135.5,
"beamCenterY_pxl": -1159.1,
"pixelSize_um": 75,
"numberOfImages": 6250,
"imageTime_us": 100,
"enery_kev": 12400.0,
"detectorWidth_pxl": 2067,
"detectorHeight_pxl": 2163,
"underload": -30000,
"overload": 30000,
"unitCell": {"a":45.8,"b": 73.9,"c": 53.5,"alpha": 90.0,"beta": 109.6,"gamma": 90.0},
"spaceGroupNumber": 4,
"crystfelTreshold": 7.0,
"crystfelMinSNR": 3.0,
"crystfelMinPixCount": 1,
"crystfelMultiCrystal": false
}

11
src/test_sender.sh Executable file
View File

@@ -0,0 +1,11 @@
#!/bin/bash
smx=/sls/MX/applications
mxlib=$smx/mxlibs3/all-beamlines/stable
db=$smx/mxdbclient/all-beamlines/stable
for i in {0..0}
do
echo $i
#insert $i after pyhton file exec
env PYTHONPATH=$mxlib:$db /sls/MX/applications/conda_envs/vdp/bin/python ./send_msg.py $i !*
done