From d5c4592ce7a4ff5e837299f642ab4e632c77ebc2 Mon Sep 17 00:00:00 2001 From: Beale John Henry Date: Wed, 15 Jan 2025 02:51:48 +0100 Subject: [PATCH] upated log and print statements - now takes reservation - and changed threshold --- reduction_tools/crystfel_split.py | 84 +++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/reduction_tools/crystfel_split.py b/reduction_tools/crystfel_split.py index 5e6afef..d0407e6 100644 --- a/reduction_tools/crystfel_split.py +++ b/reduction_tools/crystfel_split.py @@ -69,7 +69,7 @@ def scrub_res( stream ): # example - diffraction_resolution_limit = 4.07 nm^-1 or 2.46 A # scrub res_lst or return np.nan try: - pattern = r"diffraction_resolution_limit\s=\s\d\.\d+\snm\^-1\sor\s(\d\.\d+)\sA" + pattern = r"diffraction_resolution_limit\s=\s\d+\.\d+\snm\^-1\sor\s(\d+\.\d+)\sA" res_lst = re.findall( pattern, stream ) if AttributeError: return res_lst @@ -191,11 +191,23 @@ def make_process_dir( proc_dir ): logger.debug( "making directory error" ) raise -def submit_job( job_file ): +def submit_job( job_file, reservation ): # submit the job - submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file] - job_output = subprocess.check_output(submit_cmd) + if reservation: + print( "using a ra beamtime reservation = {0}".format( reservation ) ) + logger.info( "using ra reservation to process data = {0}".format( reservation ) ) + submit_cmd = [ "sbatch", "-p", "hour", "--reservation={0}".format( reservation ), "--cpus-per-task=32", "--" , job_file ] + else: + submit_cmd = [ "sbatch", "-p", "hour", "--cpus-per-task=32", "--" , job_file ] + logger.info( "using slurm command = {0}".format( submit_cmd ) ) + + try: + job_output = subprocess.check_output( submit_cmd ) + logger.info( "submited job = {0}".format( job_output ) ) + except subprocess.CalledProcessError as e: + print( "please give the correct ra reservation or remove the -v from the arguements" ) + exit() # scrub job id from - example Submitted batch job 742403 pattern = r"Submitted batch job (\d+)" @@ -205,11 +217,11 @@ def submit_job( job_file ): def wait_for_jobs( job_ids, total_jobs ): - with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: + with tqdm( total=total_jobs, desc="Jobs Completed", unit="job" ) as pbar: while job_ids: completed_jobs = set() for job_id in job_ids: - status_cmd = ["squeue", "-h", "-j", str(job_id)] + status_cmd = [ "squeue", "-h", "-j", str( job_id ) ] status = subprocess.check_output(status_cmd) if not status: completed_jobs.add(job_id) @@ -217,7 +229,7 @@ def wait_for_jobs( job_ids, total_jobs ): job_ids.difference_update(completed_jobs) time.sleep(2) -def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold ): +def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation ): # set chunk counter chunk = 0 @@ -249,8 +261,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, thres stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) ) # submit jobs - job_id = submit_job( cryst_run_file ) - logger.info( f"Job submitted: { job_id }" ) + job_id = submit_job( cryst_run_file, reservation ) submitted_job_ids.add( job_id ) # increase chunk counter @@ -261,7 +272,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, thres return submitted_job_ids, chunk, stream_lst -def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ): +def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation ): print( "reading SwissFEL lst file" ) print( "creating {0} image chunks of lst".format( chunk_size ) ) @@ -270,11 +281,11 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ): # run crystfel runs on individual splits print( "submitting jobs to cluster" ) - submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold ) + submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation ) # monitor progress of jobs - time.sleep(30) - wait_for_jobs(submitted_job_ids, chunk) + time.sleep( 30 ) + wait_for_jobs( submitted_job_ids, chunk ) print( "done" ) # make composite .stream file @@ -323,6 +334,20 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ): logger.info( "mean beta = {0} +/- {1} A".format( mean_beta, std_beta ) ) logger.info( "mean gamma = {0} +/- {1} A".format( mean_gamma, std_gamma ) ) + print( "printing stats" ) + print( "image = {0}".format( chunks ) ) + print( "crystals = {0}".format( xtals ) ) + print( "indexing rate = {0} %".format( index_rate ) ) + print( "mean resolution = {0} +/- {1} A".format( mean_res, std_res ) ) + print( "median resolution = {0} A".format( median_res ) ) + print( "mean observations = {0} +/- {1}".format( mean_obs, std_obs ) ) + print( "mean a = {0} +/- {1} A".format( mean_a, std_a ) ) + print( "mean b = {0} +/- {1} A".format( mean_b, std_b ) ) + print( "mean c = {0} +/- {1} A".format( mean_c, std_c ) ) + print( "mean alpha = {0} +/- {1} A".format( mean_alpha, std_alpha ) ) + print( "mean beta = {0} +/- {1} A".format( mean_beta, std_beta ) ) + print( "mean gamma = {0} +/- {1} A".format( mean_gamma, std_gamma ) ) + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( @@ -360,6 +385,20 @@ if __name__ == "__main__": type=str, default="split" ) + parser.add_argument( + "-v", + "--reservation", + help="reservation name for ra cluster. Usually along the lines of P11111_2024-12-10", + type=str, + default=None + ) + parser.add_argument( + "-p", + "--photons_or_energy", + help="determines the threshold to use for CrystFEL. Photons counts have always been used in Cristallina and are now used on Alvra from 01.11.2024. Please use 'energy' for Alvra before this.", + type=str, + default="photons" + ) parser.add_argument( "-d", "--debug", @@ -367,21 +406,9 @@ if __name__ == "__main__": type=bool, default=False ) - parser.add_argument( - "-e", - "--endstation", - help="which endstation did you collect these data from, e.g., alvra or cristallina. Please over-write name depending on endstation.", - type=str, - default="cristallina" - ) args = parser.parse_args() # set current working directory cwd = os.getcwd() - # set threshold based on endstation - if args.endstation == "alvra": - threshold = 3000 - elif args.endstation == "cristallina": - threshold = 10 # set loguru if not args.debug: logger.remove() @@ -390,5 +417,10 @@ if __name__ == "__main__": # log geometry file geom = open( args.geom_file, "r" ).read() logger.info( geom ) - main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, threshold ) + # set threshold based on detector + if args.photons_or_energy == "energy": + threshold = 3000 + elif args.photons_or_energy == "photons": + threshold = 15 + main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, threshold, args.reservation )