Decorate readers to capture data lineage using record_data_lineage from src.meta_ops

This commit is contained in:
2025-09-20 11:02:09 +02:00
parent e96ecfa951
commit 5f6d0e4f2b
7 changed files with 22 additions and 14 deletions

View File

@@ -23,7 +23,9 @@ import logging
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
@record_data_lineage(data_level=0)
def read_jsonflag_as_dict(path_to_file): def read_jsonflag_as_dict(path_to_file):

View File

@@ -21,10 +21,9 @@ import argparse
import logging import logging
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
@record_data_lineage(data_level=0)
def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
# If instruments_dir is not provided, use the default path relative to the module directory # If instruments_dir is not provided, use the default path relative to the module directory
if not instruments_dir: if not instruments_dir:

View File

@@ -21,8 +21,9 @@ import argparse
import logging import logging
import warnings import warnings
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
@record_data_lineage(data_level=0)
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename) filename = os.path.normpath(filename)
@@ -44,7 +45,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
# Read header as a dictionary and detect where data table starts # Read header as a dictionary and detect where data table starts
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
data_start = False data_start = False
# Work with copy of the file for safety # Work with copy of the file for safety
if work_with_copy: if work_with_copy:
@@ -54,7 +55,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
# Run header detection # Run header detection
header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants) header_line_number, column_names, fmt_dict, table_preamble = detect_table_header_line(tmp_filename, format_variants)
header_dict = {}
# Unpack validated format info # Unpack validated format info
table_header = fmt_dict['table_header'] table_header = fmt_dict['table_header']
separator = fmt_dict['separator'] separator = fmt_dict['separator']

View File

@@ -22,11 +22,12 @@ import logging
import utils.g5505_utils as utils import utils.g5505_utils as utils
import src.hdf5_ops as hdf5_ops import src.hdf5_ops as hdf5_ops
import instruments.filereader_registry as filereader_registry import instruments.filereader_registry as filereader_registry
from src.meta_ops import record_data_lineage
import inspect
def hdf5_file_reader(dest_file_obj_or_path, src_file_path=None, dest_group_name=None, work_with_copy: bool = True): @record_data_lineage(data_level=0)
def hdf5_file_reader(dest_file_obj_or_path, src_file_path : str = None, dest_group_name : str = None, work_with_copy: bool = True):
""" """
Reads an HDF5 file and copies its contents to a destination group. Reads an HDF5 file and copies its contents to a destination group.
If an HDF5 file object is provided, it skips reading from a file path. If an HDF5 file object is provided, it skips reading from a file path.

View File

@@ -22,7 +22,7 @@ import argparse
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
def split_header(header_lines): def split_header(header_lines):
header_lines_copy = [] header_lines_copy = []
@@ -79,6 +79,8 @@ def extract_var_descriptions(part2):
@record_data_lineage(data_level=0)
def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True): def read_nasa_ames_as_dict(filename, instruments_dir: str = None, work_with_copy: bool = True):
# If instruments_dir is not provided, use the default path relative to the module directory # If instruments_dir is not provided, use the default path relative to the module directory

View File

@@ -20,7 +20,9 @@ import argparse
import logging import logging
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
@record_data_lineage(data_level=0)
def read_structured_file_as_dict(path_to_file): def read_structured_file_as_dict(path_to_file):
""" """
Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize, Reads a JSON or YAML file, flattens nested structures using pandas.json_normalize,
@@ -32,7 +34,7 @@ def read_structured_file_as_dict(path_to_file):
_, path_head = os.path.split(path_to_file) _, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head file_dict['name'] = path_head
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)} file_dict['attributes_dict'] = {} #'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
file_dict['datasets'] = [] file_dict['datasets'] = []
try: try:

View File

@@ -21,8 +21,9 @@ from igor2.binarywave import load as loadibw
import logging import logging
import argparse import argparse
import utils.g5505_utils as utils import utils.g5505_utils as utils
from src.meta_ops import record_data_lineage
@record_data_lineage(data_level=0)
def read_xps_ibw_file_as_dict(filename): def read_xps_ibw_file_as_dict(filename):
""" """
Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings, Reads IBW files from the Multiphase Chemistry Group, which contain XPS spectra and acquisition settings,
@@ -66,7 +67,7 @@ def read_xps_ibw_file_as_dict(filename):
# Group name and attributes # Group name and attributes
file_dict['name'] = path_head file_dict['name'] = path_head
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)} file_dict['attributes_dict'] = {} #'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')