mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-24 21:21:08 +02:00
Added workflows folder and utils.py to support workflow generation
This commit is contained in:
37
workflows/my-workflow.yaml
Normal file
37
workflows/my-workflow.yaml
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
# === Welcome to the template Renku Workflow file! ===
|
||||||
|
# You can use this file to encode in what order your data processing steps should be run,
|
||||||
|
# making it easier for you to run your workflow, and for others to understand it!
|
||||||
|
|
||||||
|
# === How to use this template ===
|
||||||
|
# Replace the script and data paths in the template below to match your analysis commands.
|
||||||
|
# Then, run `renku run my-workflow.yaml` in a terminal to execute the workflow!
|
||||||
|
# If you are working in a notebook, run `! renku run my-workflow.yaml` in a notebook cell.
|
||||||
|
|
||||||
|
# === Docs ===
|
||||||
|
# To learn much more about what you can do with the Renku Workflow File, see our docs:
|
||||||
|
# https://renku.readthedocs.io/en/stable/topic-guides/workflows/workflow-file.html
|
||||||
|
|
||||||
|
name: my-workflow
|
||||||
|
steps:
|
||||||
|
step-one:
|
||||||
|
command: python $n $my-script $input-data $output-data
|
||||||
|
inputs:
|
||||||
|
- my-script:
|
||||||
|
path: src/script.py
|
||||||
|
- input-data:
|
||||||
|
path: data/input/sample_data.csv
|
||||||
|
outputs:
|
||||||
|
- output-data:
|
||||||
|
path: data/output/results.csv
|
||||||
|
parameters:
|
||||||
|
- n:
|
||||||
|
prefix: -n
|
||||||
|
value: 10
|
||||||
|
|
||||||
|
# === Adding more steps ===
|
||||||
|
# You can add as many steps as you want to your workflow by copy and pasting the step template above
|
||||||
|
# TIP: To run just one step from a workflow, simply add the step name to the command, like this:
|
||||||
|
# `renku run my-workflow.yaml make-plot`
|
||||||
|
# make-plot:
|
||||||
|
# command: python $another-script $output-data $my-plot
|
||||||
|
# ...
|
107
workflows/utils.py
Normal file
107
workflows/utils.py
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
import yaml
|
||||||
|
import os
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
def generate_command(base_command='python', inputs=None, outputs=None, parameters=None):
|
||||||
|
inputs = inputs or []
|
||||||
|
outputs = outputs or []
|
||||||
|
placeholders = [
|
||||||
|
f"${name}"
|
||||||
|
for name, value in inputs + parameters + outputs
|
||||||
|
if isinstance(value, dict) and not value.get('implicit', False)
|
||||||
|
]
|
||||||
|
return f"{base_command} {' '.join(placeholders)}"
|
||||||
|
|
||||||
|
class RenkuWorkflowBuilder:
|
||||||
|
def __init__(self, name):
|
||||||
|
self.name = name
|
||||||
|
self.steps = OrderedDict()
|
||||||
|
|
||||||
|
def add_step(self, step_name, base_command, inputs=None, outputs=None, parameters=None):
|
||||||
|
command = generate_command(base_command, inputs, outputs,parameters)
|
||||||
|
step = {
|
||||||
|
'command': command
|
||||||
|
}
|
||||||
|
if inputs:
|
||||||
|
step['inputs'] = [{key: value} for key, value in inputs]
|
||||||
|
if outputs:
|
||||||
|
step['outputs'] = [{key: value} for key, value in outputs]
|
||||||
|
if parameters:
|
||||||
|
step['parameters'] = [{key: value} for key, value in parameters]
|
||||||
|
self.steps[step_name] = step
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_workflow(yaml_content: str):
|
||||||
|
data = yaml.safe_load(yaml_content)
|
||||||
|
builder = RenkuWorkflowBuilder(data['name'])
|
||||||
|
|
||||||
|
for step_name, step_def in data['steps'].items():
|
||||||
|
command = step_def.get('command')
|
||||||
|
inputs = step_def.get('inputs', [])
|
||||||
|
outputs = step_def.get('outputs', [])
|
||||||
|
parameters = step_def.get('parameters', [])
|
||||||
|
builder.steps[step_name] = {
|
||||||
|
'command': command,
|
||||||
|
'inputs': inputs,
|
||||||
|
'outputs': outputs,
|
||||||
|
'parameters': parameters
|
||||||
|
}
|
||||||
|
return builder
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_file(filepath):
|
||||||
|
if not os.path.exists(filepath):
|
||||||
|
return None
|
||||||
|
with open(filepath, 'r') as f:
|
||||||
|
return RenkuWorkflowBuilder.parse_workflow(f.read())
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
for step_name, step_value in self.steps.items():
|
||||||
|
for segment in ['inputs','outputs']:
|
||||||
|
for item in step_value.get(segment,[]):# ['inputs', 'outputs']:
|
||||||
|
# Go over either inputs or outputs
|
||||||
|
for name, value in item.items():
|
||||||
|
if not isinstance(value, dict):
|
||||||
|
raise ValueError(f"Invalid input. Step {step_name} must have {segment} of either dict or str type.")
|
||||||
|
if isinstance(value, str):
|
||||||
|
continue
|
||||||
|
if isinstance(value, dict) and 'path' in value.keys():
|
||||||
|
value['path'] = value['path'].replace(os.sep, '/')
|
||||||
|
|
||||||
|
return {
|
||||||
|
'name': self.name,
|
||||||
|
'steps': { key : value for key, value in self.steps.items()}
|
||||||
|
}
|
||||||
|
|
||||||
|
def to_yaml(self):
|
||||||
|
return yaml.dump(self.to_dict(), sort_keys=False)
|
||||||
|
|
||||||
|
def append_from(self, other, force=False):
|
||||||
|
if other.name != self.name:
|
||||||
|
raise ValueError(f"Cannot merge workflows with different names: {self.name} != {other.name}")
|
||||||
|
|
||||||
|
for step_name, step_def in other.steps.items():
|
||||||
|
if step_name in self.steps:
|
||||||
|
if force:
|
||||||
|
print(f"[Overwriting] Step '{step_name}' was overwritten as 'force=True'.")
|
||||||
|
self.steps[step_name] = step_def
|
||||||
|
else:
|
||||||
|
print(f"[Skipping] Step '{step_name}' already exists. Use 'force=True' to overwrite.")
|
||||||
|
else:
|
||||||
|
self.steps[step_name] = step_def
|
||||||
|
|
||||||
|
def save_to_file(self, directory):
|
||||||
|
os.makedirs(directory, exist_ok=True)
|
||||||
|
filepath = os.path.join(directory, f"{self.name}.yaml")
|
||||||
|
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
existing = RenkuWorkflowBuilder.from_file(filepath)
|
||||||
|
if existing and existing.name == self.name:
|
||||||
|
existing.append_from(self)
|
||||||
|
with open(filepath, 'w') as f:
|
||||||
|
f.write(existing.to_yaml())
|
||||||
|
return
|
||||||
|
# Save as new
|
||||||
|
with open(filepath, 'w') as f:
|
||||||
|
f.write(self.to_yaml())
|
||||||
|
|
184
workflows/workflow_acsm_data_PAY_2024.yaml
Normal file
184
workflows/workflow_acsm_data_PAY_2024.yaml
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
name: workflow_acsm_data_PAY_2024
|
||||||
|
steps:
|
||||||
|
update_datachain_params:
|
||||||
|
command: python $script_py $campaign_data_h5 $instrument_folder
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/update_datachain_params.py
|
||||||
|
- campaign_data_h5:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5
|
||||||
|
- in_1:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/calibration_params.yaml
|
||||||
|
implicit: true
|
||||||
|
- in_2:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/limits_of_detection.yaml
|
||||||
|
implicit: true
|
||||||
|
- in_3:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE/2024/params/validity_thresholds.yaml
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- out_1:
|
||||||
|
path: pipelines/params/calibration_params.yaml
|
||||||
|
implicit: true
|
||||||
|
- out_2:
|
||||||
|
path: pipelines/params/limits_of_detection.yaml
|
||||||
|
implicit: true
|
||||||
|
- out_3:
|
||||||
|
path: pipelines/params/validity_thresholds.yaml
|
||||||
|
implicit: true
|
||||||
|
parameters:
|
||||||
|
- instrument_folder:
|
||||||
|
value: ACSM_TOFWARE/2024
|
||||||
|
apply_calibration_factors:
|
||||||
|
command: python $script_py $campaign_data_h5 $calib_yaml
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/apply_calibration_factors.py
|
||||||
|
- campaign_data_h5:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5
|
||||||
|
- calib_yaml:
|
||||||
|
path: pipelines/params/calibration_factors.yaml
|
||||||
|
- data_descriptor_yaml:
|
||||||
|
path: campaignDescriptor.yaml
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- out_1:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv
|
||||||
|
implicit: true
|
||||||
|
- out_2:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv
|
||||||
|
implicit: true
|
||||||
|
- out_3:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv
|
||||||
|
implicit: true
|
||||||
|
parameters: []
|
||||||
|
generate_flags_diagnostics:
|
||||||
|
command: python $script_py $data_file $flag_type
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/generate_flags.py
|
||||||
|
- data_file:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5
|
||||||
|
- validity_thresholds_yaml:
|
||||||
|
path: pipelines/params/validity_thresholds.yaml
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- flags_csv:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv
|
||||||
|
implicit: true
|
||||||
|
parameters:
|
||||||
|
- flag_type:
|
||||||
|
value: diagnostics
|
||||||
|
generate_flags_species:
|
||||||
|
command: python $script_py $data_file $flag_type
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/generate_flags.py
|
||||||
|
- data_file:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5
|
||||||
|
- calibration_params_yaml:
|
||||||
|
path: pipelines/params/calibration_params.yaml
|
||||||
|
implicit: true
|
||||||
|
- flag_in_0:
|
||||||
|
description: automated or cpc flag
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- flags_csv:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv
|
||||||
|
implicit: true
|
||||||
|
parameters:
|
||||||
|
- flag_type:
|
||||||
|
value: species
|
||||||
|
prepare_ebas_submission:
|
||||||
|
command: python $script_py $in_1 $in_2 $in_3 $in_4 $month_range
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/prepare_ebas_submission.py
|
||||||
|
- in_1:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv
|
||||||
|
- in_2:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv
|
||||||
|
- in_3:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv
|
||||||
|
- in_4:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv
|
||||||
|
- lod:
|
||||||
|
path: pipelines/params/"limits_of_detection.yaml
|
||||||
|
implicit: true
|
||||||
|
- station:
|
||||||
|
path: pipelines/params/"station_params.yaml
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- out_1:
|
||||||
|
path: data/PAY_ACSM-092_2024.txt
|
||||||
|
implicit: true
|
||||||
|
- out_2:
|
||||||
|
path: data/PAY_ACSM-092_FLAGS_2024.txt
|
||||||
|
implicit: true
|
||||||
|
parameters:
|
||||||
|
- month_range:
|
||||||
|
value: 2-3
|
||||||
|
visualize_diagnostic_variables:
|
||||||
|
command: python $script_py $data_file $dataset_name $flags_dataset_name $x_var
|
||||||
|
$y_vars $fig_0_VaporizerTemp_C $fig_1_FlowRate_ccs $fig_2_FilamentEmission_mA
|
||||||
|
$fig_3_ABsamp
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/visualize_datatable_vars.py
|
||||||
|
- data_file:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06.h5
|
||||||
|
- alternative_flags_csv:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta_flags.csv
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- fig_0_VaporizerTemp_C:
|
||||||
|
path: figures/fig_0_VaporizerTemp_C.html
|
||||||
|
- fig_1_FlowRate_ccs:
|
||||||
|
path: figures/fig_1_FlowRate_ccs.html
|
||||||
|
- fig_2_FilamentEmission_mA:
|
||||||
|
path: figures/fig_2_FilamentEmission_mA.html
|
||||||
|
- fig_3_ABsamp:
|
||||||
|
path: figures/fig_3_ABsamp.html
|
||||||
|
parameters:
|
||||||
|
- dataset_name:
|
||||||
|
value: ACSM_TOFWARE/2024/ACSM_PAY_2024_meta.txt/data_table
|
||||||
|
- flags_dataset_name:
|
||||||
|
value: ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_meta.txt/data_table
|
||||||
|
- x_var:
|
||||||
|
value: t_base
|
||||||
|
- y_vars:
|
||||||
|
value:
|
||||||
|
- VaporizerTemp_C
|
||||||
|
- FlowRate_ccs
|
||||||
|
- FilamentEmission_mA
|
||||||
|
- ABsamp
|
||||||
|
workflow_acsm_data_PAY_2024_step:
|
||||||
|
command: python $script_py $in_1 $in_2 $in_3 $in_4 $month_range
|
||||||
|
inputs:
|
||||||
|
- script_py:
|
||||||
|
path: pipelines/steps/prepare_ebas_submission.py
|
||||||
|
- in_1:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated.csv
|
||||||
|
- in_2:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibrated_err.csv
|
||||||
|
- in_3:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_processed/2024/ACSM_PAY_2024_timeseries_calibration_factors.csv
|
||||||
|
- in_4:
|
||||||
|
path: data/collection_PAY_2024_2025-06-06_2025-06-06/ACSM_TOFWARE_flags/2024/ACSM_PAY_2024_timeseries_flags.csv
|
||||||
|
- lod:
|
||||||
|
path: pipelines/params/"limits_of_detection.yaml
|
||||||
|
implicit: true
|
||||||
|
- station:
|
||||||
|
path: pipelines/params/"station_params.yaml
|
||||||
|
implicit: true
|
||||||
|
outputs:
|
||||||
|
- out_1:
|
||||||
|
path: data/PAY_ACSM-092_2024.txt
|
||||||
|
implicit: true
|
||||||
|
- out_2:
|
||||||
|
path: data/PAY_ACSM-092_FLAGS_2024.txt
|
||||||
|
implicit: true
|
||||||
|
parameters:
|
||||||
|
- month_range:
|
||||||
|
value: 2-3
|
Reference in New Issue
Block a user