mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-25 05:31:09 +02:00
108 lines
4.2 KiB
Python
108 lines
4.2 KiB
Python
import yaml
|
|
import os
|
|
from collections import OrderedDict
|
|
|
|
def generate_command(base_command='python', inputs=None, outputs=None, parameters=None):
|
|
inputs = inputs or []
|
|
outputs = outputs or []
|
|
placeholders = [
|
|
f"${name}"
|
|
for name, value in inputs + parameters + outputs
|
|
if isinstance(value, dict) and not value.get('implicit', False)
|
|
]
|
|
return f"{base_command} {' '.join(placeholders)}"
|
|
|
|
class RenkuWorkflowBuilder:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.steps = OrderedDict()
|
|
|
|
def add_step(self, step_name, base_command, inputs=None, outputs=None, parameters=None):
|
|
command = generate_command(base_command, inputs, outputs,parameters)
|
|
step = {
|
|
'command': command
|
|
}
|
|
if inputs:
|
|
step['inputs'] = [{key: value} for key, value in inputs]
|
|
if outputs:
|
|
step['outputs'] = [{key: value} for key, value in outputs]
|
|
if parameters:
|
|
step['parameters'] = [{key: value} for key, value in parameters]
|
|
self.steps[step_name] = step
|
|
|
|
@staticmethod
|
|
def parse_workflow(yaml_content: str):
|
|
data = yaml.safe_load(yaml_content)
|
|
builder = RenkuWorkflowBuilder(data['name'])
|
|
|
|
for step_name, step_def in data['steps'].items():
|
|
command = step_def.get('command')
|
|
inputs = step_def.get('inputs', [])
|
|
outputs = step_def.get('outputs', [])
|
|
parameters = step_def.get('parameters', [])
|
|
builder.steps[step_name] = {
|
|
'command': command,
|
|
'inputs': inputs,
|
|
'outputs': outputs,
|
|
'parameters': parameters
|
|
}
|
|
return builder
|
|
|
|
@staticmethod
|
|
def from_file(filepath):
|
|
if not os.path.exists(filepath):
|
|
return None
|
|
with open(filepath, 'r') as f:
|
|
return RenkuWorkflowBuilder.parse_workflow(f.read())
|
|
|
|
def to_dict(self):
|
|
for step_name, step_value in self.steps.items():
|
|
for segment in ['inputs','outputs']:
|
|
for item in step_value.get(segment,[]):# ['inputs', 'outputs']:
|
|
# Go over either inputs or outputs
|
|
for name, value in item.items():
|
|
if not isinstance(value, dict):
|
|
raise ValueError(f"Invalid input. Step {step_name} must have {segment} of either dict or str type.")
|
|
if isinstance(value, str):
|
|
continue
|
|
if isinstance(value, dict) and 'path' in value.keys():
|
|
value['path'] = value['path'].replace(os.sep, '/')
|
|
|
|
return {
|
|
'name': self.name,
|
|
'steps': { key : value for key, value in self.steps.items()}
|
|
}
|
|
|
|
def to_yaml(self):
|
|
return yaml.dump(self.to_dict(), sort_keys=False)
|
|
|
|
def append_from(self, other, force=False):
|
|
if other.name != self.name:
|
|
raise ValueError(f"Cannot merge workflows with different names: {self.name} != {other.name}")
|
|
|
|
for step_name, step_def in other.steps.items():
|
|
if step_name in self.steps:
|
|
if force:
|
|
print(f"[Overwriting] Step '{step_name}' was overwritten as 'force=True'.")
|
|
self.steps[step_name] = step_def
|
|
else:
|
|
print(f"[Skipping] Step '{step_name}' already exists. Use 'force=True' to overwrite.")
|
|
else:
|
|
self.steps[step_name] = step_def
|
|
|
|
def save_to_file(self, directory):
|
|
os.makedirs(directory, exist_ok=True)
|
|
filepath = os.path.join(directory, f"{self.name}.yaml")
|
|
|
|
if os.path.exists(filepath):
|
|
existing = RenkuWorkflowBuilder.from_file(filepath)
|
|
if existing and existing.name == self.name:
|
|
existing.append_from(self)
|
|
with open(filepath, 'w') as f:
|
|
f.write(existing.to_yaml())
|
|
return
|
|
# Save as new
|
|
with open(filepath, 'w') as f:
|
|
f.write(self.to_yaml())
|
|
|