updated check job status
This commit is contained in:
@@ -14,6 +14,9 @@ python detector-distance-refinement.py -l <path to lst file generated by daq>
|
||||
-c cell_file
|
||||
-s sample size
|
||||
|
||||
# other variables
|
||||
-f = fine only = only perform fine scan
|
||||
|
||||
# output
|
||||
plot files of the analysis and a suggest for the clen
|
||||
"""
|
||||
@@ -26,6 +29,7 @@ import regex as re
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import time
|
||||
from tqdm import tqdm
|
||||
import argparse
|
||||
|
||||
def h5_sample( lst, sample ):
|
||||
@@ -75,7 +79,7 @@ def geom_amend( lab6_geom_file, clen ):
|
||||
def write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file ):
|
||||
|
||||
# crystfel file name
|
||||
cryst_run_file = "{0}_cryst_run.sh".format( clen )
|
||||
cryst_run_file = "{0}_run.sh".format( clen )
|
||||
|
||||
# write file
|
||||
run_sh = open( cryst_run_file, "w" )
|
||||
@@ -126,24 +130,32 @@ def make_step_range(centre_clen, step_size, steps):
|
||||
print( "done" )
|
||||
|
||||
return step_range
|
||||
|
||||
def check_job_status(username):
|
||||
# wait for jobs to complete
|
||||
jobs_completed = False
|
||||
while not jobs_completed:
|
||||
# Get the status of the jobs using "squeue"
|
||||
result = subprocess.run(['squeue', '--user', '{0}'.format(username)], stdout=subprocess.PIPE)
|
||||
output = result.stdout.decode('utf-8')
|
||||
|
||||
# Check if there are no jobs running for the user
|
||||
if '{0}'.format(username) not in output:
|
||||
jobs_completed = True
|
||||
else:
|
||||
# Sleep for some time and check again
|
||||
print("waiting for jobs to finish")
|
||||
time.sleep(30) # sleep for 30 seconds
|
||||
def submit_job( job_file ):
|
||||
|
||||
print("All jobs completed.")
|
||||
# submit the job
|
||||
submit_cmd = [ "sbatch", "-p", "day", "--cpus-per-task=32", "--", job_file ]
|
||||
job_output = subprocess.check_output(submit_cmd)
|
||||
|
||||
# scrub job id from - example Submitted batch job 742403
|
||||
pattern = r"Submitted batch job (\d+)"
|
||||
job_id = re.search( pattern, job_output.decode().strip() ).group(1)
|
||||
|
||||
return int(job_id)
|
||||
|
||||
def wait_for_jobs( job_ids, total_jobs ):
|
||||
|
||||
with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar:
|
||||
while job_ids:
|
||||
completed_jobs = set()
|
||||
for job_id in job_ids:
|
||||
status_cmd = ["squeue", "-h", "-j", str(job_id)]
|
||||
status = subprocess.check_output(status_cmd)
|
||||
if not status:
|
||||
completed_jobs.add(job_id)
|
||||
pbar.update(1)
|
||||
job_ids.difference_update(completed_jobs)
|
||||
time.sleep(30)
|
||||
|
||||
def scrub_clen( stream_pwd ):
|
||||
|
||||
@@ -192,7 +204,6 @@ def find_streams( top_dir ):
|
||||
# return df of streams and clens
|
||||
return stream_df
|
||||
|
||||
|
||||
def scrub_us( stream ):
|
||||
|
||||
# get uc values from stream file
|
||||
@@ -207,6 +218,7 @@ def scrub_us( stream ):
|
||||
return np.nan
|
||||
|
||||
def scrub_helper( top_dir ):
|
||||
|
||||
# find stream files from process directory
|
||||
print( "finding stream files" )
|
||||
stream_df = find_streams( top_dir )
|
||||
@@ -282,7 +294,6 @@ def find_clen_values(stats_df):
|
||||
|
||||
return min_alpha_clen, min_beta_clen, min_gamma_clen, min_c_clen, min_alpha_val, min_beta_val, min_gamma_val, min_c_val
|
||||
|
||||
|
||||
def plot_indexed_std( stats_df, ax1, ax2 ):
|
||||
|
||||
# indexed images plot
|
||||
@@ -309,7 +320,6 @@ def plot_indexed_std( stats_df, ax1, ax2 ):
|
||||
color = "royalblue"
|
||||
ax2.plot(stats_df.clen, stats_df.std_c, color=color)
|
||||
|
||||
|
||||
def plot_indexed_std_alpha_beta_gamma( stats_df, ax1, ax2 ):
|
||||
|
||||
# indexed images plot
|
||||
@@ -354,6 +364,10 @@ def scan( cwd, lst, sample, lab6_geom_file, centre_clen, cell_file, step_size ):
|
||||
# make list of clen steps above and below the central clen
|
||||
step_range = make_step_range(centre_clen, step_size, steps)
|
||||
|
||||
# submitted job set and job_list
|
||||
submitted_job_ids = set()
|
||||
job_list = []
|
||||
|
||||
# make directorys for results
|
||||
print( "begin CrystFEL anaylsis of different clens" )
|
||||
|
||||
@@ -377,14 +391,17 @@ def scan( cwd, lst, sample, lab6_geom_file, centre_clen, cell_file, step_size ):
|
||||
cryst_run_file = write_crystfel_run( clen, sample_h5_file, clen_geom_file, cell_file )
|
||||
|
||||
# run crystfel file
|
||||
subprocess.call( [ "sbatch", "-p", "day", "--cpus-per-task=32", "--", "./{0}".format( cryst_run_file ) ] )
|
||||
job_list.append( cryst_run_file )
|
||||
job_id = submit_job( cryst_run_file )
|
||||
submitted_job_ids.add( job_id )
|
||||
print( "done" )
|
||||
|
||||
# move back to cwd
|
||||
os.chdir( cwd )
|
||||
|
||||
#wait for jobs to complete
|
||||
check_job_status(username)
|
||||
wait_for_jobs(submitted_job_ids, len(job_list))
|
||||
print("slurm processing done")
|
||||
|
||||
def scrub_scan( scan_top_dir, scan ):
|
||||
|
||||
@@ -411,13 +428,17 @@ def scrub_scan( scan_top_dir, scan ):
|
||||
|
||||
return suggested_clen
|
||||
|
||||
def main( cwd, lst, sample, geom, centre_clen, cell_file ):
|
||||
def main( cwd, lst, sample, geom, centre_clen, cell_file, fine_only ):
|
||||
|
||||
top_dir_coarse = "{0}/coarse".format( cwd )
|
||||
# if statement to do coarse if fine_only==False
|
||||
if fine_only == False:
|
||||
top_dir_coarse = "{0}/coarse".format( cwd )
|
||||
|
||||
scan( cwd, lst, sample, geom, centre_clen, cell_file, step_size="coarse" )
|
||||
scan( cwd, lst, sample, geom, centre_clen, cell_file, step_size="coarse" )
|
||||
|
||||
suggested_clen = scrub_scan( top_dir_coarse, scan="coarse" )
|
||||
suggested_clen = scrub_scan( top_dir_coarse, scan="coarse" )
|
||||
else:
|
||||
suggested_clen = centre_clen
|
||||
|
||||
top_dir_fine = "{0}/fine".format( cwd )
|
||||
|
||||
@@ -459,10 +480,16 @@ if __name__ == "__main__":
|
||||
type=int,
|
||||
default=500
|
||||
)
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--fine_only",
|
||||
help="True = only do fine scan",
|
||||
type=bool,
|
||||
default=False
|
||||
)
|
||||
args = parser.parse_args()
|
||||
# run main
|
||||
username = os.getlogin()
|
||||
cwd = os.getcwd()
|
||||
print( "current username = {0}".format( username ) )
|
||||
print( "top working directory = {0}".format( cwd ) )
|
||||
main( cwd, args.lst, args.sample, args.geom, args.central_distance, args.cell_file )
|
||||
main( cwd, args.lst, args.sample, args.geom, args.central_distance, args.cell_file, args.fine_only )
|
||||
|
||||
|
||||
Reference in New Issue
Block a user