Compare commits
8 Commits
d5fa2b6c71
...
885370865f
| Author | SHA1 | Date | |
|---|---|---|---|
| 885370865f | |||
| 9bb1d4204d | |||
| a1b9fc1cc9 | |||
| 80d95841e1 | |||
| 9aa7c0ece8 | |||
| ce403da6b0 | |||
| c02549f013 | |||
| a10cdf2fc5 |
@@ -2,7 +2,7 @@
|
|||||||
input_file_directory: '//fs101/5505/Data'
|
input_file_directory: '//fs101/5505/Data'
|
||||||
|
|
||||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||||
output_file_directory: '../output_files/'
|
output_file_directory: '../data/'
|
||||||
|
|
||||||
# Project metadata for data lineage and provenance
|
# Project metadata for data lineage and provenance
|
||||||
project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate'
|
project: 'Photoenhanced uptake of NO2 driven by Fe(III)-carboxylate'
|
||||||
@@ -2,7 +2,7 @@
|
|||||||
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
|
input_file_directory: '//fs101/5505/People/Juan/TypicalBeamTime'
|
||||||
|
|
||||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||||
output_file_directory: '../output_files/'
|
output_file_directory: '../data/'
|
||||||
|
|
||||||
# Project metadata for data lineage and provenance
|
# Project metadata for data lineage and provenance
|
||||||
project: 'Beamtime May 2024, Ice Napp'
|
project: 'Beamtime May 2024, Ice Napp'
|
||||||
@@ -2,7 +2,7 @@
|
|||||||
input_file_directory: '//fs03/Iron_Sulphate'
|
input_file_directory: '//fs03/Iron_Sulphate'
|
||||||
|
|
||||||
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
# Path to directory where raw data is copied and converted to HDF5 format for local analysis.
|
||||||
output_file_directory: '../output_files/'
|
output_file_directory: '../data/'
|
||||||
|
|
||||||
# Project metadata for data lineage and provenance
|
# Project metadata for data lineage and provenance
|
||||||
project: 'Fe SOA project'
|
project: 'Fe SOA project'
|
||||||
@@ -19,19 +19,94 @@ import yaml
|
|||||||
import h5py
|
import h5py
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
# Import project modules
|
import warnings
|
||||||
#root_dir = os.path.abspath(os.curdir)
|
|
||||||
#sys.path.append(root_dir)
|
|
||||||
|
|
||||||
|
|
||||||
#try:
|
|
||||||
# from dima.utils import g5505_utils as utils
|
|
||||||
#except ModuleNotFoundError:
|
|
||||||
# import utils.g5505_utils as utils
|
|
||||||
# import src.hdf5_ops as hdf5_ops
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
|
def detect_table_header_line(filepath, table_header_list, encoding_list, separator_list, verbose=False):
|
||||||
|
"""
|
||||||
|
Detects the table header line in the file and returns:
|
||||||
|
- header_line_idx (int)
|
||||||
|
- column_names (List[str])
|
||||||
|
- tb_idx used
|
||||||
|
- preamble_lines (List[str])
|
||||||
|
Returns (-1, [], None, []) if not found.
|
||||||
|
"""
|
||||||
|
preamble_lines = []
|
||||||
|
header_line_idx = -1
|
||||||
|
column_names = []
|
||||||
|
tb_idx = None
|
||||||
|
|
||||||
|
with open(filepath, 'rb') as f:
|
||||||
|
for line_number, line in enumerate(f):
|
||||||
|
decoded_line = line.decode(encoding_list[0]) # assume consistent encoding initially
|
||||||
|
for idx, tb in enumerate(table_header_list):
|
||||||
|
if tb in decoded_line:
|
||||||
|
tb_idx = idx
|
||||||
|
list_of_substrings = decoded_line.split(separator_list[idx].replace('\\t', '\t'))
|
||||||
|
counts = collections.Counter(list_of_substrings)
|
||||||
|
column_names = [f"{i}_{name.strip()}" if counts[name] > 1 else name.strip()
|
||||||
|
for i, name in enumerate(list_of_substrings)]
|
||||||
|
header_line_idx = line_number
|
||||||
|
if verbose:
|
||||||
|
print(f"[Detected header] Line {line_number}: {column_names}")
|
||||||
|
return header_line_idx, column_names, tb_idx, preamble_lines
|
||||||
|
preamble_lines.append(' '.join(decoded_line.split()))
|
||||||
|
|
||||||
|
warnings.warn("Table header was not detected using known patterns. Will attempt inference mode.")
|
||||||
|
return -1, [], None, preamble_lines
|
||||||
|
|
||||||
|
def load_file_reader_parameters(filename: str, instruments_dir: str) -> tuple:
|
||||||
|
"""
|
||||||
|
Load file reader configuration parameters based on the file and instrument directory.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
- config_dict: Full configuration dictionary
|
||||||
|
- file_encoding
|
||||||
|
- separator
|
||||||
|
- table_header
|
||||||
|
- timestamp_variables
|
||||||
|
- datetime_format
|
||||||
|
- description_dict
|
||||||
|
"""
|
||||||
|
config_path = os.path.abspath(os.path.join(instruments_dir, 'readers', 'config_text_reader.yaml'))
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(config_path, 'r') as stream:
|
||||||
|
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
except yaml.YAMLError as exc:
|
||||||
|
print(f"[YAML Load Error] {exc}")
|
||||||
|
return {}, '', '', '', [], [], {}
|
||||||
|
|
||||||
|
# Defaults
|
||||||
|
file_encoding = config_dict.get('default', {}).get('file_encoding', 'utf-8')
|
||||||
|
separator = config_dict.get('default', {}).get('separator', ',')
|
||||||
|
table_header = config_dict.get('default', {}).get('table_header', 'infer')
|
||||||
|
timestamp_variables = []
|
||||||
|
datetime_format = []
|
||||||
|
description_dict = {}
|
||||||
|
|
||||||
|
for instFolder in config_dict.keys():
|
||||||
|
if instFolder in filename.split(os.sep):
|
||||||
|
file_encoding = config_dict[instFolder].get('file_encoding', file_encoding)
|
||||||
|
separator = config_dict[instFolder].get('separator', separator)
|
||||||
|
table_header = config_dict[instFolder].get('table_header', table_header)
|
||||||
|
timestamp_variables = config_dict[instFolder].get('timestamp', [])
|
||||||
|
datetime_format = config_dict[instFolder].get('datetime_format', [])
|
||||||
|
|
||||||
|
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
|
||||||
|
if link_to_description:
|
||||||
|
path = os.path.join(instruments_dir, link_to_description)
|
||||||
|
try:
|
||||||
|
with open(path, 'r') as stream:
|
||||||
|
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
|
except (FileNotFoundError, yaml.YAMLError) as exc:
|
||||||
|
print(f"[Description Load Error] {exc}")
|
||||||
|
|
||||||
|
return (config_dict, file_encoding, separator, table_header,
|
||||||
|
timestamp_variables, datetime_format, description_dict)
|
||||||
|
|
||||||
|
|
||||||
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
|
|
||||||
filename = os.path.normpath(filename)
|
filename = os.path.normpath(filename)
|
||||||
@@ -41,56 +116,16 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
module_dir = os.path.dirname(__file__)
|
module_dir = os.path.dirname(__file__)
|
||||||
instruments_dir = os.path.join(module_dir, '..')
|
instruments_dir = os.path.join(module_dir, '..')
|
||||||
|
|
||||||
# Normalize the path (resolves any '..' in the path)
|
(config_dict,
|
||||||
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
|
file_encoding,
|
||||||
|
separator,
|
||||||
print(instrument_configs_path)
|
table_header,
|
||||||
|
timestamp_variables,
|
||||||
with open(instrument_configs_path,'r') as stream:
|
datetime_format,
|
||||||
try:
|
description_dict) = load_file_reader_parameters(filename, instruments_dir)
|
||||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
|
||||||
except yaml.YAMLError as exc:
|
|
||||||
print(exc)
|
|
||||||
# Verify if file can be read by available intrument configurations.
|
|
||||||
#if not any(key in filename.replace(os.sep,'/') for key in config_dict.keys()):
|
|
||||||
# return {}
|
|
||||||
|
|
||||||
|
|
||||||
#TODO: this may be prone to error if assumed folder structure is non compliant
|
|
||||||
file_encoding = config_dict['default']['file_encoding'] #'utf-8'
|
|
||||||
separator = config_dict['default']['separator']
|
|
||||||
table_header = config_dict['default']['table_header']
|
|
||||||
timestamp_variables = []
|
|
||||||
datetime_format = []
|
|
||||||
tb_idx = 0
|
|
||||||
column_names = ''
|
|
||||||
description_dict = {}
|
|
||||||
|
|
||||||
for instFolder in config_dict.keys():
|
|
||||||
|
|
||||||
if instFolder in filename.split(os.sep):
|
|
||||||
|
|
||||||
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
|
|
||||||
separator = config_dict[instFolder].get('separator',separator)
|
|
||||||
table_header = config_dict[instFolder].get('table_header',table_header)
|
|
||||||
timestamp_variables = config_dict[instFolder].get('timestamp',[])
|
|
||||||
datetime_format = config_dict[instFolder].get('datetime_format',[])
|
|
||||||
|
|
||||||
|
|
||||||
link_to_description = config_dict[instFolder].get('link_to_description', '').replace('/', os.sep)
|
|
||||||
|
|
||||||
if link_to_description:
|
|
||||||
path = os.path.join(instruments_dir, link_to_description)
|
|
||||||
try:
|
|
||||||
with open(path, 'r') as stream:
|
|
||||||
description_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
|
||||||
except (FileNotFoundError, yaml.YAMLError) as exc:
|
|
||||||
print(exc)
|
|
||||||
#if 'None' in table_header:
|
|
||||||
# return {}
|
|
||||||
|
|
||||||
# Read header as a dictionary and detect where data table starts
|
# Read header as a dictionary and detect where data table starts
|
||||||
header_dict = {}
|
header_dict = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||||
data_start = False
|
data_start = False
|
||||||
# Work with copy of the file for safety
|
# Work with copy of the file for safety
|
||||||
if work_with_copy:
|
if work_with_copy:
|
||||||
@@ -109,58 +144,20 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
table_preamble = []
|
table_preamble = []
|
||||||
line_number = 0
|
line_number = 0
|
||||||
if 'infer' not in table_header:
|
if 'infer' not in table_header:
|
||||||
|
header_line_idx, column_names, tb_idx, table_preamble = detect_table_header_line(
|
||||||
|
tmp_filename, table_header, file_encoding, separator)
|
||||||
|
|
||||||
with open(tmp_filename,'rb') as f:
|
if header_line_idx == -1:
|
||||||
|
table_header = ['infer'] # fallback to pandas' inference
|
||||||
for line_number, line in enumerate(f):
|
|
||||||
decoded_line = line.decode(file_encoding[tb_idx])
|
|
||||||
|
|
||||||
|
|
||||||
for tb_idx, tb in enumerate(table_header):
|
|
||||||
print(tb)
|
|
||||||
if tb in decoded_line:
|
|
||||||
break
|
|
||||||
|
|
||||||
if tb in decoded_line:
|
|
||||||
|
|
||||||
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
|
|
||||||
|
|
||||||
# Count occurrences of each substring
|
|
||||||
substring_counts = collections.Counter(list_of_substrings)
|
|
||||||
data_start = True
|
|
||||||
# Generate column names with appended index only for repeated substrings
|
|
||||||
column_names = [f"{i}_{name.strip()}" if substring_counts[name] > 1 else name.strip() for i, name in enumerate(list_of_substrings)]
|
|
||||||
|
|
||||||
#column_names = [str(i)+'_'+name.strip() for i, name in enumerate(list_of_substrings)]
|
|
||||||
#column_names = []
|
|
||||||
#for i, name in enumerate(list_of_substrings):
|
|
||||||
# column_names.append(str(i)+'_'+name)
|
|
||||||
|
|
||||||
#print(line_number, len(column_names ),'\n')
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print('Table header was not detected.')
|
|
||||||
# Subdivide line into words, and join them by single space.
|
|
||||||
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
|
||||||
list_of_substrings = decoded_line.split()
|
|
||||||
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
|
||||||
#line = ' '.join(list_of_substrings+['\n'])
|
|
||||||
#line = ' '.join(list_of_substrings)
|
|
||||||
table_preamble.append(' '.join([item for item in list_of_substrings]))# += new_line
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: it does not work with separator as none :(. fix for RGA
|
# TODO: it does not work with separator as none :(. fix for RGA
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print(column_names)
|
|
||||||
if not 'infer' in table_header:
|
if not 'infer' in table_header:
|
||||||
#print(table_header)
|
|
||||||
#print(file_encoding[tb_idx])
|
|
||||||
|
|
||||||
df = pd.read_csv(tmp_filename,
|
df = pd.read_csv(tmp_filename,
|
||||||
delimiter = separator[tb_idx].replace('\\t','\t'),
|
delimiter = separator[tb_idx].replace('\\t','\t'),
|
||||||
header=line_number,
|
header=header_line_idx,
|
||||||
#encoding='latin-1',
|
|
||||||
encoding = file_encoding[tb_idx],
|
encoding = file_encoding[tb_idx],
|
||||||
names=column_names,
|
names=column_names,
|
||||||
skip_blank_lines=True)
|
skip_blank_lines=True)
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ def read_structured_file_as_dict(path_to_file):
|
|||||||
_, path_head = os.path.split(path_to_file)
|
_, path_head = os.path.split(path_to_file)
|
||||||
|
|
||||||
file_dict['name'] = path_head
|
file_dict['name'] = path_head
|
||||||
file_dict['attributes_dict'] = {}
|
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date': utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||||
file_dict['datasets'] = []
|
file_dict['datasets'] = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,10 +1,27 @@
|
|||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import h5py
|
import h5py
|
||||||
|
|
||||||
from igor2.binarywave import load as loadibw
|
from igor2.binarywave import load as loadibw
|
||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
def read_xps_ibw_file_as_dict(filename):
|
def read_xps_ibw_file_as_dict(filename):
|
||||||
"""
|
"""
|
||||||
@@ -49,7 +66,7 @@ def read_xps_ibw_file_as_dict(filename):
|
|||||||
|
|
||||||
# Group name and attributes
|
# Group name and attributes
|
||||||
file_dict['name'] = path_head
|
file_dict['name'] = path_head
|
||||||
file_dict['attributes_dict'] = {}
|
file_dict['attributes_dict'] = {'actris_level': 0, 'processing_date':utils.created_at(), 'processing_script' : os.path.relpath(thisFilePath,dimaPath)}
|
||||||
|
|
||||||
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
# Convert notes of bytes class to string class and split string into a list of elements separated by '\r'.
|
||||||
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
notes_list = file_obj['wave']['note'].decode("utf-8").split('\r')
|
||||||
@@ -85,22 +102,11 @@ def read_xps_ibw_file_as_dict(filename):
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
thisFilePath = os.path.abspath(__file__)
|
|
||||||
except NameError:
|
|
||||||
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
|
||||||
print("[Notice] Path to DIMA package may not be resolved properly.")
|
|
||||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
|
||||||
|
|
||||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
|
||||||
|
|
||||||
if dimaPath not in sys.path: # Avoid duplicate entries
|
|
||||||
sys.path.insert(0,dimaPath)
|
|
||||||
|
|
||||||
from src.hdf5_ops import save_file_dict_to_hdf5
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
from utils.g5505_utils import created_at
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Set up argument parsing
|
# Set up argument parsing
|
||||||
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
|||||||
@@ -49,15 +49,14 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": 2,
|
"execution_count": null,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"#output_filename_path = 'output_files/unified_file_smog_chamber_2024-04-07_UTC-OFST_+0200_NG.h5'\n",
|
"number, initials = 1, 'LI' # Set as either 2, 'TBR' or 3, 'NG'\n",
|
||||||
"yaml_config_file_path = '../input_files/data_integr_config_file_TBR.yaml'\n",
|
"campaign_descriptor_path = f'../input_files/campaignDescriptor{number}_{initials}.yaml'\n",
|
||||||
"\n",
|
"\n",
|
||||||
"#path_to_input_directory = 'output_files/kinetic_flowtube_study_2022-01-31_LuciaI'\n",
|
"print(campaign_descriptor_path)\n"
|
||||||
"#path_to_hdf5_file = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_input_directory)\n"
|
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -76,7 +75,7 @@
|
|||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"\n",
|
"\n",
|
||||||
"hdf5_file_path = data_integration.run_pipeline(yaml_config_file_path)"
|
"hdf5_file_path = data_integration.run_pipeline(campaign_descriptor_path)"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -146,7 +145,7 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": 5,
|
"execution_count": null,
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
@@ -160,7 +159,7 @@
|
|||||||
],
|
],
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"kernelspec": {
|
"kernelspec": {
|
||||||
"display_name": "multiphase_chemistry_env",
|
"display_name": "Python 3",
|
||||||
"language": "python",
|
"language": "python",
|
||||||
"name": "python3"
|
"name": "python3"
|
||||||
},
|
},
|
||||||
@@ -174,7 +173,7 @@
|
|||||||
"name": "python",
|
"name": "python",
|
||||||
"nbconvert_exporter": "python",
|
"nbconvert_exporter": "python",
|
||||||
"pygments_lexer": "ipython3",
|
"pygments_lexer": "ipython3",
|
||||||
"version": "3.11.9"
|
"version": "3.11.10"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nbformat": 4,
|
"nbformat": 4,
|
||||||
|
|||||||
@@ -38,12 +38,19 @@ def _generate_datetime_dict(datetime_steps):
|
|||||||
""" Generate the datetime augment dictionary from datetime steps. """
|
""" Generate the datetime augment dictionary from datetime steps. """
|
||||||
datetime_augment_dict = {}
|
datetime_augment_dict = {}
|
||||||
for datetime_step in datetime_steps:
|
for datetime_step in datetime_steps:
|
||||||
#tmp = datetime.strptime(datetime_step, '%Y-%m-%d %H-%M-%S')
|
|
||||||
datetime_augment_dict[datetime_step] = [
|
datetime_augment_dict[datetime_step] = [
|
||||||
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'), datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
datetime_step.strftime('%Y-%m-%d'), datetime_step.strftime('%Y_%m_%d'),
|
||||||
|
datetime_step.strftime('%Y.%m.%d'), datetime_step.strftime('%Y%m%d')
|
||||||
]
|
]
|
||||||
return datetime_augment_dict
|
return datetime_augment_dict
|
||||||
|
|
||||||
|
def _generate_output_path_fragment(filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=None):
|
||||||
|
"""Generate consistent directory or file name fragment based on mode."""
|
||||||
|
if integration_mode == 'collection':
|
||||||
|
return f'collection_{index}_{filename_prefix}_{dataset_enddate}'
|
||||||
|
else:
|
||||||
|
return f'{filename_prefix}_{dataset_enddate}'
|
||||||
|
|
||||||
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
def load_config_and_setup_logging(yaml_config_file_path, log_dir):
|
||||||
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
"""Load YAML configuration file, set up logging, and validate required keys and datetime_steps."""
|
||||||
|
|
||||||
@@ -189,17 +196,6 @@ def copy_subtree_and_create_hdf5(src, dst, select_dir_keywords, select_file_keyw
|
|||||||
|
|
||||||
|
|
||||||
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
||||||
|
|
||||||
"""Integrates data sources specified by the input configuration file into HDF5 files.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
yaml_config_file_path (str): Path to the YAML configuration file.
|
|
||||||
log_dir (str): Directory to save the log file.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: List of Paths to the created HDF5 file(s).
|
|
||||||
"""
|
|
||||||
|
|
||||||
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
config_dict = load_config_and_setup_logging(path_to_config_yamlFile, log_dir)
|
||||||
|
|
||||||
path_to_input_dir = config_dict['input_file_directory']
|
path_to_input_dir = config_dict['input_file_directory']
|
||||||
@@ -213,61 +209,59 @@ def run_pipeline(path_to_config_yamlFile, log_dir='logs/'):
|
|||||||
dataset_startdate = config_dict['dataset_startdate']
|
dataset_startdate = config_dict['dataset_startdate']
|
||||||
dataset_enddate = config_dict['dataset_enddate']
|
dataset_enddate = config_dict['dataset_enddate']
|
||||||
|
|
||||||
# Determine mode and process accordingly
|
integration_mode = config_dict.get('integration_mode', 'single_experiment')
|
||||||
output_filename_path = []
|
filename_prefix = config_dict['filename_prefix']
|
||||||
campaign_name_template = lambda filename_prefix, suffix: '_'.join([filename_prefix, suffix])
|
|
||||||
date_str = f'{dataset_startdate}_{dataset_enddate}'
|
output_filename_path = []
|
||||||
|
|
||||||
|
# Determine top-level campaign folder path
|
||||||
|
top_level_foldername = _generate_output_path_fragment(
|
||||||
|
filename_prefix, integration_mode, dataset_startdate, dataset_enddate, index=1
|
||||||
|
)
|
||||||
|
|
||||||
# Create path to new raw datafolder and standardize with forward slashes
|
|
||||||
path_to_rawdata_folder = os.path.join(
|
path_to_rawdata_folder = os.path.join(
|
||||||
path_to_output_dir, 'collection_' + campaign_name_template(config_dict['filename_prefix'], date_str), "").replace(os.sep, '/')
|
path_to_output_dir, top_level_foldername, ""
|
||||||
|
).replace(os.sep, '/')
|
||||||
|
|
||||||
# Process individual datetime steps if available, regardless of mode
|
# Process individual datetime steps if available, regardless of mode
|
||||||
if config_dict.get('datetime_steps_dict', {}):
|
if config_dict.get('datetime_steps_dict', {}):
|
||||||
# Single experiment mode
|
|
||||||
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
for datetime_step, file_keywords in config_dict['datetime_steps_dict'].items():
|
||||||
date_str = datetime_step.strftime('%Y-%m-%d')
|
single_date_str = datetime_step.strftime('%Y%m%d')
|
||||||
single_campaign_name = campaign_name_template(config_dict['filename_prefix'], date_str)
|
subfolder_name = f"{filename_prefix}_{single_date_str}"
|
||||||
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, single_campaign_name, "")
|
subfolder_name = f"experimental_step_{single_date_str}"
|
||||||
|
path_to_rawdata_subfolder = os.path.join(path_to_rawdata_folder, subfolder_name, "")
|
||||||
|
|
||||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||||
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
path_to_input_dir, path_to_rawdata_subfolder, select_dir_keywords,
|
||||||
file_keywords, allowed_file_extensions, root_metadata_dict)
|
file_keywords, allowed_file_extensions, root_metadata_dict)
|
||||||
|
|
||||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||||
|
|
||||||
# Collection mode processing if specified
|
# Collection mode post-processing
|
||||||
if 'collection' in config_dict.get('integration_mode', 'single_experiment'):
|
if integration_mode == 'collection':
|
||||||
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
path_to_filenames_dict = {path_to_rawdata_folder: [os.path.basename(path) for path in output_filename_path]} if output_filename_path else {}
|
||||||
#hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path_new(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(
|
||||||
hdf5_path = hdf5_lib.create_hdf5_file_from_filesystem_path(path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict)
|
path_to_rawdata_folder, path_to_filenames_dict, [], root_metadata_dict
|
||||||
|
)
|
||||||
output_filename_path.append(hdf5_path)
|
output_filename_path.append(hdf5_path)
|
||||||
else:
|
else:
|
||||||
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
path_to_integrated_stepwise_hdf5_file = copy_subtree_and_create_hdf5(
|
||||||
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
path_to_input_dir, path_to_rawdata_folder, select_dir_keywords, [],
|
||||||
allowed_file_extensions, root_metadata_dict)
|
allowed_file_extensions, root_metadata_dict)
|
||||||
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
output_filename_path.append(path_to_integrated_stepwise_hdf5_file)
|
||||||
|
|
||||||
return output_filename_path
|
return output_filename_path
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
if len(sys.argv) < 2:
|
if len(sys.argv) < 2:
|
||||||
print("Usage: python data_integration.py <function_name> <function_args>")
|
print("Usage: python data_integration.py <function_name> <function_args>")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# Extract the function name from the command line arguments
|
|
||||||
function_name = sys.argv[1]
|
function_name = sys.argv[1]
|
||||||
|
|
||||||
# Handle function execution based on the provided function name
|
|
||||||
if function_name == 'run':
|
if function_name == 'run':
|
||||||
|
|
||||||
if len(sys.argv) != 3:
|
if len(sys.argv) != 3:
|
||||||
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
print("Usage: python data_integration.py run <path_to_config_yamlFile>")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
# Extract path to configuration file, specifying the data integration task
|
|
||||||
path_to_config_yamlFile = sys.argv[2]
|
path_to_config_yamlFile = sys.argv[2]
|
||||||
run_pipeline(path_to_config_yamlFile)
|
run_pipeline(path_to_config_yamlFile)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -19,17 +19,10 @@ import pandas as pd
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import logging
|
import logging
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import h5py
|
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
import json
|
import json
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
#try:
|
|
||||||
# from dima.utils import g5505_utils as utils
|
|
||||||
# from dima.src import hdf5_writer as hdf5_lib
|
|
||||||
#except ModuleNotFoundError:
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
import src.hdf5_writer as hdf5_lib
|
import src.hdf5_writer as hdf5_lib
|
||||||
|
|
||||||
|
|||||||
@@ -217,49 +217,49 @@ def convert_string_to_bytes(input_list: list):
|
|||||||
|
|
||||||
def convert_attrdict_to_np_structured_array(attr_value: dict):
|
def convert_attrdict_to_np_structured_array(attr_value: dict):
|
||||||
"""
|
"""
|
||||||
Converts a dictionary of attributes into a numpy structured array for HDF5
|
Converts a dictionary of attributes into a NumPy structured array with byte-encoded fields.
|
||||||
compound type compatibility.
|
Handles UTF-8 encoding to avoid UnicodeEncodeError with non-ASCII characters.
|
||||||
|
|
||||||
Each dictionary key is mapped to a field in the structured array, with the
|
|
||||||
data type (S) determined by the longest string representation of the values.
|
|
||||||
If the dictionary is empty, the function returns 'missing'.
|
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
attr_value : dict
|
attr_value : dict
|
||||||
Dictionary containing the attributes to be converted. Example:
|
Dictionary with scalar values (int, float, str).
|
||||||
attr_value = {
|
|
||||||
'name': 'Temperature',
|
|
||||||
'unit': 'Celsius',
|
|
||||||
'value': 23.5,
|
|
||||||
'timestamp': '2023-09-26 10:00'
|
|
||||||
}
|
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
new_attr_value : ndarray
|
new_attr_value : ndarray
|
||||||
Numpy structured array with UTF-8 encoded fields. Returns np.array(['missing'], dtype=[str]) if
|
1-row structured array with fixed-size byte fields (dtype='S').
|
||||||
the input dictionary is empty.
|
|
||||||
"""
|
"""
|
||||||
if not isinstance(attr_value,dict):
|
if not isinstance(attr_value, dict):
|
||||||
raise ValueError(f'Input paremeter {attr_value} must be a dictionary of scalar values.')
|
raise ValueError(f"Input must be a dictionary, got {type(attr_value)}")
|
||||||
|
|
||||||
|
if not attr_value:
|
||||||
|
return np.array(['missing'], dtype=[('value', 'S16')]) # placeholder
|
||||||
|
|
||||||
dtype = []
|
dtype = []
|
||||||
values_list = []
|
values_list = []
|
||||||
max_length = max(len(str(attr_value[key])) for key in attr_value.keys())
|
|
||||||
for key, val in attr_value.items():
|
|
||||||
# Verify if 'rename_as' is still used in metadata revision
|
|
||||||
if key != 'rename_as' and isinstance(val, (int, float, str)):
|
|
||||||
dtype.append((key, f'S{max_length}'))
|
|
||||||
values_list.append(attr_value[key])
|
|
||||||
else:
|
|
||||||
print(f"Skipping unsupported type for key {key}: {type(val)}")
|
|
||||||
if values_list:
|
|
||||||
new_attr_value = np.array([tuple(values_list)], dtype=dtype)
|
|
||||||
else:
|
|
||||||
new_attr_value = np.array(['missing'], dtype=[str])
|
|
||||||
|
|
||||||
return new_attr_value
|
max_str_len = max(len(str(v)) for v in attr_value.values())
|
||||||
|
byte_len = max_str_len * 4 # UTF-8 worst-case
|
||||||
|
|
||||||
|
for key, val in attr_value.items():
|
||||||
|
if key == 'rename_as':
|
||||||
|
continue
|
||||||
|
if isinstance(val, (int, float, str)):
|
||||||
|
dtype.append((key, f'S{byte_len}'))
|
||||||
|
try:
|
||||||
|
encoded_val = str(val).encode('utf-8') # explicit UTF-8
|
||||||
|
values_list.append(encoded_val)
|
||||||
|
except UnicodeEncodeError as e:
|
||||||
|
logging.error(f"Failed to encode {key}={val}: {e}")
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
logging.warning(f"Skipping unsupported type for key {key}: {type(val)}")
|
||||||
|
|
||||||
|
if values_list:
|
||||||
|
return np.array([tuple(values_list)], dtype=dtype)
|
||||||
|
else:
|
||||||
|
return np.array(['missing'], dtype=[('value', 'S16')])
|
||||||
|
|
||||||
|
|
||||||
def infer_units(column_name):
|
def infer_units(column_name):
|
||||||
|
|||||||
Reference in New Issue
Block a user