diff --git a/data_flagging_utils.py b/data_flagging_utils.py index b4b7059..7c9ae2e 100644 --- a/data_flagging_utils.py +++ b/data_flagging_utils.py @@ -5,6 +5,10 @@ import plotly.graph_objs as go import base64 import os +import numpy as np +import pandas as pd +import dima.utils.g5505_utils as utils + UPLOAD_DIRECTORY = 'data_products/' flags_dict = { @@ -35,7 +39,10 @@ def save_file(name, content): def create_loaded_file_figure(file_path, instfolder): - DataOpsAPI = h5de.HDF5DataOpsManager(file_path) + DataOpsAPI = h5de.HDF5DataOpsManager(file_path) + + if not DataOpsAPI.file_obj: + DataOpsAPI.load_file_obj() target_channels = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['names'][0].decode().split(',') target_loc = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['location'][0].decode() @@ -74,6 +81,137 @@ def create_loaded_file_figure(file_path, instfolder): fig.update_layout(height=1200, title_text="Multiple Subplots with Shared Y-Axes") - DataOpsAPI.close_file() + DataOpsAPI.unload_file_obj() + + return fig + +class FlaggingAppDataManager(): + + def __init__(self, file_path, mode = 'r+') -> None: + + self.file_path = file_path + self.mode = mode + self._data_ops_obj = None + self.file_obj = None + self.datasets_metadata_df = None + + return None + + def load_file_obj(self): + self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode) + self._data_ops_obj.load_file_obj() + self.file_obj = self._data_ops_obj.file_obj + + def unload_file_obj(self): + self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode) + self._data_ops_obj.unload_file_obj() # sets __data_ops_obj.file_obj to None + + def transfer_flags(self): + + if self.file_obj is None: + raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") + + path_to_append_dir, ext = os.path.splitext(self.file_path) + self._data_ops_obj.update_file(path_to_append_dir) + + + def apply_flags(self,instFolder): + + # TODO: apply flags to diagnostic and indivial channels. so far is all channels are cleaned + + if self.file_obj is None: + raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.") + + DataOpsManager = self._data_ops_obj + file_obj = self.file_obj + + #with h5py.File(self.file_path, mode = self.mode, track_order=True) as file_obj: + try: + + if not instFolder in file_obj: + raise ValueError(f'Invalid instrument name. Instrument folder {instFolder} was not found in file {self.file_path}.') + + if not ('diagnostic_channels' in file_obj[instFolder].attrs and 'target_channels' in file_obj[instFolder].attrs): + raise ValueError( + f'Required attributes missing. Instrument folder {instFolder} in file {self.file_path} has to be annotated with ' + 'attributes "diagnostic_channels" and "target_channels" that specify channels location and their names.' + ) + + dataset_name = file_obj[instFolder].attrs['target_channels']['location'][0].decode() + channel_names = file_obj[instFolder].attrs['target_channels']['names'][0].decode().split(',') + + dataset_obj = file_obj[dataset_name] + # TODO: maybe we can do this directly on dataset = dataset_obj[...], which is a structured numpy array, instead of wrapping that as dataframe + dataset_df = DataOpsManager.extract_dataset_as_dataframe(dataset_name) + + # Define datetime variable based on channel names. We assume by design the first entry of the list is the datetime variable name. + datetime_var = channel_names[0] + remaining_vars = channel_names.copy() + remaining_vars.remove(datetime_var) + + ref_datetime_format = dataset_obj.attrs.get(datetime_var,None)['datetime_format'][0].decode() + + #datetime_var_data = pd.Series([item.decode() for item in dataset_obj[datetime_var]]) + #datetime_var_data = pd.to_datetime(datetime_var_data , format = ref_datetime_format, errors = 'coerce') + dataset_df[datetime_var] = dataset_df[datetime_var].apply(lambda x: x.decode() ) + dataset_df[datetime_var] = pd.to_datetime(dataset_df[datetime_var], format = ref_datetime_format, errors = 'coerce') + + flag_indicator = np.zeros(shape = dataset_df[datetime_var].shape, + dtype = bool) + + # TODO: include this information as part of the flag's attributes in the flag recording process + flag_datetime_format='%Y-%m-%d %H:%M:%S.%f' + for flag in file_obj[f'{instFolder}_flags']: + flag_obj = file_obj[f'{instFolder}_flags'][flag]['data_table'] + + # Replace values indicated by flag NaN if flag label refers to invalidated data. + if flag_obj['flag_label'][0].decode() == 'I': + t1 = pd.to_datetime(flag_obj['startdate'][0].decode(), format=flag_datetime_format) + t2 = pd.to_datetime(flag_obj['enddate'][0].decode(), format=flag_datetime_format) + + t1_idx = abs(dataset_df[datetime_var]-t1).argmin() + t2_idx = abs(dataset_df[datetime_var]-t2).argmin() + + dataset_df.loc[t1_idx:t2_idx,remaining_vars] = np.nan + + + # Apply the .strftime() method, handling NaT values by filling with an empty string or placeholder + dataset_df[datetime_var] = dataset_df[datetime_var].apply( + lambda x: x.strftime(ref_datetime_format).encode('utf-8') if not pd.isnull(x) else b'' # Handle NaT/NaN by returning empty string + ) + + + #dt.strftime(ref_datetime_format).apply(lambda x: x.tobytes() if pd.notnull(x) else b'') + + dataset_name_parts = dataset_name.split('/') + newInstFolder = '_'.join([dataset_name_parts[0],'cleaned']) + dataset_name_parts.remove(dataset_name_parts[0]) + flagged_dataset_name = '/'.join(dataset_name_parts) + + + dataset_dict = {'attributes':{}, + 'name':flagged_dataset_name, + 'data': utils.convert_dataframe_to_np_structured_array(dataset_df)} + + dataset_dict['attributes'].update({'creation_date':utils.created_at().encode('utf-8')}) + dataset_dict['attributes'].update(dataset_obj.attrs) + + + DataOpsManager.append_dataset(dataset_dict, newInstFolder) + + except Exception as e: + self._data_ops_obj.unload_file_obj() + print(f"An unexpected error occurred: {e}." + "The file object has been properly closed.") + + + + + #flag_indicator[t1_idx:t2_idx] = True + #(datetime_var_data-t1).min() + + #if not instrument_name in file_obj and not flag_name in file_obj: + # raise ValueError(f'Invalid instrument_name {instrument_name} and flag_name {flag_name}. No object with such names in file {self.file_path}') + #if not f'{instrument_name}_flags': + # raise ValueError(f'There is no flags to work with. Make sure {instrument_name}_flags is created first before running this method.') - return fig \ No newline at end of file