changed script name

This commit is contained in:
Beale John Henry
2025-05-08 10:50:16 +02:00
parent 0c9a90f77d
commit 16138c9212

View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python3
# author J.Beale
"""
# aim
randomly select a series of crystals from a stream file and
then compile them into the correctly formated .stream
# usage
python stream_random.py -s <path to stream>
-o output file names
-n sample size
-r how many repeat random samples do you want?
# output
.stream file with random sample of xtals
"""
# modules
import re
import argparse
import pandas as pd
import numpy as np
import os
def extract_chunks( input_file ):
# setup
chunk_df = pd.DataFrame()
image_no = []
chunks = []
hits = []
collect_lines = False
# Open the input file for reading
with open(input_file, 'r') as f:
for line in f:
# Check for the start condition
if line.startswith('----- Begin chunk -----'):
hit = False
collect_lines = True
chunk_lines = []
if collect_lines:
chunk_lines.append(line)
# find image_no
if line.startswith( "Event:" ):
image_search = re.findall( r"Event: //(\d+)", line )
image = int(image_search[0])
image_no.append( image )
# is there a hit in chunk
if line.startswith( "Cell parameters" ):
hit = True
if line.startswith('----- End chunk -----'):
collect_lines = False # Stop collecting lines
chunks.append( chunk_lines )
hits.append( hit )
chunk_df[ "chunks" ] = chunks
chunk_df[ "image_no" ] = image_no
chunk_df[ "hit" ] = hits
return chunk_df
def scrub_cells( line ):
# get uc values from stream file
# example - Cell parameters 7.71784 7.78870 3.75250 nm, 90.19135 90.77553 90.19243 deg
pattern = r"Cell\sparameters\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\snm,\s(\d+\.\d+)\s(\d+\.\d+)\s(\d+\.\d+)\sdeg"
a = re.search( pattern, line ).group(1)
b = re.search( pattern, line ).group(2)
c = re.search( pattern, line ).group(3)
alpha = re.search( pattern, line ).group(4)
beta = re.search( pattern, line ).group(5)
gamma = re.search( pattern, line ).group(6)
data = [ { "a" : float( a )*10,
"b" : float( b )*10,
"c" : float( c )*10,
"alpha" : float( alpha ),
"beta" : float( beta ),
"gamma" : float( gamma )
} ]
cell_df = pd.DataFrame( data )
return cell_df
def extract_xtals( chunk ):
# setup
xtals = []
cells_df = pd.DataFrame()
collect_crystal_lines = False
# Open the input file for reading
for line in chunk:
# Check for the xtals start condition
if line.startswith('--- Begin crystal'):
collect_crystal_lines = True
xtal_lines = []
if collect_crystal_lines:
xtal_lines.append(line)
if line.startswith('--- End crystal\n'):
collect_crystal_lines = False # Stop collecting lines
xtals.append( xtal_lines )
if line.startswith( "Cell" ):
cell_df = scrub_cells( line )
cells_df = pd.concat( ( cells_df, cell_df ) )
# reset index of cells_df
cells_df = cells_df.reset_index( drop=True )
return xtals, cells_df
def extract_header( chunk ):
# setup
header = []
collect_header_lines = False
# Open the input file for reading
for line in chunk:
# Check for the xtals start condition
if line.startswith('----- Begin chunk -----'):
collect_header_lines = True
header_lines = []
if collect_header_lines:
header_lines.append(line)
if line.startswith('End of peak list'):
collect_header_lines = False # Stop collecting lines
header.append( header_lines )
return header
def get_header( header, input_file ):
if header == "geom":
start_keyword = "----- Begin geometry file -----"
end_keyword = "----- End geometry file -----"
if header == "cell":
start_keyword = "----- Begin unit cell -----"
end_keyword = "----- End unit cell -----"
# setup
collect_lines = False
headers = []
# Open the input file for reading
with open(input_file, 'r') as f:
for line in f:
# Check for the start condition
if line.strip() == start_keyword:
collect_lines = True
headers_lines = []
# Collect lines between start and end conditions
if collect_lines:
headers_lines.append(line)
# Check for the end condition
if line.strip() == end_keyword:
collect_lines = False # Stop collecting lines
headers.append(headers_lines)
return headers[0]
def write_to_file( geom, cell, chunk_header, crystals, output_file ):
# Write sections with matching cell parameters to the output file
with open(output_file, 'w') as out_file:
out_file.write('CrystFEL stream format 2.3\n')
out_file.write('Generated by CrystFEL 0.10.2\n')
out_file.writelines(geom)
out_file.writelines(cell)
for crystal, header in zip( crystals, chunk_header ):
out_file.writelines( header )
out_file.writelines( crystal )
out_file.writelines( "----- End chunk -----\n" )
def is_within_range( a, b, c, alpha, beta, gamma, cell_tolerance, angle_tolerance, ideal_centre ):
cell = [ a, b, c, alpha, beta, gamma ]
returner = False
within_range = []
# Function to check if cell parameters are within the specified range
for i in range(3):
if cell[i] < ideal_centre[i]+cell_tolerance and cell[i] > ideal_centre[i]-cell_tolerance:
within_range.append(True)
else:
within_range.append(False)
for i in range(3, 6):
if cell[i] < ideal_centre[i]+angle_tolerance and cell[i] > ideal_centre[i]-angle_tolerance:
within_range.append(True)
else:
within_range.append(False)
if all(within_range):
returner = True
return returner
def sort_xtals( chunk_df ):
# extract xtals
xtal_df = pd.DataFrame()
counter = 0
for index, row in chunk_df.iterrows():
chunk, hit, image_no = row[ "chunks" ], row[ "hit" ], row[ "image_no" ]
if hit:
# find xtals and header
header = extract_header( chunk )
xtals, cells_df = extract_xtals( chunk )
# make header same length as xtals
header = header*len(xtals)
# concat results
xtal_df_1 = pd.DataFrame()
xtal_df_1[ "header" ] = header
xtal_df_1[ "xtals" ] = xtals
xtal_df_1[ "image_no" ] = image_no
xtal_df_1 = pd.concat( ( xtal_df_1, cells_df ), axis=1 )
xtal_df = pd.concat( ( xtal_df, xtal_df_1 ) )
# add count and print every 1000s
counter = counter + len(xtals)
if counter % 1000 == 0:
print( counter, end='\r' )
print( "done" )
# sort by image no and reindex
xtal_df = xtal_df.sort_values( by=[ "image_no" ] )
xtal_df = xtal_df.reset_index( drop=True )
return xtal_df
def main( input_file, output, cell_tolerance, angle_tolerance, ideal_centre ):
# get geom and cell file headers
print( "getting header info from .stream file" )
geom = get_header( "geom", input_file )
cell = get_header( "cell", input_file )
print( "done" )
# extract chunks
print( "finding chucks" )
chunk_df = extract_chunks( input_file )
# display no. of chunks
print( "found {0} chunks".format( len(chunk_df) ) )
# remove rows without xtals
chunk_df = chunk_df.loc[chunk_df.hit, :]
print( "found {0} hits (not including multiples)".format( len(chunk_df) ) )
print( "done" )
print( "sorting xtals from chunks" )
xtal_df = sort_xtals( chunk_df )
print( "done" )
print( "finding cells that are within tolerance" )
# select cells that are within tolerance
xtal_df[ "select" ] = xtal_df.apply(lambda x: is_within_range( x[ "a" ], x[ "b" ], x[ "c" ],
x[ "alpha" ], x[ "beta" ], x[ "gamma" ],
cell_tolerance,
angle_tolerance,
ideal_centre ), axis=1
)
select_df = xtal_df[ xtal_df[ "select" ] == True ]
print( "done" )
print( "writing {0} to output file".format( len( select_df ) ) )
crystals = select_df.xtals.to_list()
chunk_header = select_df.header.to_list()
output_file = "{0}.stream".format( output )
write_to_file( geom, cell, chunk_header, crystals, output_file )
print( "done" )
def list_of_floats(arg):
return list(map(float, arg.split(',')))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-s",
"--stream",
help="input stream file",
required=True,
type=os.path.abspath
)
parser.add_argument(
"-o",
"--output",
help="output stream file name. '.stream will be added'",
type=str,
default="sample"
)
parser.add_argument(
"-c",
"--cell_tolerance",
help="tolerance for cell lengths. 0.2 is default.",
type=float,
default=0.2
)
parser.add_argument(
"-a",
"--angle_tolerance",
help="tolerance for cell angles. 0.25 is default.",
type=float,
default=0.25
)
parser.add_argument(
"-i",
"--ideal_centre",
help="ideal lengths to be selected around. List of floats - e.g. 78.5, 78.5, 38.45, 90, 90, 90",
type=list_of_floats,
required=True
)
args = parser.parse_args()
# run main
main( args.stream, args.output, args.cell_tolerance, args.angle_tolerance, args.ideal_centre )