endstation added as variable + wait bug fix

This commit is contained in:
Beale John Henry
2024-01-31 12:19:54 +01:00
parent 0464e77288
commit 4724939a6c

View File

@@ -13,11 +13,14 @@ python crystfel_split.py -l <path-to-list-file>
-g <path-to-geom-file> -g <path-to-geom-file>
-c <path-to-cell-file> -c <path-to-cell-file>
-n <name-of-job> -n <name-of-job>
-e name of endstation
# crystfel parameter may need some editing in the function - write_crystfel_run # crystfel parameter may need some editing in the function - write_crystfel_run
# output # output
a series of stream files from crystfel in the current working directory a series of stream files from crystfel in the current working directory
a concatenated .stream file in cwd
a log file with .geom and evalation of indexing, cell etc
""" """
# modules # modules
@@ -134,7 +137,7 @@ def h5_split( lst, chunk_size ):
return list_df return list_df
def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ): def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file, threshold ):
""" """
crystfel run file - spot-finding and indexing parameters may need some editing crystfel run file - spot-finding and indexing parameters may need some editing
@@ -158,9 +161,9 @@ def write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_f
run_sh.write( " --pdb={0} \\\n".format( cell_file ) ) run_sh.write( " --pdb={0} \\\n".format( cell_file ) )
run_sh.write( " --indexing=xgandalf-latt-cell \\\n" ) run_sh.write( " --indexing=xgandalf-latt-cell \\\n" )
run_sh.write( " --peaks=peakfinder8 \\\n" ) run_sh.write( " --peaks=peakfinder8 \\\n" )
run_sh.write( " --integration=rings-nograd \\\n" ) run_sh.write( " --integration=rings-nocen-nograd \\\n" )
run_sh.write( " --tolerance=10.0,10.0,10.0,2,3,2 \\\n" ) run_sh.write( " --tolerance=10.0,10.0,10.0,2,3,2 \\\n" )
run_sh.write( " --threshold=10 \\\n" ) run_sh.write( " --threshold={0} \\\n".format( threshold ) )
run_sh.write( " --min-snr=5 \\\n" ) run_sh.write( " --min-snr=5 \\\n" )
run_sh.write( " --int-radius=5,7,9 \\\n" ) run_sh.write( " --int-radius=5,7,9 \\\n" )
run_sh.write( " -j 32 \\\n" ) run_sh.write( " -j 32 \\\n" )
@@ -212,9 +215,9 @@ def wait_for_jobs( job_ids, total_jobs ):
completed_jobs.add(job_id) completed_jobs.add(job_id)
pbar.update(1) pbar.update(1)
job_ids.difference_update(completed_jobs) job_ids.difference_update(completed_jobs)
time.sleep(5) time.sleep(2)
def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file ): def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
# set chunk counter # set chunk counter
chunk = 0 chunk = 0
@@ -242,7 +245,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file ):
chunk_lst.to_csv( chunk_lst_file, index=False, header=False ) chunk_lst.to_csv( chunk_lst_file, index=False, header=False )
# write crystfel file and append path to list # write crystfel file and append path to list
cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file ) cryst_run_file, stream_file = write_crystfel_run( proc_dir, name, chunk, chunk_lst_file, geom_file, cell_file, threshold )
stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) ) stream_lst.append( "{0}/{1}".format( proc_dir, stream_file ) )
# submit jobs # submit jobs
@@ -258,7 +261,7 @@ def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file ):
return submitted_job_ids, chunk, stream_lst return submitted_job_ids, chunk, stream_lst
def main( cwd, name, lst, chunk_size, geom_file, cell_file ): def main( cwd, name, lst, chunk_size, geom_file, cell_file, threshold ):
print( "reading SwissFEL lst file" ) print( "reading SwissFEL lst file" )
print( "creating {0} image chunks of lst".format( chunk_size ) ) print( "creating {0} image chunks of lst".format( chunk_size ) )
@@ -266,14 +269,18 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file ):
print( "DONE" ) print( "DONE" )
# run crystfel runs on individual splits # run crystfel runs on individual splits
submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file ) print( "submitting jobs to cluster" )
submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file, threshold )
# monitor progress of jobs # monitor progress of jobs
time.sleep(30)
wait_for_jobs(submitted_job_ids, chunk) wait_for_jobs(submitted_job_ids, chunk)
print( "done" )
# make composite .stream file # make composite .stream file
output_file = "{0}.stream".format( name ) output_file = "{0}.stream".format( name )
print( "concatenating .streams from separate runs." )
try: try:
# Open the output file in 'append' mode # Open the output file in 'append' mode
with open(output_file, "a") as output: with open(output_file, "a") as output:
@@ -282,12 +289,13 @@ def main( cwd, name, lst, chunk_size, geom_file, cell_file ):
with open(file_name, "r") as input_file: with open(file_name, "r") as input_file:
# Read the contents of the input file and append to the output file # Read the contents of the input file and append to the output file
output.write(input_file.read()) output.write(input_file.read())
logger.info( f"Appended contents from {file_name} to {output_file}")
except FileNotFoundError: except FileNotFoundError:
logger.debug(f"File {file_name} not found. Skipping.") logger.debug(f"File {file_name} not found. Skipping.")
except IOError as e: except IOError as e:
logger.debug(f"An error occurred while appending files: {e}") logger.debug(f"An error occurred while appending files: {e}")
print( "done" )
df, xtals, chunks = calculate_stats( output_file ) df, xtals, chunks = calculate_stats( output_file )
# stats # stats
@@ -320,45 +328,60 @@ if __name__ == "__main__":
parser.add_argument( parser.add_argument(
"-l", "-l",
"--lst_file", "--lst_file",
help="file from SwissFEL output to be processed quickly", help="file from SwissFEL output to be processed quickly. Requried.",
type=os.path.abspath type=os.path.abspath,
required=True
) )
parser.add_argument( parser.add_argument(
"-k", "-k",
"--chunk_size", "--chunk_size",
help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be. Default = 500.",
type=int, type=int,
default=500 default=500
) )
parser.add_argument( parser.add_argument(
"-g", "-g",
"--geom_file", "--geom_file",
help="path to geom file to be used in the refinement", help="path to geom file to be used in the refinement. Required.",
type=os.path.abspath type=os.path.abspath,
required=True
) )
parser.add_argument( parser.add_argument(
"-c", "-c",
"--cell_file", "--cell_file",
help="path to cell file of the crystals used in the refinement", help="path to cell file of the crystals used in the refinement. Required.",
type=os.path.abspath type=os.path.abspath,
required=True
) )
parser.add_argument( parser.add_argument(
"-n", "-n",
"--job_name", "--job_name",
help="the name of the job to be done", help="the name of the job to be done. Default = 'split_###'",
type=str, type=str,
default="split" default="split"
) )
parser.add_argument( parser.add_argument(
"-d", "-d",
"--debug", "--debug",
help="output debug to terminal", help="output debug to terminal.",
type=bool, type=bool,
default=False default=False
) )
parser.add_argument(
"-e",
"--endstation",
help="which endstation did you collect these data from, e.g., alvra or cristallina. Please over-write name depending on endstation.",
type=str,
default="cristallina"
)
args = parser.parse_args() args = parser.parse_args()
# set current working directory # set current working directory
cwd = os.getcwd() cwd = os.getcwd()
# set threshold based on endstation
if args.endstation == "alvra":
threshold = 3000
elif args.endstation == "cristallina":
threshold = 10
# set loguru # set loguru
if not args.debug: if not args.debug:
logger.remove() logger.remove()
@@ -367,5 +390,5 @@ if __name__ == "__main__":
# log geometry file # log geometry file
geom = open( args.geom_file, "r" ).read() geom = open( args.geom_file, "r" ).read()
logger.info( geom ) logger.info( geom )
main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file ) main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file, threshold )