326 lines
10 KiB
Python
326 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# author J.Beale
|
|
|
|
"""
|
|
# aim
|
|
selects crystals based on their unit cell values
|
|
|
|
# usage
|
|
python stream_random.py -s <path to stream>
|
|
-o output file names
|
|
-c tolerance of cell axes to accept
|
|
-a tolerance of angles to accept
|
|
-i ideal central lengths to accept
|
|
|
|
# output
|
|
.stream file containing images that fall within requested range of value
|
|
"""
|
|
|
|
# modules
|
|
import re
|
|
import argparse
|
|
import pandas as pd
|
|
import numpy as np
|
|
import os
|
|
|
|
def extract_chunks( input_file ):
|
|
|
|
# setup
|
|
chunk_df = pd.DataFrame()
|
|
image_no = []
|
|
chunks = []
|
|
hits = []
|
|
collect_lines = False
|
|
# Open the input file for reading
|
|
with open(input_file, 'r') as f:
|
|
for line in f:
|
|
|
|
# Check for the start condition
|
|
if line.startswith('----- Begin chunk -----'):
|
|
hit = False
|
|
collect_lines = True
|
|
chunk_lines = []
|
|
if collect_lines:
|
|
chunk_lines.append(line)
|
|
|
|
# find image_no
|
|
if line.startswith( "Event:" ):
|
|
image_search = re.findall( r"Event: //(\d+)", line )
|
|
image = int(image_search[0])
|
|
image_no.append( image )
|
|
|
|
# is there a hit in chunk
|
|
if line.startswith( "Cell parameters" ):
|
|
hit = True
|
|
|
|
if line.startswith('----- End chunk -----'):
|
|
collect_lines = False # Stop collecting lines
|
|
chunks.append( chunk_lines )
|
|
hits.append( hit )
|
|
|
|
chunk_df[ "chunks" ] = chunks
|
|
chunk_df[ "image_no" ] = image_no
|
|
chunk_df[ "hit" ] = hits
|
|
|
|
return chunk_df
|
|
|
|
def scrub_cells( line ):
|
|
|
|
# get uc values from stream file
|
|
# example - Cell parameters 7.71784 7.78870 3.75250 nm, 90.19135 90.77553 90.19243 deg
|
|
pattern = r"Cell\sparameters\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg"
|
|
a = re.search( pattern, line ).group(1)
|
|
b = re.search( pattern, line ).group(2)
|
|
c = re.search( pattern, line ).group(3)
|
|
alpha = re.search( pattern, line ).group(4)
|
|
beta = re.search( pattern, line ).group(5)
|
|
gamma = re.search( pattern, line ).group(6)
|
|
|
|
data = [ { "a" : float( a )*10,
|
|
"b" : float( b )*10,
|
|
"c" : float( c )*10,
|
|
"alpha" : float( alpha ),
|
|
"beta" : float( beta ),
|
|
"gamma" : float( gamma )
|
|
} ]
|
|
cell_df = pd.DataFrame( data )
|
|
|
|
return cell_df
|
|
|
|
def extract_xtals( chunk ):
|
|
|
|
# setup
|
|
xtals = []
|
|
cells_df = pd.DataFrame()
|
|
collect_crystal_lines = False
|
|
# Open the input file for reading
|
|
for line in chunk:
|
|
|
|
# Check for the xtals start condition
|
|
if line.startswith('--- Begin crystal'):
|
|
collect_crystal_lines = True
|
|
xtal_lines = []
|
|
if collect_crystal_lines:
|
|
xtal_lines.append(line)
|
|
if line.startswith('--- End crystal\n'):
|
|
collect_crystal_lines = False # Stop collecting lines
|
|
xtals.append( xtal_lines )
|
|
if line.startswith( "Cell" ):
|
|
cell_df = scrub_cells( line )
|
|
cells_df = pd.concat( ( cells_df, cell_df ) )
|
|
|
|
# reset index of cells_df
|
|
cells_df = cells_df.reset_index( drop=True )
|
|
|
|
return xtals, cells_df
|
|
|
|
def extract_header( chunk ):
|
|
|
|
# setup
|
|
header = []
|
|
collect_header_lines = False
|
|
# Open the input file for reading
|
|
for line in chunk:
|
|
|
|
# Check for the xtals start condition
|
|
if line.startswith('----- Begin chunk -----'):
|
|
collect_header_lines = True
|
|
header_lines = []
|
|
if collect_header_lines:
|
|
header_lines.append(line)
|
|
if line.startswith('End of peak list'):
|
|
collect_header_lines = False # Stop collecting lines
|
|
header.append( header_lines )
|
|
|
|
return header
|
|
|
|
def get_header( header, input_file ):
|
|
|
|
if header == "geom":
|
|
start_keyword = "----- Begin geometry file -----"
|
|
end_keyword = "----- End geometry file -----"
|
|
if header == "cell":
|
|
start_keyword = "----- Begin unit cell -----"
|
|
end_keyword = "----- End unit cell -----"
|
|
|
|
# setup
|
|
collect_lines = False
|
|
headers = []
|
|
|
|
# Open the input file for reading
|
|
with open(input_file, 'r') as f:
|
|
for line in f:
|
|
# Check for the start condition
|
|
if line.strip() == start_keyword:
|
|
collect_lines = True
|
|
headers_lines = []
|
|
# Collect lines between start and end conditions
|
|
if collect_lines:
|
|
headers_lines.append(line)
|
|
# Check for the end condition
|
|
if line.strip() == end_keyword:
|
|
collect_lines = False # Stop collecting lines
|
|
headers.append(headers_lines)
|
|
|
|
return headers[0]
|
|
|
|
def write_to_file( geom, cell, chunk_header, crystals, output_file ):
|
|
|
|
# Write sections with matching cell parameters to the output file
|
|
with open(output_file, 'w') as out_file:
|
|
out_file.write('CrystFEL stream format 2.3\n')
|
|
out_file.write('Generated by CrystFEL 0.10.2\n')
|
|
out_file.writelines(geom)
|
|
out_file.writelines(cell)
|
|
for crystal, header in zip( crystals, chunk_header ):
|
|
out_file.writelines( header )
|
|
out_file.writelines( crystal )
|
|
out_file.writelines( "----- End chunk -----\n" )
|
|
|
|
def is_within_range( a, b, c, alpha, beta, gamma, cell_tolerance, angle_tolerance, ideal_centre ):
|
|
|
|
cell = [ a, b, c, alpha, beta, gamma ]
|
|
returner = False
|
|
within_range = []
|
|
|
|
# Function to check if cell parameters are within the specified range
|
|
for i in range(3):
|
|
if cell[i] < ideal_centre[i]+cell_tolerance and cell[i] > ideal_centre[i]-cell_tolerance:
|
|
within_range.append(True)
|
|
else:
|
|
within_range.append(False)
|
|
|
|
for i in range(3, 6):
|
|
if cell[i] < ideal_centre[i]+angle_tolerance and cell[i] > ideal_centre[i]-angle_tolerance:
|
|
within_range.append(True)
|
|
else:
|
|
within_range.append(False)
|
|
|
|
if all(within_range):
|
|
returner = True
|
|
|
|
return returner
|
|
|
|
def sort_xtals( chunk_df ):
|
|
|
|
# extract xtals
|
|
xtal_df = pd.DataFrame()
|
|
counter = 0
|
|
for index, row in chunk_df.iterrows():
|
|
|
|
chunk, hit, image_no = row[ "chunks" ], row[ "hit" ], row[ "image_no" ]
|
|
|
|
if hit:
|
|
|
|
# find xtals and header
|
|
header = extract_header( chunk )
|
|
xtals, cells_df = extract_xtals( chunk )
|
|
|
|
# make header same length as xtals
|
|
header = header*len(xtals)
|
|
|
|
# concat results
|
|
xtal_df_1 = pd.DataFrame()
|
|
xtal_df_1[ "header" ] = header
|
|
xtal_df_1[ "xtals" ] = xtals
|
|
xtal_df_1[ "image_no" ] = image_no
|
|
xtal_df_1 = pd.concat( ( xtal_df_1, cells_df ), axis=1 )
|
|
xtal_df = pd.concat( ( xtal_df, xtal_df_1 ) )
|
|
|
|
# add count and print every 1000s
|
|
counter = counter + len(xtals)
|
|
if counter % 1000 == 0:
|
|
print( counter, end='\r' )
|
|
print( "done" )
|
|
|
|
# sort by image no and reindex
|
|
xtal_df = xtal_df.sort_values( by=[ "image_no" ] )
|
|
xtal_df = xtal_df.reset_index( drop=True )
|
|
|
|
return xtal_df
|
|
|
|
def main( input_file, output, cell_tolerance, angle_tolerance, ideal_centre ):
|
|
|
|
# get geom and cell file headers
|
|
print( "getting header info from .stream file" )
|
|
geom = get_header( "geom", input_file )
|
|
cell = get_header( "cell", input_file )
|
|
print( "done" )
|
|
|
|
# extract chunks
|
|
print( "finding chucks" )
|
|
chunk_df = extract_chunks( input_file )
|
|
# display no. of chunks
|
|
print( "found {0} chunks".format( len(chunk_df) ) )
|
|
# remove rows without xtals
|
|
chunk_df = chunk_df.loc[chunk_df.hit, :]
|
|
print( "found {0} hits (not including multiples)".format( len(chunk_df) ) )
|
|
print( "done" )
|
|
|
|
print( "sorting xtals from chunks" )
|
|
xtal_df = sort_xtals( chunk_df )
|
|
print( "done" )
|
|
|
|
print( "finding cells that are within tolerance" )
|
|
# select cells that are within tolerance
|
|
xtal_df[ "select" ] = xtal_df.apply(lambda x: is_within_range( x[ "a" ], x[ "b" ], x[ "c" ],
|
|
x[ "alpha" ], x[ "beta" ], x[ "gamma" ],
|
|
cell_tolerance,
|
|
angle_tolerance,
|
|
ideal_centre ), axis=1
|
|
)
|
|
select_df = xtal_df[ xtal_df[ "select" ] == True ]
|
|
print( "done" )
|
|
|
|
print( "writing {0} to output file".format( len( select_df ) ) )
|
|
crystals = select_df.xtals.to_list()
|
|
chunk_header = select_df.header.to_list()
|
|
output_file = "{0}.stream".format( output )
|
|
write_to_file( geom, cell, chunk_header, crystals, output_file )
|
|
print( "done" )
|
|
|
|
def list_of_floats(arg):
|
|
return list(map(float, arg.split(',')))
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"-s",
|
|
"--stream",
|
|
help="input stream file",
|
|
required=True,
|
|
type=os.path.abspath
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
help="output stream file name. '.stream will be added'",
|
|
type=str,
|
|
default="sample"
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--cell_tolerance",
|
|
help="tolerance for cell lengths. 0.2 is default.",
|
|
type=float,
|
|
default=0.2
|
|
)
|
|
parser.add_argument(
|
|
"-a",
|
|
"--angle_tolerance",
|
|
help="tolerance for cell angles. 0.25 is default.",
|
|
type=float,
|
|
default=0.25
|
|
)
|
|
parser.add_argument(
|
|
"-i",
|
|
"--ideal_centre",
|
|
help="ideal lengths to be selected around. List of floats - e.g. 78.5, 78.5, 38.45, 90, 90, 90",
|
|
type=list_of_floats,
|
|
required=True
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# run main
|
|
main( args.stream, args.output, args.cell_tolerance, args.angle_tolerance, args.ideal_centre ) |