upated log and print statements - now takes reservation - and changed threshold
This commit is contained in:
@@ -69,7 +69,7 @@ def scrub_res( stream ):
|
||||
# example - diffraction_resolution_limit = 4.07 nm^-1 or 2.46 A
|
||||
# scrub res_lst or return np.nan
|
||||
try:
|
||||
pattern = r"diffraction_resolution_limit\s=\s\d\.\d+\snm\^-1\sor\s(\d\.\d+)\sA"
|
||||
pattern = r"diffraction_resolution_limit\s=\s\d+\.\d+\snm\^-1\sor\s(\d+\.\d+)\sA"
|
||||
res_lst = re.findall( pattern, stream )
|
||||
if AttributeError:
|
||||
return res_lst
|
||||
@@ -191,11 +191,23 @@ def make_process_dir( proc_dir ):
|
||||
logger.debug( "making directory error" )
|
||||
raise
|
||||
|
||||
def submit_job( job_file ):
|
||||
def submit_job( job_file, reservation ):
|
||||
|
||||
# submit the job
|
||||
submit_cmd = ["sbatch", "--cpus-per-task=32", "--" ,job_file]
|
||||
job_output = subprocess.check_output(submit_cmd)
|
||||
if reservation:
|
||||
print( "using a ra beamtime reservation = {0}".format( reservation ) )
|
||||
logger.info( "using ra reservation to process data = {0}".format( reservation ) )
|
||||
submit_cmd = [ "sbatch", "-p", "hour", "--reservation={0}".format( reservation ), "--cpus-per-task=32", "--" , job_file ]
|
||||
else:
|
||||
submit_cmd = [ "sbatch", "-p", "hour", "--cpus-per-task=32", "--" , job_file ]
|
||||
logger.info( "using slurm command = {0}".format( submit_cmd ) )
|
||||
|
||||
try:
|
||||
job_output = subprocess.check_output( submit_cmd )
|
||||
logger.info( "submited job = {0}".format( job_output ) )
|
||||
except subprocess.CalledProcessError as e:
|
||||
print( "please give the correct ra reservation or remove the -v from the arguements" )
|
||||
exit()
|
||||
|
||||
# scrub job id from - example Submitted batch job 742403
|
||||
pattern = r"Submitted batch job (\d+)"
|
||||
@@ -205,11 +217,11 @@ def submit_job( job_file ):
|
||||
|
||||
def wait_for_jobs( job_ids, total_jobs ):
|
||||
|
||||
with tqdm(total=total_jobs, desc="Jobs Completed", unit="job") as pbar:
|
||||
with tqdm( total=total_jobs, desc="Jobs Completed", unit="job" ) as pbar:
|
||||
while job_ids:
|
||||
completed_jobs = set()
|
||||
for job_id in job_ids:
|
||||
status_cmd = ["squeue", "-h", "-j", str(job_id)]
|
||||
status_cmd = [ "squeue", "-h", "-j", str( job_id ) ]
|
||||
status = subprocess.check_output(status_cmd)
|
||||
if not status:
|
||||
completed_jobs.add(job_id)
|
||||
@@ -217,7 +229,7 @@ def wait_for_jobs( job_ids, total_jobs ):
|
||||
job_ids.difference_update(completed_jobs)
|
||||
time.sleep(2)
|
||||
|
||||
def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
|
||||
def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation ):
|
||||
|
||||
# set chunk counter
|
||||
chunk = 0
|
||||
@@ -249,8 +261,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, thres
|
||||
stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) )
|
||||
|
||||
# submit jobs
|
||||
job_id = submit_job( cryst_run_file )
|
||||
logger.info( f"Job submitted: { job_id }" )
|
||||
job_id = submit_job( cryst_run_file, reservation )
|
||||
submitted_job_ids.add( job_id )
|
||||
|
||||
# increase chunk counter
|
||||
@@ -261,7 +272,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, thres
|
||||
|
||||
return submitted_job_ids, chunk, stream_lst
|
||||
|
||||
def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
|
||||
def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation ):
|
||||
|
||||
print( "reading SwissFEL lst file" )
|
||||
print( "creating {0} image chunks of lst".format( chunk_size ) )
|
||||
@@ -270,11 +281,11 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
|
||||
|
||||
# run crystfel runs on individual splits
|
||||
print( "submitting jobs to cluster" )
|
||||
submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold )
|
||||
submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold, reservation )
|
||||
|
||||
# monitor progress of jobs
|
||||
time.sleep(30)
|
||||
wait_for_jobs(submitted_job_ids, chunk)
|
||||
time.sleep( 30 )
|
||||
wait_for_jobs( submitted_job_ids, chunk )
|
||||
print( "done" )
|
||||
|
||||
# make composite .stream file
|
||||
@@ -323,6 +334,20 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
|
||||
logger.info( "mean beta = {0} +/- {1} A".format( mean_beta, std_beta ) )
|
||||
logger.info( "mean gamma = {0} +/- {1} A".format( mean_gamma, std_gamma ) )
|
||||
|
||||
print( "printing stats" )
|
||||
print( "image = {0}".format( chunks ) )
|
||||
print( "crystals = {0}".format( xtals ) )
|
||||
print( "indexing rate = {0} %".format( index_rate ) )
|
||||
print( "mean resolution = {0} +/- {1} A".format( mean_res, std_res ) )
|
||||
print( "median resolution = {0} A".format( median_res ) )
|
||||
print( "mean observations = {0} +/- {1}".format( mean_obs, std_obs ) )
|
||||
print( "mean a = {0} +/- {1} A".format( mean_a, std_a ) )
|
||||
print( "mean b = {0} +/- {1} A".format( mean_b, std_b ) )
|
||||
print( "mean c = {0} +/- {1} A".format( mean_c, std_c ) )
|
||||
print( "mean alpha = {0} +/- {1} A".format( mean_alpha, std_alpha ) )
|
||||
print( "mean beta = {0} +/- {1} A".format( mean_beta, std_beta ) )
|
||||
print( "mean gamma = {0} +/- {1} A".format( mean_gamma, std_gamma ) )
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
@@ -360,6 +385,20 @@ if __name__ == "__main__":
|
||||
type=str,
|
||||
default="split"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--reservation",
|
||||
help="reservation name for ra cluster. Usually along the lines of P11111_2024-12-10",
|
||||
type=str,
|
||||
default=None
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--photons_or_energy",
|
||||
help="determines the threshold to use for CrystFEL. Photons counts have always been used in Cristallina and are now used on Alvra from 01.11.2024. Please use 'energy' for Alvra before this.",
|
||||
type=str,
|
||||
default="photons"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--debug",
|
||||
@@ -367,21 +406,9 @@ if __name__ == "__main__":
|
||||
type=bool,
|
||||
default=False
|
||||
)
|
||||
parser.add_argument(
|
||||
"-e",
|
||||
"--endstation",
|
||||
help="which endstation did you collect these data from, e.g., alvra or cristallina. Please over-write name depending on endstation.",
|
||||
type=str,
|
||||
default="cristallina"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
# set current working directory
|
||||
cwd = os.getcwd()
|
||||
# set threshold based on endstation
|
||||
if args.endstation == "alvra":
|
||||
threshold = 3000
|
||||
elif args.endstation == "cristallina":
|
||||
threshold = 10
|
||||
# set loguru
|
||||
if not args.debug:
|
||||
logger.remove()
|
||||
@@ -390,5 +417,10 @@ if __name__ == "__main__":
|
||||
# log geometry file
|
||||
geom = open( args.geom_file, "r" ).read()
|
||||
logger.info( geom )
|
||||
main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, threshold )
|
||||
# set threshold based on detector
|
||||
if args.photons_or_energy == "energy":
|
||||
threshold = 3000
|
||||
elif args.photons_or_energy == "photons":
|
||||
threshold = 15
|
||||
main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, threshold, args.reservation )
|
||||
|
||||
|
||||
Reference in New Issue
Block a user