diff --git a/reduction_tools/crystfel_split_var.py b/reduction_tools/crystfel_split_var.py new file mode 100644 index 0000000..6e309a7 --- /dev/null +++ b/reduction_tools/crystfel_split_var.py @@ -0,0 +1,303 @@ +#!/usr/bin/python + +# author J.Beale + +""" +# aim +to process a batch of data very fast by splitting it into a number of chunks and submitting +these jobs separately to the cluster + +# usage +python crystfel_split.py -l + -k -default 1000 + -g + -c + -n -default split +# note -p progress bar (True/False) -default True + -t crystfel threshold -default 10 + -s crystfel min-snr -default 5 + -i crystfel int-radius -default 3,5,9 + -m crystfel multi or no-multi (True/False) -default False (no-multi) + -r crystfel retry or no-retry (True/False) -default False (no-retry) + -x crystfel min-pix-count -default 2 + +# output +a series of stream files from crystfel in the current working directory +""" + +# modules +import pandas as pd +import subprocess +import os, errno +import time +import argparse +from tqdm import tqdm +import regex as re + +def h5_split( lst, chunk_size ): + + # read h5.lst - note - removes // from image column + # scrub file name + lst_name = os.path.basename( lst ) + + cols = [ "h5", "image" ] + df = pd.read_csv( lst, sep="\s//", engine="python", names=cols ) + + # re-add // to image columm and drop other columns + df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str ) + df = df[ [ "h5_path" ] ] + + # split df into a lst + list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)] + + return list_df + +def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, + geom_file, cell_file, threshold, min_snr, + int_rad, multi, retry, min_pix ): + + # stream file name + stream_file = "{0}_{1}.stream".format( name, chunk ) + + # crystfel file name + cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk ) + # write file + run_sh = open( cryst_run_file, "w" ) + run_sh.write( "#!/bin/sh\n\n" ) + run_sh.write( "module purge\n" ) + run_sh.write( "module load crystfel/0.10.2\n" ) + run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) ) + run_sh.write( " --output={0} \\\n".format( stream_file ) ) + run_sh.write( " --geometry={0} \\\n".format( geom_file ) ) + run_sh.write( " --pdb={0} \\\n".format( cell_file ) ) + run_sh.write( " --indexing=xgandalf-latt-cell --peaks=peakfinder8 \\\n" ) + run_sh.write( " --threshold={0} \\\n".format( threshold ) ) + run_sh.write( " --min-snr={0} \\\n".format( min_snr ) ) + run_sh.write( " --int-radius={0},{1},{2} \\\n".format( int_rad[0], int_rad[1], int_rad[2] ) ) + run_sh.write( " -j 32 --max-res=3000\\\n" ) + run_sh.write( " --{0} \\\n".format( multi ) ) + run_sh.write( " --{0} \\\n".format( retry ) ) + run_sh.write( " --min-pix-count={0}\\\n".format( min_pix ) ) + run_sh.write( " --min-res=85\\\n" ) + run_sh.close() + + # make file executable + subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] ) + + # return crystfel file name + return cryst_run_file, stream_file + +def make_process_dir( proc_dir ): + # make process directory + try: + os.makedirs( proc_dir ) + except OSError as e: + if e.errno != errno.EEXIST: + raise + +def submit_job( job_file ): + + # submit the job + submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file] + job_output = subprocess.check_output(submit_cmd) + + # scrub job id from - example Submitted batch job 742403 + pattern = r"Submitted batch job (\d+)" + job_id = re.search( pattern, job_output.decode().strip() ).group(1) + + return int(job_id) + +def wait_for_jobs( job_ids, total_jobs ): + + with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: + while job_ids: + completed_jobs = set() + for job_id in job_ids: + status_cmd = ["squeue", "-h", "-j", str(job_id)] + status = subprocess.check_output(status_cmd) + if not status: + completed_jobs.add(job_id) + pbar.update(1) + job_ids.difference_update(completed_jobs) + time.sleep(10) + +def run_splits( cwd, name, lst, chunk_size, geom_file, + cell_file, progress, threshold, min_snr, + int_rad, multi, retry, min_pix ): + + print( "reading SwissFEL lst file" ) + print( "creating {0} image chunks of lst".format( chunk_size ) ) + list_df = h5_split( lst, chunk_size ) + print( "DONE" ) + + # set chunk counter + chunk = 0 + + # submitted job set + submitted_job_ids = set() + + # stream file list + stream_lst = [] + + print( "creating crystfel jobs for individual chunks" ) + for chunk_lst in list_df: + + print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) ) + # define process directory + proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk ) + + # make process directory + make_process_dir(proc_dir) + + # move to process directory + os.chdir( proc_dir ) + + # write list to file + chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk ) + chunk_lst.to_csv( chunk_lst_file, index=False, header=False ) + + # write crystfel file and append path to list + cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, + geom_file, cell_file, threshold, min_snr, + int_rad, multi, retry, min_pix ) + stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) ) + + # submit jobs + job_id = submit_job( cryst_run_file ) + print(f"Job submitted: { job_id }") + submitted_job_ids.add( job_id ) + + # increase chunk counter + chunk = chunk +1 + + # move back to top dir + os.chdir( cwd ) + + print( "DONE" ) + + # include progress bar if required + if progress==True: + wait_for_jobs(submitted_job_ids, chunk) + print("slurm processing done") + + # make composite .stream file + output_file = "{0}.stream".format( name ) + + print( "comp" ) + try: + # Open the output file in 'append' mode + with open(output_file, "a") as output: + for file_name in stream_lst: + try: + with open(file_name, "r") as input_file: + # Read the contents of the input file and append to the output file + output.write(input_file.read()) + print(f"Appended contents from {file_name} to {output_file}") + except FileNotFoundError: + print(f"File {file_name} not found. Skipping.") + except IOError as e: + print(f"An error occurred while appending files: {e}") + + print( "DONE" ) + +def list_of_ints(arg): + return list(map(int, arg.split(','))) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "-l", + "--lst_file", + help="file from SwissFEL output to be processed quickly", + type=os.path.abspath + ) + parser.add_argument( + "-k", + "--chunk_size", + help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", + type=int, + default=1000 + ) + parser.add_argument( + "-g", + "--geom_file", + help="path to geom file to be used in the refinement", + type=os.path.abspath + ) + parser.add_argument( + "-c", + "--cell_file", + help="path to cell file of the crystals used in the refinement", + type=os.path.abspath + ) + parser.add_argument( + "-n", + "--job_name", + help="the name of the job to be done", + type=str, + default="split" + ) + parser.add_argument( + "-p", + "--progress", + help="gives you the option of also having a progress bar", + type=bool, + default=True + ) + parser.add_argument( + "-t", + "--threshold", + help="threshold for crystfel run - peaks must be above this to be found", + type=int, + default=10 + ) + parser.add_argument( + "-s", + "--min_snr", + help="min-snr for crystfel run - peaks must to above this to be counted", + type=int, + default=5 + ) + parser.add_argument( + "-i", + "--int_radius", + help="int_rad for crystfel run - peaks must to above this to be counted", + type=list_of_ints, + default=[3,5,9] + ) + parser.add_argument( + "-m", + "--multi", + help="multi crystfel flag, do you wnat to look for multiple lattices", + type=bool, + default=False + ) + parser.add_argument( + "-r", + "--retry", + help="retry crystfel flag, do you want to retry failed indexing patterns", + type=bool, + default=False + ) + parser.add_argument( + "-x", + "--min_pix", + help="min-pix-count for crystfel runs, minimum number of pixels a spot should contain in peak finding", + type=int, + default=2 + ) + args = parser.parse_args() + # run geom converter + cwd = os.getcwd() + if args.multi == True: + multi = "multi" + else: + multi = "no-multi" + if args.retry == True: + retry = "retry" + else: + retry = "no-retry" + run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, + args.geom_file, args.cell_file, args.progress, + args.threshold, args.min_snr, args.int_radius, + multi, retry, args.min_pix )