#!/usr/bin/env python3 # author J.Beale """ # aim order crystel hits from a .stream output and calcualte the unit-cell volume # usage python chip_uc_gather.py -s -a -c -o # output csv file of aperture, mean, no-of-hits input for uc_analysis.py """ # modules import re import argparse import pandas as pd import numpy as np import os def extract_chunks( input_file ): # setup chunk_df = pd.DataFrame() event_no = [] chunks = [] hits = [] collect_lines = False # Open the input file for reading with open(input_file, 'r') as f: for line in f: # Check for the start condition if line.startswith('----- Begin chunk -----'): hit = False collect_lines = True chunk_lines = [] if collect_lines: chunk_lines.append(line) # find image_no if line.startswith( "Image serial number:" ): event_search = re.findall( r"Image serial number:\s(\d+)", line ) event = int(event_search[0]) event_no.append( event ) # is there a hit in chunk if line.startswith( "Cell parameters" ): hit = True if line.startswith('----- End chunk -----'): collect_lines = False # Stop collecting lines chunks.append( chunk_lines ) hits.append( hit ) chunk_df[ "chunks" ] = chunks chunk_df[ "event" ] = event_no chunk_df[ "hit" ] = hits return chunk_df def scrub_acq( chunk ): for line in chunk: # example # "Image filename: /sf/cristallina/data/p21981/raw/run0141-hewl_cover_filter_30um/data/acq0001.JF17T16V01j.h5" if line.startswith( "Image filename" ): try: pattern = r"data/acq(\d+)\.JF" acq = re.findall( pattern, line ) if AttributeError: return int(acq[0]) except AttributeError: print( "scrub acquistion error" ) return np.nan def scrub_cell( chunk ): for line in chunk: if line.startswith( "Cell parameters" ): try: pattern = r"Cell\sparameters\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg" cell_lst = re.findall( pattern, line ) if AttributeError: return cell_lst except AttributeError: print( "scrub_cells error" ) return np.nan def V_unit( a, b, c, alpha, beta, gamma ): cos_alpha = np.cos(alpha * (np.pi / 180)) cos_beta = np.cos(beta * (np.pi / 180)) cos_gamma = np.cos(gamma * (np.pi / 180)) m_matrix = np.array([[a * a , a * b * cos_gamma, a * c * cos_beta ], \ [a * b * cos_gamma, b * b , b * c * cos_alpha], \ [a * c * cos_beta , b * c * cos_alpha, c * c ]]) m_matrix_det = np.linalg.det(m_matrix) V_unit = np.sqrt(m_matrix_det) return V_unit def main( input_file, acq_size, chip_size, output ): # extract chunks print( "finding chucks" ) chunk_df = extract_chunks( input_file ) # display no. of chunks print( "found {0} chunks".format( len(chunk_df) ) ) print( "found {0} crystals".format( chunk_df.hit.sum() ) ) print( "done" ) # find unit cells print( "geting unit cell data from from chunks" ) xtal_df = pd.DataFrame() counter = 0 for index, row in chunk_df.iterrows(): chunk, hit, event = row[ "chunks" ], row[ "hit" ], row[ "event" ] if hit: # calc image number acq = scrub_acq( chunk ) image_no = ( acq - 1 )*acq_size + event # get unit cells cell_lst = scrub_cell( chunk ) # concat results cols = [ "a", "b", "c", "alpha", "beta", "gamma" ] xtal_df_1 = pd.DataFrame( cell_lst, columns=cols ) # set datatype xtal_df_1 = xtal_df_1.astype( float ) # calculate xtal_df_1[ "image_no" ] = image_no xtal_df_1[ "uc_vol" ] = V_unit( xtal_df_1.a.values[0], xtal_df_1.b.values[0], xtal_df_1.c.values[0], xtal_df_1.alpha.values[0], xtal_df_1.beta.values[0], xtal_df_1.gamma.values[0] ) xtal_df = pd.concat( ( xtal_df, xtal_df_1 ) ) # add count and print every 1000s counter = counter + 1 if counter % 1000 == 0: print( counter, end='\r' ) print( "done" ) print( "merging multiple crystals" ) # average across multiple hits uc_df = xtal_df.groupby( "image_no" ).mean() uc_df[ "hits" ] = xtal_df.groupby( "image_no" ).count().a print( xtal_df.groupby( "image_no" ).count().a ) # remove abc uc_df = uc_df[ [ "uc_vol", "hits" ] ] print( uc_df ) # stretch index to include blanks chip_index = np.arange( 1, chip_size+1 ) uc_df = uc_df.reindex( chip_index ) print( "outputing to csv" ) # output to file file_name = "{0}_uc.csv".format( output ) uc_df.to_csv( file_name, sep="," ) print( "done" ) def list_of_floats(arg): return list(map(int, arg.split(','))) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-s", "--stream", help="input stream file", required=True, type=os.path.abspath ) parser.add_argument( "-a", "--acq_size", help="size of acquistions used when collecting data. default is 1000 images per acquisition", default=1000, type=int ) parser.add_argument( "-c", "--chip_size", help="total number of wells in the chip. default = 26244", default=26244, type=int ) parser.add_argument( "-o", "--output", help="output file name.", required=True, type=str ) args = parser.parse_args() # run main main( args.stream, args.acq_size, args.chip_size, args.output )