now has stream_stats output and log file - loguru still needs work

This commit is contained in:
Beale John Henry
2023-09-27 06:48:54 +02:00
parent 7eede81cec
commit 4b33332bf9

View File

@@ -18,6 +18,7 @@ python crystfel_split.py -l <path-to-list-file>
# output # output
a series of stream files from crystfel in the current working directory a series of stream files from crystfel in the current working directory
a log file with relavent info on the run
""" """
# modules # modules
@@ -28,6 +29,93 @@ import time
import argparse import argparse
from tqdm import tqdm from tqdm import tqdm
import regex as re import regex as re
import numpy as np
from loguru import logger
def count_chunks( stream ):
# get number of chunks
# example - ----- Begin chunk -----
# count them
try:
pattern = r"-----\sBegin\schunk\s-----"
chunks = re.findall( pattern, stream )
if AttributeError:
return len( chunks )
except AttributeError:
logger.debug( "count_chunks error" )
return np.nan
def scrub_cells( stream ):
# get uc values from stream file
# example - Cell parameters 7.71784 7.78870 3.75250 nm, 90.19135 90.77553 90.19243 deg
# scrub clen and return - else nan
try:
pattern = r"Cell\sparameters\s(\d\.\d+)\s(\d\.\d+)\s(\d\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg"
cell_lst = re.findall( pattern, stream )
xtals = len( cell_lst )
if AttributeError:
return cell_lst, xtals
except AttributeError:
logger.debug( "scrub_cells error" )
return np.nan
def scrub_res( stream ):
# get diffraction limit
# example - diffraction_resolution_limit = 4.07 nm^-1 or 2.46 A
# scrub res_lst or return np.nan
try:
pattern = r"diffraction_resolution_limit\s=\s\d\.\d+\snm\^-1\sor\s(\d\.\d+)\sA"
res_lst = re.findall( pattern, stream )
if AttributeError:
return res_lst
except AttributeError:
logger.debug( "scrub_res error" )
return np.nan
def scrub_obs( stream ):
# get number of reflections
# example - num_reflections = 308
# scrub reflections or return np.nan
try:
pattern = r"num_reflections\s=\s(\d+)"
obs_lst = re.findall( pattern, stream )
if AttributeError:
return obs_lst
except AttributeError:
logger.debug( "scrub_obs error" )
return np.nan
def calculate_stats( stream_pwd ):
# open stream file
stream = open( stream_pwd, "r" ).read()
# get total number chunks
chunks = count_chunks( stream )
# get list of cells
cell_lst, xtals = scrub_cells( stream )
# get list of cells
res_lst = scrub_res( stream )
# get list of cells
obs_lst = scrub_obs( stream )
# res_df
cols = [ "a", "b", "c", "alpha", "beta", "gamma" ]
df = pd.DataFrame( cell_lst, columns=cols )
df[ "resolution" ] = res_lst
df[ "obs" ] = obs_lst
# convert all to floats
df = df.astype(float)
return df, xtals, chunks
def h5_split( lst, chunk_size ): def h5_split( lst, chunk_size ):
@@ -97,6 +185,7 @@ def make_process_dir( proc_dir ):
os.makedirs( proc_dir ) os.makedirs( proc_dir )
except OSError as e: except OSError as e:
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST:
logger.debug( "making directory error" )
raise raise
def submit_job( job_file ): def submit_job( job_file ):
@@ -109,7 +198,7 @@ def submit_job( job_file ):
pattern = r"Submitted batch job (\d+)" pattern = r"Submitted batch job (\d+)"
job_id = re.search( pattern, job_output.decode().strip() ).group(1) job_id = re.search( pattern, job_output.decode().strip() ).group(1)
return int(job_id) return int( job_id )
def wait_for_jobs( job_ids, total_jobs ): def wait_for_jobs( job_ids, total_jobs ):
@@ -123,14 +212,9 @@ def wait_for_jobs( job_ids, total_jobs ):
completed_jobs.add(job_id) completed_jobs.add(job_id)
pbar.update(1) pbar.update(1)
job_ids.difference_update(completed_jobs) job_ids.difference_update(completed_jobs)
time.sleep(30) time.sleep(5)
def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ): def run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file ):
print( "reading SwissFEL lst file" )
print( "creating {0} image chunks of lst".format( chunk_size ) )
list_df = h5_split( lst, chunk_size )
print( "DONE" )
# set chunk counter # set chunk counter
chunk = 0 chunk = 0
@@ -141,10 +225,9 @@ def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ):
# stream file list # stream file list
stream_lst = [] stream_lst = []
print( "creating crystfel jobs for individual chunks" )
for chunk_lst in list_df: for chunk_lst in list_df:
print( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) ) logger.info( "chunk {0} = {1} images".format( chunk, len( chunk_lst ) ) )
# define process directory # define process directory
proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk ) proc_dir = "{0}/{1}/{1}_{2}".format( cwd, name, chunk )
@@ -164,7 +247,7 @@ def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ):
# submit jobs # submit jobs
job_id = submit_job( cryst_run_file ) job_id = submit_job( cryst_run_file )
print(f"Job submitted: { job_id }") logger.info( f"Job submitted: { job_id }" )
submitted_job_ids.add( job_id ) submitted_job_ids.add( job_id )
# increase chunk counter # increase chunk counter
@@ -173,11 +256,20 @@ def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ):
# move back to top dir # move back to top dir
os.chdir( cwd ) os.chdir( cwd )
return submitted_job_ids, chunk, stream_lst
def main( cwd, name, lst, chunk_size, geom_file, cell_file ):
print( "reading SwissFEL lst file" )
print( "creating {0} image chunks of lst".format( chunk_size ) )
list_df = h5_split( lst, chunk_size )
print( "DONE" ) print( "DONE" )
# include progress bar if required # run crystfel runs on individual splits
submitted_job_ids, chunk, stream_lst = run_splits( list_df, cwd, name, lst, chunk_size, geom_file, cell_file )
# monitor progress of jobs
wait_for_jobs(submitted_job_ids, chunk) wait_for_jobs(submitted_job_ids, chunk)
print("slurm processing done")
# make composite .stream file # make composite .stream file
output_file = "{0}.stream".format( name ) output_file = "{0}.stream".format( name )
@@ -190,14 +282,38 @@ def run_splits( cwd, name, lst, chunk_size, geom_file, cell_file ):
with open(file_name, "r") as input_file: with open(file_name, "r") as input_file:
# Read the contents of the input file and append to the output file # Read the contents of the input file and append to the output file
output.write(input_file.read()) output.write(input_file.read())
print(f"Appended contents from {file_name} to {output_file}") logger.info( f"Appended contents from {file_name} to {output_file}")
except FileNotFoundError: except FileNotFoundError:
print(f"File {file_name} not found. Skipping.") logger.debug(f"File {file_name} not found. Skipping.")
except IOError as e: except IOError as e:
print(f"An error occurred while appending files: {e}") logger.debug(f"An error occurred while appending files: {e}")
print( "DONE" ) df, xtals, chunks = calculate_stats( output_file )
# stats
index_rate = round( xtals/chunks*100, 2 )
mean_res, std_res = round( df.resolution.mean(), 2 ), round( df.resolution.std(), 2 )
median_res = df.resolution.median()
mean_obs, std_obs = round( df.obs.mean(), 2 ), round( df.obs.std(), 2)
mean_a, std_a = round( df.a.mean()*10, 2 ), round( df.a.std()*10, 2 )
mean_b, std_b = round( df.b.mean()*10, 2 ), round( df.b.std()*10, 2 )
mean_c, std_c = round( df.c.mean()*10, 2 ), round( df.c.std()*10, 2 )
mean_alpha, std_alpha = round( df.alpha.mean(), 2 ), round( df.alpha.std(), 2 )
mean_beta, std_beta = round(df.beta.mean(), 2 ), round( df.beta.std(), 2 )
mean_gamma, std_gamma = round( df.gamma.mean(), 2 ), round( df.gamma.std(), 2 )
logger.info( "image = {0}".format( chunks ) )
logger.info( "crystals = {0}".format( xtals ) )
logger.info( "indexing rate = {0} %".format( index_rate ) )
logger.info( "mean resolution = {0} +/- {1} A".format( mean_res, std_res ) )
logger.info( "median resolution = {0} A".format( median_res ) )
logger.info( "mean observations = {0} +/- {1}".format( mean_obs, std_obs ) )
logger.info( "mean a = {0} +/- {1} A".format( mean_a, std_a ) )
logger.info( "mean b = {0} +/- {1} A".format( mean_b, std_b ) )
logger.info( "mean c = {0} +/- {1} A".format( mean_c, std_c ) )
logger.info( "mean alpha = {0} +/- {1} A".format( mean_alpha, std_alpha ) )
logger.info( "mean beta = {0} +/- {1} A".format( mean_beta, std_beta ) )
logger.info( "mean gamma = {0} +/- {1} A".format( mean_gamma, std_gamma ) )
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@@ -212,7 +328,7 @@ if __name__ == "__main__":
"--chunk_size", "--chunk_size",
help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be", help="how big should each chunk be? - the bigger the chunk, the fewer jobs, the slower it will be",
type=int, type=int,
default=2500 default=500
) )
parser.add_argument( parser.add_argument(
"-g", "-g",
@@ -234,6 +350,13 @@ if __name__ == "__main__":
default="split" default="split"
) )
args = parser.parse_args() args = parser.parse_args()
# run geom converter # set current working directory
cwd = os.getcwd() cwd = os.getcwd()
run_splits( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file ) # set loguru
logfile = "{0}.log".format( args.job_name )
logger.add( logfile, level="DEBUG" )
# log geometry file
geom = open( args.geom_file, "r" ).read()
logger.info( geom )
main( cwd, args.job_name, args.lst_file, args.chunk_size, args.geom_file, args.cell_file )