mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-29 12:50:48 +02:00
Implemented class FlaggingAppDataManager() with transfer_flags() and apply_flags() methods
This commit is contained in:
@ -5,6 +5,10 @@ import plotly.graph_objs as go
|
|||||||
import base64
|
import base64
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import dima.utils.g5505_utils as utils
|
||||||
|
|
||||||
UPLOAD_DIRECTORY = 'data_products/'
|
UPLOAD_DIRECTORY = 'data_products/'
|
||||||
|
|
||||||
flags_dict = {
|
flags_dict = {
|
||||||
@ -35,7 +39,10 @@ def save_file(name, content):
|
|||||||
|
|
||||||
def create_loaded_file_figure(file_path, instfolder):
|
def create_loaded_file_figure(file_path, instfolder):
|
||||||
|
|
||||||
DataOpsAPI = h5de.HDF5DataOpsManager(file_path)
|
DataOpsAPI = h5de.HDF5DataOpsManager(file_path)
|
||||||
|
|
||||||
|
if not DataOpsAPI.file_obj:
|
||||||
|
DataOpsAPI.load_file_obj()
|
||||||
|
|
||||||
target_channels = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['names'][0].decode().split(',')
|
target_channels = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['names'][0].decode().split(',')
|
||||||
target_loc = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['location'][0].decode()
|
target_loc = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['location'][0].decode()
|
||||||
@ -74,6 +81,137 @@ def create_loaded_file_figure(file_path, instfolder):
|
|||||||
|
|
||||||
fig.update_layout(height=1200, title_text="Multiple Subplots with Shared Y-Axes")
|
fig.update_layout(height=1200, title_text="Multiple Subplots with Shared Y-Axes")
|
||||||
|
|
||||||
DataOpsAPI.close_file()
|
DataOpsAPI.unload_file_obj()
|
||||||
|
|
||||||
|
return fig
|
||||||
|
|
||||||
|
class FlaggingAppDataManager():
|
||||||
|
|
||||||
|
def __init__(self, file_path, mode = 'r+') -> None:
|
||||||
|
|
||||||
|
self.file_path = file_path
|
||||||
|
self.mode = mode
|
||||||
|
self._data_ops_obj = None
|
||||||
|
self.file_obj = None
|
||||||
|
self.datasets_metadata_df = None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def load_file_obj(self):
|
||||||
|
self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode)
|
||||||
|
self._data_ops_obj.load_file_obj()
|
||||||
|
self.file_obj = self._data_ops_obj.file_obj
|
||||||
|
|
||||||
|
def unload_file_obj(self):
|
||||||
|
self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode)
|
||||||
|
self._data_ops_obj.unload_file_obj() # sets __data_ops_obj.file_obj to None
|
||||||
|
|
||||||
|
def transfer_flags(self):
|
||||||
|
|
||||||
|
if self.file_obj is None:
|
||||||
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
||||||
|
|
||||||
|
path_to_append_dir, ext = os.path.splitext(self.file_path)
|
||||||
|
self._data_ops_obj.update_file(path_to_append_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_flags(self,instFolder):
|
||||||
|
|
||||||
|
# TODO: apply flags to diagnostic and indivial channels. so far is all channels are cleaned
|
||||||
|
|
||||||
|
if self.file_obj is None:
|
||||||
|
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
||||||
|
|
||||||
|
DataOpsManager = self._data_ops_obj
|
||||||
|
file_obj = self.file_obj
|
||||||
|
|
||||||
|
#with h5py.File(self.file_path, mode = self.mode, track_order=True) as file_obj:
|
||||||
|
try:
|
||||||
|
|
||||||
|
if not instFolder in file_obj:
|
||||||
|
raise ValueError(f'Invalid instrument name. Instrument folder {instFolder} was not found in file {self.file_path}.')
|
||||||
|
|
||||||
|
if not ('diagnostic_channels' in file_obj[instFolder].attrs and 'target_channels' in file_obj[instFolder].attrs):
|
||||||
|
raise ValueError(
|
||||||
|
f'Required attributes missing. Instrument folder {instFolder} in file {self.file_path} has to be annotated with '
|
||||||
|
'attributes "diagnostic_channels" and "target_channels" that specify channels location and their names.'
|
||||||
|
)
|
||||||
|
|
||||||
|
dataset_name = file_obj[instFolder].attrs['target_channels']['location'][0].decode()
|
||||||
|
channel_names = file_obj[instFolder].attrs['target_channels']['names'][0].decode().split(',')
|
||||||
|
|
||||||
|
dataset_obj = file_obj[dataset_name]
|
||||||
|
# TODO: maybe we can do this directly on dataset = dataset_obj[...], which is a structured numpy array, instead of wrapping that as dataframe
|
||||||
|
dataset_df = DataOpsManager.extract_dataset_as_dataframe(dataset_name)
|
||||||
|
|
||||||
|
# Define datetime variable based on channel names. We assume by design the first entry of the list is the datetime variable name.
|
||||||
|
datetime_var = channel_names[0]
|
||||||
|
remaining_vars = channel_names.copy()
|
||||||
|
remaining_vars.remove(datetime_var)
|
||||||
|
|
||||||
|
ref_datetime_format = dataset_obj.attrs.get(datetime_var,None)['datetime_format'][0].decode()
|
||||||
|
|
||||||
|
#datetime_var_data = pd.Series([item.decode() for item in dataset_obj[datetime_var]])
|
||||||
|
#datetime_var_data = pd.to_datetime(datetime_var_data , format = ref_datetime_format, errors = 'coerce')
|
||||||
|
dataset_df[datetime_var] = dataset_df[datetime_var].apply(lambda x: x.decode() )
|
||||||
|
dataset_df[datetime_var] = pd.to_datetime(dataset_df[datetime_var], format = ref_datetime_format, errors = 'coerce')
|
||||||
|
|
||||||
|
flag_indicator = np.zeros(shape = dataset_df[datetime_var].shape,
|
||||||
|
dtype = bool)
|
||||||
|
|
||||||
|
# TODO: include this information as part of the flag's attributes in the flag recording process
|
||||||
|
flag_datetime_format='%Y-%m-%d %H:%M:%S.%f'
|
||||||
|
for flag in file_obj[f'{instFolder}_flags']:
|
||||||
|
flag_obj = file_obj[f'{instFolder}_flags'][flag]['data_table']
|
||||||
|
|
||||||
|
# Replace values indicated by flag NaN if flag label refers to invalidated data.
|
||||||
|
if flag_obj['flag_label'][0].decode() == 'I':
|
||||||
|
t1 = pd.to_datetime(flag_obj['startdate'][0].decode(), format=flag_datetime_format)
|
||||||
|
t2 = pd.to_datetime(flag_obj['enddate'][0].decode(), format=flag_datetime_format)
|
||||||
|
|
||||||
|
t1_idx = abs(dataset_df[datetime_var]-t1).argmin()
|
||||||
|
t2_idx = abs(dataset_df[datetime_var]-t2).argmin()
|
||||||
|
|
||||||
|
dataset_df.loc[t1_idx:t2_idx,remaining_vars] = np.nan
|
||||||
|
|
||||||
|
|
||||||
|
# Apply the .strftime() method, handling NaT values by filling with an empty string or placeholder
|
||||||
|
dataset_df[datetime_var] = dataset_df[datetime_var].apply(
|
||||||
|
lambda x: x.strftime(ref_datetime_format).encode('utf-8') if not pd.isnull(x) else b'' # Handle NaT/NaN by returning empty string
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
#dt.strftime(ref_datetime_format).apply(lambda x: x.tobytes() if pd.notnull(x) else b'')
|
||||||
|
|
||||||
|
dataset_name_parts = dataset_name.split('/')
|
||||||
|
newInstFolder = '_'.join([dataset_name_parts[0],'cleaned'])
|
||||||
|
dataset_name_parts.remove(dataset_name_parts[0])
|
||||||
|
flagged_dataset_name = '/'.join(dataset_name_parts)
|
||||||
|
|
||||||
|
|
||||||
|
dataset_dict = {'attributes':{},
|
||||||
|
'name':flagged_dataset_name,
|
||||||
|
'data': utils.convert_dataframe_to_np_structured_array(dataset_df)}
|
||||||
|
|
||||||
|
dataset_dict['attributes'].update({'creation_date':utils.created_at().encode('utf-8')})
|
||||||
|
dataset_dict['attributes'].update(dataset_obj.attrs)
|
||||||
|
|
||||||
|
|
||||||
|
DataOpsManager.append_dataset(dataset_dict, newInstFolder)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self._data_ops_obj.unload_file_obj()
|
||||||
|
print(f"An unexpected error occurred: {e}."
|
||||||
|
"The file object has been properly closed.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#flag_indicator[t1_idx:t2_idx] = True
|
||||||
|
#(datetime_var_data-t1).min()
|
||||||
|
|
||||||
|
#if not instrument_name in file_obj and not flag_name in file_obj:
|
||||||
|
# raise ValueError(f'Invalid instrument_name {instrument_name} and flag_name {flag_name}. No object with such names in file {self.file_path}')
|
||||||
|
#if not f'{instrument_name}_flags':
|
||||||
|
# raise ValueError(f'There is no flags to work with. Make sure {instrument_name}_flags is created first before running this method.')
|
||||||
|
|
||||||
return fig
|
|
Reference in New Issue
Block a user