Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f6d0e4f2b | |||
| e96ecfa951 | |||
| 8daa57c396 | |||
| 11b9e35526 | |||
| d43ead5f6c | |||
|
|
d6bb20ae7d |
@@ -13,7 +13,7 @@ group_id: '5505'
|
|||||||
experiment: 'kinetic_flowtube_study' # 'beamtime', 'smog_chamber_study'
|
experiment: 'kinetic_flowtube_study' # 'beamtime', 'smog_chamber_study'
|
||||||
dataset_startdate:
|
dataset_startdate:
|
||||||
dataset_enddate:
|
dataset_enddate:
|
||||||
actris_level: '0'
|
data_level: 0
|
||||||
|
|
||||||
# Instrument folders containing raw data from the campaign
|
# Instrument folders containing raw data from the campaign
|
||||||
instrument_datafolder:
|
instrument_datafolder:
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ group_id: '5505'
|
|||||||
experiment: 'beamtime' # beamtime, smog_chamber, lab_experiment
|
experiment: 'beamtime' # beamtime, smog_chamber, lab_experiment
|
||||||
dataset_startdate: '2023-09-22'
|
dataset_startdate: '2023-09-22'
|
||||||
dataset_enddate: '2023-09-25'
|
dataset_enddate: '2023-09-25'
|
||||||
actris_level: '0'
|
data_level: 0
|
||||||
|
|
||||||
institution : "PSI"
|
institution : "PSI"
|
||||||
filename_format : "institution,experiment,contact"
|
filename_format : "institution,experiment,contact"
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ group_id: '5505'
|
|||||||
experiment: 'smog_chamber_study' # beamtime, smog_chamber, lab_experiment
|
experiment: 'smog_chamber_study' # beamtime, smog_chamber, lab_experiment
|
||||||
dataset_startdate:
|
dataset_startdate:
|
||||||
dataset_enddate:
|
dataset_enddate:
|
||||||
actris_level: '0'
|
data_level: 0
|
||||||
|
|
||||||
# Instrument folders containing raw data from the campaign
|
# Instrument folders containing raw data from the campaign
|
||||||
instrument_datafolder:
|
instrument_datafolder:
|
||||||
|
|||||||
@@ -23,7 +23,9 @@ import logging
|
|||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
def read_jsonflag_as_dict(path_to_file):
|
def read_jsonflag_as_dict(path_to_file):
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -21,10 +21,9 @@ import argparse
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
|
|
||||||
|
|
||||||
def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
# If instruments_dir is not provided, use the default path relative to the module directory
|
# If instruments_dir is not provided, use the default path relative to the module directory
|
||||||
if not instruments_dir:
|
if not instruments_dir:
|
||||||
|
|||||||
@@ -21,8 +21,9 @@ import argparse
|
|||||||
import logging
|
import logging
|
||||||
import warnings
|
import warnings
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
|
|
||||||
filename = os.path.normpath(filename)
|
filename = os.path.normpath(filename)
|
||||||
@@ -44,7 +45,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
|
|
||||||
|
|
||||||
# Read header as a dictionary and detect where data table starts
|
# Read header as a dictionary and detect where data table starts
|
||||||
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
|
||||||
data_start = False
|
data_start = False
|
||||||
# Work with copy of the file for safety
|
# Work with copy of the file for safety
|
||||||
if work_with_copy:
|
if work_with_copy:
|
||||||
@@ -54,7 +55,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
|
|
||||||
# Run header detection
|
# Run header detection
|
||||||
header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants)
|
header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants)
|
||||||
|
header_dict = {}
|
||||||
# Unpack validated format info
|
# Unpack validated format info
|
||||||
table_header = fmt_dict['table_header']
|
table_header = fmt_dict['table_header']
|
||||||
separator = fmt_dict['separator']
|
separator = fmt_dict['separator']
|
||||||
|
|||||||
@@ -22,11 +22,12 @@ import logging
|
|||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
import src.hdf5_ops as hdf5_ops
|
import src.hdf5_ops as hdf5_ops
|
||||||
import instruments.filereader_registry as filereader_registry
|
import instruments.filereader_registry as filereader_registry
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
import inspect
|
||||||
|
|
||||||
def hdf5_file_reader(dest_file_obj_or_path, src_file_path=None, dest_group_name=None, work_with_copy: bool = True):
|
@record_data_lineage(data_level=0)
|
||||||
|
def hdf5_file_reader(dest_file_obj_or_path, src_file_path : str = None, dest_group_name : str = None, work_with_copy: bool = True):
|
||||||
"""
|
"""
|
||||||
Reads an HDF5 file and copies its contents to a destination group.
|
Reads an HDF5 file and copies its contents to a destination group.
|
||||||
If an HDF5 file object is provided, it skips reading from a file path.
|
If an HDF5 file object is provided, it skips reading from a file path.
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import argparse
|
|||||||
|
|
||||||
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
def split_header(header_lines):
|
def split_header(header_lines):
|
||||||
header_lines_copy = []
|
header_lines_copy = []
|
||||||
@@ -79,6 +79,8 @@ def extract_var_descriptions(part2):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True):
|
def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
|
|
||||||
# If instruments_dir is not provided, use the default path relative to the module directory
|
# If instruments_dir is not provided, use the default path relative to the module directory
|
||||||
|
|||||||
@@ -20,7 +20,9 @@ import argparse
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
def read_structured_file_as_dict(path_to_file):
|
def read_structured_file_as_dict(path_to_file):
|
||||||
"""
|
"""
|
||||||
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
|
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
|
||||||
@@ -32,7 +34,7 @@ def read_structured_file_as_dict(path_to_file):
|
|||||||
_, path_head = os.path.split(path_to_file)
|
_, path_head = os.path.split(path_to_file)
|
||||||
|
|
||||||
file_dict['name'] = path_head
|
file_dict['name'] = path_head
|
||||||
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
file_dict['attributes_dict'] = {} #'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||||
file_dict['datasets'] = []
|
file_dict['datasets'] = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -21,8 +21,9 @@ from igor2.binarywave import load as loadibw
|
|||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
from src.meta_ops import record_data_lineage
|
||||||
|
|
||||||
|
@record_data_lineage(data_level=0)
|
||||||
def read_xps_ibw_file_as_dict(filename):
|
def read_xps_ibw_file_as_dict(filename):
|
||||||
"""
|
"""
|
||||||
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
|
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
|
||||||
@@ -66,7 +67,7 @@ def read_xps_ibw_file_as_dict(filename):
|
|||||||
|
|
||||||
# Group name and attributes
|
# Group name and attributes
|
||||||
file_dict['name'] = path_head
|
file_dict['name'] = path_head
|
||||||
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
file_dict['attributes_dict'] = {} #'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||||
|
|
||||||
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
||||||
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ if dimaPath not in sys.path: # Avoid duplicate entries
|
|||||||
import yaml
|
import yaml
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
import shutil
|
||||||
# Importing chain class from itertools
|
# Importing chain class from itertools
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
import shutil
|
import shutil
|
||||||
@@ -57,7 +58,7 @@ def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
|||||||
# Define required keys
|
# Define required keys
|
||||||
required_keys = [
|
required_keys = [
|
||||||
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
'experiment', 'contact', 'input_file_directory', 'output_file_directory',
|
||||||
'instrument_datafolder', 'project', 'actris_level'
|
'instrument_datafolder', 'project', 'data_level'
|
||||||
]
|
]
|
||||||
|
|
||||||
# Supported integration modes
|
# Supported integration modes
|
||||||
@@ -258,7 +259,7 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
|||||||
select_dir_keywords = config_dict['instrument_datafolder']
|
select_dir_keywords = config_dict['instrument_datafolder']
|
||||||
|
|
||||||
# Define root folder metadata dictionary
|
# Define root folder metadata dictionary
|
||||||
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'actris_level']}
|
root_metadata_dict = {key : config_dict[key] for key in ['project', 'experiment', 'contact', 'data_level']}
|
||||||
|
|
||||||
# Get dataset start and end dates
|
# Get dataset start and end dates
|
||||||
dataset_startdate = config_dict['dataset_startdate']
|
dataset_startdate = config_dict['dataset_startdate']
|
||||||
|
|||||||
84
src/meta_ops.py
Normal file
84
src/meta_ops.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.append(dimaPath)
|
||||||
|
|
||||||
|
|
||||||
|
import h5py
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
import logging
|
||||||
|
import datetime
|
||||||
|
import yaml
|
||||||
|
import json
|
||||||
|
import copy
|
||||||
|
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
#import src.hdf5_writer as hdf5_lib
|
||||||
|
import inspect
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
|
||||||
|
def record_data_lineage(data_level: int = 0):
|
||||||
|
"""Parameterized decorator to record data lineage information.
|
||||||
|
`data_level` is a user-defined integer.
|
||||||
|
Adds lineage metadata to dict returns or HDF5 group attributes."""
|
||||||
|
|
||||||
|
def decorator(function: callable):
|
||||||
|
# Get relative path to the script where the function is defined
|
||||||
|
tmpFunctionAbsPath = inspect.getfile(function)
|
||||||
|
functionFileRelativePath = os.path.relpath(tmpFunctionAbsPath, dimaPath)
|
||||||
|
func_signature = inspect.signature(function)
|
||||||
|
|
||||||
|
@wraps(function)
|
||||||
|
def wrapper_func(*args, **kwargs):
|
||||||
|
# Bind args/kwargs to the function signature
|
||||||
|
bound_args = func_signature.bind(*args, **kwargs)
|
||||||
|
bound_args.apply_defaults()
|
||||||
|
|
||||||
|
dest_file_path = bound_args.arguments.get('dest_file_obj_or_path')
|
||||||
|
dest_group_name = bound_args.arguments.get('dest_group_name')
|
||||||
|
|
||||||
|
# If the file is already an h5py.File object, use its filename
|
||||||
|
if isinstance(dest_file_path, h5py.File):
|
||||||
|
dest_file_path = dest_file_path.filename
|
||||||
|
|
||||||
|
# Call the original function
|
||||||
|
result = function(*args, **kwargs)
|
||||||
|
|
||||||
|
# Prepare lineage metadata
|
||||||
|
data_lineage_metadata = {
|
||||||
|
'data_level': data_level,
|
||||||
|
'processing_script': functionFileRelativePath,
|
||||||
|
'processing_date': utils.created_at(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Case 1: dict result → inject metadata
|
||||||
|
if isinstance(result, dict):
|
||||||
|
if 'attributes_dict' not in result:
|
||||||
|
result['attributes_dict'] = {}
|
||||||
|
result['attributes_dict'].update(data_lineage_metadata)
|
||||||
|
|
||||||
|
# Case 2: HDF5 group → inject metadata safely
|
||||||
|
elif dest_file_path and dest_group_name:
|
||||||
|
if os.path.exists(dest_file_path) and dest_file_path.endswith('.h5'):
|
||||||
|
with h5py.File(dest_file_path, mode='r+', track_order=True) as fobj:
|
||||||
|
if dest_group_name in fobj:
|
||||||
|
for key, value in data_lineage_metadata.items():
|
||||||
|
fobj[dest_group_name].attrs[key] = value
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
return wrapper_func
|
||||||
|
|
||||||
|
return decorator
|
||||||
Reference in New Issue
Block a user