diff --git a/README.md b/README.md index 6b2290c..a9506d8 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,8 @@ For Windows users, the following are required: Open **Git Bash** and run: ```bash -cd GitLab -git clone --recurse-submodules https://gitlab.psi.ch/apog/acsmnode.git +cd Gitea +git clone --recurse-submodules https://gitea.psi.ch/apog/acsmnode.git cd acsmnode ``` diff --git a/pipelines/steps/apply_calibration_factors.py b/pipelines/steps/apply_calibration_factors.py index eca6368..b787a80 100644 --- a/pipelines/steps/apply_calibration_factors.py +++ b/pipelines/steps/apply_calibration_factors.py @@ -199,9 +199,22 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_factors return calibration_factor_table, new_data_table - -def main(data_file, calibration_file): +from workflows.utils import RenkuWorkflowBuilder +def main(data_file, calibration_file, capture_renku_metadata = False, workflow_name = 'apply_calibration_workflow'): """Main function for processing the data with calibration.""" + #-----------Gather Renku Workflow File Information ------------------------- + inputs = [] + outputs = [] + parameters = [] + # Collect input and parameters for renku workflow file + #inputs.append(('script.py',{'path' : os.path.relpath(__file__, start=os.getcwd())})) + inputs.append(('script_py',{'path' : os.path.relpath(__file__, start=projectPath)})) + inputs.append(('campaign_data_h5',{'path' : os.path.relpath(data_file, start=projectPath)})) + inputs.append(('calib_yaml',{'path' : os.path.relpath(calibration_file, start=projectPath)})) + inputs.append(('data_descriptor_yaml',{'path' : os.path.relpath(os.path.join(projectPath,'campaignDescriptor.yaml'), start=projectPath), + 'implicit' : True})) + # --------------------------------------------------------------------------- + # Load input data and calibration factors try: print(f"Opening data file: {data_file} using src.hdf5_ops.HDF5DataOpsManager().") @@ -262,7 +275,7 @@ def main(data_file, calibration_file): # Apply calibration factors to input data_table and generate data lineage metadata calibration_factor_table, calibrated_table = apply_calibration_factors(data_table, datetime_var, calibration_file) calibrated_table_err = generate_error_dataframe(calibrated_table, datetime_var) - + # Define suffix to output table pairs. suffix_to_dataframe_dict = { 'calibrated.csv': calibrated_table, 'calibrated_err.csv': calibrated_table_err, @@ -280,23 +293,38 @@ def main(data_file, calibration_file): filename, _ = os.path.splitext(parent_file) if not _: filename += '.csv' - + cnt = 1 for suffix, data_table in suffix_to_dataframe_dict.items(): path_to_output_file = os.path.join(path_to_output_folder, f'{filename}_{suffix}') try: data_table.to_csv(path_to_output_file, index=False) print(f"Saved {filename}_{suffix} to {path_to_output_folder}") + outputs.append((f'out_{cnt}', {'path' : os.path.relpath(path_to_output_file, start=projectPath),'implicit' : True})) + cnt += 1 except Exception as e: print(f"Failed to save {path_to_output_file} due to: {e}") - continue + #continue + return # Record data lineage metadata['suffix'] = suffix stepUtils.record_data_lineage(path_to_output_file, os.getcwd(), metadata) + # ---------------- Start Renku Workflow file generation ------------------------------------------------------------------------ + + if capture_renku_metadata: + workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name) + workflowfile_builder.add_step(step_name='apply_calibration_factors', + base_command="python", + inputs=inputs, + outputs=outputs, + parameters=parameters) + workflowfile_builder.save_to_file(os.path.join(projectPath,'workflows')) # Will merge or create workflows/data-pipeline.yaml + + return 0 except Exception as e: print(f"Error during calibration: {e}") - exit(1) + return if __name__ == '__main__': # Set up argument parsing diff --git a/pipelines/steps/generate_flags.py b/pipelines/steps/generate_flags.py index 3eb571c..124a658 100644 --- a/pipelines/steps/generate_flags.py +++ b/pipelines/steps/generate_flags.py @@ -335,9 +335,15 @@ def generate_species_flags(data_table : pd.DataFrame, calib_param_dict : dict, f # all_dat[FlowRate_ccs >= flow_lower_lim & FlowRate_ccs <= flow_upper_lim ,flag_flow_auto:="V"] # all_dat[FilamentEmission_mA >= filament_lower_lim & FilamentEmission_mA <= filament_upper_lim ,flag_filament_auto:="V"] -def main(data_file, flag_type): - # Open data file and load dataset associated with flag_type : either diagnostics or species +def main(data_file, flag_type, capture_renku_metadata=False, workflow_name='generate_flags_workflow'): + + inputs = [] + outputs = [] + parameters = [] + + try: + # Load data and locate relevant dataset dataManager = dataOps.HDF5DataOpsManager(data_file) dataManager.load_file_obj() @@ -347,28 +353,24 @@ def main(data_file, flag_type): print(f'Invalid data file: {data_file}. Missing instrument folder ACSM_TOFWARE.') raise ImportError(f'Instrument folder "/ACSM_TOFWARE" not found in data_file : {data_file}') - - dataManager.extract_and_load_dataset_metadata() dataset_metadata_df = dataManager.dataset_metadata_df.copy() STATION_ABBR = load_project_yaml_files(projectPath,'campaignDescriptor.yaml')['station_abbr'] - # Find dataset associated with diagnostic channels + + # Find dataset associated with flag_type if flag_type == 'diagnostics': - keywords = [f'ACSM_{STATION_ABBR}_','_meta.txt/data_table'] - find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] + keywords = [f'ACSM_{STATION_ABBR}_','_meta.txt/data_table'] + elif flag_type == 'species': + keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table'] + elif flag_type == 'cpc': + keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas'] + else: + raise ValueError(f"Unsupported flag_type: {flag_type}") - if flag_type == 'species': - keywords = [f'ACSM_{STATION_ABBR}_','_timeseries.txt/data_table'] - find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] - - if flag_type == 'cpc': - keywords = ['cpc.particle_number_concentration.aerosol.', f'CH02L_TSI_3772_{STATION_ABBR}.CH02L_CPC.lev1.nas'] - find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] - - # Specify source dataset to be extracted from input hdf5 data file + find_keyword = [all(keyword in item for keyword in keywords) for item in dataset_metadata_df['dataset_name']] columns = ['dataset_name','parent_file','parent_instrument'] - dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns) - print(':)') + dataset_name, parent_file, parent_instrument = tuple(dataset_metadata_df.loc[find_keyword,col] for col in columns) + if not (dataset_name.size == 1): raise ValueError(f'{flag_type} file is not uniquely identifiable: {parent_file}') else: @@ -376,45 +378,35 @@ def main(data_file, flag_type): parent_file = parent_file.values[0] parent_instrument = parent_instrument.values[0] + # Extract data and timestamp data_table = dataManager.extract_dataset_as_dataframe(dataset_name) datetime_var, datetime_var_format = dataManager.infer_datetime_variable(dataset_name) - + dataManager.unload_file_obj() - # Count the number of NaT (null) values + # Report missing timestamps num_nats = data_table[datetime_var].isna().sum() - # Get the total number of rows total_rows = len(data_table) - # Calculate the percentage of NaT values percentage_nats = (num_nats / total_rows) * 100 print(f"Total rows: {total_rows}") print(f"NaT (missing) values: {num_nats}") print(f"Percentage of data loss: {percentage_nats:.4f}%") - - dataManager.unload_file_obj() - except Exception as e: + except Exception as e: print(f"Error loading input files: {e}") - exit(1) - finally: - dataManager.unload_file_obj() - - + return 1 print('Starting flag generation.') try: path_to_output_dir, ext = os.path.splitext(data_file) - print('Path to output directory :', path_to_output_dir) - # Define output directory of apply_calibration_factors() step suffix = 'flags' - if len(parent_instrument.split('/')) >= 2: - instFolder = parent_instrument.split('/')[0] - category = parent_instrument.split('/')[1] - else: - instFolder = parent_instrument.split('/')[0] - category = '' - path_to_output_folder, ext = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category])) - processingScriptRelPath = os.path.relpath(thisFilePath,start=projectPath) + # Parse folder/category from instrument + parts = parent_instrument.split('/') + instFolder = parts[0] + category = parts[1] if len(parts) >= 2 else '' + + path_to_output_folder = os.path.splitext('/'.join([path_to_output_dir,f'{instFolder}_{suffix}',category]))[0] + processingScriptRelPath = os.path.relpath(thisFilePath, start=projectPath) if not os.path.exists(path_to_output_folder): os.makedirs(path_to_output_folder) @@ -422,47 +414,115 @@ def main(data_file, flag_type): print('Processing script:', processingScriptRelPath) print('Output directory:', path_to_output_folder) - # Compute diagnostic flags based on validity thresholds defined in configuration_file_dict - + # Flagging logic if flag_type == 'diagnostics': - #validity_thresholds_dict = load_parameters(flag_type) validity_thresholds_dict = load_project_yaml_files(projectPath, "validity_thresholds.yaml") - flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict) - - if flag_type == 'species': - #calib_param_dict = load_parameters(flag_type) - calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml") - flags_table = generate_species_flags(data_table,calib_param_dict,path_to_output_folder,datetime_var) - if flag_type == 'cpc': - print(':D') + flags_table = generate_diagnostic_flags(data_table, validity_thresholds_dict) + elif flag_type == 'species': + calib_param_dict = load_project_yaml_files(projectPath, "calibration_params.yaml") + flags_table = generate_species_flags(data_table, calib_param_dict, path_to_output_folder, datetime_var) + elif flag_type == 'cpc': flags_table = generate_cpc_flags(data_table, datetime_var) - - metadata = {'actris_level' : 1, + + # Metadata for lineage + metadata = { + 'actris_level' : 1, 'processing_script': processingScriptRelPath.replace(os.sep,'/'), 'processing_date' : utils.created_at(), 'flag_type' : flag_type, 'datetime_var': datetime_var - } - - # Save output tables to csv file and save/or update data lineage record + } + filename, ext = os.path.splitext(parent_file) path_to_flags_file = '/'.join([path_to_output_folder, f'{filename}_flags.csv']) - #path_to_calibration_factors_file = '/'.join([path_to_output_folder, f'{filename}_calibration_factors.csv']) - - flags_table.to_csv(path_to_flags_file, index=False) + + # Save output and record lineage + flags_table.to_csv(path_to_flags_file, index=False) status = stepUtils.record_data_lineage(path_to_flags_file, projectPath, metadata) print(f"Flags saved to {path_to_flags_file}") print(f"Data lineage saved to {path_to_output_folder}") - - #flags_table.to_csv(path_to_flags_file, index=False) - - - # Read json and assign numeric flag to column except Exception as e: - print(f"Error during calibration: {e}") - exit(1) + print(f"Error during flag generation: {e}") + return 1 + + # --------------------- Renku Metadata Collection ---------------------------- + if capture_renku_metadata: + from workflows.utils import RenkuWorkflowBuilder + + inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)})) + inputs.append(("data_file", {'path': os.path.relpath(data_file, start=projectPath)})) + + # Parameter + parameters.append(("flag_type", {'value': flag_type})) + + # Add implicit YAML config + if flag_type == 'diagnostics': + inputs.append(("validity_thresholds_yaml", { + 'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/validity_thresholds.yaml"), start=projectPath), + 'implicit': True + })) + + elif flag_type == 'species': + inputs.append(("calibration_params_yaml", { + 'path': os.path.relpath(os.path.join(projectPath, "pipelines/params/calibration_params.yaml"), start=projectPath), + 'implicit': True + })) + + # Add CSV and JSON flags from flags folder as implicit inputs + + flag_index = 0 + for fname in os.listdir(path_to_output_folder): + full_path = os.path.join(path_to_output_folder, fname) + + # Skip the output file to avoid circular dependency + if os.path.abspath(full_path) == os.path.abspath(path_to_flags_file): + continue + + rel_flag_path = os.path.relpath(full_path, start=projectPath) + + if fname.endswith('.csv') or (fname.endswith('.json') and 'metadata' not in fname): + inputs.append((f"flag_in_{flag_index}", { + 'description': 'manual flag by domain expert' if fname.endswith('.json') else 'automated or cpc flag', + 'path': rel_flag_path, + 'implicit': True + + })) + flag_index += 1 + + #elif flag_type == 'cpc': + # CPC may require logic like species if any dependencies are found + # for fname in os.listdir(path_to_output_folder): + # rel_flag_path = os.path.relpath(os.path.join(path_to_output_folder, fname), start=projectPath) + # if fname.endswith('.nas') and ('cpc' in fname): + # inputs.append((f"flag_{fname}", { + # 'path': rel_flag_path, + # 'implicit': True + # })) + + # Output + outputs.append(("flags_csv", { + 'path': os.path.relpath(path_to_flags_file, start=projectPath), + 'implicit': True + })) + + # Define workflow step + workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name) + workflowfile_builder.add_step( + step_name=f"generate_flags_{flag_type}", + base_command="python", + inputs=inputs, + outputs=outputs, + parameters=parameters + ) + workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows')) + + + return 0 + + + def get_flags_from_folder(flagsFolderPath): diff --git a/pipelines/steps/prepare_ebas_submission.py b/pipelines/steps/prepare_ebas_submission.py index d440cbd..bfb5ab4 100644 --- a/pipelines/steps/prepare_ebas_submission.py +++ b/pipelines/steps/prepare_ebas_submission.py @@ -102,7 +102,13 @@ def parse_months(month_str: str) -> list: return sorted(months) -def main(paths_to_processed_files : list, path_to_flags : str, month : int = None): +def main(paths_to_processed_files : list, path_to_flags : str, month : str = None, capture_renku_metadata: bool = False, workflow_name: str = "ebas_submission_worflow"): + + + inputs = [] + outputs = [] + parameters = [] + # Set up argument parsing acum_df = join_tables(paths_to_processed_files) @@ -213,6 +219,36 @@ def main(paths_to_processed_files : list, path_to_flags : str, month : int = Non outdir = output_dir app.process(infile, acq_err_log, outdir=outdir) + # ------------------- Renku Metadata Collection ------------------------ + if capture_renku_metadata: + + from workflows.utils import RenkuWorkflowBuilder + + inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)})) + for idx, path in enumerate(paths_to_processed_files + [path_to_flags]): + inputs.append((f"in_{idx+1}", {'path': os.path.relpath(path, start=projectPath)})) + + inputs.append(('lod', {'path': os.path.relpath(os.path.join(projectPath,'pipelines/params/"limits_of_detection.yaml'), start=projectPath),'implicit': True})) + inputs.append(('station', {'path': os.path.relpath(os.path.join(projectPath,'pipelines/params/"station_params.yaml'), start=projectPath),'implicit': True})) + + outputs.append(("out_1", {'path': os.path.relpath(output_file1, start=projectPath), 'implicit': True})) + outputs.append(("out_2", {'path': os.path.relpath(output_file2, start=projectPath), 'implicit': True})) + + if month is not None: + parameters.append(("month_range", {'value': month})) + + workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name) + workflowfile_builder.add_step( + step_name=f"{workflow_name}_step", + base_command="python", + inputs=inputs, + outputs=outputs, + parameters=parameters + ) + workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows')) + + return 0 + if __name__ == '__main__': parser = argparse.ArgumentParser(description="Process and calibrate ACSM data for JFJ station.") diff --git a/pipelines/steps/update_datachain_params.py b/pipelines/steps/update_datachain_params.py index b0f13a8..f0c311c 100644 --- a/pipelines/steps/update_datachain_params.py +++ b/pipelines/steps/update_datachain_params.py @@ -89,10 +89,26 @@ def sync_yaml_files(src_filepath, dest_filepath): with open(dest_filepath, 'w') as dest_file: yaml.safe_dump(dest_yaml, dest_file, default_flow_style=False) print(f"Synchronized: {os.path.basename(src_filepath)}") + return 0 else: print(f"Structures do not match for {os.path.basename(src_filepath)}. Skipping synchronization.") + return -def main(path_to_data_file, instrument_folder): +from workflows.utils import RenkuWorkflowBuilder + +def main(path_to_data_file, instrument_folder, capture_renku_metadata = False, workflow_name='parameter_update_workflow'): + + + + inputs = [] + outputs = [] + parameters = [] + + # Collect input and parameters for renku workflow file + #inputs.append(('script.py',{'path' : os.path.relpath(__file__, start=os.getcwd())})) + inputs.append(('script_py',{'path' : os.path.relpath(__file__, start=projectPath)})) + inputs.append(('campaign_data_h5',{'path' : os.path.relpath(path_to_data_file, start=projectPath)})) + parameters.append(('instrument_folder', {'value':instrument_folder})) src_folder = os.path.normpath(os.path.join(os.path.splitext(path_to_data_file)[0],instrument_folder)) @@ -115,16 +131,36 @@ def main(path_to_data_file, instrument_folder): # Get list of files in source folder. # We assume we only need to process .yaml files. src_folder = os.path.normpath(os.path.join(src_folder,'params')) + cnt = 1 for filename in os.listdir(src_folder): if filename.endswith(".yaml"): - src_filepath = os.path.join(src_folder, filename) - dest_filepath = os.path.join(dest_folder, filename) + src_filepath = os.path.normpath(os.path.join(src_folder, filename)) + dest_filepath = os.path.normpath(os.path.join(dest_folder, filename)) + + # Proceed only if the destination file exists. if os.path.exists(dest_filepath): - sync_yaml_files(src_filepath, dest_filepath) + status = sync_yaml_files(src_filepath, dest_filepath) else: print(f"Destination YAML file not found for: {filename}") + # If yaml file synchronization successful add input output pair + if status==0: + inputs.append((f'in_{cnt}',{'path':os.path.relpath(src_filepath, start=projectPath),'implicit': True})) + outputs.append((f'out_{cnt}',{'path':os.path.relpath(dest_filepath, start=projectPath),'implicit': True})) + cnt += 1 + + # ---------------- Start Renku Workflow file generation ------------------------------------------------------------------------ + if capture_renku_metadata: + workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name) + workflowfile_builder.add_step(step_name='update_datachain_params', + base_command="python", + inputs=inputs, + outputs=outputs, + parameters = parameters) + workflowfile_builder.save_to_file(os.path.join(projectPath,'workflows')) # Will merge or create workflows/data-pipeline.yaml + + return 0 if __name__ == "__main__": @@ -144,5 +180,3 @@ if __name__ == "__main__": instrument_folder = args.instrument_folder main(path_to_data_file, instrument_folder) - - diff --git a/pipelines/steps/visualize_datatable_vars.py b/pipelines/steps/visualize_datatable_vars.py index 9981098..e4ce9a0 100644 --- a/pipelines/steps/visualize_datatable_vars.py +++ b/pipelines/steps/visualize_datatable_vars.py @@ -1,16 +1,34 @@ +import os +import sys +import yaml +import argparse + +try: + thisFilePath = os.path.abspath(__file__) + print(thisFilePath) +except NameError: + print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") + thisFilePath = os.getcwd() + +projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..", '..')) + +if projectPath not in sys.path: + sys.path.insert(0, projectPath) import dima.src.hdf5_ops as dataOps -import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import plotly.graph_objects as go -def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, x_var, y_vars, yaxis_range_dict = {'FlowRate_ccs' : [0,100]}): +def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, x_var, y_vars, + yaxis_range_dict={'FlowRate_ccs': [0, 100]}, + capture_renku_metadata=False, + workflow_name="visualize_table_variables"): - - if not os.path.exists(data_file_path): + if not os.path.exists(data_file_path): raise ValueError(f"Path to input file {data_file_path} does not exists. The parameter 'data_file_path' must be a valid path to a suitable HDF5 file. ") + APPEND_DIR = os.path.splitext(data_file_path)[0] if not os.path.exists(APPEND_DIR): APPEND_DIR = None @@ -19,81 +37,55 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, dataManager = dataOps.HDF5DataOpsManager(data_file_path) try: - # Load the dataset dataManager.load_file_obj() dataset_df = dataManager.extract_dataset_as_dataframe(dataset_name) except Exception as e: print(f"Exception occurred while loading dataset: {e}") finally: - # Unload file object to free resources dataManager.unload_file_obj() - # Flags dataset loading and processing try: - # Re-load the file for flags dataset dataManager.load_file_obj() flags_df = dataManager.extract_dataset_as_dataframe(flags_dataset_name) - # Ensure the time variable exists in both datasets if x_var not in dataset_df.columns and x_var not in flags_df.columns: raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.") - # Convert the x_var column to datetime in flags_df flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x.decode(encoding="utf-8"))) except Exception as e: dataManager.unload_file_obj() - # If loading from the file fails, attempt alternative path + if APPEND_DIR: - # Remove 'data_table' part from the path for alternate location if 'data_table' in flags_dataset_name: flags_dataset_name_parts = flags_dataset_name.split(sep='/') flags_dataset_name_parts.remove('data_table') - # Remove existing extension and append .csv base_path = os.path.join(APPEND_DIR, '/'.join(flags_dataset_name_parts)) alternative_path = os.path.splitext(base_path)[0] + '_flags.csv' - - # Attempt to read CSV + if not os.path.exists(alternative_path): raise FileNotFoundError( f"File not found at {alternative_path}. Ensure there are flags associated with {data_file_path}." ) flags_df = pd.read_csv(alternative_path) - # Ensure the time variable exists in both datasets if x_var not in dataset_df.columns and x_var not in flags_df.columns: raise ValueError(f"Invalid x_var: {x_var}. x_var must exist in both {dataset_name} and {flags_dataset_name}.") - # Apply datetime conversion on the x_var column in flags_df flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x: x)) finally: - # Ensure file object is unloaded after use dataManager.unload_file_obj() - - #if x_var not in dataset_df.columns and x_var not in flags_df.columns: - # raise ValueError(f'Invalid x_var : {x_var}. x_var must refer to a time variable name that is both in {dataset_name} and {flags_dataset_name}') - - #flags_df[x_var] = pd.to_datetime(flags_df[x_var].apply(lambda x : x.decode(encoding="utf-8"))) - - #dataManager.unload_file_obj() - if not all(var in dataset_df.columns for var in y_vars): raise ValueError(f'Invalid y_vars : {y_vars}. y_vars must be a subset of {dataset_df.columns}.') - #fig, ax = plt.subplots(len(y_vars), 1, figsize=(12, 5)) + figs = [] + output_paths = [] + figures_dir = os.path.join(projectPath, "figures") + os.makedirs(figures_dir, exist_ok=True) - figs = [] # store each figure for var_idx, var in enumerate(y_vars): - #y = dataset_df[var].to_numpy() - - # Plot Flow Rate - #fig = plt.figure(var_idx,figsize=(12, 2.5)) - #ax = plt.gca() - #ax.plot(dataset_df[x_var], dataset_df[var], label=var, alpha=0.8, color='tab:blue') - fig = go.Figure() - # Main line plot fig.add_trace(go.Scatter( x=dataset_df[x_var], y=dataset_df[var], @@ -102,40 +94,24 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, line=dict(color='blue'), opacity=0.8 )) - - # Specify flag name associated with var name in y_vars. By construction, it is assumed the name satisfy the following sufix convention. - var_flag_name = f"flag_{var}" - if var_flag_name in flags_df.columns: - - # Identify valid and invalid indices + var_flag_name = f"flag_{var}" + if var_flag_name in flags_df.columns: ind_invalid = flags_df[var_flag_name].to_numpy() - # ind_valid = np.logical_not(ind_valid) - # Detect start and end indices of invalid regions - # Find transition points in invalid regions invalid_starts = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][::2] invalid_ends = np.diff(np.concatenate(([False], ind_invalid, [False]))).nonzero()[0][1::2] + t_base = dataset_df[x_var] - # Fill invalid regions - t_base = dataset_df[x_var] #.to_numpy() - y_min, y_max = dataset_df[var].min(), dataset_df[var].max() - max_idx = len(t_base) - 1 # maximum valid index + max_idx = len(t_base) - 1 for start, end in zip(invalid_starts, invalid_ends): - if start >= end: print(f"Warning: Skipping invalid interval — start ({start}) >= end ({end})") - continue # Clip start and end to valid index range + continue start = max(0, start) end = min(end, max_idx) - - #ax.fill_betweenx([dataset_df[var].min(), dataset_df[var].max()], t_base[start], t_base[end], - # color='red', alpha=0.3, label="Invalid Data" if start == invalid_starts[0] else "") - # start = max(0, start) - - fig.add_shape( type="rect", x0=t_base[start], x1=t_base[end], @@ -145,7 +121,7 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, line_width=0, layer="below" ) - # Add a dummy invisible trace just for the legend + fig.add_trace(go.Scatter( x=[None], y=[None], mode='markers', @@ -153,41 +129,85 @@ def visualize_table_variables(data_file_path, dataset_name, flags_dataset_name, name='Invalid Region' )) - # Labels and Legends - #ax.set_xlabel(x_var) - #ax.set_ylabel(var) - #ax.legend() - #ax.grid(True) - - #plt.tight_layout() - #plt.show() - - #return fig, ax if var in yaxis_range_dict: y_axis_range = yaxis_range_dict[var] else: y_axis_range = [dataset_df[var].min(), dataset_df[var].max()] - - print('y axis range:',y_axis_range) - - # Add layout - fig.update_layout( - title=f"{var} over {x_var}", - xaxis_title=x_var, - yaxis_title=var, - xaxis_range = [t_base.min(), t_base.max()], - yaxis_range = y_axis_range, - showlegend=True, - height=300, - margin=dict(l=40, r=20, t=40, b=40), - legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1) - ) - fig.show() + fig.update_layout( + title=f"{var} over {x_var}", + xaxis_title=x_var, + yaxis_title=var, + xaxis_range=[t_base.min(), t_base.max()], + yaxis_range=y_axis_range, + showlegend=True, + height=300, + margin=dict(l=40, r=20, t=40, b=40), + legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1) + ) + + fig_path = os.path.join(figures_dir, f"fig_{var_idx}_{var}.html") + fig.write_html(fig_path) + output_paths.append(fig_path) figs.append(fig) - # Optionally return figs if needed - return figs - + # Display figure in notebook + fig.show() - \ No newline at end of file + inputs = [] + outputs = [] + parameters = [] + + if capture_renku_metadata: + from workflows.utils import RenkuWorkflowBuilder + + inputs.append(("script_py", {'path': os.path.relpath(thisFilePath, start=projectPath)})) + inputs.append(("data_file", {'path': os.path.relpath(data_file_path, start=projectPath)})) + # Track alternative path if used + if 'alternative_path' in locals(): + inputs.append(("alternative_flags_csv", { + 'path': os.path.relpath(alternative_path, start=projectPath), + 'implicit' : True + })) + + for fig_path in output_paths: + outputs.append((os.path.splitext(os.path.basename(fig_path))[0], + {'path': os.path.relpath(fig_path, start=projectPath)})) + + parameters.append(("dataset_name", {'value': dataset_name})) + parameters.append(("flags_dataset_name", {'value': flags_dataset_name})) + parameters.append(("x_var", {'value': x_var})) + parameters.append(("y_vars", {'value': y_vars})) + + workflowfile_builder = RenkuWorkflowBuilder(name=workflow_name) + workflowfile_builder.add_step( + step_name=workflow_name, + base_command="python", + inputs=inputs, + outputs=outputs, + parameters=parameters + ) + workflowfile_builder.save_to_file(os.path.join(projectPath, 'workflows')) + + return 0 + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="Visualize table variables and associated flags.") + + parser.add_argument("data_file_path", type=str, help="Path to HDF5 file") + parser.add_argument("dataset_name", type=str, help="Dataset name in HDF5 file") + parser.add_argument("flags_dataset_name", type=str, help="Flags dataset name") + parser.add_argument("x_var", type=str, help="Time variable (x-axis)") + parser.add_argument("y_vars", nargs='+', help="List of y-axis variable names") + parser.add_argument("--capture_renku_metadata", action="store_true", help="Flag to capture Renku workflow metadata") + + args = parser.parse_args() + + visualize_table_variables( + data_file_path=args.data_file_path, + dataset_name=args.dataset_name, + flags_dataset_name=args.flags_dataset_name, + x_var=args.x_var, + y_vars=args.y_vars, + capture_renku_metadata=args.capture_renku_metadata + )