diff --git a/workflows/my-workflow.yaml b/workflows/my-workflow.yaml new file mode 100644 index 0000000..8eac388 --- /dev/null +++ b/workflows/my-workflow.yaml @@ -0,0 +1,37 @@ +# === Welcome to the template Renku Workflow file! === +# You can use this file to encode in what order your data processing steps should be run, +# making it easier for you to run your workflow, and for others to understand it! + +# === How to use this template === +# Replace the script and data paths in the template below to match your analysis commands. +# Then, run `renku run my-workflow.yaml` in a terminal to execute the workflow! +# If you are working in a notebook, run `! renku run my-workflow.yaml` in a notebook cell. + +# === Docs === +# To learn much more about what you can do with the Renku Workflow File, see our docs: +# https://renku.readthedocs.io/en/stable/topic-guides/workflows/workflow-file.html + +name: my-workflow +steps: + step-one: + command: python $n $my-script $input-data $output-data + inputs: + - my-script: + path: src/script.py + - input-data: + path: data/input/sample_data.csv + outputs: + - output-data: + path: data/output/results.csv + parameters: + - n: + prefix: -n + value: 10 + + # === Adding more steps === + # You can add as many steps as you want to your workflow by copy and pasting the step template above + # TIP: To run just one step from a workflow, simply add the step name to the command, like this: + # `renku run my-workflow.yaml make-plot` + # make-plot: + # command: python $another-script $output-data $my-plot + # ... diff --git a/workflows/utils.py b/workflows/utils.py new file mode 100644 index 0000000..9d4ff5d --- /dev/null +++ b/workflows/utils.py @@ -0,0 +1,107 @@ +import yaml +import os +from collections import OrderedDict + +def generate_command(base_command='python', inputs=None, outputs=None, parameters=None): + inputs = inputs or [] + outputs = outputs or [] + placeholders = [ + f"${name}" + for name, value in inputs + parameters + outputs + if isinstance(value, dict) and not value.get('implicit', False) + ] + return f"{base_command} {' '.join(placeholders)}" + +class RenkuWorkflowBuilder: + def __init__(self, name): + self.name = name + self.steps = OrderedDict() + + def add_step(self, step_name, base_command, inputs=None, outputs=None, parameters=None): + command = generate_command(base_command, inputs, outputs,parameters) + step = { + 'command': command + } + if inputs: + step['inputs'] = [{key: value} for key, value in inputs] + if outputs: + step['outputs'] = [{key: value} for key, value in outputs] + if parameters: + step['parameters'] = [{key: value} for key, value in parameters] + self.steps[step_name] = step + + @staticmethod + def parse_workflow(yaml_content: str): + data = yaml.safe_load(yaml_content) + builder = RenkuWorkflowBuilder(data['name']) + + for step_name, step_def in data['steps'].items(): + command = step_def.get('command') + inputs = step_def.get('inputs', []) + outputs = step_def.get('outputs', []) + parameters = step_def.get('parameters', []) + builder.steps[step_name] = { + 'command': command, + 'inputs': inputs, + 'outputs': outputs, + 'parameters': parameters + } + return builder + + @staticmethod + def from_file(filepath): + if not os.path.exists(filepath): + return None + with open(filepath, 'r') as f: + return RenkuWorkflowBuilder.parse_workflow(f.read()) + + def to_dict(self): + for step_name, step_value in self.steps.items(): + for segment in ['inputs','outputs']: + for item in step_value.get(segment,[]):# ['inputs', 'outputs']: + # Go over either inputs or outputs + for name, value in item.items(): + if not isinstance(value, dict): + raise ValueError(f"Invalid input. Step {step_name} must have {segment} of either dict or str type.") + if isinstance(value, str): + continue + if isinstance(value, dict) and 'path' in value.keys(): + value['path'] = value['path'].replace(os.sep, '/') + + return { + 'name': self.name, + 'steps': { key : value for key, value in self.steps.items()} + } + + def to_yaml(self): + return yaml.dump(self.to_dict(), sort_keys=False) + + def append_from(self, other, force=False): + if other.name != self.name: + raise ValueError(f"Cannot merge workflows with different names: {self.name} != {other.name}") + + for step_name, step_def in other.steps.items(): + if step_name in self.steps: + if force: + print(f"[Overwriting] Step '{step_name}' was overwritten as 'force=True'.") + self.steps[step_name] = step_def + else: + print(f"[Skipping] Step '{step_name}' already exists. Use 'force=True' to overwrite.") + else: + self.steps[step_name] = step_def + + def save_to_file(self, directory): + os.makedirs(directory, exist_ok=True) + filepath = os.path.join(directory, f"{self.name}.yaml") + + if os.path.exists(filepath): + existing = RenkuWorkflowBuilder.from_file(filepath) + if existing and existing.name == self.name: + existing.append_from(self) + with open(filepath, 'w') as f: + f.write(existing.to_yaml()) + return + # Save as new + with open(filepath, 'w') as f: + f.write(self.to_yaml()) + diff --git a/workflows/workflow_acsm_data_PAY_2024.yaml b/workflows/workflow_acsm_data_PAY_2024.yaml new file mode 100644 index 0000000..2c55408 --- /dev/null +++ b/workflows/workflow_acsm_data_PAY_2024.yaml @@ -0,0 +1,184 @@ +name: workflow_acsm_data_PAY_2024 +steps: + update_datachain_params: + command: python $script_py $campaign_data_h5 $instrument_folder + inputs: + - script_py: + path: pipelines/steps/update_datachain_params.py + - campaign_data_h5: + path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5 + - in_1: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/calibration_params.yaml + implicit: true + - in_2: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/limits_of_detection.yaml + implicit: true + - in_3: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/validity_thresholds.yaml + implicit: true + outputs: + - out_1: + path: pipelines/params/calibration_params.yaml + implicit: true + - out_2: + path: pipelines/params/limits_of_detection.yaml + implicit: true + - out_3: + path: pipelines/params/validity_thresholds.yaml + implicit: true + parameters: + - instrument_folder: + value: ACSM_TOFWARE/2024 + apply_calibration_factors: + command: python $script_py $campaign_data_h5 $calib_yaml + inputs: + - script_py: + path: pipelines/steps/apply_calibration_factors.py + - campaign_data_h5: + path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5 + - calib_yaml: + path: pipelines/params/calibration_factors.yaml + - data_descriptor_yaml: + path: campaignDescriptor.yaml + implicit: true + outputs: + - out_1: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv + implicit: true + - out_2: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv + implicit: true + - out_3: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv + implicit: true + parameters: [] + generate_flags_diagnostics: + command: python $script_py $data_file $flag_type + inputs: + - script_py: + path: pipelines/steps/generate_flags.py + - data_file: + path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5 + - validity_thresholds_yaml: + path: pipelines/params/validity_thresholds.yaml + implicit: true + outputs: + - flags_csv: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv + implicit: true + parameters: + - flag_type: + value: diagnostics + generate_flags_species: + command: python $script_py $data_file $flag_type + inputs: + - script_py: + path: pipelines/steps/generate_flags.py + - data_file: + path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5 + - calibration_params_yaml: + path: pipelines/params/calibration_params.yaml + implicit: true + - flag_in_0: + description: automated or cpc flag + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv + implicit: true + outputs: + - flags_csv: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv + implicit: true + parameters: + - flag_type: + value: species + prepare_ebas_submission: + command: python $script_py $in_1 $in_2 $in_3 $in_4 $month_range + inputs: + - script_py: + path: pipelines/steps/prepare_ebas_submission.py + - in_1: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv + - in_2: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv + - in_3: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv + - in_4: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv + - lod: + path: pipelines/params/"limits_of_detection.yaml + implicit: true + - station: + path: pipelines/params/"station_params.yaml + implicit: true + outputs: + - out_1: + path: data/PAY_ACSM-092_2024.txt + implicit: true + - out_2: + path: data/PAY_ACSM-092_FLAGS_2024.txt + implicit: true + parameters: + - month_range: + value: 2-3 + visualize_diagnostic_variables: + command: python $script_py $data_file $dataset_name $flags_dataset_name $x_var + $y_vars $fig_0_VaporizerTemp_C $fig_1_FlowRate_ccs $fig_2_FilamentEmission_mA + $fig_3_ABsamp + inputs: + - script_py: + path: pipelines/steps/visualize_datatable_vars.py + - data_file: + path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5 + - alternative_flags_csv: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv + implicit: true + outputs: + - fig_0_VaporizerTemp_C: + path: figures/fig_0_VaporizerTemp_C.html + - fig_1_FlowRate_ccs: + path: figures/fig_1_FlowRate_ccs.html + - fig_2_FilamentEmission_mA: + path: figures/fig_2_FilamentEmission_mA.html + - fig_3_ABsamp: + path: figures/fig_3_ABsamp.html + parameters: + - dataset_name: + value: ACSM_TOFWARE/2024/ACSM_PAY_2024_meta.txt/data_table + - flags_dataset_name: + value: ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta.txt/data_table + - x_var: + value: t_base + - y_vars: + value: + - VaporizerTemp_C + - FlowRate_ccs + - FilamentEmission_mA + - ABsamp + workflow_acsm_data_PAY_2024_step: + command: python $script_py $in_1 $in_2 $in_3 $in_4 $month_range + inputs: + - script_py: + path: pipelines/steps/prepare_ebas_submission.py + - in_1: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv + - in_2: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv + - in_3: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv + - in_4: + path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv + - lod: + path: pipelines/params/"limits_of_detection.yaml + implicit: true + - station: + path: pipelines/params/"station_params.yaml + implicit: true + outputs: + - out_1: + path: data/PAY_ACSM-092_2024.txt + implicit: true + - out_2: + path: data/PAY_ACSM-092_FLAGS_2024.txt + implicit: true + parameters: + - month_range: + value: 2-3