Files
crystfel_tools/reduction_tools/crystfel_split.py
2023-09-26 17:06:30 +02:00

240 lines
7.4 KiB
Python

#!/usr/bin/python
# author J.Beale
"""
# aim
to process a batch of data very fast by splitting it into a number of chunks and submitting
these jobs separately to the cluster
# usage
python crystfel_split.py -l <path-to-list-file>
-k <chunk-size>
-g <path-to-geom-file>
-c <path-to-cell-file>
-n <name-of-job>
# crystfel parameter may need some editing in the function - write_crystfel_run
# output
a series of stream files from crystfel in the current working directory
"""
# modules
import pandas as pd
import subprocess
import os, errno
import time
import argparse
from tqdm import tqdm
import regex as re
def h5_split( lst, chunk_size ):
# read h5.lst - note - removes // from image column
# scrub file name
lst_name = os.path.basename( lst )
cols = [ "h5", "image" ]
df = pd.read_csv( lst, sep="\s//", engine="python", names=cols )
# re-add // to image columm and drop other columns
df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str )
df = df[ [ "h5_path" ] ]
# split df into a lst
list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)]
return list_df
def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ):
"""
crystfel run file - spot-finding and indexing parameters may need some editing
only change from inside the quote ("")
"""
# stream file name
stream_file = "{0}_{1}.stream".format( name, chunk )
# crystfel file name
cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk )
# write file
run_sh = open( cryst_run_file, "w" )
run_sh.write( "#!/bin/sh\n\n" )
run_sh.write( "module purge\n" )
run_sh.write( "module load crystfel/0.10.2\n" )
run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) )
run_sh.write( " --output={0} \\\n".format( stream_file ) )
run_sh.write( " --geometry={0} \\\n".format( geom_file ) )
run_sh.write( " --pdb={0} \\\n".format( cell_file ) )
run_sh.write( " --indexing=xgandalf-latt-cell \\\n" )
run_sh.write( " --peaks=peakfinder8 \\\n" )
run_sh.write( " --integration=rings-grad \\\n" )
run_sh.write( " --tolerance=10.0,10.0,10.0,2,3,2 \\\n" )
run_sh.write( " --threshold=10 \\\n" )
run_sh.write( " --min-snr=5 \\\n" )
run_sh.write( " --int-radius=3,5,9 \\\n" )
run_sh.write( " -j 32 \\\n" )
run_sh.write( " --no-multi \\\n" )
run_sh.write( " --check-peaks \\\n" )
run_sh.write( " --no-retry \\\n" )
run_sh.write( " --max-res=3000 \\\n" )
run_sh.write( " --min-pix-count=2 \\\n" )
run_sh.write( " --local-bg-radius=4 \\\n" )
run_sh.write( " --min-res=85" )
run_sh.close()
# make file executable
subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] )
# return crystfel file name
return cryst_run_file, stream_file
def make_process_dir( proc_dir ):
# make process directory
try:
os.makedirs( proc_dir )
except OSError as e:
if e.errno != errno.EEXIST:
raise
def submit_job( job_file ):
# submit the job
submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file]
job_output = subprocess.check_output(submit_cmd)
# scrub job id from - example Submitted batch job 742403
pattern = r"Submitted batch job (\d+)"
job_id = re.search( pattern, job_output.decode().strip() ).group(1)
return int(job_id)
def wait_for_jobs( job_ids, total_jobs ):
with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar:
while job_ids:
completed_jobs = set()
for job_id in job_ids:
status_cmd = ["squeue", "-h", "-j", str(job_id)]
status = subprocess.check_output(status_cmd)
if not status:
completed_jobs.add(job_id)
pbar.update(1)
job_ids.difference_update(completed_jobs)
time.sleep(30)
def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ):
print( "reading SwissFEL lst file" )
print( "creating {0} image chunks of lst".format( chunk_size ) )
list_df = h5_split( lst, chunk_size )
print( "DONE" )
# set chunk counter
chunk = 0
# submitted job set
submitted_job_ids = set()
# stream file list
stream_lst = []
print( "creating crystfel jobs for individual chunks" )
for chunk_lst in list_df:
print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) )
# define process directory
proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk )
# make process directory
make_process_dir(proc_dir)
# move to process directory
os.chdir( proc_dir )
# write list to file
chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk )
chunk_lst.to_csv( chunk_lst_file, index=False, header=False )
# write crystfel file and append path to list
cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file )
stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) )
# submit jobs
job_id = submit_job( cryst_run_file )
print(f"Job submitted: { job_id }")
submitted_job_ids.add( job_id )
# increase chunk counter
chunk = chunk +1
# move back to top dir
os.chdir( cwd )
print( "DONE" )
# include progress bar if required
wait_for_jobs(submitted_job_ids, chunk)
print("slurm processing done")
# make composite .stream file
output_file = "{0}.stream".format( name )
try:
# Open the output file in 'append' mode
with open(output_file, "a") as output:
for file_name in stream_lst:
try:
with open(file_name, "r") as input_file:
# Read the contents of the input file and append to the output file
output.write(input_file.read())
print(f"Appended contents from {file_name} to {output_file}")
except FileNotFoundError:
print(f"File {file_name} not found. Skipping.")
except IOError as e:
print(f"An error occurred while appending files: {e}")
print( "DONE" )
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-l",
"--lst_file",
help="file from SwissFEL output to be processed quickly",
type=os.path.abspath
)
parser.add_argument(
"-k",
"--chunk_size",
help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be",
type=int,
default=2500
)
parser.add_argument(
"-g",
"--geom_file",
help="path to geom file to be used in the refinement",
type=os.path.abspath
)
parser.add_argument(
"-c",
"--cell_file",
help="path to cell file of the crystals used in the refinement",
type=os.path.abspath
)
parser.add_argument(
"-n",
"--job_name",
help="the name of the job to be done",
type=str,
default="split"
)
args = parser.parse_args()
# run geom converter
cwd = os.getcwd()
run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file )