Compare commits

...

8 Commits

10 changed files with 370 additions and 381 deletions

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs101/5505/Data' input_file_directory: '//fs101/5505/Data'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis. # Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/' output_file_directory: '../data/'
# Project metadata for data lineage and provenance # Project metadata for data lineage and provenance
project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate' project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate'

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime' input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis. # Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/' output_file_directory: '../data/'
# Project metadata for data lineage and provenance # Project metadata for data lineage and provenance
project: 'Beamtime May 2024, Ice Napp' project: 'Beamtime May 2024, Ice Napp'

View File

@@ -2,7 +2,7 @@
input_file_directory: '//fs03/Iron_Sulphate' input_file_directory: '//fs03/Iron_Sulphate'
# Path to directory where raw data is copied and converted to HDF5 format for local analysis. # Path to directory where raw data is copied and converted to HDF5 format for local analysis.
output_file_directory: '../output_files/' output_file_directory: '../data/'
# Project metadata for data lineage and provenance # Project metadata for data lineage and provenance
project: 'Fe SOA project' project: 'Fe SOA project'

View File

@@ -19,19 +19,94 @@ import yaml
import h5py import h5py
import argparse import argparse
import logging import logging
# Import project modules import warnings
#root_dir = os.path.abspath(os.curdir)
#sys.path.append(root_dir)
#try:
# from dima.utils import g5505_utils as utils
#except ModuleNotFoundError:
# import utils.g5505_utils as utils
# import src.hdf5_ops as hdf5_ops
import utils.g5505_utils as utils import utils.g5505_utils as utils
def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False):
"""
Detects the table header line in the file and returns:
- header_line_idx (int)
- column_names (List[str])
- tb_idx used
- preamble_lines (List[str])
Returns (-1, [], None, []) if not found.
"""
preamble_lines = []
header_line_idx = -1
column_names = []
tb_idx = None
with open(filepath, 'rb') as f:
for line_number, line in enumerate(f):
decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially
for idx, tb in enumerate(table_header_list):
if tb in decoded_line:
tb_idx = idx
list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t'))
counts = collections.Counter(list_of_substrings)
column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
for i, name in enumerate(list_of_substrings)]
header_line_idx = line_number
if verbose:
print(f"[Detected header] Line {line_number}: {column_names}")
return header_line_idx, column_names, tb_idx, preamble_lines
preamble_lines.append(' '.join(decoded_line.split()))
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
return -1, [], None, preamble_lines
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
"""
Load file reader configuration parameters based on the file and instrument directory.
Returns:
- config_dict: Full configuration dictionary
- file_encoding
- separator
- table_header
- timestamp_variables
- datetime_format
- description_dict
"""
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
try:
with open(config_path, 'r') as stream:
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(f"[YAML Load Error] {exc}")
return {}, '', '', '', [], [], {}
# Defaults
file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8')
separator = config_dict.get('default', {}).get('separator', ',')
table_header = config_dict.get('default', {}).get('table_header', 'infer')
timestamp_variables = []
datetime_format = []
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding', file_encoding)
separator = config_dict[instFolder].get('separator', separator)
table_header = config_dict[instFolder].get('table_header', table_header)
timestamp_variables = config_dict[instFolder].get('timestamp', [])
datetime_format = config_dict[instFolder].get('datetime_format', [])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(f"[Description Load Error] {exc}")
return (config_dict, file_encoding, separator, table_header,
timestamp_variables, datetime_format, description_dict)
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True): def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
filename = os.path.normpath(filename) filename = os.path.normpath(filename)
@@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
module_dir = os.path.dirname(__file__) module_dir = os.path.dirname(__file__)
instruments_dir = os.path.join(module_dir, '..') instruments_dir = os.path.join(module_dir, '..')
# Normalize the path (resolves any '..' in the path) (config_dict,
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml')) file_encoding,
separator,
print(instrument_configs_path) table_header,
timestamp_variables,
with open(instrument_configs_path,'r') as stream: datetime_format,
try: description_dict) = load_file_reader_parameters(filename, instruments_dir)
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
# Verify if file can be read by available intrument configurations.
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
# return {}
#TODO: this may be prone to error if assumed folder structure is non compliant
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
separator = config_dict['default']['separator']
table_header = config_dict['default']['table_header']
timestamp_variables = []
datetime_format = []
tb_idx = 0
column_names = ''
description_dict = {}
for instFolder in config_dict.keys():
if instFolder in filename.split(os.sep):
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
separator = config_dict[instFolder].get('separator',separator)
table_header = config_dict[instFolder].get('table_header',table_header)
timestamp_variables = config_dict[instFolder].get('timestamp',[])
datetime_format = config_dict[instFolder].get('datetime_format',[])
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
if link_to_description:
path = os.path.join(instruments_dir, link_to_description)
try:
with open(path, 'r') as stream:
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
except (FileNotFoundError, yaml.YAMLError) as exc:
print(exc)
#if 'None' in table_header:
# return {}
# Read header as a dictionary and detect where data table starts # Read header as a dictionary and detect where data table starts
header_dict = {} header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
data_start = False data_start = False
# Work with copy of the file for safety # Work with copy of the file for safety
if work_with_copy: if work_with_copy:
@@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
table_preamble = [] table_preamble = []
line_number = 0 line_number = 0
if 'infer' not in table_header: if 'infer' not in table_header:
header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line(
tmp_filename, table_header, file_encoding, separator)
with open(tmp_filename,'rb') as f: if header_line_idx == -1:
table_header = ['infer'] # fallback to pandas' inference
for line_number, line in enumerate(f):
decoded_line = line.decode(file_encoding[tb_idx])
for tb_idx, tb in enumerate(table_header):
print(tb)
if tb in decoded_line:
break
if tb in decoded_line:
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
# Count occurrences of each substring
substring_counts = collections.Counter(list_of_substrings)
data_start = True
# Generate column names with appended index only for repeated substrings
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
#column_names = []
#for i, name in enumerate(list_of_substrings):
# column_names.append(str(i)+'_'+name)
#print(line_number, len(column_names ),'\n')
break
else:
print('Table header was not detected.')
# Subdivide line into words, and join them by single space.
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
list_of_substrings = decoded_line.split()
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
#line = ' '.join(list_of_substrings+['\n'])
#line = ' '.join(list_of_substrings)
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
# TODO: it does not work with separator as none :(. fix for RGA # TODO: it does not work with separator as none :(. fix for RGA
try: try:
print(column_names)
if not 'infer' in table_header: if not 'infer' in table_header:
#print(table_header)
#print(file_encoding[tb_idx])
df = pd.read_csv(tmp_filename, df = pd.read_csv(tmp_filename,
delimiter = separator[tb_idx].replace('\\t','\t'), delimiter = separator[tb_idx].replace('\\t','\t'),
header=line_number, header=header_line_idx,
#encoding='latin-1',
encoding = file_encoding[tb_idx], encoding = file_encoding[tb_idx],
names=column_names, names=column_names,
skip_blank_lines=True) skip_blank_lines=True)

View File

@@ -32,7 +32,7 @@ def read_structured_file_as_dict(path_to_file):
_, path_head = os.path.split(path_to_file) _, path_head = os.path.split(path_to_file)
file_dict['name'] = path_head file_dict['name'] = path_head
file_dict['attributes_dict'] = {} file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
file_dict['datasets'] = [] file_dict['datasets'] = []
try: try:

View File

@@ -1,10 +1,27 @@
import os
import sys import sys
import os
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
import h5py import h5py
from igor2.binarywave import load as loadibw from igor2.binarywave import load as loadibw
import logging import logging
import argparse import argparse
import utils.g5505_utils as utils
def read_xps_ibw_file_as_dict(filename): def read_xps_ibw_file_as_dict(filename):
""" """
@@ -49,7 +66,7 @@ def read_xps_ibw_file_as_dict(filename):
# Group name and attributes # Group name and attributes
file_dict['name'] = path_head file_dict['name'] = path_head
file_dict['attributes_dict'] = {} file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'. # Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r') notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
@@ -85,22 +102,11 @@ def read_xps_ibw_file_as_dict(filename):
if __name__ == "__main__": if __name__ == "__main__":
try:
thisFilePath = os.path.abspath(__file__)
except NameError:
print("Error: __file__ is not available. Ensure the script is being run from a file.")
print("[Notice] Path to DIMA package may not be resolved properly.")
thisFilePath = os.getcwd() # Use current directory or specify a default
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
if dimaPath not in sys.path: # Avoid duplicate entries
sys.path.insert(0,dimaPath)
from src.hdf5_ops import save_file_dict_to_hdf5 from src.hdf5_ops import save_file_dict_to_hdf5
from utils.g5505_utils import created_at from utils.g5505_utils import created_at
# Set up argument parsing # Set up argument parsing
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.") parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.") parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")

View File

@@ -49,15 +49,14 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"#output_filename_path = 'output_files/unified_file_smog_chamber_2024-04-07_UTC-OFST_+0200_NG.h5'\n", "number, initials = 1, 'LI' # Set as either 2, 'TBR' or 3, 'NG'\n",
"yaml_config_file_path = '../input_files/data_integr_config_file_TBR.yaml'\n", "campaign_descriptor_path = f'../input_files/campaignDescriptor{number}_{initials}.yaml'\n",
"\n", "\n",
"#path_to_input_directory = 'output_files/kinetic_flowtube_study_2022-01-31_LuciaI'\n", "print(campaign_descriptor_path)\n"
"#path_to_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_input_directory)\n"
] ]
}, },
{ {
@@ -76,7 +75,7 @@
"outputs": [], "outputs": [],
"source": [ "source": [
"\n", "\n",
"hdf5_file_path = data_integration.run_pipeline(yaml_config_file_path)" "hdf5_file_path = data_integration.run_pipeline(campaign_descriptor_path)"
] ]
}, },
{ {
@@ -146,7 +145,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 5, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@@ -160,7 +159,7 @@
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "multiphase_chemistry_env", "display_name": "Python 3",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@@ -174,7 +173,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.11.9" "version": "3.11.10"
} }
}, },
"nbformat": 4, "nbformat": 4,

View File

@@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps):
""" Generate the datetime augment dictionary from datetime steps. """ """ Generate the datetime augment dictionary from datetime steps. """
datetime_augment_dict = {} datetime_augment_dict = {}
for datetime_step in datetime_steps: for datetime_step in datetime_steps:
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
datetime_augment_dict[datetime_step] = [ datetime_augment_dict[datetime_step] = [
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d') datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
] ]
return datetime_augment_dict return datetime_augment_dict
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
"""Generate consistent directory or file name fragment based on mode."""
if integration_mode == 'collection':
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
else:
return f'{filename_prefix}_{dataset_enddate}'
def load_config_and_setup_logging(yaml_config_file_path, log_dir): def load_config_and_setup_logging(yaml_config_file_path, log_dir):
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps.""" """Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
@@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'): def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
"""Integrates data sources specified by the input configuration file into HDF5 files.
Parameters:
yaml_config_file_path (str): Path to the YAML configuration file.
log_dir (str): Directory to save the log file.
Returns:
list: List of Paths to the created HDF5 file(s).
"""
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir) config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
path_to_input_dir = config_dict['input_file_directory'] path_to_input_dir = config_dict['input_file_directory']
@@ -213,22 +209,27 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
dataset_startdate = config_dict['dataset_startdate'] dataset_startdate = config_dict['dataset_startdate']
dataset_enddate = config_dict['dataset_enddate'] dataset_enddate = config_dict['dataset_enddate']
# Determine mode and process accordingly integration_mode = config_dict.get('integration_mode', 'single_experiment')
output_filename_path = [] filename_prefix = config_dict['filename_prefix']
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
date_str = f'{dataset_startdate}_{dataset_enddate}' output_filename_path = []
# Determine top-level campaign folder path
top_level_foldername = _generate_output_path_fragment(
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
)
# Create path to new raw datafolder and standardize with forward slashes
path_to_rawdata_folder = os.path.join( path_to_rawdata_folder = os.path.join(
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/') path_to_output_dir, top_level_foldername, ""
).replace(os.sep, '/')
# Process individual datetime steps if available, regardless of mode # Process individual datetime steps if available, regardless of mode
if config_dict.get('datetime_steps_dict', {}): if config_dict.get('datetime_steps_dict', {}):
# Single experiment mode
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items(): for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
date_str = datetime_step.strftime('%Y-%m-%d') single_date_str = datetime_step.strftime('%Y%m%d')
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str) subfolder_name = f"{filename_prefix}_{single_date_str}"
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "") subfolder_name = f"experimental_step_{single_date_str}"
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords, path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
@@ -236,11 +237,12 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
output_filename_path.append(path_to_integrated_stepwise_hdf5_file) output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
# Collection mode processing if specified # Collection mode post-processing
if 'collection' in config_dict.get('integration_mode', 'single_experiment'): if integration_mode == 'collection':
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {} path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict) path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
)
output_filename_path.append(hdf5_path) output_filename_path.append(hdf5_path)
else: else:
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5( path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
@@ -250,24 +252,16 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
return output_filename_path return output_filename_path
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Usage: python data_integration.py <function_name> <function_args>") print("Usage: python data_integration.py <function_name> <function_args>")
sys.exit(1) sys.exit(1)
# Extract the function name from the command line arguments
function_name = sys.argv[1] function_name = sys.argv[1]
# Handle function execution based on the provided function name
if function_name == 'run': if function_name == 'run':
if len(sys.argv) != 3: if len(sys.argv) != 3:
print("Usage: python data_integration.py run <path_to_config_yamlFile>") print("Usage: python data_integration.py run <path_to_config_yamlFile>")
sys.exit(1) sys.exit(1)
# Extract path to configuration file, specifying the data integration task
path_to_config_yamlFile = sys.argv[2] path_to_config_yamlFile = sys.argv[2]
run_pipeline(path_to_config_yamlFile) run_pipeline(path_to_config_yamlFile)

View File

@@ -19,17 +19,10 @@ import pandas as pd
import numpy as np import numpy as np
import logging import logging
import datetime import datetime
import h5py
import yaml import yaml
import json import json
import copy import copy
#try:
# from dima.utils import g5505_utils as utils
# from dima.src import hdf5_writer as hdf5_lib
#except ModuleNotFoundError:
import utils.g5505_utils as utils import utils.g5505_utils as utils
import src.hdf5_writer as hdf5_lib import src.hdf5_writer as hdf5_lib

View File

@@ -217,49 +217,49 @@ def convert_string_to_bytes(input_list: list):
def convert_attrdict_to_np_structured_array(attr_value: dict): def convert_attrdict_to_np_structured_array(attr_value: dict):
""" """
Converts a dictionary of attributes into a numpy structured array for HDF5 Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
compound type compatibility. Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
Each dictionary key is mapped to a field in the structured array, with the
data type (S) determined by the longest string representation of the values.
If the dictionary is empty, the function returns 'missing'.
Parameters Parameters
---------- ----------
attr_value : dict attr_value : dict
Dictionary containing the attributes to be converted. Example: Dictionary with scalar values (int, float, str).
attr_value = {
'name': 'Temperature',
'unit': 'Celsius',
'value': 23.5,
'timestamp': '2023-09-26 10:00'
}
Returns Returns
------- -------
new_attr_value : ndarray new_attr_value : ndarray
Numpy structured array with UTF-8 encoded fields. Returns np.array(['missing'], dtype=[str]) if 1-row structured array with fixed-size byte fields (dtype='S').
the input dictionary is empty.
""" """
if not isinstance(attr_value, dict): if not isinstance(attr_value, dict):
raise ValueError(f'Input paremeter {attr_value} must be a dictionary of scalar values.') raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
if not attr_value:
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
dtype = [] dtype = []
values_list = [] values_list = []
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
for key, val in attr_value.items():
# Verify if 'rename_as' is still used in metadata revision
if key != 'rename_as' and isinstance(val, (int, float, str)):
dtype.append((key, f'S{max_length}'))
values_list.append(attr_value[key])
else:
print(f"Skipping unsupported type for key {key}: {type(val)}")
if values_list:
new_attr_value = np.array([tuple(values_list)], dtype=dtype)
else:
new_attr_value = np.array(['missing'], dtype=[str])
return new_attr_value max_str_len = max(len(str(v)) for v in attr_value.values())
byte_len = max_str_len * 4 # UTF-8 worst-case
for key, val in attr_value.items():
if key == 'rename_as':
continue
if isinstance(val, (int, float, str)):
dtype.append((key, f'S{byte_len}'))
try:
encoded_val = str(val).encode('utf-8') # explicit UTF-8
values_list.append(encoded_val)
except UnicodeEncodeError as e:
logging.error(f"Failed to encode {key}={val}: {e}")
raise
else:
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
if values_list:
return np.array([tuple(values_list)], dtype=dtype)
else:
return np.array(['missing'], dtype=[('value', 'S16')])
def infer_units(column_name): def infer_units(column_name):