#!/usr/bin/python # author J.Beale """ # aim to process a batch of data very fast by splitting it into a number of chunks and submitting these jobs separately to the cluster - but now all with the ability to change crystfel parameters from the command line # usage python crystfel_split.py -l -k -default 1000 -g -c -n -default split -t crystfel threshold -default 10 -s crystfel min-snr -default 5 -i crystfel int-radius -default 3,5,9 -m crystfel multi or no-multi (True/False) -default False (no-multi) -r crystfel retry or no-retry (True/False) -default False (no-retry) -x crystfel min-pix-count -default 2 # output a series of stream files from crystfel in the current working directory """ # modules import pandas as pd import subprocess import os, errno import time import argparse from tqdm import tqdm import regex as re def h5_split( lst, chunk_size ): # read h5.lst - note - removes // from image column # scrub file name lst_name = os.path.basename( lst ) cols = [ "h5", "image" ] df = pd.read_csv( lst, sep="\s//", engine="python", names=cols ) # re-add // to image columm and drop other columns df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str ) df = df[ [ "h5_path" ] ] # split df into a lst list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)] return list_df def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file, threshold, min_snr, int_rad, multi, retry, min_pix, bg_rad, min_res ): # stream file name stream_file = "{0}_{1}.stream".format( name, chunk ) # crystfel file name cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk ) # write file run_sh = open( cryst_run_file, "w" ) run_sh.write( "#!/bin/sh\n\n" ) run_sh.write( "module purge\n" ) run_sh.write( "module use MX unstable\n" ) run_sh.write( "module load crystfel/0.10.2-rhel8\n" ) run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) ) run_sh.write( " --output={0} \\\n".format( stream_file ) ) run_sh.write( " --geometry={0} \\\n".format( geom_file ) ) run_sh.write( " --pdb={0} \\\n".format( cell_file ) ) run_sh.write( " --indexing=xgandalf-latt-cell \\\n" ) run_sh.write( " --peaks=peakfinder8 \\\n" ) run_sh.write( " --integration=rings-grad \\\n" ) run_sh.write( " --tolerance=10.0,10.0,10.0,2,3,2 \\\n" ) run_sh.write( " --threshold={0} \\\n".format( threshold ) ) run_sh.write( " --min-snr={0} \\\n".format( min_snr ) ) run_sh.write( " --int-radius={0},{1},{2} \\\n".format( int_rad[0], int_rad[1], int_rad[2] ) ) run_sh.write( " -j 32 \\\n" ) run_sh.write( " --{0} \\\n".format( multi ) ) run_sh.write( " --check-peaks \\\n" ) run_sh.write( " --{0} \\\n".format( retry ) ) run_sh.write( " --max-res=3000 \\\n" ) run_sh.write( " --min-pix-count={0} \\\n".format( min_pix ) ) run_sh.write( " --local-bg-radius={0} \\\n".format( bg_rad ) ) run_sh.write( " --min-res={0}".format( min_res ) ) run_sh.close() # make file executable subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] ) # return crystfel file name return cryst_run_file, stream_file def make_process_dir( proc_dir ): # make process directory try: os.makedirs( proc_dir ) except OSError as e: if e.errno != errno.EEXIST: raise def submit_job( job_file ): # submit the job submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file] job_output = subprocess.check_output(submit_cmd) # scrub job id from - example Submitted batch job 742403 pattern = r"Submitted batch job (\d+)" job_id = re.search( pattern, job_output.decode().strip() ).group(1) return int(job_id) def wait_for_jobs( job_ids, total_jobs ): with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: while job_ids: completed_jobs = set() for job_id in job_ids: status_cmd = ["squeue", "-h", "-j", str(job_id)] status = subprocess.check_output(status_cmd) if not status: completed_jobs.add(job_id) pbar.update(1) job_ids.difference_update(completed_jobs) time.sleep(10) def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file, progress, threshold, min_snr, int_rad, multi, retry, min_pix ): print( "reading SwissFEL lst file" ) print( "creating {0} image chunks of lst".format( chunk_size ) ) list_df = h5_split( lst, chunk_size ) print( "DONE" ) # set chunk counter chunk = 0 # submitted job set submitted_job_ids = set() # stream file list stream_lst = [] print( "creating crystfel jobs for individual chunks" ) for chunk_lst in list_df: print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) ) # define process directory proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk ) # make process directory make_process_dir(proc_dir) # move to process directory os.chdir( proc_dir ) # write list to file chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk ) chunk_lst.to_csv( chunk_lst_file, index=False, header=False ) # write crystfel file and append path to list cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file, threshold, min_snr, int_rad, multi, retry, min_pix ) stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) ) # submit jobs job_id = submit_job( cryst_run_file ) print(f"Job submitted: { job_id }") submitted_job_ids.add( job_id ) # increase chunk counter chunk = chunk +1 # move back to top dir os.chdir( cwd ) print( "DONE" ) wait_for_jobs(submitted_job_ids, chunk) print("slurm processing done") # make composite .stream file output_file = "{0}.stream".format( name ) print( "comp" ) try: # Open the output file in 'append' mode with open(output_file, "a") as output: for file_name in stream_lst: try: with open(file_name, "r") as input_file: # Read the contents of the input file and append to the output file output.write(input_file.read()) print(f"Appended contents from {file_name} to {output_file}") except FileNotFoundError: print(f"File {file_name} not found. Skipping.") except IOError as e: print(f"An error occurred while appending files: {e}") print( "DONE" ) def list_of_ints(arg): return list(map(int, arg.split(','))) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-l", "--lst_file", help="file from SwissFEL output to be processed quickly", type=os.path.abspath ) parser.add_argument( "-k", "--chunk_size", help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", type=int, default=1000 ) parser.add_argument( "-g", "--geom_file", help="path to geom file to be used in the refinement", type=os.path.abspath ) parser.add_argument( "-c", "--cell_file", help="path to cell file of the crystals used in the refinement", type=os.path.abspath ) parser.add_argument( "-n", "--job_name", help="the name of the job to be done", type=str, default="split" ) parser.add_argument( "-t", "--threshold", help="threshold for crystfel run - peaks must be above this to be found", type=int, default=10 ) parser.add_argument( "-s", "--min_snr", help="min-snr for crystfel run - peaks must to above this to be counted", type=int, default=5 ) parser.add_argument( "-i", "--int_radius", help="int_rad for crystfel run - peaks must to above this to be counted", type=list_of_ints, default=[3,5,9] ) parser.add_argument( "-m", "--multi", help="multi crystfel flag, do you wnat to look for multiple lattices", type=bool, default=False ) parser.add_argument( "-r", "--retry", help="retry crystfel flag, do you want to retry failed indexing patterns", type=bool, default=False ) parser.add_argument( "-x", "--min_pix", help="min-pix-count for crystfel runs, minimum number of pixels a spot should contain in peak finding", type=int, default=2 ) parser.add_argument( "-b", "--bg_rad", help="crystfel background radius flag, radius (in pixels) used for the estimation of the local background", type=int, default=2 ) parser.add_argument( "-q", "--min-res", help="m", type=int, default=2 ) args = parser.parse_args() # run geom converter cwd = os.getcwd() if args.multi == True: multi = "multi" else: multi = "no-multi" if args.retry == True: retry = "retry" else: retry = "no-retry" run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, args.threshold, args.min_snr, args.int_radius, multi, retry, args.min_pix )