Files
crystfel_tools/reduction_tools/cat_lst.py

227 lines
7.1 KiB
Python

#!/usr/bin/python
# author J.Beale
"""
# aim
script to append lst files from different run locations
so you give the script individual run numbers and/or
# usage
python cat_lst.py -r <run_range> e.g. 45,50 - for runs 45-50, optional argument
<a series of individual run numbers> e.g., 45 47 50 - for these specific runs
#### note #### both of these can be used together - but you can't specify two lists
-e endstation - "alvra" or "cristallina"
-p pgroup
-l label - i.e. 'light', 'dark' or 'both'
-o output file name
# output
a concatentated list file of all the request runs
"""
import argparse
import pandas as pd
import glob
import os
import numpy as np
from sys import exit
def concatenate_files( input_file_lst, output, label ):
output_file = "{0}_{1}.lst".format( output, label )
lines = 0
# create output file
with open( output_file, "w" ) as output:
# loop through input list - read and write to output file
for lst_file_pwd in input_file_lst.lst_pwd:
# open and write to output file
with open( lst_file_pwd, "r" ) as lst_file:
lines = lines + len( lst_file.readlines() )
output.write( lst_file.read() )
lst_file.close()
output.close()
print( "written {0} images to {1}".format( lines, output_file ) )
def make_pwd( run_no, endstation, pgroup, jfj ):
# if to determine folder for jfj/clara or old daq
if jfj == True:
lst_pwd = "/sf/{0}/data/{1}/res/run{2}*".format( endstation, pgroup, run_no )
else:
# construct lst folder path
lst_pwd = "/sf/{0}/data/{1}/raw/run{2}*/data".format( endstation, pgroup, run_no )
return lst_pwd
def find_lst( lst_dir, label ):
if label == "on" or label == "off":
tail = "{0}.list".format( label )
if label == "light" or label == "dark":
tail = "{0}.lst".format( label )
# create df for all lst
lst_dir_df = pd.DataFrame()
# search for lst with appropriate labels
for path, dirs, files in os.walk( lst_dir ):
for name in files:
if name.endswith( tail ):
# get lst pwd
lst_pwd = os.path.join( path, name )
# put clen and stream pwd into df
data = [ { "lst_pwd" : lst_pwd
} ]
lst_dir_df_1 = pd.DataFrame( data )
lst_dir_df = pd.concat( ( lst_dir_df, lst_dir_df_1 ) )
# reset df index
lst_dir_df = lst_dir_df.reset_index( drop=True )
# return df lst from this directory
return lst_dir_df
def generate_lst_df( run_lst, endstation, label, pgroup, jfj ):
# make run number df
cols = [ "run_no" ]
range_df = pd.DataFrame( run_lst, columns=cols )
# add zeros to left hand of number
range_df[ "run_no" ] = range_df.run_no.str.zfill(4)
# make new column of list paths
range_df[ "lst_app_dir" ] = range_df[ "run_no" ].apply( lambda x: make_pwd( x, endstation, pgroup, jfj ) )
# make df of lsts to be concatenated
lst_df = pd.DataFrame()
for index, row in range_df.iterrows():
# get approximate dir pwd
lst_app_dir = row[ "lst_app_dir" ]
# find matching file
try:
lst_dir = glob.glob( lst_app_dir )
# find lsts in lst directory depending on label
lst_dir_df = find_lst( lst_dir[0], label )
except IndexError:
lst_dir_df = pd.DataFrame()
# append lst dir dfs
lst_df = pd.concat( [ lst_df, lst_dir_df ], ignore_index=True )
# reset df index
lst_df = lst_df.reset_index( drop=True )
return lst_df
def main( run_lst, endstation, label, pgroup, output_file, jfj ):
# make df of lst files
lst_df = generate_lst_df( run_lst, endstation, label, pgroup, jfj )
# check to see if any files have been found
if lst_df.empty:
print( "no {0} lists were found in runs {1}".format( label, run_lst ) )
exit()
# concatinate all lst file in lst_df
concatenate_files( lst_df, output_file, label )
def range_of_runs(arg):
return list(map(int, arg.split(',')))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-r",
"--range",
help="list files in a range of run number to concatentate",
type=range_of_runs
)
parser.add_argument(
"runs",
help="type in indivdual run numbers for list to be concantenated",
type=str,
nargs='*',
default=[]
)
parser.add_argument(
"-e",
"--endstation",
help="which endstation did you collect these data from, e.g., alvra or cristallina",
type=str,
default="cristallina"
)
parser.add_argument(
"-p",
"--pgroup",
help="pgroup the data are collected in",
type=str
)
parser.add_argument(
"-j",
"--jfj",
help="was the Jungfraujoch/Clara data processing pipeline used to process your data. Default = True",
type=bool,
default=False
)
parser.add_argument(
"-l",
"--label",
help="the activation label for the data. Not JFJ the labels should = 'light' or 'dark'. With JFJ the labels should = 'on' or 'off'.",
type=str
)
parser.add_argument(
"-o",
"--output",
help="name of output file",
type=str,
default=None
)
args = parser.parse_args()
# JFJ on/off non-JFJ light/dark logic\
if args.label != "off" and args.label != "on" and args.label != "light" and args.label != "dark":
print( "label flag (-l) must = either 'on' or 'off' with JFJ = True, or 'light' or 'dark' and JFJ = False." )
exit()
print( args.jfj )
if ( args.label == "off" or args.label == "on" ) and args.jfj == False:
print( "JFJ uses 'on' and 'off' flags. Please check inputs and whether the new JFJ/Clara processing pipeline was used." )
exit()
if ( args.label == "light" or args.label == "dark" ) and args.jfj == True:
print( "The old daq uses 'light' and 'dark' flags. Please check inputs and whether the newJFJ/Clara processing pipeline was used." )
exit()
# make continuous list from input range limits
range = []
if args.range is not None:
limits = args.range
range = np.arange( limits[0], limits[1]+1 )
# convert to list
range = range.tolist()
# convert to strings
range = list( map( str, range ) )
# concat range and run lists
runs = args.runs
run_lst = range + runs
print( "appending {0} lst files from runs {1}".format( args.label, run_lst ) )
# make default name
if not args.output:
output_name = "-".join( str(e) for e in run_lst )
output_name = "run{0}".format( output_name )
else:
output_name = args.output
# run main
main( run_lst, args.endstation, args.label, args.pgroup, output_name, args.jfj )
print( "done" )