Files
crystfel_tools/clen_tools/detector-distance-refinement.py
2025-01-16 13:06:47 +01:00

534 lines
16 KiB
Python

#!/usr/bin/env python3
# authors T.Mason and J.Beale
"""
# aim
to refine the detector distance using crystfel
- naming covention = #.###/#.###.stream
# usage
python detector-distance-refinement.py -l <path to lst file generated by daq>
-g <path to geom file>
-d central clen to refine around
-c cell_file
-s sample size
-e endstation - alvra or cristallina
# output
plot files of the analysis and a suggest for the clen
"""
# modules
import pandas as pd
import subprocess
import os, errno
import regex as re
import numpy as np
import matplotlib.pyplot as plt
import time
from tqdm import tqdm
import argparse
from scipy.optimize import curve_fit
from scipy.signal import peak_widths, find_peaks
def h5_sample( lst, sample ):
# create sample of images from run
# read h5.lst - note - removes // from image column
cols = [ "h5", "image" ]
sample_df = pd.read_csv( lst, sep="\s//", engine="python", names=cols )
# take sample - if sample required
if len( sample_df ) > sample:
# take defined sample
sample_df = sample_df.sample( sample )
# sort list
sample_df = sample_df.sort_index()
# re-add // to image columm
sample_df[ "image" ] = "//" + sample_df.image.astype(str)
# write sample to file
sample_file = "h5_{0}_sample.lst".format( sample )
sample_df.to_csv( sample_file, sep=" ", index=False, header=False )
# return sample file name
return sample_file
def geom_amend( lab6_geom_file, clen ):
# read lab6 geom
lab6_geom = open( lab6_geom_file, "r" )
# use regex to find clen and replace with new
# clen example => clen = 0.1217
clen_geom = re.sub( "clen = 0\.\d+", "clen = {0}".format( clen ), lab6_geom.read() )
# close lab6 geom file
lab6_geom.close()
# write new clen_geom to file
clen_geom_file = "{0}.geom".format( clen )
geom = open( clen_geom_file, "w" )
geom.write( clen_geom )
geom.close()
# return clen_geom file name
return clen_geom_file
def write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file, threshold ):
# crystfel file name
cryst_run_file = "{0}_run.sh".format( clen )
# write file
run_sh = open( cryst_run_file, "w" )
run_sh.write( "#!/bin/sh\n\n" )
run_sh.write( "module purge\n" )
run_sh.write( "module load crystfel/0.11.1\n" )
run_sh.write( "indexamajig -i {0} \\\n".format( sample_h5_file ) )
run_sh.write( " --output={0}.stream \\\n".format( clen ) )
run_sh.write( " --geometry={0}\\\n".format( clen_geom_file ) )
run_sh.write( " --pdb={0} \\\n".format( cell_file ) )
run_sh.write( " --indexing=mosflm --peaks=peakfinder8 \\\n" )
run_sh.write( " --threshold={0} --min-snr=5 --int-radius=3,5,9 \\\n".format( threshold ) )
run_sh.write( " -j 32 --no-multi --no-retry --max-res=3000 --min-pix-count=2 --min-res=85\n\n" )
run_sh.close()
# make file executable
subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] )
# return crystfel file name
return cryst_run_file
def make_sample( lst, sample ):
# make sample list
print("making {0} sample of images".format(sample))
sample_h5 = h5_sample(lst, sample)
sample_h5_file = "{0}/{1}".format(cwd, sample_h5)
print("done")
return sample_h5_file
def make_process_dir(proc_dir):
# make process directory
try:
os.makedirs( proc_dir )
except OSError as e:
if e.errno != errno.EEXIST:
raise
def make_step_range(centre_clen, step_size, steps):
# make list of clen steps above and below the central clen
print( "make clen array around {0}".format( centre_clen ) )
step_range = step_size*steps
bottom_step = centre_clen-step_range/2
top_step = bottom_step+step_range
step_range = np.arange( bottom_step, top_step, step_size )
step_range = step_range.round( 6 ) # important - otherwise np gives your .99999999 instead of 1 somethimes
print( "done" )
return step_range
def submit_job( job_file ):
# submit the job
submit_cmd = [ "sbatch", "-p", "day", "--cpus-per-task=32", "--", job_file ]
job_output = subprocess.check_output(submit_cmd)
# scrub job id from - example Submitted batch job 742403
pattern = r"Submitted batch job (\d+)"
job_id = re.search( pattern, job_output.decode().strip() ).group(1)
return int(job_id)
def wait_for_jobs( job_ids, total_jobs ):
with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar:
while job_ids:
completed_jobs = set()
for job_id in job_ids:
status_cmd = ["squeue", "-h", "-j", str(job_id)]
status = subprocess.check_output(status_cmd)
if not status:
completed_jobs.add(job_id)
pbar.update(1)
job_ids.difference_update(completed_jobs)
time.sleep(2)
def scrub_clen( stream_pwd ):
# get clen from stream name
# example - /sf/cristallina/data/p20590/work/process/jhb/detector_refinement/coarse_scan/0.115/0.115.stream
# scrub clen and return - else nan
try:
pattern = r"0\.\d+/(0\.\d+)\.stream"
re_search = re.search( pattern, stream_pwd )
clen = re_search.group( 1 )
if AttributeError:
return float( clen )
except AttributeError:
return 1
def find_streams( top_dir ):
# create df for streams
stream_df = pd.DataFrame()
# search for all files that end with .stream
for path, dirs, files in os.walk( top_dir ):
for name in files:
if name.endswith( ".stream" ):
# get stream pwd
stream_pwd = os.path.join( path, name )
# scrub clen from stream
clen = scrub_clen( stream_pwd )
# put clen and stream pwd into df
data = [ { "stream_pwd" : stream_pwd,
"clen" : clen
} ]
stream_df_1 = pd.DataFrame( data )
stream_df = pd.concat( ( stream_df, stream_df_1 ) )
# sort df based on clen
stream_df = stream_df.sort_values( by="clen" )
# reset df index
stream_df = stream_df.reset_index( drop=True )
# return df of streams and clens
return stream_df
def scrub_us( stream ):
# get uc values from stream file
# example - Cell parameters 7.71784 7.78870 3.75250 nm, 90.19135 90.77553 90.19243 deg
# scrub clen and return - else nan
try:
pattern = r"Cell\sparameters\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg"
cells = re.findall( pattern, stream )
if AttributeError:
return cells
except AttributeError:
return np.nan
def scrub_helper( top_dir ):
# find stream files from process directory
print( "finding stream files" )
stream_df = find_streams( top_dir )
print( "done" )
# making results df for unit cell and index no.
stats_df = pd.DataFrame()
# loop through stream files and collect unit_cell information
print( "looping through stream files to collect unit cell, indexed information" )
for index, row in stream_df.iterrows():
stream_pwd, clen = row[ "stream_pwd" ], row[ "clen" ]
# open stream file
stream = open( stream_pwd, "r" ).read()
# scrub unit cell information
cells = scrub_us( stream )
# put cells in df
cols = [ "a", "b", "c", "alpha", "beta", "gamma" ]
cells_df = pd.DataFrame( cells, columns=cols )
cells_df = cells_df.astype( float )
# calc stats
indexed = len( cells_df )
std_a = cells_df.a.std()
std_b = cells_df.b.std()
std_c = cells_df.c.std()
std_alpha = cells_df.alpha.std()
std_beta = cells_df.beta.std()
std_gamma = cells_df.gamma.std()
# put stats in results df
stats = [ { "clen" : clen,
"indexed" : indexed,
"std_a" : std_a,
"std_b" : std_b,
"std_c" : std_c,
"std_alpha" : std_alpha,
"std_beta" : std_beta,
"std_gamma" : std_gamma,
} ]
stats_df_1 = pd.DataFrame( stats )
stats_df = pd.concat( ( stats_df, stats_df_1 ) )
# reset index
stats_df = stats_df.reset_index( drop=True )
print( "done" )
return stats_df
def find_clen_values( stats_df, scan ):
def find_min_clen(col_name):
min_val = stats_df[col_name].min()
min_row = stats_df[stats_df[col_name] == min_val]
min_clen = min_row['clen'].values[0]
return min_val, min_clen
def gauss(x, *p):
A, mu, sigma = p
return A * np.exp(-(x-mu)**2/(2.*sigma**2))
p0 = [ 30, 0.111, 0.01 ]
parameters, covariance = curve_fit( gauss, stats_df.clen, stats_df.indexed, p0=p0 )
# Get the fitted curve
stats_df[ "gaus" ] = gauss( stats_df.clen, *parameters)
# find peak centre
peaks = find_peaks( stats_df.gaus.values )
# find full peak width
fwhm = peak_widths( stats_df.gaus.values, peaks[0], rel_height=0.5 )
fwhm_str = int( round( fwhm[2][0], 0 ) )
fwhm_end = int( round( fwhm[3][0], 0 ) )
# translate width into motor values
indexed_start = stats_df.iloc[ fwhm_str, 0 ]
indexed_end = stats_df.iloc[ fwhm_end, 0 ]
mid_gauss = stats_df.clen.iloc[ peaks[0] ].values[0]
# cut df to only include indexed patterns
stats_df = stats_df[ ( stats_df.clen < indexed_end ) & ( stats_df.clen > indexed_start ) ]
# calculate minimum values
min_alpha_val, min_alpha_clen = find_min_clen('std_alpha')
min_beta_val, min_beta_clen = find_min_clen('std_beta')
min_gamma_val, min_gamma_clen = find_min_clen('std_gamma')
min_c_val, min_c_clen = find_min_clen('std_c')
# find possible clens
suggested_clen = (min_alpha_clen + min_beta_clen + min_gamma_clen )/3
suggested_clen = round(suggested_clen, 4)
print( "middle of indexing gaussion fit of {0} scan = {1}".format( scan, mid_gauss ) )
print( "mean minimum of alpha, beta, gamma of {0} scan = {1}".format( scan, suggested_clen ) )
return suggested_clen
def plot_indexed_std( stats_df, ax1, ax2 ):
# indexed images plot
color = "tab:red"
ax1.set_xlabel("clen")
ax1.set_ylabel("indexed", color=color)
ax1.plot(stats_df.clen, stats_df.indexed, color=color)
ax1.tick_params(axis="y", labelcolor=color)
# label color
color = "tab:blue"
ax2.set_ylabel("a,b,c st.deviation", color=color)
ax2.tick_params(axis='y', labelcolor=color)
# std_a plot
color = "turquoise"
ax2.plot(stats_df.clen, stats_df.std_a, color=color, label="a" )
# std_b plot
color = "deepskyblue"
ax2.plot(stats_df.clen, stats_df.std_b, color=color, label="b" )
# std_c plot
color = "royalblue"
ax2.plot(stats_df.clen, stats_df.std_c, color=color, label="c" )
def plot_indexed_std_alpha_beta_gamma( stats_df, ax1, ax2 ):
# indexed images plot
color = "tab:red"
ax1.set_xlabel("clen")
ax1.set_ylabel("indexed", color=color)
ax1.plot(stats_df.clen, stats_df.indexed, color=color)
ax1.tick_params(axis="y", labelcolor=color)
# label color
color = "tab:green"
ax2.set_ylabel("alpha, beta, gamma st.deviation", color=color)
ax2.tick_params(axis='y', labelcolor=color)
# std_alpha plot
color = "yellow"
ax2.plot(stats_df.clen, stats_df.std_alpha, color=color, label="alpha" )
# std_beta plot
color = "green"
ax2.plot(stats_df.clen, stats_df.std_beta, color=color, label="beta" )
# std_gamma plot
color = "darkolivegreen"
ax2.plot(stats_df.clen, stats_df.std_gamma, color=color, label="gamma" )
def scan( cwd, lst, sample, lab6_geom_file, centre_clen, cell_file, threshold, step_size ):
# define coarse or fine scan
if step_size == "coarse":
steps = 30
step_size = 0.0005 # m
scan_name = "scan"
if step_size == "fine":
steps = 50
step_size = 0.00005 # m
scan_name = "scan"
#make sample list
sample_h5_file = make_sample(lst, sample)
# make list of clen steps above and below the central clen
step_range = make_step_range(centre_clen, step_size, steps)
# submitted job set and job_list
submitted_job_ids = set()
job_list = []
# make directorys for results
print( "begin CrystFEL anaylsis of different clens" )
# loop to cycle through clen steps
for clen in step_range:
# define process directory
proc_dir = "{0}/{1}/{2}".format( cwd, scan_name, clen )
# make process directory
make_process_dir(proc_dir)
# move to process directory
os.chdir( proc_dir )
# make geom file
clen_geom_file = geom_amend( lab6_geom_file, clen )
# make crystfel run file
cryst_run_file = write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file, threshold )
# run crystfel file
job_list.append( cryst_run_file )
job_id = submit_job( cryst_run_file )
submitted_job_ids.add( job_id )
# move back to cwd
os.chdir( cwd )
#wait for jobs to complete
wait_for_jobs(submitted_job_ids, len(job_list))
print("slurm processing done")
def scrub_scan( scan_top_dir, scan ):
stats_df = scrub_helper(scan_top_dir)
# calculate suggested clen and make plot
suggested_clen = find_clen_values( stats_df, scan )
# plot results
fig, (ax1, ax3) = plt.subplots(1, 2)
ax2 = ax1.twinx()
ax4 = ax3.twinx()
plot_indexed_std(stats_df, ax1, ax2)
plot_indexed_std_alpha_beta_gamma(stats_df, ax3, ax4)
fig.legend(loc="upper center")
fig.tight_layout()
plt.savefig("{0}.png".format(scan))
return suggested_clen
def main( cwd, lst, sample, geom, centre_clen, cell_file, threshold ):
# run scan
if not os.path.isdir( "{0}/scan".format( cwd ) ):
# run inital coarse scan
scan( cwd, lst, sample, geom, centre_clen, cell_file, threshold, "coarse" )
# get approx centre from new results
coarse_clen = scrub_scan( cwd, scan="scan" )
# perfom 2nd scan
scan( cwd, lst, sample, geom, coarse_clen, cell_file, threshold, "fine" )
else:
print( "scan already performed" )
# check results
suggested_clen = scrub_scan( cwd, scan="scan" )
return suggested_clen
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-l",
"--lst",
help="path to crystfel list file containing enough patterns for detector distance refinement",
type=os.path.abspath,
required=True
)
parser.add_argument(
"-g",
"--geom",
help="path to geom file to be used in the refinement",
type=os.path.abspath,
required=True
)
parser.add_argument(
"-d",
"--central_distance",
help="intial clen to use for refinement - usually from detector shift refinement",
type=float,
required=True
)
parser.add_argument(
"-c",
"--cell_file",
help="path to cell file of the crystals used in the refinement",
type=os.path.abspath,
required=True
)
parser.add_argument(
"-s",
"--sample",
help="sample size to use in the refinement",
type=int,
default=500
)
parser.add_argument(
"-e",
"--endstation",
help="which endstation did you collect these data from, e.g., alvra or cristallina",
type=str,
default="cristallina"
)
args = parser.parse_args()
# set threshold based on endstation
if args.endstation == "alvra":
threshold = 3000
elif args.endstation == "cristallina":
threshold = 10
else:
print( "you must say which beamline you collected the data on, alvra or cristallina, to set the threshold value correctly for crystfel" )
exit()
# run main
cwd = os.getcwd()
print( "top working directory = {0}".format( cwd ) )
main( cwd, args.lst, args.sample, args.geom, args.central_distance, args.cell_file, threshold )