Compare commits

...

8 Commits

10 changed files with 370 additions and 381 deletions

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs101/5505/Data'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/'
output_file_directory: '../data/'
# Project metadata for data lineage and provenance
project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate'

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/'
output_file_directory: '../data/'
# Project metadata for data lineage and provenance
project: 'Beamtime May 2024, Ice Napp'

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs03/Iron_Sulphate'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/'
output_file_directory: '../data/'
# Project metadata for data lineage and provenance
project: 'Fe SOA project'

View File

@@ -19,19 +19,94 @@ import yaml
import h5py
import argparse
import logging
# Import project modules
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
#try:
# from dima.utils import g5505_utils as utils
#except ModuleNotFoundError:
# import utils.g5505_utils as utils
# import src.hdf5_ops as hdf5_ops
import warnings
import utils.g5505_utils as utils
def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False):
"""
Detects the table header line in the file and returns:
- header_line_idx (int)
- column_names (List[str])
- tb_idx used
- preamble_lines (List[str])
Returns (-1, [], None, []) if not found.
"""
preamble_lines = []
header_line_idx = -1
column_names = []
tb_idx = None
with open(filepath, 'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially
for idx, tb in enumerate(table_header_list):
if tb in decoded_line:
tb_idx = idx
list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t'))
counts = collections.Counter(list_of_substrings)
column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
for i, name in enumerate(list_of_substrings)]
header_line_idx = line_number
if verbose:
print(f"[Detected header] Line {line_number}: {column_names}")
return header_line_idx, column_names, tb_idx, preamble_lines
preamble_lines.append(' '.join(decoded_line.split()))
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
return -1, [], None, preamble_lines
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
"""
Load file reader configuration parameters based on the file and instrument directory.
Returns:
- config_dict: Full configuration dictionary
- file_encoding
- separator
- table_header
- timestamp_variables
- datetime_format
- description_dict
"""
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
try:
with open(config_path, 'r') as stream:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(f"[YAML Load Error] {exc}")
return {}, '', '', '', [], [], {}
# Defaults
file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8')
separator = config_dict.get('default', {}).get('separator', ',')
table_header = config_dict.get('default', {}).get('table_header', 'infer')
timestamp_variables = []
datetime_format = []
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding', file_encoding)
separator = config_dict[instFolder].get('separator', separator)
table_header = config_dict[instFolder].get('table_header', table_header)
timestamp_variables = config_dict[instFolder].get('timestamp', [])
datetime_format = config_dict[instFolder].get('datetime_format', [])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(f"[Description Load Error] {exc}")
return (config_dict, file_encoding, separator, table_header,
timestamp_variables, datetime_format, description_dict)
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename)
@@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
module_dir = os.path.dirname(__file__)
instruments_dir = os.path.join(module_dir, '..')
# Normalize the path (resolves any '..' in the path)
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
print(instrument_configs_path)
with open(instrument_configs_path,'r') as stream:
try:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
# Verify if file can be read by available intrument configurations.
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
# return {}
#TODO: this may be prone to error if assumed folder structure is non compliant
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
separator = config_dict['default']['separator']
table_header = config_dict['default']['table_header']
timestamp_variables = []
datetime_format = []
tb_idx = 0
column_names = ''
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
separator = config_dict[instFolder].get('separator',separator)
table_header = config_dict[instFolder].get('table_header',table_header)
timestamp_variables = config_dict[instFolder].get('timestamp',[])
datetime_format = config_dict[instFolder].get('datetime_format',[])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(exc)
#if 'None' in table_header:
# return {}
(config_dict,
file_encoding,
separator,
table_header,
timestamp_variables,
datetime_format,
description_dict) = load_file_reader_parameters(filename, instruments_dir)
# Read header as a dictionary and detect where data table starts
header_dict = {}
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
data_start = False
# Work with copy of the file for safety
if work_with_copy:
@@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
table_preamble = []
line_number = 0
if 'infer' not in table_header:
header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line(
tmp_filename, table_header, file_encoding, separator)
with open(tmp_filename,'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(file_encoding[tb_idx])
for tb_idx, tb in enumerate(table_header):
print(tb)
if tb in decoded_line:
break
if tb in decoded_line:
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
# Count occurrences of each substring
substring_counts = collections.Counter(list_of_substrings)
data_start = True
# Generate column names with appended index only for repeated substrings
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = []
#for i, name in enumerate(list_of_substrings):
# column_names.append(str(i)+'_'+name)
#print(line_number, len(column_names ),'\n')
break
else:
print('Table header was not detected.')
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = decoded_line.split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
if header_line_idx == -1:
table_header = ['infer'] # fallback to pandas' inference
# TODO: it does not work with separator as none :(. fix for RGA
try:
print(column_names)
if not 'infer' in table_header:
#print(table_header)
#print(file_encoding[tb_idx])
df = pd.read_csv(tmp_filename,
delimiter = separator[tb_idx].replace('\\t','\t'),
header=line_number,
#encoding='latin-1',
header=header_line_idx,
encoding = file_encoding[tb_idx],
names=column_names,
skip_blank_lines=True)

View File

@@ -32,7 +32,7 @@ def read_structured_file_as_dict(path_to_file):
_, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head
file_dict['attributes_dict'] = {}
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
file_dict['datasets'] = []
try:

View File

@@ -1,10 +1,27 @@
import os
import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import h5py
from igor2.binarywave import load as loadibw
import logging
import argparse
import utils.g5505_utils as utils
def read_xps_ibw_file_as_dict(filename):
"""
@@ -49,7 +66,7 @@ def read_xps_ibw_file_as_dict(filename):
# Group name and attributes
file_dict['name'] = path_head
file_dict['attributes_dict'] = {}
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
@@ -85,22 +102,11 @@ def read_xps_ibw_file_as_dict(filename):
if __name__ == "__main__":
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at
# Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")

View File

@@ -49,15 +49,14 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#output_filename_path = 'output_files/unified_file_smog_chamber_2024-04-07_UTC-OFST_+0200_NG.h5'\n",
"yaml_config_file_path = '../input_files/data_integr_config_file_TBR.yaml'\n",
"number, initials = 1, 'LI' # Set as either 2, 'TBR' or 3, 'NG'\n",
"campaign_descriptor_path = f'../input_files/campaignDescriptor{number}_{initials}.yaml'\n",
"\n",
"#path_to_input_directory = 'output_files/kinetic_flowtube_study_2022-01-31_LuciaI'\n",
"#path_to_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_input_directory)\n"
"print(campaign_descriptor_path)\n"
]
},
{
@@ -76,7 +75,7 @@
"outputs": [],
"source": [
"\n",
"hdf5_file_path = data_integration.run_pipeline(yaml_config_file_path)"
"hdf5_file_path = data_integration.run_pipeline(campaign_descriptor_path)"
]
},
{
@@ -146,7 +145,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -160,7 +159,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "multiphase_chemistry_env",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
@@ -174,7 +173,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
"version": "3.11.10"
}
},
"nbformat": 4,

View File

@@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps):
""" Generate the datetime augment dictionary from datetime steps. """
datetime_augment_dict = {}
for datetime_step in datetime_steps:
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
datetime_augment_dict[datetime_step] = [
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
]
return datetime_augment_dict
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
"""Generate consistent directory or file name fragment based on mode."""
if integration_mode == 'collection':
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
else:
return f'{filename_prefix}_{dataset_enddate}'
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
@@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
"""Integrates data sources specified by the input configuration file into HDF5 files.
Parameters:
yaml_config_file_path (str): Path to the YAML configuration file.
log_dir (str): Directory to save the log file.
Returns:
list: List of Paths to the created HDF5 file(s).
"""
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
path_to_input_dir = config_dict['input_file_directory']
@@ -213,22 +209,27 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
dataset_startdate = config_dict['dataset_startdate']
dataset_enddate = config_dict['dataset_enddate']
# Determine mode and process accordingly
output_filename_path = []
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
date_str = f'{dataset_startdate}_{dataset_enddate}'
integration_mode = config_dict.get('integration_mode', 'single_experiment')
filename_prefix = config_dict['filename_prefix']
output_filename_path = []
# Determine top-level campaign folder path
top_level_foldername = _generate_output_path_fragment(
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
)
# Create path to new raw datafolder and standardize with forward slashes
path_to_rawdata_folder = os.path.join(
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
path_to_output_dir, top_level_foldername, ""
).replace(os.sep, '/')
# Process individual datetime steps if available, regardless of mode
if config_dict.get('datetime_steps_dict', {}):
# Single experiment mode
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
date_str = datetime_step.strftime('%Y-%m-%d')
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
single_date_str = datetime_step.strftime('%Y%m%d')
subfolder_name = f"{filename_prefix}_{single_date_str}"
subfolder_name = f"experimental_step_{single_date_str}"
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
@@ -236,11 +237,12 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
# Collection mode processing if specified
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
# Collection mode post-processing
if integration_mode == 'collection':
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
)
output_filename_path.append(hdf5_path)
else:
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
@@ -250,24 +252,16 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
return output_filename_path
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python data_integration.py <function_name> <function_args>")
sys.exit(1)
# Extract the function name from the command line arguments
function_name = sys.argv[1]
# Handle function execution based on the provided function name
if function_name == 'run':
if len(sys.argv) != 3:
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
sys.exit(1)
# Extract path to configuration file, specifying the data integration task
path_to_config_yamlFile = sys.argv[2]
run_pipeline(path_to_config_yamlFile)

View File

@@ -19,17 +19,10 @@ import pandas as pd
import numpy as np
import logging
import datetime
import h5py
import yaml
import json
import copy
#try:
# from dima.utils import g5505_utils as utils
# from dima.src import hdf5_writer as hdf5_lib
#except ModuleNotFoundError:
import utils.g5505_utils as utils
import src.hdf5_writer as hdf5_lib

View File

@@ -217,49 +217,49 @@ def convert_string_to_bytes(input_list: list):
def convert_attrdict_to_np_structured_array(attr_value: dict):
"""
Converts a dictionary of attributes into a numpy structured array for HDF5
compound type compatibility.
Each dictionary key is mapped to a field in the structured array, with the
data type (S) determined by the longest string representation of the values.
If the dictionary is empty, the function returns 'missing'.
Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
Parameters
----------
attr_value : dict
Dictionary containing the attributes to be converted. Example:
attr_value = {
'name': 'Temperature',
'unit': 'Celsius',
'value': 23.5,
'timestamp': '2023-09-26 10:00'
}
Dictionary with scalar values (int, float, str).
Returns
-------
new_attr_value : ndarray
Numpy structured array with UTF-8 encoded fields. Returns np.array(['missing'], dtype=[str]) if
the input dictionary is empty.
1-row structured array with fixed-size byte fields (dtype='S').
"""
if not isinstance(attr_value,dict):
raise ValueError(f'Input paremeter {attr_value} must be a dictionary of scalar values.')
if not isinstance(attr_value, dict):
raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
if not attr_value:
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
dtype = []
values_list = []
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
for key, val in attr_value.items():
# Verify if 'rename_as' is still used in metadata revision
if key != 'rename_as' and isinstance(val, (int, float, str)):
dtype.append((key, f'S{max_length}'))
values_list.append(attr_value[key])
else:
print(f"Skipping unsupported type for key {key}: {type(val)}")
if values_list:
new_attr_value = np.array([tuple(values_list)], dtype=dtype)
else:
new_attr_value = np.array(['missing'], dtype=[str])
return new_attr_value
max_str_len = max(len(str(v)) for v in attr_value.values())
byte_len = max_str_len * 4 # UTF-8 worst-case
for key, val in attr_value.items():
if key == 'rename_as':
continue
if isinstance(val, (int, float, str)):
dtype.append((key, f'S{byte_len}'))
try:
encoded_val = str(val).encode('utf-8') # explicit UTF-8
values_list.append(encoded_val)
except UnicodeEncodeError as e:
logging.error(f"Failed to encode {key}={val}: {e}")
raise
else:
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
if values_list:
return np.array([tuple(values_list)], dtype=dtype)
else:
return np.array(['missing'], dtype=[('value', 'S16')])
def infer_units(column_name):