same as split script but can change crystfel parameters from command line
This commit is contained in:
303
reduction_tools/crystfel_split_var.py
Normal file
303
reduction_tools/crystfel_split_var.py
Normal file
@@ -0,0 +1,303 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
# author J.Beale
|
||||
|
||||
"""
|
||||
# aim
|
||||
to process a batch of data very fast by splitting it into a number of chunks and submitting
|
||||
these jobs separately to the cluster
|
||||
|
||||
# usage
|
||||
python crystfel_split.py -l <path-to-list-file>
|
||||
-k <chunk-size> -default 1000
|
||||
-g <path-to-geom-file>
|
||||
-c <path-to-cell-file>
|
||||
-n <job-name> -default split
|
||||
# note -p progress bar (True/False) -default True
|
||||
-t crystfel threshold -default 10
|
||||
-s crystfel min-snr -default 5
|
||||
-i crystfel int-radius -default 3,5,9
|
||||
-m crystfel multi or no-multi (True/False) -default False (no-multi)
|
||||
-r crystfel retry or no-retry (True/False) -default False (no-retry)
|
||||
-x crystfel min-pix-count -default 2
|
||||
|
||||
# output
|
||||
a series of stream files from crystfel in the current working directory
|
||||
"""
|
||||
|
||||
# modules
|
||||
import pandas as pd
|
||||
import subprocess
|
||||
import os, errno
|
||||
import time
|
||||
import argparse
|
||||
from tqdm import tqdm
|
||||
import regex as re
|
||||
|
||||
def h5_split( lst, chunk_size ):
|
||||
|
||||
# read h5.lst - note - removes // from image column
|
||||
# scrub file name
|
||||
lst_name = os.path.basename( lst )
|
||||
|
||||
cols = [ "h5", "image" ]
|
||||
df = pd.read_csv( lst, sep="\s//", engine="python", names=cols )
|
||||
|
||||
# re-add // to image columm and drop other columns
|
||||
df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str )
|
||||
df = df[ [ "h5_path" ] ]
|
||||
|
||||
# split df into a lst
|
||||
list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)]
|
||||
|
||||
return list_df
|
||||
|
||||
def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file,
|
||||
geom_file, cell_file, threshold, min_snr,
|
||||
int_rad, multi, retry, min_pix ):
|
||||
|
||||
# stream file name
|
||||
stream_file = "{0}_{1}.stream".format( name, chunk )
|
||||
|
||||
# crystfel file name
|
||||
cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk )
|
||||
# write file
|
||||
run_sh = open( cryst_run_file, "w" )
|
||||
run_sh.write( "#!/bin/sh\n\n" )
|
||||
run_sh.write( "module purge\n" )
|
||||
run_sh.write( "module load crystfel/0.10.2\n" )
|
||||
run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) )
|
||||
run_sh.write( " --output={0} \\\n".format( stream_file ) )
|
||||
run_sh.write( " --geometry={0} \\\n".format( geom_file ) )
|
||||
run_sh.write( " --pdb={0} \\\n".format( cell_file ) )
|
||||
run_sh.write( " --indexing=xgandalf-latt-cell --peaks=peakfinder8 \\\n" )
|
||||
run_sh.write( " --threshold={0} \\\n".format( threshold ) )
|
||||
run_sh.write( " --min-snr={0} \\\n".format( min_snr ) )
|
||||
run_sh.write( " --int-radius={0},{1},{2} \\\n".format( int_rad[0], int_rad[1], int_rad[2] ) )
|
||||
run_sh.write( " -j 32 --max-res=3000\\\n" )
|
||||
run_sh.write( " --{0} \\\n".format( multi ) )
|
||||
run_sh.write( " --{0} \\\n".format( retry ) )
|
||||
run_sh.write( " --min-pix-count={0}\\\n".format( min_pix ) )
|
||||
run_sh.write( " --min-res=85\\\n" )
|
||||
run_sh.close()
|
||||
|
||||
# make file executable
|
||||
subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] )
|
||||
|
||||
# return crystfel file name
|
||||
return cryst_run_file, stream_file
|
||||
|
||||
def make_process_dir( proc_dir ):
|
||||
# make process directory
|
||||
try:
|
||||
os.makedirs( proc_dir )
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise
|
||||
|
||||
def submit_job( job_file ):
|
||||
|
||||
# submit the job
|
||||
submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file]
|
||||
job_output = subprocess.check_output(submit_cmd)
|
||||
|
||||
# scrub job id from - example Submitted batch job 742403
|
||||
pattern = r"Submitted batch job (\d+)"
|
||||
job_id = re.search( pattern, job_output.decode().strip() ).group(1)
|
||||
|
||||
return int(job_id)
|
||||
|
||||
def wait_for_jobs( job_ids, total_jobs ):
|
||||
|
||||
with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar:
|
||||
while job_ids:
|
||||
completed_jobs = set()
|
||||
for job_id in job_ids:
|
||||
status_cmd = ["squeue", "-h", "-j", str(job_id)]
|
||||
status = subprocess.check_output(status_cmd)
|
||||
if not status:
|
||||
completed_jobs.add(job_id)
|
||||
pbar.update(1)
|
||||
job_ids.difference_update(completed_jobs)
|
||||
time.sleep(10)
|
||||
|
||||
def run_splits( cwd, name, lst, chunk_size, geom_file,
|
||||
cell_file, progress, threshold, min_snr,
|
||||
int_rad, multi, retry, min_pix ):
|
||||
|
||||
print( "reading SwissFEL lst file" )
|
||||
print( "creating {0} image chunks of lst".format( chunk_size ) )
|
||||
list_df = h5_split( lst, chunk_size )
|
||||
print( "DONE" )
|
||||
|
||||
# set chunk counter
|
||||
chunk = 0
|
||||
|
||||
# submitted job set
|
||||
submitted_job_ids = set()
|
||||
|
||||
# stream file list
|
||||
stream_lst = []
|
||||
|
||||
print( "creating crystfel jobs for individual chunks" )
|
||||
for chunk_lst in list_df:
|
||||
|
||||
print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) )
|
||||
# define process directory
|
||||
proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk )
|
||||
|
||||
# make process directory
|
||||
make_process_dir(proc_dir)
|
||||
|
||||
# move to process directory
|
||||
os.chdir( proc_dir )
|
||||
|
||||
# write list to file
|
||||
chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk )
|
||||
chunk_lst.to_csv( chunk_lst_file, index=False, header=False )
|
||||
|
||||
# write crystfel file and append path to list
|
||||
cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file,
|
||||
geom_file, cell_file, threshold, min_snr,
|
||||
int_rad, multi, retry, min_pix )
|
||||
stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) )
|
||||
|
||||
# submit jobs
|
||||
job_id = submit_job( cryst_run_file )
|
||||
print(f"Job submitted: { job_id }")
|
||||
submitted_job_ids.add( job_id )
|
||||
|
||||
# increase chunk counter
|
||||
chunk = chunk +1
|
||||
|
||||
# move back to top dir
|
||||
os.chdir( cwd )
|
||||
|
||||
print( "DONE" )
|
||||
|
||||
# include progress bar if required
|
||||
if progress==True:
|
||||
wait_for_jobs(submitted_job_ids, chunk)
|
||||
print("slurm processing done")
|
||||
|
||||
# make composite .stream file
|
||||
output_file = "{0}.stream".format( name )
|
||||
|
||||
print( "comp" )
|
||||
try:
|
||||
# Open the output file in 'append' mode
|
||||
with open(output_file, "a") as output:
|
||||
for file_name in stream_lst:
|
||||
try:
|
||||
with open(file_name, "r") as input_file:
|
||||
# Read the contents of the input file and append to the output file
|
||||
output.write(input_file.read())
|
||||
print(f"Appended contents from {file_name} to {output_file}")
|
||||
except FileNotFoundError:
|
||||
print(f"File {file_name} not found. Skipping.")
|
||||
except IOError as e:
|
||||
print(f"An error occurred while appending files: {e}")
|
||||
|
||||
print( "DONE" )
|
||||
|
||||
def list_of_ints(arg):
|
||||
return list(map(int, arg.split(',')))
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"-l",
|
||||
"--lst_file",
|
||||
help="file from SwissFEL output to be processed quickly",
|
||||
type=os.path.abspath
|
||||
)
|
||||
parser.add_argument(
|
||||
"-k",
|
||||
"--chunk_size",
|
||||
help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be",
|
||||
type=int,
|
||||
default=1000
|
||||
)
|
||||
parser.add_argument(
|
||||
"-g",
|
||||
"--geom_file",
|
||||
help="path to geom file to be used in the refinement",
|
||||
type=os.path.abspath
|
||||
)
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--cell_file",
|
||||
help="path to cell file of the crystals used in the refinement",
|
||||
type=os.path.abspath
|
||||
)
|
||||
parser.add_argument(
|
||||
"-n",
|
||||
"--job_name",
|
||||
help="the name of the job to be done",
|
||||
type=str,
|
||||
default="split"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--progress",
|
||||
help="gives you the option of also having a progress bar",
|
||||
type=bool,
|
||||
default=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--threshold",
|
||||
help="threshold for crystfel run - peaks must be above this to be found",
|
||||
type=int,
|
||||
default=10
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--min_snr",
|
||||
help="min-snr for crystfel run - peaks must to above this to be counted",
|
||||
type=int,
|
||||
default=5
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--int_radius",
|
||||
help="int_rad for crystfel run - peaks must to above this to be counted",
|
||||
type=list_of_ints,
|
||||
default=[3,5,9]
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--multi",
|
||||
help="multi crystfel flag, do you wnat to look for multiple lattices",
|
||||
type=bool,
|
||||
default=False
|
||||
)
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--retry",
|
||||
help="retry crystfel flag, do you want to retry failed indexing patterns",
|
||||
type=bool,
|
||||
default=False
|
||||
)
|
||||
parser.add_argument(
|
||||
"-x",
|
||||
"--min_pix",
|
||||
help="min-pix-count for crystfel runs, minimum number of pixels a spot should contain in peak finding",
|
||||
type=int,
|
||||
default=2
|
||||
)
|
||||
args = parser.parse_args()
|
||||
# run geom converter
|
||||
cwd = os.getcwd()
|
||||
if args.multi == True:
|
||||
multi = "multi"
|
||||
else:
|
||||
multi = "no-multi"
|
||||
if args.retry == True:
|
||||
retry = "retry"
|
||||
else:
|
||||
retry = "no-retry"
|
||||
run_splits( cwd, args.job_name, args.lst_file, args.chunk_size,
|
||||
args.geom_file, args.cell_file, args.progress,
|
||||
args.threshold, args.min_snr, args.int_radius,
|
||||
multi, retry, args.min_pix )
|
||||
Reference in New Issue
Block a user