From fad0f3b518169f0dac8415c2a3f8f732bd4aeddf Mon Sep 17 00:00:00 2001 From: Beale John Henry Date: Wed, 19 Jul 2023 13:58:43 +0200 Subject: [PATCH] a python script to split the processing of large run files --- clen_tools/crystfel_chunk.py | 207 +++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 clen_tools/crystfel_chunk.py diff --git a/clen_tools/crystfel_chunk.py b/clen_tools/crystfel_chunk.py new file mode 100644 index 0000000..cfcac11 --- /dev/null +++ b/clen_tools/crystfel_chunk.py @@ -0,0 +1,207 @@ +#!/usr/bin/python + +# author J.Beale + +""" +# aim +to process a batch of data very fast by splitting it into a number of chunks and submitting +these jobs separately to the cluster + +# usage +python crystfel_split.py -l + -k + -g + -c +# note -p True/False will make a progress bar + +# output +a series of stream files from crystfel in the current working directory +""" + +# modules +import pandas as pd +import subprocess +import os, errno +import time +import argparse +from tqdm import tqdm +import regex as re + +def h5_split( lst, chunk_size ): + + # read h5.lst - note - removes // from image column + # scrub file name + lst_name = os.path.basename( lst ) + + cols = [ "h5", "image" ] + df = pd.read_csv( lst, sep="\s//", engine="python", names=cols ) + + # re-add // to image columm and drop other columns + df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str ) + df = df[ [ "h5_path" ] ] + + # split df into a lst + list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)] + + return list_df + +def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ): + + # crystfel file name + cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk ) + # write file + run_sh = open( cryst_run_file, "w" ) + run_sh.write( "#!/bin/sh\n\n" ) + run_sh.write( "module purge\n" ) + run_sh.write( "module load crystfel/0.10.2\n" ) + run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) ) + run_sh.write( " --output={0}_{1}.stream \\\n".format( name, chunk ) ) + run_sh.write( " --geometry={0}\\\n".format( geom_file ) ) + run_sh.write( " --pdb={0} \\\n".format( cell_file ) ) + run_sh.write( " --indexing=xgandalf-latt-cell --peaks=peakfinder8 \\\n" ) + run_sh.write( " --threshold=15 --min-snr=10 --int-radius=3,5,9 \\\n" ) + run_sh.write( " -j 36 --no-multi --no-retry --max-res=3000 --min-pix-count=2 --min-res=85\\\n" ) + run_sh.write( " --temp-dir={0}".format( proc_dir ) ) + run_sh.close() + + # make file executable + subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] ) + + # return crystfel file name + return cryst_run_file + +def make_process_dir( proc_dir ): + # make process directory + try: + os.makedirs( proc_dir ) + except OSError as e: + if e.errno != errno.EEXIST: + raise + +def submit_job( job_file, slurm_output_dir ): + + # submit the job + submit_cmd = ["sbatch", job_file] + job_output = subprocess.check_output(submit_cmd) + + # scrub job id from - example Submitted batch job 742403 + pattern = r"Submitted batch job (\d+)" + job_id = re.search( pattern, job_output.decode().strip() ).group(1) + + return int(job_id) + +def wait_for_jobs( job_ids, total_jobs ): + + with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: + while job_ids: + completed_jobs = set() + for job_id in job_ids: + status_cmd = ["squeue", "-h", "-j", str(job_id)] + status = subprocess.check_output(status_cmd) + if not status: + completed_jobs.add(job_id) + pbar.update(1) + job_ids.difference_update(completed_jobs) + time.sleep(30) + +def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file, progress ): + + print( "reading SwissFEL lst file" ) + print( "creating {0} image chunks of lst".format( chunk_size ) ) + list_df = h5_split( lst, chunk_size ) + print( "DONE" ) + + # set chunk counter + chunk = 0 + + # create job list + job_list = [] + + print( "creating crystfel jobs for individual chunks" ) + for chunk_lst in list_df: + + print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) ) + # define process directory + proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk ) + + # make process directory + make_process_dir(proc_dir) + + # write list to file + chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk ) + chunk_lst.to_csv( chunk_lst_file, index=False, header=False ) + + # write crystfel file + cryst_run_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ) + + # add crystfel run file to list + job_list.append( cryst_run_file ) + + # increase chunk counter + chunk = chunk +1 + print( "DONE" ) + + # submitted job set + submitted_job_ids = set() + slurm_output_dir = "{0}/{1}".format( cwd, name, chunk ) + # submit jobs + for job_file in job_list: + job_id = submit_job( job_file, slurm_output_dir ) + submitted_job_ids.add( job_id ) + print(f"Job submitted: { job_id }") + + # include progress bar if required + if progress==True: + wait_for_jobs(submitted_job_ids, len(job_list)) + print("slurm processing done") + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "-l", + "--lst_file", + help="file from SwissFEL output to be processed quickly", + type=os.path.abspath + ) + parser.add_argument( + "-k", + "--chunk_size", + help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", + type=int, + default=1000 + ) + parser.add_argument( + "-g", + "--geom_file", + help="path to geom file to be used in the refinement", + type=os.path.abspath + ) + parser.add_argument( + "-c", + "--cell_file", + help="path to cell file of the crystals used in the refinement", + type=os.path.abspath + ) + parser.add_argument( + "-n", + "--job_name", + help="the name of the job to be done", + type=str, + default="split" + ) + parser.add_argument( + "-p", + "--progress", + help="gives you the option of also having a progress bar", + type=bool, + default=True + ) + args = parser.parse_args() + # run geom converter + cwd = os.getcwd() + run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, args.progress ) + + + + +