Files
crystfel_tools/cool_tools/chip_uc_gather.py

215 lines
6.4 KiB
Python

#!/usr/bin/env python3
# author J.Beale
"""
# aim
order crystel hits from a .stream output and calcualte the unit-cell volume
# usage
python chip_uc_gather.py -s <path to stream file>
-a <size of swissfel acq>
-c <chip szie>
-o <output>
# output
csv file of aperture, mean, no-of-hits
input for uc_analysis.py
"""
# modules
import re
import argparse
import pandas as pd
import numpy as np
import os
def extract_chunks( input_file ):
# setup
chunk_df = pd.DataFrame()
event_no = []
chunks = []
hits = []
collect_lines = False
# Open the input file for reading
with open(input_file, 'r') as f:
for line in f:
# Check for the start condition
if line.startswith('----- Begin chunk -----'):
hit = False
collect_lines = True
chunk_lines = []
if collect_lines:
chunk_lines.append(line)
# find image_no
if line.startswith( "Image serial number:" ):
event_search = re.findall( r"Image serial number:\s(\d+)", line )
event = int(event_search[0])
event_no.append( event )
# is there a hit in chunk
if line.startswith( "Cell parameters" ):
hit = True
if line.startswith('----- End chunk -----'):
collect_lines = False # Stop collecting lines
chunks.append( chunk_lines )
hits.append( hit )
chunk_df[ "chunks" ] = chunks
chunk_df[ "event" ] = event_no
chunk_df[ "hit" ] = hits
return chunk_df
def scrub_acq( chunk ):
for line in chunk:
# example
# "Image filename: /sf/cristallina/data/p21981/raw/run0141-hewl_cover_filter_30um/data/acq0001.JF17T16V01j.h5"
if line.startswith( "Image filename" ):
try:
pattern = r"data/acq(\d+)\.JF"
acq = re.findall( pattern, line )
if AttributeError:
return int(acq[0])
except AttributeError:
print( "scrub acquistion error" )
return np.nan
def scrub_cell( chunk ):
for line in chunk:
if line.startswith( "Cell parameters" ):
try:
pattern = r"Cell\sparameters\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg"
cell_lst = re.findall( pattern, line )
if AttributeError:
return cell_lst
except AttributeError:
print( "scrub_cells error" )
return np.nan
def V_unit( a, b, c, alpha, beta, gamma ):
cos_alpha = np.cos(alpha * (np.pi / 180))
cos_beta = np.cos(beta * (np.pi / 180))
cos_gamma = np.cos(gamma * (np.pi / 180))
m_matrix = np.array([[a * a , a * b * cos_gamma, a * c * cos_beta ], \
[a * b * cos_gamma, b * b , b * c * cos_alpha], \
[a * c * cos_beta , b * c * cos_alpha, c * c ]])
m_matrix_det = np.linalg.det(m_matrix)
V_unit = np.sqrt(m_matrix_det)
return V_unit
def main( input_file, acq_size, chip_size, output ):
# extract chunks
print( "finding chucks" )
chunk_df = extract_chunks( input_file )
# display no. of chunks
print( "found {0} chunks".format( len(chunk_df) ) )
print( "found {0} crystals".format( chunk_df.hit.sum() ) )
print( "done" )
# find unit cells
print( "geting unit cell data from from chunks" )
xtal_df = pd.DataFrame()
counter = 0
for index, row in chunk_df.iterrows():
chunk, hit, event = row[ "chunks" ], row[ "hit" ], row[ "event" ]
if hit:
# calc image number
acq = scrub_acq( chunk )
image_no = ( acq - 1 )*acq_size + event
# get unit cells
cell_lst = scrub_cell( chunk )
# concat results
cols = [ "a", "b", "c", "alpha", "beta", "gamma" ]
xtal_df_1 = pd.DataFrame( cell_lst, columns=cols )
# set datatype
xtal_df_1 = xtal_df_1.astype( float )
# calculate
xtal_df_1[ "image_no" ] = image_no
xtal_df_1[ "uc_vol" ] = V_unit( xtal_df_1.a.values[0],
xtal_df_1.b.values[0],
xtal_df_1.c.values[0],
xtal_df_1.alpha.values[0],
xtal_df_1.beta.values[0],
xtal_df_1.gamma.values[0] )
xtal_df = pd.concat( ( xtal_df, xtal_df_1 ) )
# add count and print every 1000s
counter = counter + 1
if counter % 1000 == 0:
print( counter, end='\r' )
print( "done" )
print( "merging multiple crystals" )
# average across multiple hits
uc_df = xtal_df.groupby( "image_no" ).mean()
uc_df[ "hits" ] = xtal_df.groupby( "image_no" ).count().a
print( xtal_df.groupby( "image_no" ).count().a )
# remove abc
uc_df = uc_df[ [ "uc_vol", "hits" ] ]
print( uc_df )
# stretch index to include blanks
chip_index = np.arange( 1, chip_size+1 )
uc_df = uc_df.reindex( chip_index )
print( "outputing to csv" )
# output to file
file_name = "{0}_uc.csv".format( output )
uc_df.to_csv( file_name, sep="," )
print( "done" )
def list_of_floats(arg):
return list(map(int, arg.split(',')))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-s",
"--stream",
help="input stream file",
required=True,
type=os.path.abspath
)
parser.add_argument(
"-a",
"--acq_size",
help="size of acquistions used when collecting data. default is 1000 images per acquisition",
default=1000,
type=int
)
parser.add_argument(
"-c",
"--chip_size",
help="total number of wells in the chip. default = 26244",
default=26244,
type=int
)
parser.add_argument(
"-o",
"--output",
help="output file name.",
required=True,
type=str
)
args = parser.parse_args()
# run main
main( args.stream, args.acq_size, args.chip_size, args.output )