mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-24 21:21:08 +02:00
Move app scripts to app folder. This is to improve modularity of the app
This commit is contained in:
808
app/data_flagging_app.py
Normal file
808
app/data_flagging_app.py
Normal file
@ -0,0 +1,808 @@
|
||||
import sys, os
|
||||
|
||||
try:
|
||||
thisFilePath = os.path.abspath(__file__)
|
||||
print(thisFilePath)
|
||||
except NameError:
|
||||
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
|
||||
print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)")
|
||||
#print("Otherwise, path to submodule DIMA may not be resolved properly.")
|
||||
thisFilePath = os.getcwd() # Use current directory or specify a default
|
||||
|
||||
dimaPath = os.path.normpath(os.path.join(thisFilePath, "..",'..')) # Move up to project root
|
||||
|
||||
print(dimaPath)
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
import base64
|
||||
import dash
|
||||
import io
|
||||
|
||||
# Set up project root directory
|
||||
root_dir = os.path.abspath(os.curdir)
|
||||
sys.path.append(root_dir)
|
||||
sys.path.append(os.path.join(root_dir,'dima'))
|
||||
|
||||
|
||||
import data_flagging_utils as data_flagging_utils
|
||||
|
||||
from dash import Dash, html, dcc, callback, Output, Input, State, dash_table
|
||||
import plotly.graph_objs as go
|
||||
from plotly.subplots import make_subplots
|
||||
import dash_bootstrap_components as dbc
|
||||
import json
|
||||
|
||||
import dima.src.hdf5_ops as hdf5_ops
|
||||
#import dima.instruments.readers.filereader_registry as filereader_registry
|
||||
#import instruments_.readers.flag_reader as flag_reader
|
||||
|
||||
#filereader_registry.file_extensions.append('.json')
|
||||
#filereader_registry.file_readers.update({'ACSM_TOFWARE_flags_json' : lambda x: flag_reader.read_jsonflag_as_dict(x)})
|
||||
|
||||
import threading
|
||||
import webbrowser
|
||||
from time import sleep
|
||||
|
||||
EnableVisCheckbox = dbc.Col(dbc.Row([dbc.Col(dcc.Checklist(
|
||||
id='enable-flag-checkbox',
|
||||
options=[{'label': html.Span('Enable Flag Visualization', style={'font-size': 15, 'padding-left': 10}), 'value': True}],
|
||||
value=[],
|
||||
inline=True),width=6),
|
||||
dbc.Col(dbc.Button("Load Flags", id='load-flags-button', color='primary'),width=4)],
|
||||
justify="center", align="center"),
|
||||
width=12)
|
||||
|
||||
FlagVisTable = html.Div(dash_table.DataTable(data=[],
|
||||
columns=[{"name": i, "id": i} for i in ['id','startdate','enddate','flag_description','parent_ch_pos','parent_channel']],
|
||||
id='tbl',
|
||||
style_header={'textAlign': 'center'},
|
||||
fixed_rows={'headers': True}, # Fixed table headers
|
||||
style_table={'height': '1000px'}, # Make table scrollable
|
||||
style_cell={'textAlign': 'left', 'padding': '10px'}, # Cell styling
|
||||
),
|
||||
style={
|
||||
'background-color': '#f0f0f0', # Background color for the table
|
||||
#'height': '1000px', # Match the table's height
|
||||
'padding': '5px', # Optional padding around the table
|
||||
'border': '1px solid #ccc', # Optional border around the background
|
||||
} )
|
||||
|
||||
ReviewOpsPannel = dbc.Col([
|
||||
# Row 1
|
||||
dbc.Row([html.H2("Flagging workflow pannel", style={'font-size': 20})]),
|
||||
|
||||
|
||||
# Row 2
|
||||
dbc.Row([
|
||||
#dbc.Col(html.Div("Review Status"), width=6),
|
||||
dcc.Checklist(
|
||||
id='flag-review-status-checklist',
|
||||
options=[
|
||||
{'label': [html.Span("Verify Flags", style={'font-size': 15, 'padding-left': 2})], 'value': 'will review'},
|
||||
{'label': [html.Span("Ready to Record Flags", style={'font-size': 15, 'padding-left': 2})], 'value': 'will transfer'},
|
||||
{'label': [html.Span("Finalize Flagging", style={'font-size': 15, 'padding-left': 2})], 'value': 'will apply'}
|
||||
],
|
||||
value=[],
|
||||
#inline=True,
|
||||
style={
|
||||
"display": "flex", # Flexbox for left alignment
|
||||
"flexDirection": "column", # Arrange the items vertically
|
||||
"alignItems": "flex-start" # Align the items to the left
|
||||
}
|
||||
),
|
||||
]),
|
||||
|
||||
# Row 3
|
||||
dbc.Row([
|
||||
#dbc.Col(dbc.Button("Load Flags", id='button-1', color='primary'),width=4),
|
||||
dbc.Col(dbc.Button("Delete Flag", id='delete-flag-button', color='primary'),width=4),
|
||||
dbc.Col(dbc.Button("Record Flags", id='button-2', color='primary'),width=4),
|
||||
dbc.Col(dbc.Button("Apply Flags", id='button-3', color='primary'),width=4)],
|
||||
justify="center", align="center"),
|
||||
|
||||
# Row 4
|
||||
#dbc.Row([
|
||||
# dbc.Col(html.Div("Apply Flags"), width=6),
|
||||
# dbc.Col(dbc.Button("Button 2", id='button-2', color='secondary'), width=6),
|
||||
#]),
|
||||
],width=12)
|
||||
|
||||
# Initialize Dash app with Bootstrap theme
|
||||
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
|
||||
|
||||
#df = pd.DataFrame.empty()
|
||||
|
||||
app.layout = dbc.Container([
|
||||
html.Div(children=[
|
||||
html.Div(children=[
|
||||
html.H1('QC/QA Data Flagging App'),
|
||||
html.H6('All measurements are assumed valid unless checked otherwise.')
|
||||
]
|
||||
)],style={'textAlign': 'center'}),
|
||||
dbc.Row([
|
||||
|
||||
dbc.Col([
|
||||
dcc.Upload(
|
||||
id='upload-image',
|
||||
children=html.Div(['Drag and Drop or ',html.A('Select Files')]),
|
||||
style={
|
||||
'fontSize': "16px",
|
||||
'width': '100%',
|
||||
'height': '60px',
|
||||
'lineHeight': '60px',
|
||||
'borderWidth': '1px',
|
||||
'borderStyle': 'dashed',
|
||||
'borderRadius': '5px',
|
||||
'textAlign': 'center',
|
||||
'margin': '10px'
|
||||
}),
|
||||
dcc.Dropdown(
|
||||
id='flag-options',
|
||||
options= data_flagging_utils.filter_flags_by_label(data_flagging_utils.flags_dict,'I'), # displays only flags to invalidate
|
||||
)],
|
||||
width=12
|
||||
),
|
||||
#],justify="center", align="center"),
|
||||
|
||||
#dbc.Row([
|
||||
dbc.Col([dbc.Button('Create Flag', id='flag-button', color="primary", className="mt-2")],width=2),
|
||||
dbc.Col([dbc.Button('Reset Flag', id='reset-flag-button', color="secondary", className="mt-2")],width=2),
|
||||
dbc.Col([dbc.Button('Commit Flag', id='commit-flag-button', color="secondary", className="mt-2")],width=2)
|
||||
], justify="center", align="center",style={'background-color': '#f8f9fa', 'padding': '20px', 'text-align': 'center'}),
|
||||
|
||||
dbc.Row([
|
||||
html.H3("Instrument Dashboard"),
|
||||
|
||||
# First Dropdown (Instrument Folders)
|
||||
dcc.Dropdown(
|
||||
id="instrument-dropdown",
|
||||
options=[{"label": i, "value": i} for i in []],
|
||||
placeholder="Select an Instrument Folder",
|
||||
),
|
||||
|
||||
# Spinner wrapping the second and third dropdowns
|
||||
dcc.Loading(
|
||||
type="circle", # Spinner style
|
||||
children=[
|
||||
# Second Dropdown (Files)
|
||||
dcc.Dropdown(
|
||||
id="file-dropdown",
|
||||
placeholder="Select a File",
|
||||
disabled=True # Initially disabled
|
||||
),
|
||||
|
||||
# Third Dropdown (Sub-selection)
|
||||
dcc.Dropdown(
|
||||
id="sub-dropdown",
|
||||
placeholder="Select Variables",
|
||||
multi = True,
|
||||
disabled=True
|
||||
)
|
||||
]
|
||||
)
|
||||
],
|
||||
justify="center", align="center",style={'background-color': '#f8f9fa', 'padding': '20px', 'text-align': 'center'}),
|
||||
|
||||
dbc.Row([
|
||||
dbc.Col([
|
||||
html.Div([
|
||||
html.Div(id='flag-mode-title', style={'whiteSpace': 'pre-line'}),
|
||||
dcc.Loading(
|
||||
type="circle", # Spinner style
|
||||
children=[
|
||||
dcc.Graph(id='timeseries-plot',
|
||||
style={'height': '1200px','width' : '100%'})])
|
||||
],
|
||||
style={'height': '1000px', 'overflowY': 'auto'})
|
||||
],
|
||||
width=8,
|
||||
style={'background-color': '#e9ecef', 'padding': '20px', 'text-align': 'center','height': '1000px'}),
|
||||
#dbc.Col([html.Div(id='flag-record', style={'whiteSpace': 'pre-line'})], width=4), #config={'modeBarButtons': True,
|
||||
#'modeBarButtonsToAdd':['select2d','lasso2d'],
|
||||
#'modeBarButtonsToRemove': ['zoom', 'pan']}),], width=12)
|
||||
dbc.Col([
|
||||
html.Div([
|
||||
EnableVisCheckbox,
|
||||
FlagVisTable,
|
||||
ReviewOpsPannel,
|
||||
],
|
||||
style={'height': '1000px','overflowY': 'auto'}), # Set a fixed height for the div
|
||||
],
|
||||
|
||||
width=4,
|
||||
style={'background-color': '#dee2e6', 'padding': '20px', 'text-align': 'center','height': '1000px'},)
|
||||
|
||||
],justify="center", align="center"),
|
||||
|
||||
dbc.Row([ # row 3
|
||||
dbc.Col([
|
||||
dcc.Store(id='memory-output'),
|
||||
html.Div(id='textarea-example-output', style={'whiteSpace': 'pre-line'})
|
||||
], width=12)
|
||||
],justify="center", align="center"),
|
||||
],
|
||||
)
|
||||
|
||||
#@app.callback()
|
||||
|
||||
@app.callback(
|
||||
Output('memory-output','data', allow_duplicate=True),
|
||||
Output("instrument-dropdown", "options"),
|
||||
Output("instrument-dropdown", "disabled"),
|
||||
[Input('upload-image','filename'),
|
||||
Input('upload-image','contents')],
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def load_data(filename, contents):
|
||||
data = {'data_loaded_flag': False}
|
||||
if filename and contents and filename.endswith('.h5'):
|
||||
|
||||
try:
|
||||
path_to_file = data_flagging_utils.save_file(filename,contents)
|
||||
|
||||
DataOps = hdf5_ops.HDF5DataOpsManager(path_to_file)
|
||||
DataOps.load_file_obj()
|
||||
|
||||
#content_type, content_string = contents.split(',')
|
||||
#decoded = base64.b64decode(content_string)
|
||||
#file_path = io.BytesIO(decoded)
|
||||
DataOps.extract_and_load_dataset_metadata()
|
||||
df = DataOps.dataset_metadata_df.copy()
|
||||
DataOps.unload_file_obj()
|
||||
|
||||
# TODO: allow selection of instrument folder
|
||||
|
||||
instrument_list = [{"label": instFolder, "value": instFolder} for instFolder in df['parent_instrument'].unique()]
|
||||
|
||||
# Create list of file names in dict format for the first instFolder
|
||||
instFolderName = df['parent_instrument'].unique()[0]
|
||||
instFolderFileList = list(df.loc[df['parent_instrument']==instFolderName,'parent_file'].to_numpy())
|
||||
|
||||
#file_list = [{"label": fileName, "value": fileName} for fileName in child_files]
|
||||
|
||||
#fig, channel_names = data_flagging_utils.create_loaded_file_figure(path_to_file, instfolder)
|
||||
|
||||
data['data_loaded_flag'] = True
|
||||
data['path_to_uploaded_file'] = path_to_file
|
||||
data['dataset_metadata_table'] = {}# df.to_dict()
|
||||
data[instFolderName] = instFolderFileList
|
||||
|
||||
|
||||
data['instFolder'] = instFolderName
|
||||
#data['channel_names'] = channel_names
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return data, instrument_list, False
|
||||
|
||||
except Exception as e:
|
||||
|
||||
DataOps.unload_file_obj()
|
||||
print(f"Error processing file: {e}")
|
||||
return data, [], False
|
||||
|
||||
|
||||
return data, [], False
|
||||
|
||||
@app.callback(
|
||||
Output("file-dropdown", "options"),
|
||||
Output("file-dropdown", "disabled"),
|
||||
Input("instrument-dropdown", "value"),
|
||||
State('memory-output','data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def update_file_dropdown(instFolderName, data):
|
||||
|
||||
|
||||
# Verify if dataset_metadata from uploaded HDF5 file was loaded correctly
|
||||
if not all([instFolderName, data]):
|
||||
return [], False
|
||||
|
||||
if not 'dataset_metadata_table' in data.keys():
|
||||
return [], False
|
||||
|
||||
|
||||
file_list = []
|
||||
# Get files in instFolder
|
||||
instFolderFileList = data.get(instFolderName,[])
|
||||
|
||||
# Otherwise, if there is no precomputed file list associated with a instFolder, compute that from dataset_metadata
|
||||
if instFolderFileList:
|
||||
file_list = [{"label": fileName, "value": fileName} for fileName in instFolderFileList]
|
||||
else:
|
||||
path_to_file = data['path_to_uploaded_file']
|
||||
DataOps = hdf5_ops.HDF5DataOpsManager(path_to_file)
|
||||
DataOps.load_file_obj()
|
||||
|
||||
#content_type, content_string = contents.split(',')
|
||||
#decoded = base64.b64decode(content_string)
|
||||
#file_path = io.BytesIO(decoded)
|
||||
DataOps.extract_and_load_dataset_metadata()
|
||||
tmp = DataOps.dataset_metadata_df.copy()
|
||||
DataOps.unload_file_obj()
|
||||
|
||||
instFolderFileList = tmp.loc[tmp['parent_instrument']==instFolderName,'parent_file'].to_numpy()
|
||||
file_list = [{"label": fileName, "value": fileName} for fileName in instFolderFileList]
|
||||
|
||||
return file_list, False
|
||||
|
||||
@app.callback(
|
||||
Output("sub-dropdown", "options"),
|
||||
Output("sub-dropdown", "disabled"),
|
||||
Output("sub-dropdown", "value"),
|
||||
Input("instrument-dropdown", "value"),
|
||||
Input("file-dropdown", "value"),
|
||||
State('memory-output','data'),
|
||||
prevent_initial_call=True,
|
||||
)
|
||||
def update_variable_dropdown(instFolderName, fileName, data):
|
||||
|
||||
|
||||
# Verify if dataset_metadata from uploaded HDF5 file was loaded correctly
|
||||
#if not isinstance(data,dict):
|
||||
# return [], False
|
||||
|
||||
if not all([instFolderName, fileName, data]):
|
||||
return [], False, []
|
||||
|
||||
|
||||
#file_list = []
|
||||
# Get files in instFolder
|
||||
#instFolderFileList = data.get(instFolderName,[])
|
||||
|
||||
# Otherwise, if there is no precomputed file list associated with a instFolder, compute that from dataset_metadata
|
||||
try:
|
||||
path_to_file = data['path_to_uploaded_file']
|
||||
DataOps = hdf5_ops.HDF5DataOpsManager(path_to_file)
|
||||
DataOps.load_file_obj()
|
||||
|
||||
dataset_name = '/'.join([instFolderName,fileName,'data_table'])
|
||||
# Get attributes for data table
|
||||
datetime_var, datetime_var_format = DataOps.infer_datetime_variable(dataset_name)
|
||||
metadata_dict = DataOps.get_metadata(dataset_name)
|
||||
|
||||
#content_type, content_string = contents.split(',')
|
||||
#decoded = base64.b64decode(content_string)
|
||||
#file_path = io.BytesIO(decoded)
|
||||
#DataOps.extract_and_load_dataset_metadata()
|
||||
#tmp = DataOps.dataset_metadata_df.copy()
|
||||
#DataOps.unload_file_obj()
|
||||
|
||||
|
||||
|
||||
#instFolderFileList = tmp.loc[tmp['parent_instrument']==instFolderName,'parent_file'].to_numpy()
|
||||
variableList = []
|
||||
for var_name in metadata_dict.keys():
|
||||
if var_name != datetime_var:
|
||||
variableList.append(var_name)
|
||||
|
||||
DataOps.unload_file_obj()
|
||||
except Exception as e:
|
||||
DataOps.unload_file_obj()
|
||||
print(f"Error processing dataset_name: {e}")
|
||||
return [], False, []
|
||||
|
||||
return [{"label": var_name, "value": var_name} for var_name in variableList] , False, variableList
|
||||
|
||||
@app.callback(
|
||||
Output('timeseries-plot', 'figure'),
|
||||
Output('memory-output','data'),
|
||||
Input('instrument-dropdown', 'value'),
|
||||
Input('file-dropdown', 'value'),
|
||||
Input('sub-dropdown', 'value'),
|
||||
Input('memory-output', 'data'),
|
||||
prevent_initial_call=True
|
||||
)
|
||||
def update_figure(instFolderName, fileName, variableList, data):
|
||||
# Check if any input is None or empty
|
||||
if not all([instFolderName, fileName, variableList, data]):
|
||||
return go.Figure(), dash.no_update # Return an empty figure to prevent crashes
|
||||
|
||||
path_to_file = data.get('path_to_uploaded_file')
|
||||
if not path_to_file:
|
||||
return go.Figure(), dash.no_update
|
||||
|
||||
DataOps = hdf5_ops.HDF5DataOpsManager(path_to_file)
|
||||
DataOps.load_file_obj()
|
||||
dataset_name = '/'.join([instFolderName, fileName, 'data_table'])
|
||||
|
||||
# Get attributes for data table
|
||||
datetime_var, datetime_var_format = DataOps.infer_datetime_variable(dataset_name)
|
||||
DataOps.unload_file_obj()
|
||||
|
||||
fig, channel_names = data_flagging_utils.create_loaded_file_figure(
|
||||
path_to_file, instFolderName, dataset_name, datetime_var, datetime_var_format, variableList
|
||||
)
|
||||
data['channel_names'] = channel_names
|
||||
return fig, data
|
||||
|
||||
|
||||
"""@app.callback(
|
||||
Output('memory-output','data'),
|
||||
Output('timeseries-plot', 'figure'),
|
||||
Output("instrument-dropdown", "options"),
|
||||
Output("instrument-dropdown", "disabled"),
|
||||
[Input('upload-image','filename')],
|
||||
[Input('upload-image','contents')]
|
||||
)
|
||||
def load_data(filename, contents):
|
||||
data = {'data_loaded_flag': False}
|
||||
if filename and contents and filename.endswith('.h5'):
|
||||
|
||||
try:
|
||||
path_to_file = data_flagging_utils.save_file(filename,contents)
|
||||
|
||||
DataOps = hdf5_ops.HDF5DataOpsManager(path_to_file)
|
||||
DataOps.load_file_obj()
|
||||
|
||||
#content_type, content_string = contents.split(',')
|
||||
#decoded = base64.b64decode(content_string)
|
||||
#file_path = io.BytesIO(decoded)
|
||||
DataOps.extract_and_load_dataset_metadata()
|
||||
df = DataOps.dataset_metadata_df.copy()
|
||||
# TODO: allow selection of instrument folder
|
||||
instfolder = df['parent_instrument'].unique()[0]
|
||||
instrument_list = [{"label": instFolder, "value": instFolder} for instFolder in df['parent_instrument'].unique()]
|
||||
|
||||
#fig, channel_names = data_flagging_utils.create_loaded_file_figure(path_to_file, instfolder)
|
||||
|
||||
data['data_loaded_flag'] = True
|
||||
data['path_to_uploaded_file'] = path_to_file
|
||||
data['instfolder'] = instfolder
|
||||
#data['channel_names'] = channel_names
|
||||
|
||||
DataOps.unload_file_obj()
|
||||
|
||||
|
||||
|
||||
return data, dash.no_update, instrument_list, False
|
||||
|
||||
except Exception as e:
|
||||
|
||||
DataOps.unload_file_obj()
|
||||
print(f"Error processing file: {e}")
|
||||
return data, dash.no_update, instrument_list, False
|
||||
|
||||
|
||||
return data, dash.no_update, [], False"""
|
||||
|
||||
@app.callback(
|
||||
Output('timeseries-plot', 'figure', allow_duplicate=True),
|
||||
Output('flag-mode-title','children'),
|
||||
Input('flag-button', 'n_clicks'),
|
||||
State('timeseries-plot', 'figure'),
|
||||
State('memory-output', 'data'),
|
||||
prevent_initial_call=True,
|
||||
)
|
||||
def create_flag(n_clicks, fig, data):
|
||||
#if not data or not data.get('data_loaded_flag', False):
|
||||
|
||||
if not all([n_clicks, fig, data]):
|
||||
return dash.no_update, dash.no_update
|
||||
|
||||
fig['layout'].update({'dragmode' : 'select',
|
||||
'activeselection' : dict(fillcolor='yellow'),
|
||||
'doubleClick' : 'reset'
|
||||
})
|
||||
|
||||
#fig['layout'].update({'title':"Flagging Mode Enabled: Select ROI to Define Flagging Interval."})
|
||||
|
||||
#value = '{} amigos'.format(n_clicks)
|
||||
title = "Flagging Mode Enabled: Select ROI to Define Flagging Interval."
|
||||
return fig, title
|
||||
#return fig
|
||||
|
||||
#@app.callback(
|
||||
# Output('timeseries-plot', 'figure', allow_duplicate=True),
|
||||
# Output('timeseries-plot', 'selectedData', allow_duplicate=True),
|
||||
# #Output('textarea-example-output','children'),
|
||||
# Input('reset-flag-button', 'n_clicks'),
|
||||
# State('timeseries-plot', 'figure'),
|
||||
# #State('memory-output', 'data'),
|
||||
# prevent_initial_call=True
|
||||
#)
|
||||
#def clear_flag(n_clicks, fig):
|
||||
#if not data or not data.get('data_loaded_flag', False):
|
||||
# return dash.no_update, dash.no_update
|
||||
|
||||
# fig['layout'].update({'dragmode': 'zoom', 'activeselection': None})
|
||||
#fig.update_layout()
|
||||
#update_layout(dragmode='select', activeselection=dict(fillcolor='yellow'))
|
||||
|
||||
#shapes = []
|
||||
#if relayoutData and 'xaxis.range[0]' in relayoutData:
|
||||
# start = relayoutData['xaxis.range[0]']
|
||||
# end = relayoutData['xaxis.range[1]']
|
||||
#else:
|
||||
# start, end = None, None
|
||||
|
||||
#if start and end:
|
||||
# shapes.append({
|
||||
# 'type': 'rect',
|
||||
# 'xref': 'x',
|
||||
# 'yref': 'paper',
|
||||
# 'x0': start,
|
||||
# 'y0': 0,
|
||||
# 'x1': end,
|
||||
# 'y1': 1,
|
||||
# 'fillcolor': 'rgba(128, 0, 128, 0.3)',
|
||||
# 'line': {'width': 0}
|
||||
# })
|
||||
# fig['layout'].update(shapes=shapes)
|
||||
|
||||
#value = '{} amigos'.format(n_clicks)
|
||||
# return fig, None #, f'You have entered: \n{value}'
|
||||
|
||||
@app.callback(
|
||||
[Output('timeseries-plot', 'selectedData'),
|
||||
Output('timeseries-plot', 'figure', allow_duplicate=True),
|
||||
Output('flag-mode-title', 'children',allow_duplicate=True)],
|
||||
[Input('reset-flag-button', 'n_clicks'),
|
||||
State('timeseries-plot', 'figure'),
|
||||
State('memory-output', 'data')],
|
||||
prevent_initial_call = True)
|
||||
def clear_flag(n_clicks, fig, data):
|
||||
|
||||
if n_clicks > 0 and data.get('data_loaded_flag', False):
|
||||
# Clear selection
|
||||
selected_data = None
|
||||
fig['layout'].update({'dragmode': 'zoom', 'activeselection': None,
|
||||
'selections':{'line': None}})
|
||||
instFolder =data['instFolder']
|
||||
fig['layout'].update({'title': f'{instFolder}: Target and Diagnostic Channels'})
|
||||
flagging_mode_message = ''
|
||||
return selected_data, fig, flagging_mode_message
|
||||
else:
|
||||
return dash.no_update, dash.no_update, dash.no_update
|
||||
|
||||
@app.callback(
|
||||
[Output('timeseries-plot', 'figure', allow_duplicate=True),
|
||||
Output('timeseries-plot', 'selectedData',allow_duplicate=True),
|
||||
Output('flag-mode-title', 'children',allow_duplicate=True)],
|
||||
[Input('timeseries-plot', 'relayoutData'),
|
||||
State('timeseries-plot', 'figure'),
|
||||
State('memory-output', 'data')],
|
||||
prevent_initial_call = True)
|
||||
def clear_flag_mode_title(relayoutData, fig, data):
|
||||
if not all([relayoutData, fig, data]):
|
||||
return dash.no_update, dash.no_update, dash.no_update
|
||||
|
||||
if data.get('data_loaded_flag', False) and not fig['layout'].get('dragmode',None) == 'select':
|
||||
# Clear selection
|
||||
selected_data = None
|
||||
fig['layout'].update({'dragmode': 'zoom', 'activeselection': None,
|
||||
'selections':{'line': None}})
|
||||
#instFolder =data['instfolder']
|
||||
#fig['layout'].update({'title': f'{instFolder}: Target and Diagnostic Channels'})
|
||||
flagging_mode_message = ''
|
||||
return fig, selected_data, flagging_mode_message
|
||||
else:
|
||||
return dash.no_update, dash.no_update, dash.no_update
|
||||
|
||||
def extract_number(s):
|
||||
return int(s[1:])-1 if s[1:].isdigit() else 0
|
||||
|
||||
@callback(Output('tbl', 'data'),
|
||||
Input('commit-flag-button','n_clicks'),
|
||||
State('flag-options','value'),
|
||||
State('timeseries-plot','selectedData'),
|
||||
State('memory-output', 'data'),
|
||||
prevent_initial_call=True)
|
||||
def commit_flag(n_clicks,flag_value,selected_Data, data):
|
||||
|
||||
value = selected_Data
|
||||
if (selected_Data is None) and (not isinstance(selected_Data,dict)):
|
||||
return []
|
||||
elif not selected_Data.get('range',[]): # verify if there is a flag's time interval to commit
|
||||
return []
|
||||
|
||||
# TODO: modify the name path/to/name to reflect the directory provenance
|
||||
instFolder = data['instFolder']
|
||||
filePath = data['path_to_uploaded_file']
|
||||
|
||||
flagfolderpath = os.path.join(os.path.splitext(data['path_to_uploaded_file'])[0],f'{instFolder}_flags')
|
||||
|
||||
if not os.path.isdir(flagfolderpath):
|
||||
os.makedirs(flagfolderpath)
|
||||
|
||||
#dirlist = os.listdir(flagfolderpath)
|
||||
# Get all files in the directory with their full paths
|
||||
files = [os.path.join(flagfolderpath, f) for f in os.listdir(flagfolderpath)]
|
||||
|
||||
# Sort files by creation time
|
||||
dirlist_sorted_by_creation = sorted(files, key=os.path.getctime)
|
||||
|
||||
#dirlist = dirlist.sort(key=lambda x: int(x.split('_')[1].split('.')[0]))
|
||||
|
||||
display_flag_registry = False
|
||||
if not display_flag_registry:
|
||||
tableData = []
|
||||
else:
|
||||
tableData = data_flagging_utils.load_flags(filePath, instFolder)
|
||||
|
||||
#tableData = []
|
||||
#for pathtofile in dirlist_sorted_by_creation:
|
||||
# if '.json' in pathtofile:
|
||||
# with open(pathtofile,'r') as f:
|
||||
# tableData.append(json.load(f))
|
||||
|
||||
number_of_existing_flags = len(dirlist_sorted_by_creation)
|
||||
flagid = number_of_existing_flags+1
|
||||
flag_filename = os.path.join(flagfolderpath,f'flag_{flagid}.json')
|
||||
|
||||
#if not os.path.exists(flag_filename):
|
||||
# with open(flag_filename,'r') as open_flagsfile:
|
||||
# json_flagsobject = json.load(open_flagsfile)
|
||||
# data = [json_flagsobject[key] for key in json_flagsobject.keys()]
|
||||
|
||||
|
||||
#return f'You have entered: \n{value}'
|
||||
channel_names = data.get('channel_names', [])
|
||||
for key, value in selected_Data['range'].items():
|
||||
if 'x' in key:
|
||||
new_row = {'id':flagid,'startdate':value[0],'enddate':value[1],'flag_code': flag_value}
|
||||
new_row.update(data_flagging_utils.flags_dict.get(flag_value,{}))
|
||||
if channel_names:
|
||||
channel_pos = extract_number(key)
|
||||
parent_channel, parent_dataset = tuple(channel_names[channel_pos].split(','))
|
||||
new_row.update({'parent_ch_pos': str(channel_pos), 'parent_channel':parent_channel, 'parent_dataset': parent_dataset})
|
||||
|
||||
tableData.append(new_row)
|
||||
#data = [{'startdate':value[0],'enddate':value[1],'value':90}]
|
||||
|
||||
|
||||
if not os.path.exists(flag_filename):
|
||||
with open(flag_filename,'w') as flagsfile:
|
||||
#json_flagsobject = json.dump({'row'+str(len(data)): new_row}, flagsfile)
|
||||
json.dump(new_row, flagsfile)
|
||||
#else:
|
||||
# with open(flag_filename,'a') as flagsfile:
|
||||
# json.dump(new_row, flagsfile)
|
||||
#json.dump({'row'+str(len(data)): new_row}, flagsfile)
|
||||
#data = [json_flagsobject[key] for key in json_flagsobject.keys()]
|
||||
|
||||
return tableData
|
||||
|
||||
#@callback(Output('memory-output','data',allow_duplicate=True),
|
||||
# [Input('enable-flag-checkbox', 'value'), State('memory-output','data')],
|
||||
# prevent_initial_call=True)
|
||||
#[Input('tbl','active_cell'), Input('enable-flag-checkbox', 'value') State('timeseries-plot', 'figure'), State('tbl','data')],)
|
||||
#def enable_flag_visualization(value, memory):
|
||||
# if isinstance(memory,dict):
|
||||
# memory.update({'vis_enabled' : value})
|
||||
|
||||
# return memory
|
||||
|
||||
# return dash.no_update
|
||||
|
||||
@callback(Output('timeseries-plot', 'figure',allow_duplicate=True),
|
||||
[Input('enable-flag-checkbox', 'value'), State('timeseries-plot', 'figure')],
|
||||
prevent_initial_call = True)
|
||||
def clear_flags_from_figure(value, figure):
|
||||
|
||||
vis_enabled = value[0] if value and isinstance(value, list) else False
|
||||
|
||||
if not vis_enabled and figure:
|
||||
shapes = figure.get('layout', {}).get('shapes', [])
|
||||
|
||||
if shapes: # If there are shapes in the figure, clear them
|
||||
new_figure = figure.copy() # Create a copy to avoid mutation
|
||||
new_figure['layout']['shapes'] = []
|
||||
return new_figure
|
||||
|
||||
return dash.no_update
|
||||
|
||||
|
||||
@callback(Output('timeseries-plot', 'figure',allow_duplicate=True),
|
||||
[Input('tbl','active_cell'),
|
||||
State('enable-flag-checkbox', 'value'), State('timeseries-plot', 'figure'), State('tbl','data')],
|
||||
prevent_initial_call = True)
|
||||
def visualize_flag_on_figure(active_cell, value, figure, data):
|
||||
|
||||
if value:
|
||||
vis_enabled = value[0]
|
||||
else:
|
||||
vis_enabled = False
|
||||
|
||||
|
||||
if active_cell and vis_enabled:
|
||||
row = active_cell['row']
|
||||
startdate = data[row]['startdate']
|
||||
enddate = data[row]['enddate']
|
||||
parent_ch_pos = data[row].get('parent_ch_pos',None)
|
||||
|
||||
if parent_ch_pos != None:
|
||||
# Ensure that startdate and enddate are parsed correctly
|
||||
#startdate = pd.to_datetime(startdate)
|
||||
#enddate = pd.to_datetime(enddate)
|
||||
|
||||
# Determine y-axis range directly from layout
|
||||
yaxis_key = f"yaxis{int(parent_ch_pos) + 1}" if int(parent_ch_pos) > 0 else "yaxis"
|
||||
xaxis_key = f"xaxis{int(parent_ch_pos) + 1}" if int(parent_ch_pos) > 0 else "xaxis"
|
||||
#y_min = figure['layout'].get(yaxis_key, {}).get('range', [0, 1])[0]
|
||||
#y_max = figure['layout'].get(yaxis_key, {}).get('range', [0, 1])[1]
|
||||
|
||||
# Add a vertical region to the specified subplot
|
||||
figure['layout']['shapes'] = figure['layout'].get('shapes', []) + [
|
||||
dict(
|
||||
type="rect",
|
||||
xref=xaxis_key.replace('axis', ''),
|
||||
yref=yaxis_key.replace('axis', ''),
|
||||
x0=startdate,
|
||||
x1=enddate,
|
||||
y0=figure['layout'][yaxis_key]['range'][0],
|
||||
y1=figure['layout'][yaxis_key]['range'][1],
|
||||
line=dict(color="rgba(50, 171, 96, 1)", width=2),
|
||||
fillcolor="rgba(50, 171, 96, 0.3)",
|
||||
)
|
||||
]
|
||||
return figure
|
||||
|
||||
return dash.no_update
|
||||
|
||||
@callback(Output('tbl', 'data',allow_duplicate=True),
|
||||
[Input('load-flags-button','n_clicks'),State('enable-flag-checkbox', 'value'),State('memory-output', 'data')],
|
||||
prevent_initial_call = True)
|
||||
def visualize_flags_on_table(n_clicks,value,memoryData):
|
||||
|
||||
|
||||
instFolder = memoryData.get('instfolder', '')
|
||||
filePath = memoryData.get('path_to_uploaded_file', '')
|
||||
|
||||
#flagfolderpath = os.path.join(os.path.splitext(memoryData['path_to_uploaded_file'])[0],f'{instfolder}_flags')
|
||||
|
||||
if not filePath:
|
||||
return dash.no_update
|
||||
|
||||
|
||||
|
||||
|
||||
#flagfolderpath = os.path.join(os.path.splitext(memoryData['path_to_uploaded_file'])[0],f'{instfolder}_flags')
|
||||
## Return no table update if there is no flags folder
|
||||
#if not os.path.exists(flagfolderpath):
|
||||
# return dash.no_update
|
||||
|
||||
#files = [os.path.join(flagfolderpath, f) for f in os.listdir(flagfolderpath)]
|
||||
|
||||
vis_enabled = value[0] if value and isinstance(value, list) else False
|
||||
|
||||
if n_clicks > 0 and vis_enabled: # and len(files) > 0:
|
||||
|
||||
tableData = data_flagging_utils.load_flags(filePath, instFolder)
|
||||
|
||||
if not tableData:
|
||||
return dash.no_update
|
||||
else:
|
||||
return tableData
|
||||
|
||||
# # Sort files by creation time
|
||||
# dirlist_sorted_by_creation = sorted(files, key=os.path.getctime)
|
||||
# tableData = []
|
||||
# for pathtofile in dirlist_sorted_by_creation:
|
||||
# if '.json' in pathtofile:
|
||||
# try:
|
||||
# with open(pathtofile,'r') as f:
|
||||
# tableData.append(json.load(f))
|
||||
# except (json.JSONDecodeError, FileNotFoundError) as e:
|
||||
# print(e)
|
||||
# continue # Skip invalid or missing files
|
||||
|
||||
# return tableData
|
||||
|
||||
return dash.no_update
|
||||
|
||||
|
||||
def open_browser():
|
||||
"""Wait for the server to start, then open the browser."""
|
||||
sleep(1) # Wait briefly to ensure the server is starting
|
||||
webbrowser.open_new("http://127.0.0.1:8050/")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Start the browser-opening function in a separate thread
|
||||
threading.Thread(target=open_browser).start()
|
||||
|
||||
# Run the Dash app server
|
||||
app.run_server(debug=True, use_reloader=False)
|
316
app/data_flagging_utils.py
Normal file
316
app/data_flagging_utils.py
Normal file
@ -0,0 +1,316 @@
|
||||
|
||||
import dima.src.hdf5_ops as h5de
|
||||
from plotly.subplots import make_subplots
|
||||
import plotly.graph_objs as go
|
||||
import base64
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import dima.utils.g5505_utils as utils
|
||||
|
||||
UPLOAD_DIRECTORY = 'data_products/'
|
||||
|
||||
flags_dict = {
|
||||
"000" : {"flag_label": 'V', "flag_description": "Valid measurement"},
|
||||
"100" : {"flag_label": 'V', "flag_description": "Checked by data originator. Valid measurement, overrides any invalid flags"},
|
||||
"110" : {"flag_label": 'V', "flag_description": "Episode data checked and accepted by data originator. Valid measurement"},
|
||||
"111" : {"flag_label": 'V', "flag_description": "Irregular data checked and accepted by data originator. Valid measurement"},
|
||||
"456" : {"flag_label": 'I', "flag_description": "Invalidated by data originator"},
|
||||
"460" : {"flag_label": 'I', "flag_description": "Contamination suspected"},
|
||||
"559" : {"flag_label": 'V', "flag_description": "Unspecified contamination or local influence, but considered valid"},
|
||||
"599" : {"flag_label": 'I', "flag_description": "Unspecified contamination or local influence"},
|
||||
"652" : {"flag_label": 'V', "flag_description": "construction/activity nearby"},
|
||||
"659" : {"flag_label": 'I', "flag_description": "Unspecified instrument/sampling anomaly"},
|
||||
"660" : {"flag_label": 'V', "flag_description": "Unspecified instrument/sampling anomaly"},
|
||||
"999" : {"flag_label": 'I', "flag_description": "Missing measurement, unspecified reason"}
|
||||
}
|
||||
|
||||
def save_file(name, content):
|
||||
# Decode the content and save the file
|
||||
content_type, content_string = content.split(',')
|
||||
decoded = base64.b64decode(content_string)
|
||||
file_path = os.path.join(UPLOAD_DIRECTORY, name)
|
||||
if not os.path.exists(file_path):
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(decoded)
|
||||
print(f"File saved successfully at {file_path}")
|
||||
return file_path
|
||||
else:
|
||||
print(f'File already exists at {file_path}.\nTo maintain the integrity of the existing file, it will not be overwritten.')
|
||||
return file_path
|
||||
|
||||
def filter_flags_by_label(flags_dict, label):
|
||||
"""
|
||||
Filters the flags dictionary by the specified label.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
flags_dict (dict): The dictionary containing flags.
|
||||
label (str): The label to filter by ('I' or 'V').
|
||||
|
||||
Returns:
|
||||
--------
|
||||
list: A list of dictionaries with 'label' and 'value' for the specified label.
|
||||
"""
|
||||
return [{'label': value['flag_description'], 'value': code}
|
||||
for code, value in flags_dict.items() if value['flag_label'] == label]
|
||||
|
||||
|
||||
def create_loaded_file_figure(file_path, instfolder):
|
||||
|
||||
DataOpsAPI = h5de.HDF5DataOpsManager(file_path)
|
||||
|
||||
if not DataOpsAPI.file_obj:
|
||||
DataOpsAPI.load_file_obj()
|
||||
|
||||
target_channels = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['names'][0].decode().split(',')
|
||||
target_loc = DataOpsAPI.file_obj[instfolder].attrs['target_channels']['location'][0].decode()
|
||||
diagnostic_channels = DataOpsAPI.file_obj[instfolder].attrs['diagnostic_channels']['names'][0].decode().split(',')
|
||||
diagnostic_loc = DataOpsAPI.file_obj[instfolder].attrs['diagnostic_channels']['location'][0].decode()
|
||||
|
||||
#fig = make_subplots(rows=(len(target_channels+diagnostic_channels)-2), cols=1, shared_xaxes=True,
|
||||
# row_heights = [1 for i in range(len(target_channels+diagnostic_channels)-2)])
|
||||
fig = make_subplots(rows=(len(target_channels+diagnostic_channels)-2), cols=1,
|
||||
row_heights = [1 for i in range(len(target_channels+diagnostic_channels)-2)])
|
||||
traces = []
|
||||
trace_idx = 1
|
||||
dataset = DataOpsAPI.file_obj[target_loc]
|
||||
time_column = DataOpsAPI.reformat_datetime_column(target_loc,target_channels[0],'%d.%m.%Y %H:%M:%S.%f')
|
||||
|
||||
|
||||
for i in range(1,len(target_channels)):
|
||||
|
||||
fig.add_trace(go.Scatter(x = time_column,
|
||||
y = dataset[target_channels[i]][:],
|
||||
mode = 'lines',
|
||||
name = target_channels[i]), row=trace_idx, col=1)
|
||||
fig.update_yaxes(title_text= target_channels[i], row=trace_idx, col=1)
|
||||
trace_idx = trace_idx + 1
|
||||
|
||||
dataset = DataOpsAPI.file_obj[diagnostic_loc]
|
||||
time_column = DataOpsAPI.reformat_datetime_column(diagnostic_loc,diagnostic_channels[0],'%d.%m.%Y %H:%M:%S')
|
||||
for i in range(1,len(diagnostic_channels)):
|
||||
|
||||
fig.add_trace(go.Scatter(x = time_column,
|
||||
y = dataset[diagnostic_channels[i]][:],
|
||||
mode = 'lines',
|
||||
name = diagnostic_channels[i]), row=trace_idx, col=1)
|
||||
fig.update_yaxes(title_text= diagnostic_channels[i], row=trace_idx, col=1, type="log")
|
||||
trace_idx = trace_idx + 1
|
||||
|
||||
fig.update_layout(height=1200, title_text=f"{instfolder} : Target and Diagnostic Channels", showlegend=False)
|
||||
|
||||
DataOpsAPI.unload_file_obj()
|
||||
target_channels.remove(target_channels[0])
|
||||
diagnostic_channels.remove(diagnostic_channels[0])
|
||||
return fig, [','.join([item,target_loc]) for item in target_channels] + [','.join([item,diagnostic_loc]) for item in diagnostic_channels]
|
||||
|
||||
#import os
|
||||
import json
|
||||
import h5py
|
||||
|
||||
def load_flags(filePath, instFolder, dry_run : bool = False):
|
||||
"""
|
||||
Returns a list of flags (dictionaries) based on the provided filePath and instFolder.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
filePath (str): The path to the uploaded file, expected to have an .h5 extension.
|
||||
instFolder (str): The name of the instrument folder, which must exist as a group in the HDF5 file.
|
||||
dry_run (bool): If True, performs all operations except loading file contents.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
list: A list of dictionaries containing flag data (or file paths in dry_run mode),
|
||||
or None if conditions are not met.
|
||||
"""
|
||||
# Ensure the input file is an .h5 file
|
||||
if not filePath.endswith('.h5'):
|
||||
print(f"Invalid file extension: {filePath}. Expected a .h5 file.")
|
||||
return None
|
||||
|
||||
# Ensure the instFolder exists as a group in the HDF5 file
|
||||
try:
|
||||
with h5py.File(filePath, 'r') as h5file:
|
||||
if instFolder not in h5file:
|
||||
print(f"Instrument folder '{instFolder}' not found in HDF5 file.")
|
||||
return None
|
||||
except (OSError, IOError) as e:
|
||||
print(f"Error reading HDF5 file: {e}")
|
||||
return None
|
||||
|
||||
# Construct the flags folder path
|
||||
flagFolderPath = os.path.join(os.path.splitext(filePath)[0], f'{instFolder}_flags')
|
||||
|
||||
# Return None if the flags folder does not exist
|
||||
if not os.path.exists(flagFolderPath):
|
||||
return None
|
||||
|
||||
# List files in the flags folder
|
||||
files = [os.path.join(flagFolderPath, f) for f in os.listdir(flagFolderPath)]
|
||||
|
||||
# If no files found, return None
|
||||
if not files:
|
||||
return None
|
||||
|
||||
# Sort files by creation time
|
||||
sortedFiles = sorted(files, key=os.path.getctime)
|
||||
|
||||
if dry_run:
|
||||
print(f"Dry run: Found {len(sortedFiles)} files in the flags folder:")
|
||||
for filePath in sortedFiles:
|
||||
print(f" - {filePath}")
|
||||
return sortedFiles # Return file paths in dry run mode
|
||||
|
||||
# Process and load JSON files
|
||||
flagDataList = []
|
||||
for filePath in sortedFiles:
|
||||
if filePath.endswith('.json'):
|
||||
try:
|
||||
with open(filePath, 'r') as file:
|
||||
flagDataList.append(json.load(file))
|
||||
except (json.JSONDecodeError, FileNotFoundError) as e:
|
||||
print(f"Error loading file {filePath}: {e}")
|
||||
continue # Skip invalid or missing files
|
||||
|
||||
return flagDataList
|
||||
|
||||
class FlaggingAppDataManager():
|
||||
|
||||
def __init__(self, file_path, mode = 'r+') -> None:
|
||||
|
||||
self.file_path = file_path
|
||||
self.mode = mode
|
||||
self._data_ops_obj = None
|
||||
self.file_obj = None
|
||||
self.datasets_metadata_df = None
|
||||
|
||||
return None
|
||||
|
||||
def load_file_obj(self):
|
||||
self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode)
|
||||
self._data_ops_obj.load_file_obj()
|
||||
self.file_obj = self._data_ops_obj.file_obj
|
||||
|
||||
def unload_file_obj(self):
|
||||
self._data_ops_obj = h5de.HDF5DataOpsManager(self.file_path, self.mode)
|
||||
self._data_ops_obj.unload_file_obj() # sets __data_ops_obj.file_obj to None
|
||||
|
||||
def transfer_flags(self):
|
||||
|
||||
if self.file_obj is None:
|
||||
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
||||
|
||||
path_to_append_dir, ext = os.path.splitext(self.file_path)
|
||||
self._data_ops_obj.update_file(path_to_append_dir)
|
||||
|
||||
|
||||
def apply_flags(self,instFolder):
|
||||
|
||||
# TODO: apply flags to diagnostic and indivial channels. so far is all channels are cleaned
|
||||
|
||||
if self.file_obj is None:
|
||||
raise RuntimeError("File object is not loaded. Please load the HDF5 file using the 'load_file_obj' method before attempting to modify it.")
|
||||
|
||||
DataOpsManager = self._data_ops_obj
|
||||
file_obj = self.file_obj
|
||||
|
||||
#with h5py.File(self.file_path, mode = self.mode, track_order=True) as file_obj:
|
||||
try:
|
||||
|
||||
if not instFolder in file_obj:
|
||||
raise ValueError(f'Invalid instrument name. Instrument folder {instFolder} was not found in file {self.file_path}.')
|
||||
|
||||
if '_'.join([instFolder,'flags']) not in flag_obj:
|
||||
raise RuntimeWarning(f'There is no flags to apply. ')
|
||||
|
||||
if not ('diagnostic_channels' in file_obj[instFolder].attrs and 'target_channels' in file_obj[instFolder].attrs):
|
||||
raise ValueError(
|
||||
f'Required attributes missing. Instrument folder {instFolder} in file {self.file_path} has to be annotated with '
|
||||
'attributes "diagnostic_channels" and "target_channels" that specify channels location and their names.'
|
||||
)
|
||||
|
||||
dataset_name = file_obj[instFolder].attrs['target_channels']['location'][0].decode()
|
||||
channel_names = file_obj[instFolder].attrs['target_channels']['names'][0].decode().split(',')
|
||||
|
||||
dataset_obj = file_obj[dataset_name]
|
||||
# TODO: maybe we can do this directly on dataset = dataset_obj[...], which is a structured numpy array, instead of wrapping that as dataframe
|
||||
dataset_df = DataOpsManager.extract_dataset_as_dataframe(dataset_name)
|
||||
|
||||
# Define datetime variable based on channel names. We assume by design the first entry of the list is the datetime variable name.
|
||||
datetime_var = channel_names[0]
|
||||
remaining_vars = channel_names.copy()
|
||||
remaining_vars.remove(datetime_var)
|
||||
|
||||
ref_datetime_format = dataset_obj.attrs.get(datetime_var,None)['datetime_format'][0].decode()
|
||||
|
||||
#datetime_var_data = pd.Series([item.decode() for item in dataset_obj[datetime_var]])
|
||||
#datetime_var_data = pd.to_datetime(datetime_var_data , format = ref_datetime_format, errors = 'coerce')
|
||||
dataset_df[datetime_var] = dataset_df[datetime_var].apply(lambda x: x.decode() )
|
||||
dataset_df[datetime_var] = pd.to_datetime(dataset_df[datetime_var], format = ref_datetime_format, errors = 'coerce')
|
||||
|
||||
flag_indicator = np.zeros(shape = dataset_df[datetime_var].shape,
|
||||
dtype = bool)
|
||||
|
||||
# TODO: include this information as part of the flag's attributes in the flag recording process
|
||||
flag_datetime_format='%Y-%m-%d %H:%M:%S.%f'
|
||||
for flag in file_obj[f'{instFolder}_flags']:
|
||||
flag_obj = file_obj[f'{instFolder}_flags'][flag]['data_table']
|
||||
|
||||
# Replace values indicated by flag NaN if flag label refers to invalidated data.
|
||||
if not flag_obj['flag_code'][0].decode() is 'None':
|
||||
flag_label = ''
|
||||
else:
|
||||
flag_label = flag_obj['flag_label'][0].decode()
|
||||
|
||||
if flag_label == 'I':
|
||||
t1 = pd.to_datetime(flag_obj['startdate'][0].decode(), format=flag_datetime_format)
|
||||
t2 = pd.to_datetime(flag_obj['enddate'][0].decode(), format=flag_datetime_format)
|
||||
|
||||
t1_idx = abs(dataset_df[datetime_var]-t1).argmin()
|
||||
t2_idx = abs(dataset_df[datetime_var]-t2).argmin()
|
||||
|
||||
dataset_df.loc[t1_idx:t2_idx,remaining_vars] = np.nan
|
||||
|
||||
|
||||
# Apply the .strftime() method, handling NaT values by filling with an empty string or placeholder
|
||||
dataset_df[datetime_var] = dataset_df[datetime_var].apply(
|
||||
lambda x: x.strftime(ref_datetime_format).encode('utf-8') if not pd.isnull(x) else b'' # Handle NaT/NaN by returning empty string
|
||||
)
|
||||
|
||||
|
||||
# Split full datasetname instFolder/fileName/datatable --> [instFolder, filename, datatable]
|
||||
dataset_name_parts = dataset_name.split('/')
|
||||
# Create new instFolder name to store dataset after applying flags
|
||||
newInstFolder = '_'.join([dataset_name_parts[0],'cleaned'])
|
||||
dataset_name_parts.remove(dataset_name_parts[0])
|
||||
# Put together relative datasetname. Note that instFolder is now missing.
|
||||
flagged_dataset_name = '/'.join(dataset_name_parts)
|
||||
|
||||
dataset_dict = {'attributes':{},
|
||||
'name':flagged_dataset_name,
|
||||
'data': utils.convert_dataframe_to_np_structured_array(dataset_df)}
|
||||
|
||||
dataset_dict['attributes'].update({'creation_date':utils.created_at().encode('utf-8')})
|
||||
dataset_dict['attributes'].update(dataset_obj.attrs)
|
||||
|
||||
|
||||
DataOpsManager.append_dataset(dataset_dict, newInstFolder)
|
||||
|
||||
except Exception as e:
|
||||
self._data_ops_obj.unload_file_obj()
|
||||
print(f"An unexpected error occurred: {e}"
|
||||
"The file object has been properly closed.")
|
||||
|
||||
|
||||
|
||||
|
||||
#flag_indicator[t1_idx:t2_idx] = True
|
||||
#(datetime_var_data-t1).min()
|
||||
|
||||
#if not instrument_name in file_obj and not flag_name in file_obj:
|
||||
# raise ValueError(f'Invalid instrument_name {instrument_name} and flag_name {flag_name}. No object with such names in file {self.file_path}')
|
||||
#if not f'{instrument_name}_flags':
|
||||
# raise ValueError(f'There is no flags to work with. Make sure {instrument_name}_flags is created first before running this method.')
|
||||
|
Reference in New Issue
Block a user