Compare commits
8 Commits
d5fa2b6c71
...
885370865f
| Author | SHA1 | Date | |
|---|---|---|---|
| 885370865f | |||
| 9bb1d4204d | |||
| a1b9fc1cc9 | |||
| 80d95841e1 | |||
| 9aa7c0ece8 | |||
| ce403da6b0 | |||
| c02549f013 | |||
| a10cdf2fc5 |
@@ -2,7 +2,7 @@
|
||||
input_file_directory: '//fs101/5505/Data'
|
||||
|
||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||
output_file_directory: '../output_files/'
|
||||
output_file_directory: '../data/'
|
||||
|
||||
# Project metadata for data lineage and provenance
|
||||
project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate'
|
||||
@@ -2,7 +2,7 @@
|
||||
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
|
||||
|
||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||
output_file_directory: '../output_files/'
|
||||
output_file_directory: '../data/'
|
||||
|
||||
# Project metadata for data lineage and provenance
|
||||
project: 'Beamtime May 2024, Ice Napp'
|
||||
@@ -2,7 +2,7 @@
|
||||
input_file_directory: '//fs03/Iron_Sulphate'
|
||||
|
||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||
output_file_directory: '../output_files/'
|
||||
output_file_directory: '../data/'
|
||||
|
||||
# Project metadata for data lineage and provenance
|
||||
project: 'Fe SOA project'
|
||||
@@ -19,19 +19,94 @@ import yaml
|
||||
import h5py
|
||||
import argparse
|
||||
import logging
|
||||
# Import project modules
|
||||
#root_dir = os.path.abspath(os.curdir)
|
||||
#sys.path.append(root_dir)
|
||||
|
||||
|
||||
#try:
|
||||
# from dima.utils import g5505_utils as utils
|
||||
#except ModuleNotFoundError:
|
||||
# import utils.g5505_utils as utils
|
||||
# import src.hdf5_ops as hdf5_ops
|
||||
import warnings
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False):
|
||||
"""
|
||||
Detects the table header line in the file and returns:
|
||||
- header_line_idx (int)
|
||||
- column_names (List[str])
|
||||
- tb_idx used
|
||||
- preamble_lines (List[str])
|
||||
Returns (-1, [], None, []) if not found.
|
||||
"""
|
||||
preamble_lines = []
|
||||
header_line_idx = -1
|
||||
column_names = []
|
||||
tb_idx = None
|
||||
|
||||
with open(filepath, 'rb') as f:
|
||||
for line_number, line in enumerate(f):
|
||||
decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially
|
||||
for idx, tb in enumerate(table_header_list):
|
||||
if tb in decoded_line:
|
||||
tb_idx = idx
|
||||
list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t'))
|
||||
counts = collections.Counter(list_of_substrings)
|
||||
column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
|
||||
for i, name in enumerate(list_of_substrings)]
|
||||
header_line_idx = line_number
|
||||
if verbose:
|
||||
print(f"[Detected header] Line {line_number}: {column_names}")
|
||||
return header_line_idx, column_names, tb_idx, preamble_lines
|
||||
preamble_lines.append(' '.join(decoded_line.split()))
|
||||
|
||||
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
|
||||
return -1, [], None, preamble_lines
|
||||
|
||||
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
|
||||
"""
|
||||
Load file reader configuration parameters based on the file and instrument directory.
|
||||
|
||||
Returns:
|
||||
- config_dict: Full configuration dictionary
|
||||
- file_encoding
|
||||
- separator
|
||||
- table_header
|
||||
- timestamp_variables
|
||||
- datetime_format
|
||||
- description_dict
|
||||
"""
|
||||
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
|
||||
|
||||
try:
|
||||
with open(config_path, 'r') as stream:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
print(f"[YAML Load Error] {exc}")
|
||||
return {}, '', '', '', [], [], {}
|
||||
|
||||
# Defaults
|
||||
file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8')
|
||||
separator = config_dict.get('default', {}).get('separator', ',')
|
||||
table_header = config_dict.get('default', {}).get('table_header', 'infer')
|
||||
timestamp_variables = []
|
||||
datetime_format = []
|
||||
description_dict = {}
|
||||
|
||||
for instFolder in config_dict.keys():
|
||||
if instFolder in filename.split(os.sep):
|
||||
file_encoding = config_dict[instFolder].get('file_encoding', file_encoding)
|
||||
separator = config_dict[instFolder].get('separator', separator)
|
||||
table_header = config_dict[instFolder].get('table_header', table_header)
|
||||
timestamp_variables = config_dict[instFolder].get('timestamp', [])
|
||||
datetime_format = config_dict[instFolder].get('datetime_format', [])
|
||||
|
||||
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
|
||||
if link_to_description:
|
||||
path = os.path.join(instruments_dir, link_to_description)
|
||||
try:
|
||||
with open(path, 'r') as stream:
|
||||
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except (FileNotFoundError, yaml.YAMLError) as exc:
|
||||
print(f"[Description Load Error] {exc}")
|
||||
|
||||
return (config_dict, file_encoding, separator, table_header,
|
||||
timestamp_variables, datetime_format, description_dict)
|
||||
|
||||
|
||||
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||
|
||||
filename = os.path.normpath(filename)
|
||||
@@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
module_dir = os.path.dirname(__file__)
|
||||
instruments_dir = os.path.join(module_dir, '..')
|
||||
|
||||
# Normalize the path (resolves any '..' in the path)
|
||||
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
|
||||
|
||||
print(instrument_configs_path)
|
||||
|
||||
with open(instrument_configs_path,'r') as stream:
|
||||
try:
|
||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except yaml.YAMLError as exc:
|
||||
print(exc)
|
||||
# Verify if file can be read by available intrument configurations.
|
||||
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
|
||||
# return {}
|
||||
|
||||
|
||||
#TODO: this may be prone to error if assumed folder structure is non compliant
|
||||
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
|
||||
separator = config_dict['default']['separator']
|
||||
table_header = config_dict['default']['table_header']
|
||||
timestamp_variables = []
|
||||
datetime_format = []
|
||||
tb_idx = 0
|
||||
column_names = ''
|
||||
description_dict = {}
|
||||
|
||||
for instFolder in config_dict.keys():
|
||||
|
||||
if instFolder in filename.split(os.sep):
|
||||
|
||||
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
|
||||
separator = config_dict[instFolder].get('separator',separator)
|
||||
table_header = config_dict[instFolder].get('table_header',table_header)
|
||||
timestamp_variables = config_dict[instFolder].get('timestamp',[])
|
||||
datetime_format = config_dict[instFolder].get('datetime_format',[])
|
||||
|
||||
|
||||
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
|
||||
|
||||
if link_to_description:
|
||||
path = os.path.join(instruments_dir, link_to_description)
|
||||
try:
|
||||
with open(path, 'r') as stream:
|
||||
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
except (FileNotFoundError, yaml.YAMLError) as exc:
|
||||
print(exc)
|
||||
#if 'None' in table_header:
|
||||
# return {}
|
||||
(config_dict,
|
||||
file_encoding,
|
||||
separator,
|
||||
table_header,
|
||||
timestamp_variables,
|
||||
datetime_format,
|
||||
description_dict) = load_file_reader_parameters(filename, instruments_dir)
|
||||
|
||||
# Read header as a dictionary and detect where data table starts
|
||||
header_dict = {}
|
||||
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||
data_start = False
|
||||
# Work with copy of the file for safety
|
||||
if work_with_copy:
|
||||
@@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
||||
table_preamble = []
|
||||
line_number = 0
|
||||
if 'infer' not in table_header:
|
||||
header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line(
|
||||
tmp_filename, table_header, file_encoding, separator)
|
||||
|
||||
with open(tmp_filename,'rb') as f:
|
||||
|
||||
for line_number, line in enumerate(f):
|
||||
decoded_line = line.decode(file_encoding[tb_idx])
|
||||
|
||||
|
||||
for tb_idx, tb in enumerate(table_header):
|
||||
print(tb)
|
||||
if tb in decoded_line:
|
||||
break
|
||||
|
||||
if tb in decoded_line:
|
||||
|
||||
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
|
||||
|
||||
# Count occurrences of each substring
|
||||
substring_counts = collections.Counter(list_of_substrings)
|
||||
data_start = True
|
||||
# Generate column names with appended index only for repeated substrings
|
||||
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
|
||||
|
||||
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
|
||||
#column_names = []
|
||||
#for i, name in enumerate(list_of_substrings):
|
||||
# column_names.append(str(i)+'_'+name)
|
||||
|
||||
#print(line_number, len(column_names ),'\n')
|
||||
break
|
||||
else:
|
||||
print('Table header was not detected.')
|
||||
# Subdivide line into words, and join them by single space.
|
||||
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
||||
list_of_substrings = decoded_line.split()
|
||||
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
||||
#line = ' '.join(list_of_substrings+['\n'])
|
||||
#line = ' '.join(list_of_substrings)
|
||||
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
|
||||
if header_line_idx == -1:
|
||||
table_header = ['infer'] # fallback to pandas' inference
|
||||
|
||||
|
||||
# TODO: it does not work with separator as none :(. fix for RGA
|
||||
|
||||
try:
|
||||
print(column_names)
|
||||
if not 'infer' in table_header:
|
||||
#print(table_header)
|
||||
#print(file_encoding[tb_idx])
|
||||
|
||||
df = pd.read_csv(tmp_filename,
|
||||
delimiter = separator[tb_idx].replace('\\t','\t'),
|
||||
header=line_number,
|
||||
#encoding='latin-1',
|
||||
header=header_line_idx,
|
||||
encoding = file_encoding[tb_idx],
|
||||
names=column_names,
|
||||
skip_blank_lines=True)
|
||||
|
||||
@@ -32,7 +32,7 @@ def read_structured_file_as_dict(path_to_file):
|
||||
_, path_head = os.path.split(path_to_file)
|
||||
|
||||
file_dict['name'] = path_head
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||
file_dict['datasets'] = []
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,10 +1,27 @@
|
||||
import os
|
||||
import sys
|
||||
import os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
|
||||
|
||||
import h5py
|
||||
|
||||
from igor2.binarywave import load as loadibw
|
||||
import logging
|
||||
import argparse
|
||||
import utils.g5505_utils as utils
|
||||
|
||||
|
||||
def read_xps_ibw_file_as_dict(filename):
|
||||
"""
|
||||
@@ -49,7 +66,7 @@ def read_xps_ibw_file_as_dict(filename):
|
||||
|
||||
# Group name and attributes
|
||||
file_dict['name'] = path_head
|
||||
file_dict['attributes_dict'] = {}
|
||||
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||
|
||||
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
||||
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
||||
@@ -85,22 +102,11 @@ def read_xps_ibw_file_as_dict(filename):
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
except NameError:
|
||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||
|
||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||
sys.path.insert(0,dimaPath)
|
||||
|
||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||
from utils.g5505_utils import created_at
|
||||
|
||||
|
||||
|
||||
# Set up argument parsing
|
||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||
|
||||
@@ -49,15 +49,14 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#output_filename_path = 'output_files/unified_file_smog_chamber_2024-04-07_UTC-OFST_+0200_NG.h5'\n",
|
||||
"yaml_config_file_path = '../input_files/data_integr_config_file_TBR.yaml'\n",
|
||||
"number, initials = 1, 'LI' # Set as either 2, 'TBR' or 3, 'NG'\n",
|
||||
"campaign_descriptor_path = f'../input_files/campaignDescriptor{number}_{initials}.yaml'\n",
|
||||
"\n",
|
||||
"#path_to_input_directory = 'output_files/kinetic_flowtube_study_2022-01-31_LuciaI'\n",
|
||||
"#path_to_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_input_directory)\n"
|
||||
"print(campaign_descriptor_path)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -76,7 +75,7 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"\n",
|
||||
"hdf5_file_path = data_integration.run_pipeline(yaml_config_file_path)"
|
||||
"hdf5_file_path = data_integration.run_pipeline(campaign_descriptor_path)"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -146,7 +145,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
@@ -160,7 +159,7 @@
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "multiphase_chemistry_env",
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
@@ -174,7 +173,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.9"
|
||||
"version": "3.11.10"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
||||
@@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps):
|
||||
""" Generate the datetime augment dictionary from datetime steps. """
|
||||
datetime_augment_dict = {}
|
||||
for datetime_step in datetime_steps:
|
||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
||||
datetime_augment_dict[datetime_step] = [
|
||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
|
||||
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||
]
|
||||
return datetime_augment_dict
|
||||
|
||||
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
|
||||
"""Generate consistent directory or file name fragment based on mode."""
|
||||
if integration_mode == 'collection':
|
||||
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
|
||||
else:
|
||||
return f'{filename_prefix}_{dataset_enddate}'
|
||||
|
||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||
|
||||
@@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
|
||||
|
||||
|
||||
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||
|
||||
"""Integrates data sources specified by the input configuration file into HDF5 files.
|
||||
|
||||
Parameters:
|
||||
yaml_config_file_path (str): Path to the YAML configuration file.
|
||||
log_dir (str): Directory to save the log file.
|
||||
|
||||
Returns:
|
||||
list: List of Paths to the created HDF5 file(s).
|
||||
"""
|
||||
|
||||
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
||||
|
||||
path_to_input_dir = config_dict['input_file_directory']
|
||||
@@ -213,61 +209,59 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||
dataset_startdate = config_dict['dataset_startdate']
|
||||
dataset_enddate = config_dict['dataset_enddate']
|
||||
|
||||
# Determine mode and process accordingly
|
||||
output_filename_path = []
|
||||
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
|
||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
||||
integration_mode = config_dict.get('integration_mode', 'single_experiment')
|
||||
filename_prefix = config_dict['filename_prefix']
|
||||
|
||||
output_filename_path = []
|
||||
|
||||
# Determine top-level campaign folder path
|
||||
top_level_foldername = _generate_output_path_fragment(
|
||||
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
|
||||
)
|
||||
|
||||
# Create path to new raw datafolder and standardize with forward slashes
|
||||
path_to_rawdata_folder = os.path.join(
|
||||
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
|
||||
path_to_output_dir, top_level_foldername, ""
|
||||
).replace(os.sep, '/')
|
||||
|
||||
# Process individual datetime steps if available, regardless of mode
|
||||
if config_dict.get('datetime_steps_dict', {}):
|
||||
# Single experiment mode
|
||||
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
||||
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
|
||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
|
||||
single_date_str = datetime_step.strftime('%Y%m%d')
|
||||
subfolder_name = f"{filename_prefix}_{single_date_str}"
|
||||
subfolder_name = f"experimental_step_{single_date_str}"
|
||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
|
||||
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
# Collection mode processing if specified
|
||||
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
||||
# Collection mode post-processing
|
||||
if integration_mode == 'collection':
|
||||
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
||||
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
|
||||
path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
|
||||
)
|
||||
output_filename_path.append(hdf5_path)
|
||||
else:
|
||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||
allowed_file_extensions, root_metadata_dict)
|
||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||
allowed_file_extensions, root_metadata_dict)
|
||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||
|
||||
return output_filename_path
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python data_integration.py <function_name> <function_args>")
|
||||
sys.exit(1)
|
||||
|
||||
# Extract the function name from the command line arguments
|
||||
function_name = sys.argv[1]
|
||||
|
||||
# Handle function execution based on the provided function name
|
||||
if function_name == 'run':
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
||||
sys.exit(1)
|
||||
# Extract path to configuration file, specifying the data integration task
|
||||
path_to_config_yamlFile = sys.argv[2]
|
||||
run_pipeline(path_to_config_yamlFile)
|
||||
|
||||
|
||||
|
||||
@@ -19,17 +19,10 @@ import pandas as pd
|
||||
import numpy as np
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
import h5py
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import copy
|
||||
|
||||
#try:
|
||||
# from dima.utils import g5505_utils as utils
|
||||
# from dima.src import hdf5_writer as hdf5_lib
|
||||
#except ModuleNotFoundError:
|
||||
import utils.g5505_utils as utils
|
||||
import src.hdf5_writer as hdf5_lib
|
||||
|
||||
|
||||
@@ -217,49 +217,49 @@ def convert_string_to_bytes(input_list: list):
|
||||
|
||||
def convert_attrdict_to_np_structured_array(attr_value: dict):
|
||||
"""
|
||||
Converts a dictionary of attributes into a numpy structured array for HDF5
|
||||
compound type compatibility.
|
||||
|
||||
Each dictionary key is mapped to a field in the structured array, with the
|
||||
data type (S) determined by the longest string representation of the values.
|
||||
If the dictionary is empty, the function returns 'missing'.
|
||||
Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
|
||||
Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
attr_value : dict
|
||||
Dictionary containing the attributes to be converted. Example:
|
||||
attr_value = {
|
||||
'name': 'Temperature',
|
||||
'unit': 'Celsius',
|
||||
'value': 23.5,
|
||||
'timestamp': '2023-09-26 10:00'
|
||||
}
|
||||
Dictionary with scalar values (int, float, str).
|
||||
|
||||
Returns
|
||||
-------
|
||||
new_attr_value : ndarray
|
||||
Numpy structured array with UTF-8 encoded fields. Returns np.array(['missing'], dtype=[str]) if
|
||||
the input dictionary is empty.
|
||||
1-row structured array with fixed-size byte fields (dtype='S').
|
||||
"""
|
||||
if not isinstance(attr_value,dict):
|
||||
raise ValueError(f'Input paremeter {attr_value} must be a dictionary of scalar values.')
|
||||
if not isinstance(attr_value, dict):
|
||||
raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
|
||||
|
||||
if not attr_value:
|
||||
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
|
||||
|
||||
dtype = []
|
||||
values_list = []
|
||||
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
|
||||
for key, val in attr_value.items():
|
||||
# Verify if 'rename_as' is still used in metadata revision
|
||||
if key != 'rename_as' and isinstance(val, (int, float, str)):
|
||||
dtype.append((key, f'S{max_length}'))
|
||||
values_list.append(attr_value[key])
|
||||
else:
|
||||
print(f"Skipping unsupported type for key {key}: {type(val)}")
|
||||
if values_list:
|
||||
new_attr_value = np.array([tuple(values_list)], dtype=dtype)
|
||||
else:
|
||||
new_attr_value = np.array(['missing'], dtype=[str])
|
||||
|
||||
return new_attr_value
|
||||
max_str_len = max(len(str(v)) for v in attr_value.values())
|
||||
byte_len = max_str_len * 4 # UTF-8 worst-case
|
||||
|
||||
for key, val in attr_value.items():
|
||||
if key == 'rename_as':
|
||||
continue
|
||||
if isinstance(val, (int, float, str)):
|
||||
dtype.append((key, f'S{byte_len}'))
|
||||
try:
|
||||
encoded_val = str(val).encode('utf-8') # explicit UTF-8
|
||||
values_list.append(encoded_val)
|
||||
except UnicodeEncodeError as e:
|
||||
logging.error(f"Failed to encode {key}={val}: {e}")
|
||||
raise
|
||||
else:
|
||||
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
|
||||
|
||||
if values_list:
|
||||
return np.array([tuple(values_list)], dtype=dtype)
|
||||
else:
|
||||
return np.array(['missing'], dtype=[('value', 'S16')])
|
||||
|
||||
|
||||
def infer_units(column_name):
|
||||
|
||||
Reference in New Issue
Block a user