diff --git a/workflows/utils.py b/workflows/utils.py index 3b34705..29b1961 100644 --- a/workflows/utils.py +++ b/workflows/utils.py @@ -1,5 +1,22 @@ -import yaml import os +import sys +import inspect +try: + thisFilePath = os.path.abspath(__file__) + print(thisFilePath) +except NameError: + print("[Notice] The __file__ attribute is unavailable in this environment (e.g., Jupyter or IDLE).") + print("When using a terminal, make sure the working directory is set to the script's location to prevent path issues (for the DIMA submodule)") + #print("Otherwise, path to submodule DIMA may not be resolved properly.") + thisFilePath = os.getcwd() # Use current directory or specify a default + + +projectPath = os.path.normpath(os.path.join(thisFilePath, "..", "..")) # Move up to project root + +if projectPath not in sys.path: + sys.path.insert(0,projectPath) + +import yaml from collections import OrderedDict def generate_command(base_command='python', inputs=None, outputs=None, parameters=None): @@ -12,85 +29,170 @@ def generate_command(base_command='python', inputs=None, outputs=None, parameter ] return f"{base_command} {' '.join(placeholders)}" -class RenkuWorkflowBuilder: +class RenkuWorkflowBuilder: + """ + Build and manage a Renku workflow definition (YAML-based). + Steps can be added, merged, serialized, and reloaded. + """ + def __init__(self, name): + """ + Initialize a workflow builder. + + Args: + name (str): Workflow name. + """ self.name = name + self.directory_path = os.path.join(projectPath, 'workflows').replace(os.sep, '/') self.steps = OrderedDict() - def add_step(self, step_name, base_command, inputs=None, outputs=None, parameters=None): - command = generate_command(base_command, inputs, outputs,parameters) - step = { - 'command': command - } - if inputs: - step['inputs'] = [{key: value} for key, value in inputs] - if outputs: - step['outputs'] = [{key: value} for key, value in outputs] - if parameters: - step['parameters'] = [{key: value} for key, value in parameters] - self.steps[step_name] = step + @staticmethod + def _hash_content(step_def): + """ + Compute a stable hash of a step definition (for collision detection). + """ + import json, hashlib + content_str = json.dumps(step_def, sort_keys=True, default=str) + return hashlib.md5(content_str.encode()).hexdigest() + + @staticmethod + def _normalize_paths(items : list): + """ + Normalize file paths inside a list of (key, value) pairs. + """ + if not items: + return items + normalized = [] + for key, value in items: + if isinstance(value, dict) and 'path' in value: + value['path'] = value['path'].replace(os.sep, '/') + normalized.append({key: value}) + return normalized + + def create_workflow_file(self): + """ + Create (or update) a workflow YAML file on disk. + """ + self.save_to_file(self.directory_path) + filepath = os.path.join(self.directory_path, f'{self.name}.yaml').replace(os.sep, '/') + if os.path.exists(filepath): + print(f'Workflow file created at : {filepath}') + # TODO: add else-case handling (currently silent if file not created) + + def add_step(self, step_name, base_command, + inputs : list = [], + outputs : list = [], + parameters : list = []): + """ + Add a step to the workflow and persist it to file. + Normalizes input/output/parameter paths and avoids duplicates. + """ + command = generate_command(base_command, inputs, outputs, parameters) + step = {'command': command} + + step['inputs'] = self._normalize_paths(inputs) + step['outputs'] = self._normalize_paths(outputs) + step['parameters'] = self._normalize_paths(parameters) + + # Deduplicate or version step + if step_name not in self.steps: + self.steps[step_name] = step + elif self.steps[step_name] != step: + content_hash = self._hash_content(step) + hashed_name = f"{step_name}_{content_hash[:8]}" + print(f"[Added] Step '{step_name}' as '{hashed_name}'") + self.steps[hashed_name] = step + + self.save_to_file(self.directory_path) + + def run_and_add_step(self, step, *args, **kwargs): + """ + Run a step function, collect its provenance, and add it to the workflow. + """ + file_path = inspect.getfile(step) + step_name = os.path.splitext(os.path.basename(file_path))[0] + provenance = step(*args, **kwargs) + # TODO: validate provenance has 'inputs', 'outputs', 'parameters' + self.add_step(step_name, "python", + provenance["inputs"], + provenance["outputs"], + provenance["parameters"]) + return provenance @staticmethod def parse_workflow(yaml_content: str): + """ + Parse YAML content and return a populated RenkuWorkflowBuilder. + """ data = yaml.safe_load(yaml_content) builder = RenkuWorkflowBuilder(data['name']) for step_name, step_def in data['steps'].items(): - command = step_def.get('command') - inputs = step_def.get('inputs', []) - outputs = step_def.get('outputs', []) - parameters = step_def.get('parameters', []) builder.steps[step_name] = { - 'command': command, - 'inputs': inputs, - 'outputs': outputs, - 'parameters': parameters + 'command': step_def.get('command'), + 'inputs': [{k: v} for item in step_def.get('inputs', []) for k, v in item.items()] if step_def.get('inputs') else [], + 'outputs': [{k: v} for item in step_def.get('outputs', []) for k, v in item.items()] if step_def.get('outputs') else [], + 'parameters': [{k: v} for item in step_def.get('parameters', []) for k, v in item.items()] if step_def.get('parameters') else [] } return builder @staticmethod def from_file(filepath): + """ + Load a workflow from file, or return a new empty one if file does not exist. + """ if not os.path.exists(filepath): - return None + workflow_name = os.path.splitext(os.path.basename(filepath))[0] + return RenkuWorkflowBuilder(name=workflow_name) with open(filepath, 'r') as f: return RenkuWorkflowBuilder.parse_workflow(f.read()) def to_dict(self): + """ + Return workflow definition as a dict, normalizing paths in inputs/outputs. + """ for step_name, step_value in self.steps.items(): - for segment in ['inputs','outputs']: - for item in step_value.get(segment,[]):# ['inputs', 'outputs']: - # Go over either inputs or outputs - for name, value in item.items(): + for segment in ['inputs', 'outputs']: + for item in step_value.get(segment, []): + for _, value in item.items(): if not isinstance(value, dict): - raise ValueError(f"Invalid input. Step {step_name} must have {segment} of either dict or str type.") - if isinstance(value, str): - continue - if isinstance(value, dict) and 'path' in value.keys(): + raise ValueError( + f"Invalid input. Step {step_name} must have {segment} as dict or str type." + ) + if 'path' in value: value['path'] = value['path'].replace(os.sep, '/') - return { - 'name': self.name, - 'steps': { key : value for key, value in self.steps.items()} - } + return {'name': self.name, 'steps': dict(self.steps)} def to_yaml(self): + """ + Serialize workflow definition to YAML. + """ return yaml.dump(self.to_dict(), sort_keys=False) def append_from(self, other, force=False): + """ + Merge steps from another workflow into this one. + If step content differs, a hashed suffix is added. + """ if other.name != self.name: raise ValueError(f"Cannot merge workflows with different names: {self.name} != {other.name}") - + curr_steps = self.steps.copy() for step_name, step_def in other.steps.items(): - if step_name in self.steps: - if force: - print(f"[Overwriting] Step '{step_name}' was overwritten as 'force=True'.") - self.steps[step_name] = step_def - else: - print(f"[Skipping] Step '{step_name}' already exists. Use 'force=True' to overwrite.") - else: + if step_name not in curr_steps: self.steps[step_name] = step_def + elif self.steps[step_name] != step_def: + content_hash = self._hash_content(step_def) + hashed_name = f"{step_name}_{content_hash[:8]}" + if hashed_name not in self.steps: + self.steps[hashed_name] = step_def + print(f"[Added] Step '{step_name}' → '{hashed_name}'") def save_to_file(self, directory): + """ + Save workflow definition to a YAML file in the given directory. + Merges with existing file if present. + """ os.makedirs(directory, exist_ok=True) filepath = os.path.join(directory, f"{self.name}.yaml") @@ -101,7 +203,7 @@ class RenkuWorkflowBuilder: with open(filepath, 'w') as f: f.write(existing.to_yaml()) return - # Save as new + with open(filepath, 'w') as f: f.write(self.to_yaml())