Refactor steps to collect information for renku workflow file generation

This commit is contained in:
2025-06-06 17:02:13 +02:00
parent a4847f0071
commit 160791b738
6 changed files with 347 additions and 169 deletions

View File

@ -19,8 +19,8 @@ For Windows users, the following are required:
Open **Git Bash** and run: Open **Git Bash** and run:
```bash ```bash
cd GitLab cd Gitea
git clone --recurse-submodules https://gitlab.psi.ch/apog/acsmnode.git git clone --recurse-submodules https://gitea.psi.ch/apog/acsmnode.git
cd acsmnode cd acsmnode
``` ```

View File

@ -199,9 +199,22 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_factors
return calibration_factor_table, new_data_table return calibration_factor_table, new_data_table
from workflows.utils import RenkuWorkflowBuilder
def main(data_file, calibration_file): def main(data_file, calibration_file, capture_renku_metadata = False, workflow_name = 'apply_calibration_workflow'):
"""Main function for processing the data with calibration.""" """Main function for processing the data with calibration."""
#-----------Gather Renku Workflow File Information -------------------------
inputs = []
outputs = []
parameters = []
# Collect input and parameters for renku workflow file
#inputs.append(('script.py',{'path' : os.path.relpath(__file__, start=os.getcwd())}))
inputs.append(('script_py',{'path' : os.path.relpath(__file__, start=projectPath)}))
inputs.append(('campaign_data_h5',{'path' : os.path.relpath(data_file, start=projectPath)}))
inputs.append(('calib_yaml',{'path' : os.path.relpath(calibration_file, start=projectPath)}))
inputs.append(('data_descriptor_yaml',{'path' : os.path.relpath(os.path.join(projectPath,'campaignDescriptor.yaml'), start=projectPath),
'implicit' : True}))
# ---------------------------------------------------------------------------
# Load input data and calibration factors # Load input data and calibration factors
try: try:
print(f"Opening data file: {data_file} using src.hdf5_ops.HDF5DataOpsManager().") print(f"Opening data file: {data_file} using src.hdf5_ops.HDF5DataOpsManager().")
@ -262,7 +275,7 @@ def main(data_file, calibration_file):
# Apply calibration factors to input data_table and generate data lineage metadata # Apply calibration factors to input data_table and generate data lineage metadata
calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, calibration_file) calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, calibration_file)
calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var)
# Define suffix to output table pairs.
suffix_to_dataframe_dict = { suffix_to_dataframe_dict = {
'calibrated.csv': calibrated_table, 'calibrated.csv': calibrated_table,
'calibrated_err.csv': calibrated_table_err, 'calibrated_err.csv': calibrated_table_err,
@ -280,23 +293,38 @@ def main(data_file, calibration_file):
filename, _ = os.path.splitext(parent_file) filename, _ = os.path.splitext(parent_file)
if not _: if not _:
filename += '.csv' filename += '.csv'
cnt = 1
for suffix, data_table in suffix_to_dataframe_dict.items(): for suffix, data_table in suffix_to_dataframe_dict.items():
path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}') path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}')
try: try:
data_table.to_csv(path_to_output_file, index=False) data_table.to_csv(path_to_output_file, index=False)
print(f"Saved {filename}_{suffix} to {path_to_output_folder}") print(f"Saved {filename}_{suffix} to {path_to_output_folder}")
outputs.append((f'out_{cnt}', {'path' : os.path.relpath(path_to_output_file, start=projectPath),'implicit' : True}))
cnt += 1
except Exception as e: except Exception as e:
print(f"Failed to save {path_to_output_file} due to: {e}") print(f"Failed to save {path_to_output_file} due to: {e}")
continue #continue
return
# Record data lineage # Record data lineage
metadata['suffix'] = suffix metadata['suffix'] = suffix
stepUtils.record_data_lineage(path_to_output_file, os.getcwd(), metadata) stepUtils.record_data_lineage(path_to_output_file, os.getcwd(), metadata)
# ---------------- Start Renku Workflow file generation ------------------------------------------------------------------------
if capture_renku_metadata:
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
workflowfile_builder.add_step(step_name='apply_calibration_factors',
base_command="python",
inputs=inputs,
outputs=outputs,
parameters=parameters)
workflowfile_builder.save_to_file(os.path.join(projectPath,'workflows')) # Will merge or create workflows/data-pipeline.yaml
return 0
except Exception as e: except Exception as e:
print(f"Error during calibration: {e}") print(f"Error during calibration: {e}")
exit(1) return
if __name__ == '__main__': if __name__ == '__main__':
# Set up argument parsing # Set up argument parsing

View File

@ -335,9 +335,15 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f
# all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] # all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"]
# all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] # all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"]
def main(data_file, flag_type): def main(data_file, flag_type, capture_renku_metadata=False, workflow_name='generate_flags_workflow'):
# Open data file and load dataset associated with flag_type : either diagnostics or species
inputs = []
outputs = []
parameters = []
try: try:
# Load data and locate relevant dataset
dataManager = dataOps.HDF5DataOpsManager(data_file) dataManager = dataOps.HDF5DataOpsManager(data_file)
dataManager.load_file_obj() dataManager.load_file_obj()
@ -347,28 +353,24 @@ def main(data_file, flag_type):
print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.') print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.')
raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}') raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}')
dataManager.extract_and_load_dataset_metadata() dataManager.extract_and_load_dataset_metadata()
dataset_metadata_df = dataManager.dataset_metadata_df.copy() dataset_metadata_df = dataManager.dataset_metadata_df.copy()
STATION_ABBR = load_project_yaml_files(projectPath,'campaignDescriptor.yaml')['station_abbr'] STATION_ABBR = load_project_yaml_files(projectPath,'campaignDescriptor.yaml')['station_abbr']
# Find dataset associated with diagnostic channels
# Find dataset associated with flag_type
if flag_type == 'diagnostics': if flag_type == 'diagnostics':
keywords = [f'ACSM_{STATION_ABBR}_','_meta.txt/data_table'] keywords = [f'ACSM_{STATION_ABBR}_','_meta.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] elif flag_type == 'species':
keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table']
elif flag_type == 'cpc':
keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas']
else:
raise ValueError(f"Unsupported flag_type: {flag_type}")
if flag_type == 'species': find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
if flag_type == 'cpc':
keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas']
find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']]
# Specify source dataset to be extracted from input hdf5 data file
columns = ['dataset_name','parent_file','parent_instrument'] columns = ['dataset_name','parent_file','parent_instrument']
dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns) dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns)
print(':)')
if not (dataset_name.size == 1): if not (dataset_name.size == 1):
raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}') raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}')
else: else:
@ -376,45 +378,35 @@ def main(data_file, flag_type):
parent_file = parent_file.values[0] parent_file = parent_file.values[0]
parent_instrument = parent_instrument.values[0] parent_instrument = parent_instrument.values[0]
# Extract data and timestamp
data_table = dataManager.extract_dataset_as_dataframe(dataset_name) data_table = dataManager.extract_dataset_as_dataframe(dataset_name)
datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name) datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name)
dataManager.unload_file_obj()
# Count the number of NaT (null) values # Report missing timestamps
num_nats = data_table[datetime_var].isna().sum() num_nats = data_table[datetime_var].isna().sum()
# Get the total number of rows
total_rows = len(data_table) total_rows = len(data_table)
# Calculate the percentage of NaT values
percentage_nats = (num_nats / total_rows) * 100 percentage_nats = (num_nats / total_rows) * 100
print(f"Total rows: {total_rows}") print(f"Total rows: {total_rows}")
print(f"NaT (missing) values: {num_nats}") print(f"NaT (missing) values: {num_nats}")
print(f"Percentage of data loss: {percentage_nats:.4f}%") print(f"Percentage of data loss: {percentage_nats:.4f}%")
dataManager.unload_file_obj()
except Exception as e: except Exception as e:
print(f"Error loading input files: {e}") print(f"Error loading input files: {e}")
exit(1) return 1
finally:
dataManager.unload_file_obj()
print('Starting flag generation.') print('Starting flag generation.')
try: try:
path_to_output_dir, ext = os.path.splitext(data_file) path_to_output_dir, ext = os.path.splitext(data_file)
print('Path to output directory :', path_to_output_dir)
# Define output directory of apply_calibration_factors() step
suffix = 'flags' suffix = 'flags'
if len(parent_instrument.split('/')) >= 2:
instFolder = parent_instrument.split('/')[0]
category = parent_instrument.split('/')[1]
else:
instFolder = parent_instrument.split('/')[0]
category = ''
path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) # Parse folder/category from instrument
processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) parts = parent_instrument.split('/')
instFolder = parts[0]
category = parts[1] if len(parts) >= 2 else ''
path_to_output_folder = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category]))[0]
processingScriptRelPath = os.path.relpath(thisFilePath, start=projectPath)
if not os.path.exists(path_to_output_folder): if not os.path.exists(path_to_output_folder):
os.makedirs(path_to_output_folder) os.makedirs(path_to_output_folder)
@ -422,47 +414,115 @@ def main(data_file, flag_type):
print('Processing script:', processingScriptRelPath) print('Processing script:', processingScriptRelPath)
print('Output directory:', path_to_output_folder) print('Output directory:', path_to_output_folder)
# Compute diagnostic flags based on validity thresholds defined in configuration_file_dict # Flagging logic
if flag_type == 'diagnostics': if flag_type == 'diagnostics':
#validity_thresholds_dict = load_parameters(flag_type)
validity_thresholds_dict = load_project_yaml_files(projectPath, "validity_thresholds.yaml") validity_thresholds_dict = load_project_yaml_files(projectPath, "validity_thresholds.yaml")
flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict) flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict)
elif flag_type == 'species':
if flag_type == 'species': calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml")
#calib_param_dict = load_parameters(flag_type) flags_table = generate_species_flags(data_table, calib_param_dict, path_to_output_folder, datetime_var)
calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml") elif flag_type == 'cpc':
flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var)
if flag_type == 'cpc':
print(':D')
flags_table = generate_cpc_flags(data_table, datetime_var) flags_table = generate_cpc_flags(data_table, datetime_var)
metadata = {'actris_level' : 1, # Metadata for lineage
metadata = {
'actris_level' : 1,
'processing_script': processingScriptRelPath.replace(os.sep,'/'), 'processing_script': processingScriptRelPath.replace(os.sep,'/'),
'processing_date' : utils.created_at(), 'processing_date' : utils.created_at(),
'flag_type' : flag_type, 'flag_type' : flag_type,
'datetime_var': datetime_var 'datetime_var': datetime_var
} }
# Save output tables to csv file and save/or update data lineage record
filename, ext = os.path.splitext(parent_file) filename, ext = os.path.splitext(parent_file)
path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv'])
#path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv'])
# Save output and record lineage
flags_table.to_csv(path_to_flags_file, index=False) flags_table.to_csv(path_to_flags_file, index=False)
status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata) status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata)
print(f"Flags saved to {path_to_flags_file}") print(f"Flags saved to {path_to_flags_file}")
print(f"Data lineage saved to {path_to_output_folder}") print(f"Data lineage saved to {path_to_output_folder}")
#flags_table.to_csv(path_to_flags_file, index=False)
# Read json and assign numeric flag to column
except Exception as e: except Exception as e:
print(f"Error during calibration: {e}") print(f"Error during flag generation: {e}")
exit(1) return 1
# --------------------- Renku Metadata Collection ----------------------------
if capture_renku_metadata:
from workflows.utils import RenkuWorkflowBuilder
inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)}))
inputs.append(("data_file", {'path': os.path.relpath(data_file, start=projectPath)}))
# Parameter
parameters.append(("flag_type", {'value': flag_type}))
# Add implicit YAML config
if flag_type == 'diagnostics':
inputs.append(("validity_thresholds_yaml", {
'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/validity_thresholds.yaml"), start=projectPath),
'implicit': True
}))
elif flag_type == 'species':
inputs.append(("calibration_params_yaml", {
'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/calibration_params.yaml"), start=projectPath),
'implicit': True
}))
# Add CSV and JSON flags from flags folder as implicit inputs
flag_index = 0
for fname in os.listdir(path_to_output_folder):
full_path = os.path.join(path_to_output_folder, fname)
# Skip the output file to avoid circular dependency
if os.path.abspath(full_path) == os.path.abspath(path_to_flags_file):
continue
rel_flag_path = os.path.relpath(full_path, start=projectPath)
if fname.endswith('.csv') or (fname.endswith('.json') and 'metadata' not in fname):
inputs.append((f"flag_in_{flag_index}", {
'description': 'manual flag by domain expert' if fname.endswith('.json') else 'automated or cpc flag',
'path': rel_flag_path,
'implicit': True
}))
flag_index += 1
#elif flag_type == 'cpc':
# CPC may require logic like species if any dependencies are found
# for fname in os.listdir(path_to_output_folder):
# rel_flag_path = os.path.relpath(os.path.join(path_to_output_folder, fname), start=projectPath)
# if fname.endswith('.nas') and ('cpc' in fname):
# inputs.append((f"flag_{fname}", {
# 'path': rel_flag_path,
# 'implicit': True
# }))
# Output
outputs.append(("flags_csv", {
'path': os.path.relpath(path_to_flags_file, start=projectPath),
'implicit': True
}))
# Define workflow step
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
workflowfile_builder.add_step(
step_name=f"generate_flags_{flag_type}",
base_command="python",
inputs=inputs,
outputs=outputs,
parameters=parameters
)
workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows'))
return 0
def get_flags_from_folder(flagsFolderPath): def get_flags_from_folder(flagsFolderPath):

View File

@ -102,7 +102,13 @@ def parse_months(month_str: str) -> list:
return sorted(months) return sorted(months)
def main(paths_to_processed_files : list, path_to_flags : str, month : int = None): def main(paths_to_processed_files : list, path_to_flags : str, month : str = None, capture_renku_metadata: bool = False, workflow_name: str = "ebas_submission_worflow"):
inputs = []
outputs = []
parameters = []
# Set up argument parsing # Set up argument parsing
acum_df = join_tables(paths_to_processed_files) acum_df = join_tables(paths_to_processed_files)
@ -213,6 +219,36 @@ def main(paths_to_processed_files : list, path_to_flags : str, month : int = Non
outdir = output_dir outdir = output_dir
app.process(infile, acq_err_log, outdir=outdir) app.process(infile, acq_err_log, outdir=outdir)
# ------------------- Renku Metadata Collection ------------------------
if capture_renku_metadata:
from workflows.utils import RenkuWorkflowBuilder
inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)}))
for idx, path in enumerate(paths_to_processed_files + [path_to_flags]):
inputs.append((f"in_{idx+1}", {'path': os.path.relpath(path, start=projectPath)}))
inputs.append(('lod', {'path': os.path.relpath(os.path.join(projectPath,'pipelines/params/"limits_of_detection.yaml'), start=projectPath),'implicit': True}))
inputs.append(('station', {'path': os.path.relpath(os.path.join(projectPath,'pipelines/params/"station_params.yaml'), start=projectPath),'implicit': True}))
outputs.append(("out_1", {'path': os.path.relpath(output_file1, start=projectPath), 'implicit': True}))
outputs.append(("out_2", {'path': os.path.relpath(output_file2, start=projectPath), 'implicit': True}))
if month is not None:
parameters.append(("month_range", {'value': month}))
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
workflowfile_builder.add_step(
step_name=f"{workflow_name}_step",
base_command="python",
inputs=inputs,
outputs=outputs,
parameters=parameters
)
workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows'))
return 0
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.") parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.")

View File

@ -89,10 +89,26 @@ def sync_yaml_files(src_filepath, dest_filepath):
with open(dest_filepath, 'w') as dest_file: with open(dest_filepath, 'w') as dest_file:
yaml.safe_dump(dest_yaml, dest_file, default_flow_style=False) yaml.safe_dump(dest_yaml, dest_file, default_flow_style=False)
print(f"Synchronized: {os.path.basename(src_filepath)}") print(f"Synchronized: {os.path.basename(src_filepath)}")
return 0
else: else:
print(f"Structures do not match for {os.path.basename(src_filepath)}. Skipping synchronization.") print(f"Structures do not match for {os.path.basename(src_filepath)}. Skipping synchronization.")
return
def main(path_to_data_file, instrument_folder): from workflows.utils import RenkuWorkflowBuilder
def main(path_to_data_file, instrument_folder, capture_renku_metadata = False, workflow_name='parameter_update_workflow'):
inputs = []
outputs = []
parameters = []
# Collect input and parameters for renku workflow file
#inputs.append(('script.py',{'path' : os.path.relpath(__file__, start=os.getcwd())}))
inputs.append(('script_py',{'path' : os.path.relpath(__file__, start=projectPath)}))
inputs.append(('campaign_data_h5',{'path' : os.path.relpath(path_to_data_file, start=projectPath)}))
parameters.append(('instrument_folder', {'value':instrument_folder}))
src_folder = os.path.normpath(os.path.join(os.path.splitext(path_to_data_file)[0],instrument_folder)) src_folder = os.path.normpath(os.path.join(os.path.splitext(path_to_data_file)[0],instrument_folder))
@ -115,16 +131,36 @@ def main(path_to_data_file, instrument_folder):
# Get list of files in source folder. # Get list of files in source folder.
# We assume we only need to process .yaml files. # We assume we only need to process .yaml files.
src_folder = os.path.normpath(os.path.join(src_folder,'params')) src_folder = os.path.normpath(os.path.join(src_folder,'params'))
cnt = 1
for filename in os.listdir(src_folder): for filename in os.listdir(src_folder):
if filename.endswith(".yaml"): if filename.endswith(".yaml"):
src_filepath = os.path.join(src_folder, filename) src_filepath = os.path.normpath(os.path.join(src_folder, filename))
dest_filepath = os.path.join(dest_folder, filename) dest_filepath = os.path.normpath(os.path.join(dest_folder, filename))
# Proceed only if the destination file exists. # Proceed only if the destination file exists.
if os.path.exists(dest_filepath): if os.path.exists(dest_filepath):
sync_yaml_files(src_filepath, dest_filepath) status = sync_yaml_files(src_filepath, dest_filepath)
else: else:
print(f"Destination YAML file not found for: {filename}") print(f"Destination YAML file not found for: {filename}")
# If yaml file synchronization successful add input output pair
if status==0:
inputs.append((f'in_{cnt}',{'path':os.path.relpath(src_filepath, start=projectPath),'implicit': True}))
outputs.append((f'out_{cnt}',{'path':os.path.relpath(dest_filepath, start=projectPath),'implicit': True}))
cnt += 1
# ---------------- Start Renku Workflow file generation ------------------------------------------------------------------------
if capture_renku_metadata:
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
workflowfile_builder.add_step(step_name='update_datachain_params',
base_command="python",
inputs=inputs,
outputs=outputs,
parameters = parameters)
workflowfile_builder.save_to_file(os.path.join(projectPath,'workflows')) # Will merge or create workflows/data-pipeline.yaml
return 0
if __name__ == "__main__": if __name__ == "__main__":
@ -144,5 +180,3 @@ if __name__ == "__main__":
instrument_folder = args.instrument_folder instrument_folder = args.instrument_folder
main(path_to_data_file, instrument_folder) main(path_to_data_file, instrument_folder)

View File

@ -1,16 +1,34 @@
import os
import sys
import yaml
import argparse
try:
thisFilePath = os.path.abspath(__file__)
print(thisFilePath)
except NameError:
print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).")
thisFilePath = os.getcwd()
projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..", '..'))
if projectPath not in sys.path:
sys.path.insert(0, projectPath)
import dima.src.hdf5_ops as dataOps import dima.src.hdf5_ops as dataOps
import os
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import plotly.graph_objects as go import plotly.graph_objects as go
def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, x_var, y_vars, yaxis_range_dict = {'FlowRate_ccs' : [0,100]}):
def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, x_var, y_vars,
yaxis_range_dict={'FlowRate_ccs': [0, 100]},
capture_renku_metadata=False,
workflow_name="visualize_table_variables"):
if not os.path.exists(data_file_path):
if not os.path.exists(data_file_path):
raise ValueError(f"Path to input file {data_file_path} does not exists. The parameter 'data_file_path' must be a valid path to a suitable HDF5 file. ") raise ValueError(f"Path to input file {data_file_path} does not exists. The parameter 'data_file_path' must be a valid path to a suitable HDF5 file. ")
APPEND_DIR = os.path.splitext(data_file_path)[0] APPEND_DIR = os.path.splitext(data_file_path)[0]
if not os.path.exists(APPEND_DIR): if not os.path.exists(APPEND_DIR):
APPEND_DIR = None APPEND_DIR = None
@ -19,81 +37,55 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name,
dataManager = dataOps.HDF5DataOpsManager(data_file_path) dataManager = dataOps.HDF5DataOpsManager(data_file_path)
try: try:
# Load the dataset
dataManager.load_file_obj() dataManager.load_file_obj()
dataset_df = dataManager.extract_dataset_as_dataframe(dataset_name) dataset_df = dataManager.extract_dataset_as_dataframe(dataset_name)
except Exception as e: except Exception as e:
print(f"Exception occurred while loading dataset: {e}") print(f"Exception occurred while loading dataset: {e}")
finally: finally:
# Unload file object to free resources
dataManager.unload_file_obj() dataManager.unload_file_obj()
# Flags dataset loading and processing
try: try:
# Re-load the file for flags dataset
dataManager.load_file_obj() dataManager.load_file_obj()
flags_df = dataManager.extract_dataset_as_dataframe(flags_dataset_name) flags_df = dataManager.extract_dataset_as_dataframe(flags_dataset_name)
# Ensure the time variable exists in both datasets
if x_var not in dataset_df.columns and x_var not in flags_df.columns: if x_var not in dataset_df.columns and x_var not in flags_df.columns:
raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.") raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.")
# Convert the x_var column to datetime in flags_df
flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x.decode(encoding="utf-8"))) flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x.decode(encoding="utf-8")))
except Exception as e: except Exception as e:
dataManager.unload_file_obj() dataManager.unload_file_obj()
# If loading from the file fails, attempt alternative path
if APPEND_DIR: if APPEND_DIR:
# Remove 'data_table' part from the path for alternate location
if 'data_table' in flags_dataset_name: if 'data_table' in flags_dataset_name:
flags_dataset_name_parts = flags_dataset_name.split(sep='/') flags_dataset_name_parts = flags_dataset_name.split(sep='/')
flags_dataset_name_parts.remove('data_table') flags_dataset_name_parts.remove('data_table')
# Remove existing extension and append .csv
base_path = os.path.join(APPEND_DIR, '/'.join(flags_dataset_name_parts)) base_path = os.path.join(APPEND_DIR, '/'.join(flags_dataset_name_parts))
alternative_path = os.path.splitext(base_path)[0] + '_flags.csv' alternative_path = os.path.splitext(base_path)[0] + '_flags.csv'
# Attempt to read CSV
if not os.path.exists(alternative_path): if not os.path.exists(alternative_path):
raise FileNotFoundError( raise FileNotFoundError(
f"File not found at {alternative_path}. Ensure there are flags associated with {data_file_path}." f"File not found at {alternative_path}. Ensure there are flags associated with {data_file_path}."
) )
flags_df = pd.read_csv(alternative_path) flags_df = pd.read_csv(alternative_path)
# Ensure the time variable exists in both datasets
if x_var not in dataset_df.columns and x_var not in flags_df.columns: if x_var not in dataset_df.columns and x_var not in flags_df.columns:
raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.") raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.")
# Apply datetime conversion on the x_var column in flags_df
flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x)) flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x))
finally: finally:
# Ensure file object is unloaded after use
dataManager.unload_file_obj() dataManager.unload_file_obj()
#if x_var not in dataset_df.columns and x_var not in flags_df.columns:
# raise ValueError(f'Invalid x_var : {x_var}. x_var must refer to a time variable name that is both in {dataset_name} and {flags_dataset_name}')
#flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x : x.decode(encoding="utf-8")))
#dataManager.unload_file_obj()
if not all(var in dataset_df.columns for var in y_vars): if not all(var in dataset_df.columns for var in y_vars):
raise ValueError(f'Invalid y_vars : {y_vars}. y_vars must be a subset of {dataset_df.columns}.') raise ValueError(f'Invalid y_vars : {y_vars}. y_vars must be a subset of {dataset_df.columns}.')
#fig, ax = plt.subplots(len(y_vars), 1, figsize=(12, 5)) figs = []
output_paths = []
figures_dir = os.path.join(projectPath, "figures")
os.makedirs(figures_dir, exist_ok=True)
figs = [] # store each figure
for var_idx, var in enumerate(y_vars): for var_idx, var in enumerate(y_vars):
#y = dataset_df[var].to_numpy()
# Plot Flow Rate
#fig = plt.figure(var_idx,figsize=(12, 2.5))
#ax = plt.gca()
#ax.plot(dataset_df[x_var], dataset_df[var], label=var, alpha=0.8, color='tab:blue')
fig = go.Figure() fig = go.Figure()
# Main line plot
fig.add_trace(go.Scatter( fig.add_trace(go.Scatter(
x=dataset_df[x_var], x=dataset_df[x_var],
y=dataset_df[var], y=dataset_df[var],
@ -102,40 +94,24 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name,
line=dict(color='blue'), line=dict(color='blue'),
opacity=0.8 opacity=0.8
)) ))
# Specify flag name associated with var name in y_vars. By construction, it is assumed the name satisfy the following sufix convention. var_flag_name = f"flag_{var}"
var_flag_name = f"flag_{var}" if var_flag_name in flags_df.columns:
if var_flag_name in flags_df.columns:
# Identify valid and invalid indices
ind_invalid = flags_df[var_flag_name].to_numpy() ind_invalid = flags_df[var_flag_name].to_numpy()
# ind_valid = np.logical_not(ind_valid)
# Detect start and end indices of invalid regions
# Find transition points in invalid regions
invalid_starts = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][::2] invalid_starts = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][::2]
invalid_ends = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][1::2] invalid_ends = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][1::2]
t_base = dataset_df[x_var]
# Fill invalid regions
t_base = dataset_df[x_var] #.to_numpy()
y_min, y_max = dataset_df[var].min(), dataset_df[var].max() y_min, y_max = dataset_df[var].min(), dataset_df[var].max()
max_idx = len(t_base) - 1 # maximum valid index max_idx = len(t_base) - 1
for start, end in zip(invalid_starts, invalid_ends): for start, end in zip(invalid_starts, invalid_ends):
if start >= end: if start >= end:
print(f"Warning: Skipping invalid interval — start ({start}) >= end ({end})") print(f"Warning: Skipping invalid interval — start ({start}) >= end ({end})")
continue # Clip start and end to valid index range continue
start = max(0, start) start = max(0, start)
end = min(end, max_idx) end = min(end, max_idx)
#ax.fill_betweenx([dataset_df[var].min(), dataset_df[var].max()], t_base[start], t_base[end],
# color='red', alpha=0.3, label="Invalid Data" if start == invalid_starts[0] else "")
# start = max(0, start)
fig.add_shape( fig.add_shape(
type="rect", type="rect",
x0=t_base[start], x1=t_base[end], x0=t_base[start], x1=t_base[end],
@ -145,7 +121,7 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name,
line_width=0, line_width=0,
layer="below" layer="below"
) )
# Add a dummy invisible trace just for the legend
fig.add_trace(go.Scatter( fig.add_trace(go.Scatter(
x=[None], y=[None], x=[None], y=[None],
mode='markers', mode='markers',
@ -153,41 +129,85 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name,
name='Invalid Region' name='Invalid Region'
)) ))
# Labels and Legends
#ax.set_xlabel(x_var)
#ax.set_ylabel(var)
#ax.legend()
#ax.grid(True)
#plt.tight_layout()
#plt.show()
#return fig, ax
if var in yaxis_range_dict: if var in yaxis_range_dict:
y_axis_range = yaxis_range_dict[var] y_axis_range = yaxis_range_dict[var]
else: else:
y_axis_range = [dataset_df[var].min(), dataset_df[var].max()] y_axis_range = [dataset_df[var].min(), dataset_df[var].max()]
print('y axis range:',y_axis_range)
# Add layout
fig.update_layout(
title=f"{var} over {x_var}",
xaxis_title=x_var,
yaxis_title=var,
xaxis_range = [t_base.min(), t_base.max()],
yaxis_range = y_axis_range,
showlegend=True,
height=300,
margin=dict(l=40, r=20, t=40, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)
fig.show() fig.update_layout(
title=f"{var} over {x_var}",
xaxis_title=x_var,
yaxis_title=var,
xaxis_range=[t_base.min(), t_base.max()],
yaxis_range=y_axis_range,
showlegend=True,
height=300,
margin=dict(l=40, r=20, t=40, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)
fig_path = os.path.join(figures_dir, f"fig_{var_idx}_{var}.html")
fig.write_html(fig_path)
output_paths.append(fig_path)
figs.append(fig) figs.append(fig)
# Optionally return figs if needed # Display figure in notebook
return figs fig.show()
inputs = []
outputs = []
parameters = []
if capture_renku_metadata:
from workflows.utils import RenkuWorkflowBuilder
inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)}))
inputs.append(("data_file", {'path': os.path.relpath(data_file_path, start=projectPath)}))
# Track alternative path if used
if 'alternative_path' in locals():
inputs.append(("alternative_flags_csv", {
'path': os.path.relpath(alternative_path, start=projectPath),
'implicit' : True
}))
for fig_path in output_paths:
outputs.append((os.path.splitext(os.path.basename(fig_path))[0],
{'path': os.path.relpath(fig_path, start=projectPath)}))
parameters.append(("dataset_name", {'value': dataset_name}))
parameters.append(("flags_dataset_name", {'value': flags_dataset_name}))
parameters.append(("x_var", {'value': x_var}))
parameters.append(("y_vars", {'value': y_vars}))
workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name)
workflowfile_builder.add_step(
step_name=workflow_name,
base_command="python",
inputs=inputs,
outputs=outputs,
parameters=parameters
)
workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows'))
return 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Visualize table variables and associated flags.")
parser.add_argument("data_file_path", type=str, help="Path to HDF5 file")
parser.add_argument("dataset_name", type=str, help="Dataset name in HDF5 file")
parser.add_argument("flags_dataset_name", type=str, help="Flags dataset name")
parser.add_argument("x_var", type=str, help="Time variable (x-axis)")
parser.add_argument("y_vars", nargs='+', help="List of y-axis variable names")
parser.add_argument("--capture_renku_metadata", action="store_true", help="Flag to capture Renku workflow metadata")
args = parser.parse_args()
visualize_table_variables(
data_file_path=args.data_file_path,
dataset_name=args.dataset_name,
flags_dataset_name=args.flags_dataset_name,
x_var=args.x_var,
y_vars=args.y_vars,
capture_renku_metadata=args.capture_renku_metadata
)