Merge branch 'feature/DB_for_FileReader_Repo' into 'main'
Restructuring of file reader system to process multi-instrument data folders. See merge request 5505-public/dima!3
This commit is contained in:
@ -119,3 +119,20 @@ table_header:
|
|||||||
description: Sample source
|
description: Sample source
|
||||||
units: unspecified
|
units: unspecified
|
||||||
rename_as: sample_source
|
rename_as: sample_source
|
||||||
|
|
||||||
|
# NO2 additional vocabulary terms
|
||||||
|
|
||||||
|
CHOCHO (ppb):
|
||||||
|
description: CHOCHO concentration
|
||||||
|
units: ppb
|
||||||
|
rename_as: chocho_concentration_ppb
|
||||||
|
CHOCHO Uncertainty (ppb):
|
||||||
|
description: Uncertainty in CHOCHO concentration
|
||||||
|
units: ppb
|
||||||
|
rename_as: chocho_uncertainty_ppb
|
||||||
|
|
||||||
|
10_#ICEDOAS iter.:
|
||||||
|
description: Number of ICEDOAS iterations
|
||||||
|
units: unspecified
|
||||||
|
rename_as: icedoas_iterations
|
||||||
|
|
@ -1,113 +0,0 @@
|
|||||||
table_header:
|
|
||||||
Start Date/Time (UTC):
|
|
||||||
description: Start date and time of the measurement in UTC
|
|
||||||
units: YYYY-MM-DD HH:MM:SS
|
|
||||||
rename_as: start_datetime_utc
|
|
||||||
Duration (s):
|
|
||||||
description: Duration of the measurement in seconds
|
|
||||||
units: seconds
|
|
||||||
rename_as: duration_seconds
|
|
||||||
NO2 (ppb):
|
|
||||||
description: NO2 concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: no2_concentration_ppb
|
|
||||||
NO2 Uncertainty (ppb):
|
|
||||||
description: Uncertainty in NO2 concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: no2_uncertainty_ppb
|
|
||||||
H2O (ppb):
|
|
||||||
description: H2O concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: h2o_concentration_ppb
|
|
||||||
H2O Uncertainty (ppb):
|
|
||||||
description: Uncertainty in H2O concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: h2o_uncertainty_ppb
|
|
||||||
CHOCHO (ppb):
|
|
||||||
description: CHOCHO concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: chocho_concentration_ppb
|
|
||||||
CHOCHO Uncertainty (ppb):
|
|
||||||
description: Uncertainty in CHOCHO concentration
|
|
||||||
units: ppb
|
|
||||||
rename_as: chocho_uncertainty_ppb
|
|
||||||
File Number:
|
|
||||||
description: File number
|
|
||||||
units: unspecified
|
|
||||||
rename_as: file_number
|
|
||||||
Light Intensity:
|
|
||||||
description: Light intensity
|
|
||||||
units: unspecified
|
|
||||||
rename_as: light_intensity
|
|
||||||
10_#ICEDOAS iter.:
|
|
||||||
description: Number of ICEDOAS iterations
|
|
||||||
units: unspecified
|
|
||||||
rename_as: icedoas_iterations
|
|
||||||
Cell Pressure:
|
|
||||||
description: Cell pressure
|
|
||||||
units: unspecified
|
|
||||||
rename_as: cell_pressure
|
|
||||||
Ambient Pressure:
|
|
||||||
description: Ambient pressure
|
|
||||||
units: unspecified
|
|
||||||
rename_as: ambient_pressure
|
|
||||||
Cell Temp:
|
|
||||||
description: Cell temperature
|
|
||||||
units: unspecified
|
|
||||||
rename_as: cell_temperature
|
|
||||||
Spec Temp:
|
|
||||||
description: Spectrometer temperature
|
|
||||||
units: unspecified
|
|
||||||
rename_as: spec_temperature
|
|
||||||
Lat:
|
|
||||||
description: Latitude
|
|
||||||
units: unspecified
|
|
||||||
rename_as: latitude
|
|
||||||
Lon:
|
|
||||||
description: Longitude
|
|
||||||
units: unspecified
|
|
||||||
rename_as: longitude
|
|
||||||
Height:
|
|
||||||
description: Height
|
|
||||||
units: unspecified
|
|
||||||
rename_as: height
|
|
||||||
Speed:
|
|
||||||
description: Speed
|
|
||||||
units: unspecified
|
|
||||||
rename_as: speed
|
|
||||||
GPSQuality:
|
|
||||||
description: GPS quality
|
|
||||||
units: unspecified
|
|
||||||
rename_as: gps_quality
|
|
||||||
0-Air Ref. Time:
|
|
||||||
description: 0-air reference time
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_time
|
|
||||||
0-Air Ref. Duration:
|
|
||||||
description: 0-air reference duration
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_duration
|
|
||||||
0-Air Ref. File Number:
|
|
||||||
description: 0-air reference file number
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_file_number
|
|
||||||
0-Air Ref. Intensity:
|
|
||||||
description: 0-air reference intensity
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_intensity
|
|
||||||
0-Air Ref. Rel Intensity:
|
|
||||||
description: 0-air reference relative intensity
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_relative_intensity
|
|
||||||
0-Air Ref. Intensity valid:
|
|
||||||
description: 0-air reference intensity validity
|
|
||||||
units: unspecified
|
|
||||||
rename_as: zero_air_ref_intensity_valid
|
|
||||||
MeasMode:
|
|
||||||
description: Measurement mode
|
|
||||||
units: unspecified
|
|
||||||
rename_as: measurement_mode
|
|
||||||
SampleSource:
|
|
||||||
description: Sample source
|
|
||||||
units: unspecified
|
|
||||||
rename_as: sample_source
|
|
@ -1,11 +1,20 @@
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import subprocess
|
||||||
|
import yaml
|
||||||
#root_dir = os.path.abspath(os.curdir)
|
#root_dir = os.path.abspath(os.curdir)
|
||||||
#sys.path.append(root_dir)
|
#sys.path.append(root_dir)
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# from dima.instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
||||||
|
# from dima.instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||||
|
|
||||||
|
#except ModuleNotFoundError as e:
|
||||||
|
# print(e)
|
||||||
from instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
from instruments.readers.xps_ibw_reader import read_xps_ibw_file_as_dict
|
||||||
from instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
from instruments.readers.g5505_text_reader import read_txt_files_as_dict
|
||||||
|
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
||||||
|
from instruments.readers.acsm_flag_reader import read_jsonflag_as_dict
|
||||||
|
|
||||||
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml']
|
file_extensions = ['.ibw','.txt','.dat','.h5','.TXT','.csv','.pkl','.json','.yaml']
|
||||||
|
|
||||||
@ -15,24 +24,37 @@ default_instruments_dir = None # or provide an absolute path
|
|||||||
file_readers = {
|
file_readers = {
|
||||||
'ibw': lambda a1: read_xps_ibw_file_as_dict(a1),
|
'ibw': lambda a1: read_xps_ibw_file_as_dict(a1),
|
||||||
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
'txt': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||||
'TXT': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
|
||||||
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
'dat': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||||
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False)
|
'csv': lambda a1: read_txt_files_as_dict(a1, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||||
}
|
'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||||
|
'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False),
|
||||||
|
'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x)}
|
||||||
|
|
||||||
# Add new "instrument reader (Data flagging app data)"
|
REGISTRY_FILE = "registry.yaml" #os.path.join(os.path.dirname(__file__), "registry.yaml")
|
||||||
|
|
||||||
from instruments.readers.acsm_tofware_reader import read_acsm_files_as_dict
|
def load_registry():
|
||||||
file_extensions.append('.txt')
|
|
||||||
file_readers.update({'ACSM_TOFWARE_txt' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False)})
|
|
||||||
|
|
||||||
file_extensions.append('.csv')
|
module_dir = os.path.dirname(__file__)
|
||||||
file_readers.update({'ACSM_TOFWARE_csv' : lambda x: read_acsm_files_as_dict(x, instruments_dir=default_instruments_dir, work_with_copy=False)})
|
instruments_dir = os.path.join(module_dir, '..')
|
||||||
|
|
||||||
from instruments.readers.flag_reader import read_jsonflag_as_dict
|
# Normalize the path (resolves any '..' in the path)
|
||||||
file_extensions.append('.json')
|
registry_path = os.path.abspath(os.path.join(module_dir,REGISTRY_FILE))
|
||||||
file_readers.update({'ACSM_TOFWARE_flags_json' : lambda x: read_jsonflag_as_dict(x)})
|
|
||||||
|
|
||||||
|
with open(registry_path, "r") as file:
|
||||||
|
return yaml.safe_load(file)["instruments"]
|
||||||
|
|
||||||
|
def find_reader(instrument_folder, file_extension):
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
registry = load_registry()
|
||||||
|
|
||||||
|
for entry in registry:
|
||||||
|
if entry["instrumentFolderName"] == instrument_folder and entry["fileExtension"] == file_extension:
|
||||||
|
return entry["fileReaderPath"], entry["InstrumentDictionaryPath"]
|
||||||
|
|
||||||
|
return None, None # Not found
|
||||||
|
|
||||||
def compute_filereader_key_from_path(hdf5_file_path):
|
def compute_filereader_key_from_path(hdf5_file_path):
|
||||||
"""Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as
|
"""Constructs the key 'instrumentname_ext' based on hdf5_file_path, structured as
|
||||||
/instrumentname/to/filename.ext, which access the file reader that should be used to read such a file.
|
/instrumentname/to/filename.ext, which access the file reader that should be used to read such a file.
|
||||||
@ -52,6 +74,7 @@ def compute_filereader_key_from_path(hdf5_file_path):
|
|||||||
|
|
||||||
# Extract the filename and its extension
|
# Extract the filename and its extension
|
||||||
filename, file_extension = os.path.splitext(parts[-1])
|
filename, file_extension = os.path.splitext(parts[-1])
|
||||||
|
file_extension = file_extension.lower()
|
||||||
|
|
||||||
# Extract the first directory directly under the root directory '/' in the hdf5 file
|
# Extract the first directory directly under the root directory '/' in the hdf5 file
|
||||||
subfolder_name = parts[0] if len(parts) > 1 else ""
|
subfolder_name = parts[0] if len(parts) > 1 else ""
|
||||||
@ -76,4 +99,45 @@ def select_file_reader(path):
|
|||||||
return file_readers[extension]
|
return file_readers[extension]
|
||||||
|
|
||||||
# Default case if no reader is found
|
# Default case if no reader is found
|
||||||
return lambda x : None
|
return lambda x : None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def run_reader(hdf5_file_path, src_file_path, dst_group_name):
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
projectPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||||
|
|
||||||
|
#
|
||||||
|
full_string, file_extension = compute_filereader_key_from_path(dst_group_name)
|
||||||
|
full_string_parts = full_string.split("_")
|
||||||
|
full_string_parts.remove(file_extension)
|
||||||
|
instrument_folder = '_'.join(full_string_parts)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
reader_path, dict_path = find_reader(instrument_folder, file_extension)
|
||||||
|
|
||||||
|
|
||||||
|
if reader_path:
|
||||||
|
reader_path = os.path.normpath(os.path.join(projectPath, reader_path))
|
||||||
|
if not os.path.exists(reader_path):
|
||||||
|
raise FileNotFoundError(f"File reader {reader_path} not found for key {full_string}. Verify the reader is properly referenced in registry.yaml.")
|
||||||
|
else:
|
||||||
|
print(f'Attempting to run {reader_path}')
|
||||||
|
|
||||||
|
|
||||||
|
command = ["python", reader_path, hdf5_file_path, src_file_path, instrument_folder]
|
||||||
|
#if dict_path:
|
||||||
|
# args.append(dict_path)
|
||||||
|
print(f"Running: {command}")
|
||||||
|
output = subprocess.run(command, capture_output=True)#, check=True)
|
||||||
|
print('Subprocess output',output.stdout)
|
||||||
|
else:
|
||||||
|
print(f'There is no file reader available to process files in {instrument_folder}.')
|
||||||
|
#logging.info(instFoldermsdEnd )
|
||||||
|
BIN
instruments/instrument_registry.xlsx
Normal file
BIN
instruments/instrument_registry.xlsx
Normal file
Binary file not shown.
101
instruments/readers/acsm_flag_reader.py
Normal file
101
instruments/readers/acsm_flag_reader.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import collections
|
||||||
|
import json
|
||||||
|
import h5py
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def read_jsonflag_as_dict(path_to_file):
|
||||||
|
|
||||||
|
|
||||||
|
file_dict = {}
|
||||||
|
path_tail, path_head = os.path.split(path_to_file)
|
||||||
|
|
||||||
|
file_dict['name'] = path_head
|
||||||
|
# TODO: review this header dictionary, it may not be the best way to represent header data
|
||||||
|
file_dict['attributes_dict'] = {}
|
||||||
|
file_dict['datasets'] = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(path_to_file, 'r') as stream:
|
||||||
|
flag = json.load(stream)#, Loader=json.FullLoader)
|
||||||
|
except (FileNotFoundError, json.JSONDecodeError) as exc:
|
||||||
|
print(exc)
|
||||||
|
|
||||||
|
dataset = {}
|
||||||
|
dataset['name'] = 'data_table'#_numerical_variables'
|
||||||
|
dataset['data'] = utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
|
||||||
|
dataset['shape'] = dataset['data'].shape
|
||||||
|
dataset['dtype'] = type(dataset['data'])
|
||||||
|
|
||||||
|
file_dict['datasets'].append(dataset)
|
||||||
|
|
||||||
|
return file_dict
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
hdf5_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read source file and return an internal dictionary representation
|
||||||
|
idr_dict = read_jsonflag_as_dict(src_file_path)
|
||||||
|
|
||||||
|
if not os.path.exists(hdf5_file_path):
|
||||||
|
default_mode = 'w'
|
||||||
|
|
||||||
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||||
|
|
||||||
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||||
|
try:
|
||||||
|
# Create group if it does not exist
|
||||||
|
if dst_group_name not in hdf5_file_obj:
|
||||||
|
hdf5_file_obj.create_group(dst_group_name)
|
||||||
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||||
|
print(f'Created new group: {dst_group_name}')
|
||||||
|
else:
|
||||||
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||||
|
|
||||||
|
# Save dictionary to HDF5
|
||||||
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||||
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||||
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||||
|
|
@ -1,11 +1,25 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import collections
|
import collections
|
||||||
import yaml
|
import yaml
|
||||||
|
import h5py
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
|
||||||
#root_dir = os.path.abspath(os.curdir)
|
|
||||||
#sys.path.append(root_dir)
|
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
@ -220,4 +234,55 @@ def read_acsm_files_as_dict(filename: str, instruments_dir: str = None, work_wit
|
|||||||
except:
|
except:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
return file_dict
|
return file_dict
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
hdf5_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read source file and return an internal dictionary representation
|
||||||
|
idr_dict = read_acsm_files_as_dict(src_file_path)
|
||||||
|
|
||||||
|
if not os.path.exists(hdf5_file_path):
|
||||||
|
default_mode = 'w'
|
||||||
|
|
||||||
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||||
|
|
||||||
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||||
|
try:
|
||||||
|
# Create group if it does not exist
|
||||||
|
if dst_group_name not in hdf5_file_obj:
|
||||||
|
hdf5_file_obj.create_group(dst_group_name)
|
||||||
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||||
|
print(f'Created new group: {dst_group_name}')
|
||||||
|
else:
|
||||||
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||||
|
|
||||||
|
# Save dictionary to HDF5
|
||||||
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||||
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||||
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ HONO: #ICAD/HONO:
|
|||||||
file_encoding : 'latin-1'
|
file_encoding : 'latin-1'
|
||||||
timestamp: ['Start Date/Time (UTC)']
|
timestamp: ['Start Date/Time (UTC)']
|
||||||
datetime_format: '%Y-%m-%d %H:%M:%S.%f'
|
datetime_format: '%Y-%m-%d %H:%M:%S.%f'
|
||||||
link_to_description: 'dictionaries/ICAD_HONO.yaml'
|
link_to_description: 'dictionaries/ICAD.yaml'
|
||||||
|
|
||||||
NO2: #ICAD/NO2:
|
NO2: #ICAD/NO2:
|
||||||
table_header : 'Start Date/Time (UTC) Duration (s) NO2 (ppb) NO2 Uncertainty (ppb) H2O (ppb) H2O Uncertainty (ppb) CHOCHO (ppb) CHOCHO Uncertainty (ppb) File Number Light Intensity #ICEDOAS iter. Cell Pressure Ambient Pressure Cell Temp Spec Temp Lat Lon Height Speed GPSQuality 0-Air Ref. Time 0-Air Ref. Duration 0-Air Ref. File Number 0-Air Ref. Intensity 0-Air Ref. Rel Intensity 0-Air Ref. Intensity valid MeasMode SampleSource'
|
table_header : 'Start Date/Time (UTC) Duration (s) NO2 (ppb) NO2 Uncertainty (ppb) H2O (ppb) H2O Uncertainty (ppb) CHOCHO (ppb) CHOCHO Uncertainty (ppb) File Number Light Intensity #ICEDOAS iter. Cell Pressure Ambient Pressure Cell Temp Spec Temp Lat Lon Height Speed GPSQuality 0-Air Ref. Time 0-Air Ref. Duration 0-Air Ref. File Number 0-Air Ref. Intensity 0-Air Ref. Rel Intensity 0-Air Ref. Intensity valid MeasMode SampleSource'
|
||||||
@ -38,7 +38,7 @@ NO2: #ICAD/NO2:
|
|||||||
file_encoding : 'latin-1'
|
file_encoding : 'latin-1'
|
||||||
timestamp: ['Start Date/Time (UTC)']
|
timestamp: ['Start Date/Time (UTC)']
|
||||||
datetime_format: '%Y-%m-%d %H:%M:%S.%f'
|
datetime_format: '%Y-%m-%d %H:%M:%S.%f'
|
||||||
link_to_description: 'dictionaries/ICAD_NO2.yaml'
|
link_to_description: 'dictionaries/ICAD.yaml'
|
||||||
|
|
||||||
Lopap:
|
Lopap:
|
||||||
#table_header : 'Date;Time;Ch1;490.1;500.2;510.0;520.0;530.1;540.0;550.7;603.2;700.3;800.0;Ch2;500.5;510.3;520.5;530.7;540.8;550.5;550.8;560.9;570.9;581.2;586.2;591.2;596.1;601.1;606.4;611.3;'
|
#table_header : 'Date;Time;Ch1;490.1;500.2;510.0;520.0;530.1;540.0;550.7;603.2;700.3;800.0;Ch2;500.5;510.3;520.5;530.7;540.8;550.5;550.8;560.9;570.9;581.2;586.2;591.2;596.1;601.1;606.4;611.3;'
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
import os
|
|
||||||
import json
|
|
||||||
|
|
||||||
#root_dir = os.path.abspath(os.curdir)
|
|
||||||
#sys.path.append(root_dir)
|
|
||||||
#print(__file__)
|
|
||||||
|
|
||||||
#from instruments.readers import set_dima_path as configpath
|
|
||||||
#configpath.set_dima_path()
|
|
||||||
|
|
||||||
from utils import g5505_utils
|
|
||||||
|
|
||||||
|
|
||||||
def read_jsonflag_as_dict(path_to_file):
|
|
||||||
|
|
||||||
|
|
||||||
file_dict = {}
|
|
||||||
path_tail, path_head = os.path.split(path_to_file)
|
|
||||||
|
|
||||||
file_dict['name'] = path_head
|
|
||||||
# TODO: review this header dictionary, it may not be the best way to represent header data
|
|
||||||
file_dict['attributes_dict'] = {}
|
|
||||||
file_dict['datasets'] = []
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(path_to_file, 'r') as stream:
|
|
||||||
flag = json.load(stream)#, Loader=json.FullLoader)
|
|
||||||
except (FileNotFoundError, json.JSONDecodeError) as exc:
|
|
||||||
print(exc)
|
|
||||||
|
|
||||||
dataset = {}
|
|
||||||
dataset['name'] = 'data_table'#_numerical_variables'
|
|
||||||
dataset['data'] = g5505_utils.convert_attrdict_to_np_structured_array(flag) #df_numerical_attrs.to_numpy()
|
|
||||||
dataset['shape'] = dataset['data'].shape
|
|
||||||
dataset['dtype'] = type(dataset['data'])
|
|
||||||
|
|
||||||
file_dict['datasets'].append(dataset)
|
|
||||||
|
|
||||||
return file_dict
|
|
@ -1,19 +1,40 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import collections
|
import collections
|
||||||
import yaml
|
import yaml
|
||||||
|
import h5py
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
# Import project modules
|
# Import project modules
|
||||||
root_dir = os.path.abspath(os.curdir)
|
#root_dir = os.path.abspath(os.curdir)
|
||||||
sys.path.append(root_dir)
|
#sys.path.append(root_dir)
|
||||||
|
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# from dima.utils import g5505_utils as utils
|
||||||
|
#except ModuleNotFoundError:
|
||||||
|
# import utils.g5505_utils as utils
|
||||||
|
# import src.hdf5_ops as hdf5_ops
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with_copy: bool = True):
|
||||||
|
|
||||||
|
filename = os.path.normpath(filename)
|
||||||
# If instruments_dir is not provided, use the default path relative to the module directory
|
# If instruments_dir is not provided, use the default path relative to the module directory
|
||||||
if not instruments_dir:
|
if not instruments_dir:
|
||||||
# Assuming the instruments folder is one level up from the source module directory
|
# Assuming the instruments folder is one level up from the source module directory
|
||||||
@ -23,6 +44,8 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
# Normalize the path (resolves any '..' in the path)
|
# Normalize the path (resolves any '..' in the path)
|
||||||
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
|
instrument_configs_path = os.path.abspath(os.path.join(instruments_dir,'readers','config_text_reader.yaml'))
|
||||||
|
|
||||||
|
print(instrument_configs_path)
|
||||||
|
|
||||||
with open(instrument_configs_path,'r') as stream:
|
with open(instrument_configs_path,'r') as stream:
|
||||||
try:
|
try:
|
||||||
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
config_dict = yaml.load(stream, Loader=yaml.FullLoader)
|
||||||
@ -44,7 +67,9 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
description_dict = {}
|
description_dict = {}
|
||||||
|
|
||||||
for instFolder in config_dict.keys():
|
for instFolder in config_dict.keys():
|
||||||
|
|
||||||
if instFolder in filename.split(os.sep):
|
if instFolder in filename.split(os.sep):
|
||||||
|
|
||||||
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
|
file_encoding = config_dict[instFolder].get('file_encoding',file_encoding)
|
||||||
separator = config_dict[instFolder].get('separator',separator)
|
separator = config_dict[instFolder].get('separator',separator)
|
||||||
table_header = config_dict[instFolder].get('table_header',table_header)
|
table_header = config_dict[instFolder].get('table_header',table_header)
|
||||||
@ -76,6 +101,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
|
#with open(tmp_filename,'rb',encoding=file_encoding,errors='ignore') as f:
|
||||||
|
|
||||||
if not isinstance(table_header, list):
|
if not isinstance(table_header, list):
|
||||||
|
|
||||||
table_header = [table_header]
|
table_header = [table_header]
|
||||||
file_encoding = [file_encoding]
|
file_encoding = [file_encoding]
|
||||||
separator = [separator]
|
separator = [separator]
|
||||||
@ -87,14 +113,17 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
with open(tmp_filename,'rb') as f:
|
with open(tmp_filename,'rb') as f:
|
||||||
|
|
||||||
for line_number, line in enumerate(f):
|
for line_number, line in enumerate(f):
|
||||||
|
decoded_line = line.decode(file_encoding[tb_idx])
|
||||||
|
|
||||||
for tb_idx, tb in enumerate(table_header):
|
|
||||||
if tb in line.decode(file_encoding[tb_idx]):
|
|
||||||
break
|
|
||||||
|
|
||||||
if tb in line.decode(file_encoding[tb_idx]):
|
for tb_idx, tb in enumerate(table_header):
|
||||||
list_of_substrings = line.decode(file_encoding[tb_idx]).split(separator[tb_idx].replace('\\t','\t'))
|
print(tb)
|
||||||
|
if tb in decoded_line:
|
||||||
|
break
|
||||||
|
|
||||||
|
if tb in decoded_line:
|
||||||
|
|
||||||
|
list_of_substrings = decoded_line.split(separator[tb_idx].replace('\\t','\t'))
|
||||||
|
|
||||||
# Count occurrences of each substring
|
# Count occurrences of each substring
|
||||||
substring_counts = collections.Counter(list_of_substrings)
|
substring_counts = collections.Counter(list_of_substrings)
|
||||||
@ -109,9 +138,11 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
|
|
||||||
#print(line_number, len(column_names ),'\n')
|
#print(line_number, len(column_names ),'\n')
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
print('Table header was not detected.')
|
||||||
# Subdivide line into words, and join them by single space.
|
# Subdivide line into words, and join them by single space.
|
||||||
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
# I asumme this can produce a cleaner line that contains no weird separator characters \t \r or extra spaces and so on.
|
||||||
list_of_substrings = line.decode(file_encoding[tb_idx]).split()
|
list_of_substrings = decoded_line.split()
|
||||||
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
# TODO: ideally we should use a multilinear string but the yalm parser is not recognizing \n as special character
|
||||||
#line = ' '.join(list_of_substrings+['\n'])
|
#line = ' '.join(list_of_substrings+['\n'])
|
||||||
#line = ' '.join(list_of_substrings)
|
#line = ' '.join(list_of_substrings)
|
||||||
@ -119,8 +150,13 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
|
|
||||||
|
|
||||||
# TODO: it does not work with separator as none :(. fix for RGA
|
# TODO: it does not work with separator as none :(. fix for RGA
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
print(column_names)
|
||||||
if not 'infer' in table_header:
|
if not 'infer' in table_header:
|
||||||
|
#print(table_header)
|
||||||
|
#print(file_encoding[tb_idx])
|
||||||
|
|
||||||
df = pd.read_csv(tmp_filename,
|
df = pd.read_csv(tmp_filename,
|
||||||
delimiter = separator[tb_idx].replace('\\t','\t'),
|
delimiter = separator[tb_idx].replace('\\t','\t'),
|
||||||
header=line_number,
|
header=line_number,
|
||||||
@ -138,7 +174,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
df_numerical_attrs = df.select_dtypes(include ='number')
|
df_numerical_attrs = df.select_dtypes(include ='number')
|
||||||
df_categorical_attrs = df.select_dtypes(exclude='number')
|
df_categorical_attrs = df.select_dtypes(exclude='number')
|
||||||
numerical_variables = [item for item in df_numerical_attrs.columns]
|
numerical_variables = [item for item in df_numerical_attrs.columns]
|
||||||
|
|
||||||
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
|
# Consolidate into single timestamp column the separate columns 'date' 'time' specified in text_data_source.yaml
|
||||||
if timestamp_variables:
|
if timestamp_variables:
|
||||||
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
|
#df_categorical_attrs['timestamps'] = [' '.join(df_categorical_attrs.loc[i,timestamp_variables].to_numpy()) for i in df.index]
|
||||||
@ -148,7 +184,7 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
#df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
#df_categorical_attrs['timestamps'] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
||||||
timestamps_name = ' '.join(timestamp_variables)
|
timestamps_name = ' '.join(timestamp_variables)
|
||||||
df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
df_categorical_attrs[ timestamps_name] = df_categorical_attrs[timestamp_variables].astype(str).agg(' '.join, axis=1)
|
||||||
|
|
||||||
valid_indices = []
|
valid_indices = []
|
||||||
if datetime_format:
|
if datetime_format:
|
||||||
df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce')
|
df_categorical_attrs[ timestamps_name] = pd.to_datetime(df_categorical_attrs[ timestamps_name],format=datetime_format,errors='coerce')
|
||||||
@ -249,7 +285,59 @@ def read_txt_files_as_dict(filename: str, instruments_dir: str = None, work_with
|
|||||||
# if timestamps_name in categorical_variables:
|
# if timestamps_name in categorical_variables:
|
||||||
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
|
# dataset['attributes'] = {timestamps_name: utils.parse_attribute({'unit':'YYYY-MM-DD HH:MM:SS.ffffff'})}
|
||||||
# file_dict['datasets'].append(dataset)
|
# file_dict['datasets'].append(dataset)
|
||||||
except:
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
return file_dict
|
return file_dict
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
hdf5_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read source file and return an internal dictionary representation
|
||||||
|
idr_dict = read_txt_files_as_dict(src_file_path)
|
||||||
|
|
||||||
|
if not os.path.exists(hdf5_file_path):
|
||||||
|
default_mode = 'w'
|
||||||
|
|
||||||
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||||
|
|
||||||
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||||
|
try:
|
||||||
|
# Create group if it does not exist
|
||||||
|
if dst_group_name not in hdf5_file_obj:
|
||||||
|
hdf5_file_obj.create_group(dst_group_name)
|
||||||
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||||
|
print(f'Created new group: {dst_group_name}')
|
||||||
|
else:
|
||||||
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||||
|
|
||||||
|
# Save dictionary to HDF5
|
||||||
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||||
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||||
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||||
|
|
||||||
|
@ -1,5 +1,10 @@
|
|||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
import h5py
|
||||||
|
|
||||||
from igor2.binarywave import load as loadibw
|
from igor2.binarywave import load as loadibw
|
||||||
|
import logging
|
||||||
|
import argparse
|
||||||
|
|
||||||
def read_xps_ibw_file_as_dict(filename):
|
def read_xps_ibw_file_as_dict(filename):
|
||||||
"""
|
"""
|
||||||
@ -76,4 +81,66 @@ def read_xps_ibw_file_as_dict(filename):
|
|||||||
file_dict['datasets'].append(dataset)
|
file_dict['datasets'].append(dataset)
|
||||||
|
|
||||||
|
|
||||||
return file_dict
|
return file_dict
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
thisFilePath = os.path.abspath(__file__)
|
||||||
|
except NameError:
|
||||||
|
print("Error: __file__ is not available. Ensure the script is being run from a file.")
|
||||||
|
print("[Notice] Path to DIMA package may not be resolved properly.")
|
||||||
|
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||||
|
|
||||||
|
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..','..')) # Move up to project root
|
||||||
|
|
||||||
|
if dimaPath not in sys.path: # Avoid duplicate entries
|
||||||
|
sys.path.insert(0,dimaPath)
|
||||||
|
|
||||||
|
from src.hdf5_ops import save_file_dict_to_hdf5
|
||||||
|
from utils.g5505_utils import created_at
|
||||||
|
|
||||||
|
# Set up argument parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Data ingestion process to HDF5 files.")
|
||||||
|
parser.add_argument('dst_file_path', type=str, help="Path to the target HDF5 file.")
|
||||||
|
parser.add_argument('src_file_path', type=str, help="Relative path to source file to be saved to target HDF5 file.")
|
||||||
|
parser.add_argument('dst_group_name', type=str, help="Group name '/instFolder/[category]/fileName' in the target HDF5 file.")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
hdf5_file_path = args.dst_file_path
|
||||||
|
src_file_path = args.src_file_path
|
||||||
|
dst_group_name = args.dst_group_name
|
||||||
|
default_mode = 'r+'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read source file and return an internal dictionary representation
|
||||||
|
idr_dict = read_xps_ibw_file_as_dict(src_file_path)
|
||||||
|
|
||||||
|
if not os.path.exists(hdf5_file_path):
|
||||||
|
default_mode = 'w'
|
||||||
|
|
||||||
|
print(f'Opening HDF5 file: {hdf5_file_path} in mode {default_mode}')
|
||||||
|
|
||||||
|
with h5py.File(hdf5_file_path, mode=default_mode, track_order=True) as hdf5_file_obj:
|
||||||
|
try:
|
||||||
|
# Create group if it does not exist
|
||||||
|
if dst_group_name not in hdf5_file_obj:
|
||||||
|
hdf5_file_obj.create_group(dst_group_name)
|
||||||
|
hdf5_file_obj[dst_group_name].attrs['creation_date'] = created_at().encode('utf-8')
|
||||||
|
print(f'Created new group: {dst_group_name}')
|
||||||
|
else:
|
||||||
|
print(f'Group {dst_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to create group %s in HDF5: %s', dst_group_name, inst)
|
||||||
|
|
||||||
|
# Save dictionary to HDF5
|
||||||
|
save_file_dict_to_hdf5(hdf5_file_obj, dst_group_name, idr_dict)
|
||||||
|
print(f'Completed saving file dict with keys: {idr_dict.keys()}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('File reader failed to process %s: %s', src_file_path, e)
|
||||||
|
print(f'File reader failed to process {src_file_path}. See logs for details.')
|
||||||
|
|
||||||
|
75
instruments/registry.yaml
Normal file
75
instruments/registry.yaml
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
instruments:
|
||||||
|
- instrumentFolderName: default
|
||||||
|
fileExtension: csv
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: null
|
||||||
|
|
||||||
|
- instrumentFolderName: NEXAFS
|
||||||
|
fileExtension: h5
|
||||||
|
fileReaderPath: null
|
||||||
|
InstrumentDictionaryPath: null
|
||||||
|
|
||||||
|
- instrumentFolderName: SES
|
||||||
|
fileExtension: ibw
|
||||||
|
fileReaderPath: instruments/readers/xps_ibw_reader.py
|
||||||
|
InstrumentDictionaryPath: null
|
||||||
|
|
||||||
|
- instrumentFolderName: RGA
|
||||||
|
fileExtension: txt
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/RGA.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: Pressure
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/Pressure.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: Humidity_Sensors
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/Humidity_Sensors.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: ICAD
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/ICAD.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: Lopap
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/Lopap.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: T200_NOx
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/T200_NOx.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: T360U_CO2
|
||||||
|
fileExtension: dat
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/T360U_CO2.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: htof
|
||||||
|
fileExtension: h5
|
||||||
|
fileReaderPath: null
|
||||||
|
InstrumentDictionaryPath: null
|
||||||
|
|
||||||
|
- instrumentFolderName: smps
|
||||||
|
fileExtension: txt
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/smps.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: gas
|
||||||
|
fileExtension: txt
|
||||||
|
fileReaderPath: instruments/readers/g5505_text_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/gas.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: ACSM_TOFWARE
|
||||||
|
fileExtension: txt
|
||||||
|
fileReaderPath: instruments/readers/acsm_tofware_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/ACSM_TOFWARE.yaml
|
||||||
|
|
||||||
|
- instrumentFolderName: ACSM_TOFWARE
|
||||||
|
fileExtension: csv
|
||||||
|
fileReaderPath: instruments/readers/acsm_tofware_reader.py
|
||||||
|
InstrumentDictionaryPath: instruments/dictionaries/ACSM_TOFWARE.yaml
|
@ -22,9 +22,15 @@ from datetime import datetime
|
|||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
# Import DIMA modules
|
# Import DIMA modules
|
||||||
import src.hdf5_writer as hdf5_lib
|
try:
|
||||||
import utils.g5505_utils as utils
|
from dima.src import hdf5_writer as hdf5_lib
|
||||||
from instruments import filereader_registry
|
from dima.utils import g5505_utils as utils
|
||||||
|
from dima.instruments.readers import filereader_registry
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
print(':)')
|
||||||
|
import src.hdf5_writer as hdf5_lib
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
from instruments import filereader_registry
|
||||||
|
|
||||||
allowed_file_extensions = filereader_registry.file_extensions
|
allowed_file_extensions = filereader_registry.file_extensions
|
||||||
|
|
||||||
|
@ -15,7 +15,12 @@ if dimaPath not in sys.path: # Avoid duplicate entries
|
|||||||
|
|
||||||
import h5py
|
import h5py
|
||||||
import yaml
|
import yaml
|
||||||
import src.hdf5_ops as hdf5_ops
|
|
||||||
|
try:
|
||||||
|
from dima.src import hdf5_ops as hdf5_ops
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
import src.hdf5_ops as hdf5_ops
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def load_yaml(review_yaml_file):
|
def load_yaml(review_yaml_file):
|
||||||
|
@ -1,7 +1,12 @@
|
|||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
import utils.g5505_utils as utils
|
|
||||||
from pipelines.metadata_revision import update_hdf5_file_with_review
|
try:
|
||||||
|
from dima.utils import g5505_utils as utils
|
||||||
|
from dima.pipelines.metadata_revision import update_hdf5_file_with_review
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
from pipelines.metadata_revision import update_hdf5_file_with_review
|
||||||
|
|
||||||
def perform_git_operations(hdf5_upload):
|
def perform_git_operations(hdf5_upload):
|
||||||
status_command = ['git', 'status']
|
status_command = ['git', 'status']
|
||||||
|
@ -17,9 +17,6 @@ if dimaPath not in sys.path: # Avoid duplicate entries
|
|||||||
import h5py
|
import h5py
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
import utils.g5505_utils as utils
|
|
||||||
import src.hdf5_writer as hdf5_lib
|
|
||||||
import logging
|
import logging
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
@ -29,6 +26,13 @@ import yaml
|
|||||||
import json
|
import json
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# from dima.utils import g5505_utils as utils
|
||||||
|
# from dima.src import hdf5_writer as hdf5_lib
|
||||||
|
#except ModuleNotFoundError:
|
||||||
|
import utils.g5505_utils as utils
|
||||||
|
import src.hdf5_writer as hdf5_lib
|
||||||
|
|
||||||
class HDF5DataOpsManager():
|
class HDF5DataOpsManager():
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -702,3 +706,67 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
#run(sys.argv[2])
|
#run(sys.argv[2])
|
||||||
|
|
||||||
|
|
||||||
|
def save_file_dict_to_hdf5(h5file, group_name, file_dict):
|
||||||
|
"""
|
||||||
|
Transfers data from a file_dict to an HDF5 file.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
h5file : h5py.File
|
||||||
|
HDF5 file object where the data will be written.
|
||||||
|
group_name : str
|
||||||
|
Name of the HDF5 group where data will be stored.
|
||||||
|
file_dict : dict
|
||||||
|
Dictionary containing file data to be transferred. Required structure:
|
||||||
|
{
|
||||||
|
'name': str,
|
||||||
|
'attributes_dict': dict,
|
||||||
|
'datasets': [
|
||||||
|
{
|
||||||
|
'name': str,
|
||||||
|
'data': array-like,
|
||||||
|
'shape': tuple,
|
||||||
|
'attributes': dict (optional)
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not file_dict:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create group and add their attributes
|
||||||
|
filename = file_dict['name']
|
||||||
|
group = h5file[group_name].create_group(name=filename)
|
||||||
|
# Add group attributes
|
||||||
|
group.attrs.update(file_dict['attributes_dict'])
|
||||||
|
|
||||||
|
# Add datasets to the just created group
|
||||||
|
for dataset in file_dict['datasets']:
|
||||||
|
dataset_obj = group.create_dataset(
|
||||||
|
name=dataset['name'],
|
||||||
|
data=dataset['data'],
|
||||||
|
shape=dataset['shape']
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add dataset's attributes
|
||||||
|
attributes = dataset.get('attributes', {})
|
||||||
|
dataset_obj.attrs.update(attributes)
|
||||||
|
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
|
||||||
|
|
||||||
|
stdout = f'Completed transfer for /{group_name}/{filename}'
|
||||||
|
print(stdout)
|
||||||
|
|
||||||
|
except Exception as inst:
|
||||||
|
logging.error('Failed to transfer data into HDF5: %s', inst)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
root_dir = os.path.abspath(os.curdir)
|
#root_dir = os.path.abspath(os.curdir)
|
||||||
sys.path.append(root_dir)
|
#sys.path.append(root_dir)
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -9,89 +9,38 @@ import h5py
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# from dima.utils import g5505_utils as utils
|
||||||
|
# from dima.src import hdf5_ops
|
||||||
|
# from dima.instruments import filereader_registry as filereader_registry
|
||||||
|
#except ModuleNotFoundError:
|
||||||
import utils.g5505_utils as utils
|
import utils.g5505_utils as utils
|
||||||
|
import src.hdf5_ops as hdf5_ops
|
||||||
import instruments.filereader_registry as filereader_registry
|
import instruments.filereader_registry as filereader_registry
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def __transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
|
|
||||||
"""
|
|
||||||
Transfers data from a file_dict to an HDF5 file.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
h5file : h5py.File
|
|
||||||
HDF5 file object where the data will be written.
|
|
||||||
group_name : str
|
|
||||||
Name of the HDF5 group where data will be stored.
|
|
||||||
file_dict : dict
|
|
||||||
Dictionary containing file data to be transferred. Required structure:
|
|
||||||
{
|
|
||||||
'name': str,
|
|
||||||
'attributes_dict': dict,
|
|
||||||
'datasets': [
|
|
||||||
{
|
|
||||||
'name': str,
|
|
||||||
'data': array-like,
|
|
||||||
'shape': tuple,
|
|
||||||
'attributes': dict (optional)
|
|
||||||
},
|
|
||||||
...
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
Returns
|
def __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, work_with_copy : bool = True):
|
||||||
-------
|
|
||||||
None
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not file_dict:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create group and add their attributes
|
|
||||||
filename = file_dict['name']
|
|
||||||
group = h5file[group_name].create_group(name=filename)
|
|
||||||
# Add group attributes
|
|
||||||
group.attrs.update(file_dict['attributes_dict'])
|
|
||||||
|
|
||||||
# Add datasets to the just created group
|
|
||||||
for dataset in file_dict['datasets']:
|
|
||||||
dataset_obj = group.create_dataset(
|
|
||||||
name=dataset['name'],
|
|
||||||
data=dataset['data'],
|
|
||||||
shape=dataset['shape']
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add dataset's attributes
|
|
||||||
attributes = dataset.get('attributes', {})
|
|
||||||
dataset_obj.attrs.update(attributes)
|
|
||||||
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
|
|
||||||
|
|
||||||
stdout = f'Completed transfer for /{group_name}/{filename}'
|
|
||||||
|
|
||||||
except Exception as inst:
|
|
||||||
stdout = inst
|
|
||||||
logging.error('Failed to transfer data into HDF5: %s', inst)
|
|
||||||
|
|
||||||
return stdout
|
|
||||||
|
|
||||||
def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True):
|
|
||||||
# Create copy of original file to avoid possible file corruption and work with it.
|
# Create copy of original file to avoid possible file corruption and work with it.
|
||||||
|
with h5py.File(path_to_output_file, mode='r+', track_order=True) as dest_file_obj:
|
||||||
|
|
||||||
if work_with_copy:
|
if work_with_copy:
|
||||||
tmp_file_path = utils.make_file_copy(source_file_path)
|
tmp_file_path = utils.make_file_copy(source_file_path)
|
||||||
else:
|
else:
|
||||||
tmp_file_path = source_file_path
|
tmp_file_path = source_file_path
|
||||||
|
|
||||||
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
|
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
|
||||||
with h5py.File(tmp_file_path,'r') as src_file:
|
with h5py.File(tmp_file_path,'r') as src_file:
|
||||||
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
|
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
|
||||||
|
|
||||||
if 'tmp_files' in tmp_file_path:
|
if 'tmp_files' in tmp_file_path:
|
||||||
os.remove(tmp_file_path)
|
os.remove(tmp_file_path)
|
||||||
|
|
||||||
stdout = f'Completed transfer for /{dest_group_name}'
|
stdout = f'Completed transfer for /{dest_group_name}'
|
||||||
|
|
||||||
return stdout
|
return stdout
|
||||||
|
|
||||||
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
||||||
@ -228,20 +177,16 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
|
|
||||||
# hdf5 path to filename group
|
# hdf5 path to filename group
|
||||||
dest_group_name = f'{group_name}/{filename}'
|
dest_group_name = f'{group_name}/{filename}'
|
||||||
|
source_file_path = os.path.join(dirpath,filename)
|
||||||
|
|
||||||
if not 'h5' in filename:
|
if not 'h5' in filename:
|
||||||
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||||
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||||
file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
|
file_dict = filereader_registry.select_file_reader(dest_group_name)(source_file_path)
|
||||||
# Check whether there is an available file reader
|
|
||||||
if file_dict is not None and isinstance(file_dict, dict):
|
|
||||||
if 'attributes_dict' in file_dict:
|
|
||||||
file_dict['attributes_dict'].update(data_lineage_dict.get(filename,{}))
|
|
||||||
|
|
||||||
stdout = __transfer_file_dict_to_hdf5(h5file, group_name, file_dict)
|
stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
source_file_path = os.path.join(dirpath,filename)
|
|
||||||
dest_file_obj = h5file
|
dest_file_obj = h5file
|
||||||
#group_name +'/'+filename
|
#group_name +'/'+filename
|
||||||
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||||
@ -270,6 +215,186 @@ def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
|
|||||||
|
|
||||||
return path_to_output_file #, output_yml_filename_path
|
return path_to_output_file #, output_yml_filename_path
|
||||||
|
|
||||||
|
def create_hdf5_file_from_filesystem_path_new(path_to_input_directory: str,
|
||||||
|
path_to_filenames_dict: dict = None,
|
||||||
|
select_dir_keywords : list = [],
|
||||||
|
root_metadata_dict : dict = {}, mode = 'w'):
|
||||||
|
|
||||||
|
"""
|
||||||
|
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
|
||||||
|
of a given filesystem path.
|
||||||
|
|
||||||
|
The data integration capabilities are limited by our file reader, which can only access data from a list of
|
||||||
|
admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
|
||||||
|
Files are formatted as composite objects consisting of a group, file, and attributes.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
output_filename : str
|
||||||
|
Name of the output HDF5 file.
|
||||||
|
path_to_input_directory : str
|
||||||
|
Path to root directory, specified with forward slashes, e.g., path/to/root.
|
||||||
|
|
||||||
|
path_to_filenames_dict : dict, optional
|
||||||
|
A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files.
|
||||||
|
If provided, 'input_file_system_path' is ignored.
|
||||||
|
|
||||||
|
select_dir_keywords : list
|
||||||
|
List of string elements to consider or select only directory paths that contain
|
||||||
|
a word in 'select_dir_keywords'. When empty, all directory paths are considered
|
||||||
|
to be included in the HDF5 file group hierarchy.
|
||||||
|
root_metadata_dict : dict
|
||||||
|
Metadata to include at the root level of the HDF5 file.
|
||||||
|
|
||||||
|
mode : str
|
||||||
|
'w' create File, truncate if it exists, or 'r+' read/write, File must exists. By default, mode = "w".
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
output_filename : str
|
||||||
|
Path to the created HDF5 file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
if not mode in ['w','r+']:
|
||||||
|
raise ValueError(f'Parameter mode must take values in ["w","r+"]')
|
||||||
|
|
||||||
|
if not '/' in path_to_input_directory:
|
||||||
|
raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' )
|
||||||
|
|
||||||
|
#path_to_output_directory = os.path.join(path_to_input_directory,'..')
|
||||||
|
path_to_input_directory = os.path.normpath(path_to_input_directory).rstrip(os.sep)
|
||||||
|
|
||||||
|
|
||||||
|
for i, keyword in enumerate(select_dir_keywords):
|
||||||
|
select_dir_keywords[i] = keyword.replace('/',os.sep)
|
||||||
|
|
||||||
|
if not path_to_filenames_dict:
|
||||||
|
# On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory.
|
||||||
|
# Therefore, there wont be a copying conflict by setting up input and output directories the same
|
||||||
|
path_to_filenames_dict = utils.copy_directory_with_contraints(input_dir_path=path_to_input_directory,
|
||||||
|
output_dir_path=path_to_input_directory,
|
||||||
|
dry_run=True)
|
||||||
|
# Set input_directory as copied input directory
|
||||||
|
root_dir = path_to_input_directory
|
||||||
|
path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5'
|
||||||
|
|
||||||
|
start_message = f'\n[Start] Data integration :\nSource: {path_to_input_directory}\nDestination: {path_to_output_file}\n'
|
||||||
|
|
||||||
|
print(start_message)
|
||||||
|
logging.info(start_message)
|
||||||
|
|
||||||
|
# Check if the .h5 file already exists
|
||||||
|
if os.path.exists(path_to_output_file) and mode in ['w']:
|
||||||
|
message = (
|
||||||
|
f"[Notice] The file '{path_to_output_file}' already exists and will not be overwritten.\n"
|
||||||
|
"If you wish to replace it, please delete the existing file first and rerun the program."
|
||||||
|
)
|
||||||
|
print(message)
|
||||||
|
logging.error(message)
|
||||||
|
else:
|
||||||
|
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
|
||||||
|
print('Created file')
|
||||||
|
|
||||||
|
number_of_dirs = len(path_to_filenames_dict.keys())
|
||||||
|
dir_number = 1
|
||||||
|
for dirpath, filtered_filenames_list in path_to_filenames_dict.items():
|
||||||
|
|
||||||
|
# Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict.
|
||||||
|
if not filtered_filenames_list:
|
||||||
|
continue
|
||||||
|
|
||||||
|
group_name = dirpath.replace(os.sep,'/')
|
||||||
|
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
|
||||||
|
|
||||||
|
# Flatten group name to two level
|
||||||
|
if select_dir_keywords:
|
||||||
|
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
|
||||||
|
else:
|
||||||
|
offset = 2
|
||||||
|
tmp_list = group_name.split('/')
|
||||||
|
if len(tmp_list) > offset+1:
|
||||||
|
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
|
||||||
|
|
||||||
|
# try:
|
||||||
|
# # Create group called "group_name". Hierarchy of nested groups can be implicitly defined by the forward slashes
|
||||||
|
# if not group_name in h5file.keys():
|
||||||
|
# h5file.create_group(group_name)
|
||||||
|
# h5file[group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
|
||||||
|
# #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
|
||||||
|
# #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
|
||||||
|
# #else:
|
||||||
|
# #print(group_name,' was already created.')
|
||||||
|
# instFoldermsgStart = f'Starting data transfer from instFolder: {group_name}'
|
||||||
|
# print(instFoldermsgStart)
|
||||||
|
|
||||||
|
# except Exception as inst:
|
||||||
|
# stdout = inst
|
||||||
|
# logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
|
||||||
|
|
||||||
|
for filenumber, filename in enumerate(filtered_filenames_list):
|
||||||
|
|
||||||
|
#file_ext = os.path.splitext(filename)[1]
|
||||||
|
#try:
|
||||||
|
|
||||||
|
# hdf5 path to filename group
|
||||||
|
dest_group_name = f'{group_name}/{filename}'
|
||||||
|
source_file_path = os.path.join(dirpath,filename)
|
||||||
|
|
||||||
|
if not 'h5' in filename:
|
||||||
|
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
|
||||||
|
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
|
||||||
|
|
||||||
|
# TODO: Run save_file_dict_to_hdf5 from reader.py using command line interface
|
||||||
|
#file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
|
||||||
|
|
||||||
|
#stdout = hdf5_ops.save_file_dict_to_hdf5(h5file, group_name, file_dict)
|
||||||
|
|
||||||
|
filereader_registry.run_reader(path_to_output_file, source_file_path, dest_group_name)
|
||||||
|
|
||||||
|
else:
|
||||||
|
|
||||||
|
#try:
|
||||||
|
# # Create group if it does not exist
|
||||||
|
# if dest_group_name not in dest_file_obj:
|
||||||
|
# dest_file_obj.create_group(dest_group_name)
|
||||||
|
# dest_file_obj[dest_group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
|
||||||
|
# print(f'Created new group: {dest_group_name}')
|
||||||
|
# else:
|
||||||
|
# print(f'Group {dest_group_name} already exists. Proceeding with data transfer...')
|
||||||
|
|
||||||
|
#except Exception as inst:
|
||||||
|
# logging.error('Failed to create group %s in HDF5: %s', dest_group_name, inst)
|
||||||
|
|
||||||
|
|
||||||
|
#group_name +'/'+filename
|
||||||
|
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
|
||||||
|
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
|
||||||
|
stdout = __copy_file_in_group(path_to_output_file, source_file_path, dest_group_name, False)
|
||||||
|
|
||||||
|
# Update the progress bar and log the end message
|
||||||
|
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
|
||||||
|
# Print and log the start message
|
||||||
|
utils.progressBar(dir_number, number_of_dirs, instFoldermsdEnd)
|
||||||
|
logging.info(instFoldermsdEnd )
|
||||||
|
dir_number = dir_number + 1
|
||||||
|
|
||||||
|
print('[End] Data integration')
|
||||||
|
logging.info('[End] Data integration')
|
||||||
|
|
||||||
|
if len(root_metadata_dict.keys())>0:
|
||||||
|
with h5py.File(path_to_output_file, mode='r+', track_order=True) as h5file:
|
||||||
|
for key, value in root_metadata_dict.items():
|
||||||
|
#if key in h5file.attrs:
|
||||||
|
# del h5file.attrs[key]
|
||||||
|
h5file.attrs.create(key, value)
|
||||||
|
#annotate_root_dir(output_filename,root_metadata_dict)
|
||||||
|
|
||||||
|
|
||||||
|
#output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename)
|
||||||
|
|
||||||
|
return path_to_output_file #, output_yml_filename_path
|
||||||
|
|
||||||
def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None):
|
def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None):
|
||||||
"""
|
"""
|
||||||
Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns.
|
Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns.
|
||||||
@ -412,6 +537,6 @@ def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5
|
|||||||
with h5py.File(output_filename, mode) as h5file:
|
with h5py.File(output_filename, mode) as h5file:
|
||||||
# Add project level attributes at the root/top level
|
# Add project level attributes at the root/top level
|
||||||
h5file.attrs.update(project_level_attributes)
|
h5file.attrs.update(project_level_attributes)
|
||||||
__transfer_file_dict_to_hdf5(h5file, '/', file_dict)
|
hdf5_ops.save_file_dict_to_hdf5(h5file, '/', file_dict)
|
||||||
|
|
||||||
#if __name__ == '__main__':
|
#if __name__ == '__main__':
|
||||||
|
@ -13,8 +13,11 @@ from plotly.subplots import make_subplots
|
|||||||
import plotly.graph_objects as go
|
import plotly.graph_objects as go
|
||||||
import plotly.express as px
|
import plotly.express as px
|
||||||
#import plotly.io as pio
|
#import plotly.io as pio
|
||||||
from src.hdf5_ops import get_parent_child_relationships
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from dima.src.hdf5_ops import get_parent_child_relationships
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
from src.hdf5_ops import get_parent_child_relationships
|
||||||
|
|
||||||
|
|
||||||
def display_group_hierarchy_on_a_treemap(filename: str):
|
def display_group_hierarchy_on_a_treemap(filename: str):
|
||||||
|
Reference in New Issue
Block a user