From 044fda0ca6b787abf631f823c2630fef7f1dcde9 Mon Sep 17 00:00:00 2001 From: beale_j Date: Tue, 19 Sep 2023 10:41:21 +0200 Subject: [PATCH] Delete crystfel_chunk.py --- clen_tools/crystfel_chunk.py | 205 ----------------------------------- 1 file changed, 205 deletions(-) delete mode 100644 clen_tools/crystfel_chunk.py diff --git a/clen_tools/crystfel_chunk.py b/clen_tools/crystfel_chunk.py deleted file mode 100644 index 58552f9..0000000 --- a/clen_tools/crystfel_chunk.py +++ /dev/null @@ -1,205 +0,0 @@ -#!/usr/bin/python - -# author J.Beale - -""" -# aim -to process a batch of data very fast by splitting it into a number of chunks and submitting -these jobs separately to the cluster - -# usage -python crystfel_split.py -l - -k - -g - -c -# note -p True/False will make a progress bar - -# output -a series of stream files from crystfel in the current working directory -""" - -# modules -import pandas as pd -import subprocess -import os, errno -import time -import argparse -from tqdm import tqdm -import regex as re - -def h5_split( lst, chunk_size ): - - # read h5.lst - note - removes // from image column - # scrub file name - lst_name = os.path.basename( lst ) - - cols = [ "h5", "image" ] - df = pd.read_csv( lst, sep="\s//", engine="python", names=cols ) - - # re-add // to image columm and drop other columns - df[ "h5_path" ] = df.h5 + " //" + df.image.astype( str ) - df = df[ [ "h5_path" ] ] - - # split df into a lst - list_df = [df[i:i + chunk_size] for i in range( 0, len(df), chunk_size)] - - return list_df - -def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ): - - # crystfel file name - cryst_run_file = "{0}/{1}_{2}.sh".format( proc_dir, name, chunk ) - # write file - run_sh = open( cryst_run_file, "w" ) - run_sh.write( "#!/bin/sh\n\n" ) - run_sh.write( "module purge\n" ) - run_sh.write( "module load crystfel/0.10.2\n" ) - run_sh.write( "indexamajig -i {0} \\\n".format( chunk_lst_file ) ) - run_sh.write( " --output={0}_{1}.stream \\\n".format( name, chunk ) ) - run_sh.write( " --geometry={0}\\\n".format( geom_file ) ) - run_sh.write( " --pdb={0} \\\n".format( cell_file ) ) - run_sh.write( " --indexing=xgandalf-latt-cell --peaks=peakfinder8 \\\n" ) - run_sh.write( " --threshold=15 --min-snr=10 --int-radius=3,5,9 \\\n" ) - run_sh.write( " -j 32 --no-multi --no-retry --max-res=3000 --min-pix-count=2 --min-res=85\\\n" ) - run_sh.close() - - # make file executable - subprocess.call( [ "chmod", "+x", "{0}".format( cryst_run_file ) ] ) - - # return crystfel file name - return cryst_run_file - -def make_process_dir( proc_dir ): - # make process directory - try: - os.makedirs( proc_dir ) - except OSError as e: - if e.errno != errno.EEXIST: - raise - -def submit_job( job_file ): - - # submit the job - submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file] - job_output = subprocess.check_output(submit_cmd) - - # scrub job id from - example Submitted batch job 742403 - pattern = r"Submitted batch job (\d+)" - job_id = re.search( pattern, job_output.decode().strip() ).group(1) - - return int(job_id) - -def wait_for_jobs( job_ids, total_jobs ): - - with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: - while job_ids: - completed_jobs = set() - for job_id in job_ids: - status_cmd = ["squeue", "-h", "-j", str(job_id)] - status = subprocess.check_output(status_cmd) - if not status: - completed_jobs.add(job_id) - pbar.update(1) - job_ids.difference_update(completed_jobs) - time.sleep(30) - -def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file, progress ): - - print( "reading SwissFEL lst file" ) - print( "creating {0} image chunks of lst".format( chunk_size ) ) - list_df = h5_split( lst, chunk_size ) - print( "DONE" ) - - # set chunk counter - chunk = 0 - - # create job list - job_list = [] - - print( "creating crystfel jobs for individual chunks" ) - for chunk_lst in list_df: - - print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) ) - # define process directory - proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk ) - - # make process directory - make_process_dir(proc_dir) - - # write list to file - chunk_lst_file = "{0}/{1}_{2}.lst".format( proc_dir, name, chunk ) - chunk_lst.to_csv( chunk_lst_file, index=False, header=False ) - - # write crystfel file - cryst_run_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ) - - # add crystfel run file to list - job_list.append( cryst_run_file ) - - # increase chunk counter - chunk = chunk +1 - print( "DONE" ) - - # submitted job set - submitted_job_ids = set() - # submit jobs - for job_file in job_list: - job_id = submit_job( job_file ) - submitted_job_ids.add( job_id ) - print(f"Job submitted: { job_id }") - - # include progress bar if required - if progress==True: - wait_for_jobs(submitted_job_ids, len(job_list)) - print("slurm processing done") - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "-l", - "--lst_file", - help="file from SwissFEL output to be processed quickly", - type=os.path.abspath - ) - parser.add_argument( - "-k", - "--chunk_size", - help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", - type=int, - default=1000 - ) - parser.add_argument( - "-g", - "--geom_file", - help="path to geom file to be used in the refinement", - type=os.path.abspath - ) - parser.add_argument( - "-c", - "--cell_file", - help="path to cell file of the crystals used in the refinement", - type=os.path.abspath - ) - parser.add_argument( - "-n", - "--job_name", - help="the name of the job to be done", - type=str, - default="split" - ) - parser.add_argument( - "-p", - "--progress", - help="gives you the option of also having a progress bar", - type=bool, - default=True - ) - args = parser.parse_args() - # run geom converter - cwd = os.getcwd() - run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, args.progress ) - - - - -