From 77f2560aa5a0f6662421135cc03b5746031654c6 Mon Sep 17 00:00:00 2001 From: Beale John Henry Date: Fri, 8 Sep 2023 12:01:15 +0200 Subject: [PATCH] updated check job status --- clen_tools/detector-distance-refinement.py | 85 ++++++++++++++-------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/clen_tools/detector-distance-refinement.py b/clen_tools/detector-distance-refinement.py index b9911ea..19b74fc 100644 --- a/clen_tools/detector-distance-refinement.py +++ b/clen_tools/detector-distance-refinement.py @@ -14,6 +14,9 @@ python detector-distance-refinement.py -l -c cell_file -s sample size +# other variables +-f = fine only = only perform fine scan + # output plot files of the analysis and a suggest for the clen """ @@ -26,6 +29,7 @@ import regex as re import numpy as np import matplotlib.pyplot as plt import time +from tqdm import tqdm import argparse def h5_sample( lst, sample ): @@ -75,7 +79,7 @@ def geom_amend( lab6_geom_file, clen ): def write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file ): # crystfel file name - cryst_run_file = "{0}_cryst_run.sh".format( clen ) + cryst_run_file = "{0}_run.sh".format( clen ) # write file run_sh = open( cryst_run_file, "w" ) @@ -126,24 +130,32 @@ def make_step_range(centre_clen, step_size, steps): print( "done" ) return step_range - -def check_job_status(username): - # wait for jobs to complete - jobs_completed = False - while not jobs_completed: - # Get the status of the jobs using "squeue" - result = subprocess.run(['squeue', '--user', '{0}'.format(username)], stdout=subprocess.PIPE) - output = result.stdout.decode('utf-8') - # Check if there are no jobs running for the user - if '{0}'.format(username) not in output: - jobs_completed = True - else: - # Sleep for some time and check again - print("waiting for jobs to finish") - time.sleep(30) # sleep for 30 seconds +def submit_job( job_file ): - print("All jobs completed.") + # submit the job + submit_cmd = [ "sbatch", "-p", "day", "--cpus-per-task=32", "--", job_file ] + job_output = subprocess.check_output(submit_cmd) + + # scrub job id from - example Submitted batch job 742403 + pattern = r"Submitted batch job (\d+)" + job_id = re.search( pattern, job_output.decode().strip() ).group(1) + + return int(job_id) + +def wait_for_jobs( job_ids, total_jobs ): + + with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar: + while job_ids: + completed_jobs = set() + for job_id in job_ids: + status_cmd = ["squeue", "-h", "-j", str(job_id)] + status = subprocess.check_output(status_cmd) + if not status: + completed_jobs.add(job_id) + pbar.update(1) + job_ids.difference_update(completed_jobs) + time.sleep(30) def scrub_clen( stream_pwd ): @@ -192,7 +204,6 @@ def find_streams( top_dir ): # return df of streams and clens return stream_df - def scrub_us( stream ): # get uc values from stream file @@ -207,6 +218,7 @@ def scrub_us( stream ): return np.nan def scrub_helper( top_dir ): + # find stream files from process directory print( "finding stream files" ) stream_df = find_streams( top_dir ) @@ -282,7 +294,6 @@ def find_clen_values(stats_df): return min_alpha_clen, min_beta_clen, min_gamma_clen, min_c_clen, min_alpha_val, min_beta_val, min_gamma_val, min_c_val - def plot_indexed_std( stats_df, ax1, ax2 ): # indexed images plot @@ -309,7 +320,6 @@ def plot_indexed_std( stats_df, ax1, ax2 ): color = "royalblue" ax2.plot(stats_df.clen, stats_df.std_c, color=color) - def plot_indexed_std_alpha_beta_gamma( stats_df, ax1, ax2 ): # indexed images plot @@ -354,6 +364,10 @@ def scan( cwd, lst, sample, lab6_geom_file, centre_clen, cell_file, step_size ): # make list of clen steps above and below the central clen step_range = make_step_range(centre_clen, step_size, steps) + # submitted job set and job_list + submitted_job_ids = set() + job_list = [] + # make directorys for results print( "begin CrystFEL anaylsis of different clens" ) @@ -377,14 +391,17 @@ def scan( cwd, lst, sample, lab6_geom_file, centre_clen, cell_file, step_size ): cryst_run_file = write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file ) # run crystfel file - subprocess.call( [ "sbatch", "-p", "day", "--cpus-per-task=32", "--", "./{0}".format( cryst_run_file ) ] ) + job_list.append( cryst_run_file ) + job_id = submit_job( cryst_run_file ) + submitted_job_ids.add( job_id ) print( "done" ) # move back to cwd os.chdir( cwd ) #wait for jobs to complete - check_job_status(username) + wait_for_jobs(submitted_job_ids, len(job_list)) + print("slurm processing done") def scrub_scan( scan_top_dir, scan ): @@ -411,13 +428,17 @@ def scrub_scan( scan_top_dir, scan ): return suggested_clen -def main( cwd, lst, sample, geom, centre_clen, cell_file ): +def main( cwd, lst, sample, geom, centre_clen, cell_file, fine_only ): - top_dir_coarse = "{0}/coarse".format( cwd ) + # if statement to do coarse if fine_only==False + if fine_only == False: + top_dir_coarse = "{0}/coarse".format( cwd ) - scan( cwd, lst, sample, geom, centre_clen, cell_file, step_size="coarse" ) + scan( cwd, lst, sample, geom, centre_clen, cell_file, step_size="coarse" ) - suggested_clen = scrub_scan( top_dir_coarse, scan="coarse" ) + suggested_clen = scrub_scan( top_dir_coarse, scan="coarse" ) + else: + suggested_clen = centre_clen top_dir_fine = "{0}/fine".format( cwd ) @@ -459,10 +480,16 @@ if __name__ == "__main__": type=int, default=500 ) + parser.add_argument( + "-f", + "--fine_only", + help="True = only do fine scan", + type=bool, + default=False + ) args = parser.parse_args() # run main - username = os.getlogin() cwd = os.getcwd() - print( "current username = {0}".format( username ) ) print( "top working directory = {0}".format( cwd ) ) - main( cwd, args.lst, args.sample, args.geom, args.central_distance, args.cell_file ) + main( cwd, args.lst, args.sample, args.geom, args.central_distance, args.cell_file, args.fine_only ) +