Moved src/metadata_review_lib.py pipelines/metadata_revision.py

This commit is contained in:
2024-09-17 16:55:22 +02:00
parent 07401c895f
commit 9eeb9d6380

View File

@ -0,0 +1,380 @@
import sys
import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
import subprocess
import h5py
import yaml
import utils.g5505_utils as utils
import src.hdf5_vis as hdf5_vis
import src.hdf5_lib as hdf5_lib
import src.git_ops as git_ops
import numpy as np
YAML_EXT = ".yaml"
TXT_EXT = ".txt"
def get_review_status(filename_path):
filename_path_tail, filename_path_head = os.path.split(filename_path)
filename, ext = os.path.splitext(filename_path_head)
# TODO:
with open(os.path.join("review/",filename+"-review_status"+TXT_EXT),'r') as f:
workflow_steps = []
for line in f:
workflow_steps.append(line)
return workflow_steps[-1]
def first_initialize_metadata_review(hdf5_file_path, reviewer_attrs, restart = False):
"""
First: Initialize review branch with review folder with a copy of yaml representation of
hdf5 file under review and by creating a txt file with the state of the review process, e.g., under review.
"""
initials = reviewer_attrs['initials']
#branch_name = '-'.join([reviewer_attrs['type'],'review_',initials])
branch_name = '_'.join(['review',initials])
hdf5_file_path_tail, filename_path_head = os.path.split(hdf5_file_path)
filename, ext = os.path.splitext(filename_path_head)
# Check file_path points to h5 file
if not 'h5' in ext:
raise ValueError("filename_path needs to point to an h5 file.")
# Verify if yaml snapshot of input h5 file exists
if not os.path.exists(os.path.join(hdf5_file_path_tail,filename+YAML_EXT)):
raise ValueError("metadata review cannot be initialized. The associated .yaml file under review was not found. Run take_yml_snapshot_of_hdf5_file(filename_path) ")
# Initialize metadata review workflow
# print("Create branch metadata-review-by-"+initials+"\n")
#checkout_review_branch(branch_name)
# Check you are working at the right branch
curr_branch = git_ops.show_current_branch()
if not branch_name in curr_branch.stdout:
raise ValueError("Branch "+branch_name+" was not found. \nPlease open a Git Bash Terminal, and follow the below instructions: \n1. Change directory to your project's directory. \n2. Excecute the command: git checkout "+branch_name)
# Check if review file already exists and then check if it is still untracked
review_yaml_file_path = os.path.join("review/",filename+YAML_EXT)
review_yaml_file_path_tail, ext = os.path.splitext(review_yaml_file_path)
review_status_yaml_file_path = os.path.join(review_yaml_file_path_tail+"-review_status"+".txt")
if not os.path.exists(review_yaml_file_path) or restart:
review_yaml_file_path = utils.make_file_copy(os.path.join(hdf5_file_path_tail,filename+YAML_EXT), 'review')
if restart:
print('metadata review has been reinitialized. The review files will reflect the current state of the hdf5 files metadata')
#if not os.path.exists(os.path.join(review_yaml_file_path_tail+"-review_status"+".txt")):
with open(review_status_yaml_file_path,'w') as f:
f.write('under review')
# Stage untracked review files and commit them to local repository
status = git_ops.get_status()
untracked_files = []
for line in status.stdout.splitlines():
#tmp = line.decode("utf-8")
#modified_files.append(tmp.split()[1])
if 'review/' in line:
if not 'modified' in line: # untracked filesand
untracked_files.append(line.strip())
else:
untracked_files.append(line.strip().split()[1])
if 'output_files/'+filename+YAML_EXT in line and not 'modified' in line:
untracked_files.append(line.strip())
if untracked_files:
result = subprocess.run(git_ops.add_files_to_git(untracked_files),capture_output=True,check=True)
message = 'Initialized metadata review.'
commit_output = subprocess.run(git_ops.commit_changes(message),capture_output=True,check=True)
for line in commit_output.stdout.splitlines():
print(line.decode('utf-8'))
#else:
# print('This action will not have any effect because metadata review process has been already initialized.')
#status_dict = repo_obj.status()
#for filepath, file_status in status_dict.items():
# Identify keys associated to review files and stage them
# if 'review/'+filename in filepath:
# Stage changes
# repo_obj.index.add(filepath)
#author = config_file.author #default_signature
#committer = config_file.committer
#message = "Initialized metadata review process."
#tree = repo_obj.index.write_tree()
#oid = repo_obj.create_commit('HEAD', author, committer, message, tree, [repo_obj.head.peel().oid])
#print("Add and commit"+"\n")
return review_yaml_file_path, review_status_yaml_file_path
def second_save_metadata_review(review_yaml_file_path, reviewer_attrs):
"""
Second: Once you're done reviewing the yaml representation of hdf5 file in review folder.
Change the review status to complete and save (add and commit) modified .yalm and .txt files in the project by
running this function.
"""
# 1 verify review initializatin was performed first
# 2. change review status in txt to complete
# 3. git add review/ and git commit -m "Submitted metadata review"
initials = reviewer_attrs['initials']
#branch_name = '-'.join([reviewer_attrs['type'],'review','by',initials])
branch_name = '_'.join(['review',initials])
# TODO: replace with subprocess + git
#checkout_review_branch(repo_obj, branch_name)
# Check you are working at the right branch
curr_branch = git_ops.show_current_branch()
if not branch_name in curr_branch.stdout:
raise ValueError('Please checkout ' + branch_name + ' via Git Bash before submitting metadata review files. ')
# Collect modified review files
status = git_ops.get_status()
modified_files = []
os.path.basename(review_yaml_file_path)
for line in status.stdout.splitlines():
# conver line from bytes to str
tmp = line.decode("utf-8")
if 'modified' in tmp and 'review/' in tmp and os.path.basename(review_yaml_file_path) in tmp:
modified_files.append(tmp.split()[1])
# Stage modified files and commit them to local repository
review_yaml_file_path_tail, review_yaml_file_path_head = os.path.split(review_yaml_file_path)
filename, ext = os.path.splitext(review_yaml_file_path_head)
if modified_files:
review_status_file_path = os.path.join("review/",filename+"-review_status"+TXT_EXT)
with open(review_status_file_path,'a') as f:
f.write('\nsubmitted')
modified_files.append(review_status_file_path)
result = subprocess.run(git_ops.add_files_to_git(modified_files),capture_output=True,check=True)
message = 'Submitted metadata review.'
commit_output = subprocess.run(git_ops.commit_changes(message),capture_output=True,check=True)
for line in commit_output.stdout.splitlines():
print(line.decode('utf-8'))
else:
print('Nothing to commit.')
#
def load_yaml(yaml_review_file):
with open(yaml_review_file, 'r') as stream:
try:
return yaml.load(stream, Loader=yaml.FullLoader)
except yaml.YAMLError as exc:
print(exc)
return None
def update_hdf5_attributes(input_hdf5_file, yaml_dict):
def update_attributes(hdf5_obj, yaml_obj):
for attr_name, attr_value in yaml_obj['attributes'].items():
if not isinstance(attr_value, dict):
attr_value = {'rename_as': attr_name, 'value': attr_value}
if (attr_name in hdf5_obj.attrs.keys()): # delete or update
if attr_value.get('delete'): # delete when True
hdf5_obj.attrs.__delitem__(attr_name)
elif not (attr_value.get('rename_as') == attr_name): # update when true
hdf5_obj.attrs[attr_value.get('rename_as')] = hdf5_obj.attrs[attr_name] # parse_attribute(attr_value)
hdf5_obj.attrs.__delitem__(attr_name)
else: # add a new attribute
hdf5_obj.attrs.update({attr_name : utils.parse_attribute(attr_value)})
with h5py.File(input_hdf5_file, 'r+') as f:
for key in yaml_dict.keys():
hdf5_obj = f[key]
yaml_obj = yaml_dict[key]
update_attributes(hdf5_obj, yaml_obj)
def update_hdf5_file_with_review(input_hdf5_file, yaml_review_file):
yaml_dict = load_yaml(yaml_review_file)
update_hdf5_attributes(input_hdf5_file, yaml_dict)
# Regenerate yaml snapshot of updated HDF5 file
output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(input_hdf5_file)
print(f'{output_yml_filename_path} was successfully regenerated from the updated version of{input_hdf5_file}')
def third_update_hdf5_file_with_review(input_hdf5_file, yaml_review_file, reviewer_attrs={}, hdf5_upload=False):
if 'submitted' not in get_review_status(input_hdf5_file):
raise ValueError('Review yaml file must be submitted before trying to perform an update. Run first second_submit_metadata_review().')
update_hdf5_file_with_review(input_hdf5_file, yaml_review_file)
git_ops.perform_git_operations(hdf5_upload)
def count(hdf5_obj,yml_dict):
print(hdf5_obj.name)
if isinstance(hdf5_obj,h5py.Group) and len(hdf5_obj.name.split('/')) <= 4:
obj_review = yml_dict[hdf5_obj.name]
additions = [not (item in hdf5_obj.attrs.keys()) for item in obj_review['attributes'].keys()]
count_additions = sum(additions)
deletions = [not (item in obj_review['attributes'].keys()) for item in hdf5_obj.attrs.keys()]
count_delections = sum(deletions)
print('additions',count_additions, 'deletions', count_delections)
def last_submit_metadata_review(reviewer_attrs):
"""Fourth: """
initials =reviewer_attrs['initials']
repository = 'origin'
branch_name = '_'.join(['review',initials])
push_command = lambda repository,refspec: ['git','push',repository,refspec]
list_branches_command = ['git','branch','--list']
branches = subprocess.run(list_branches_command,capture_output=True,text=True,check=True)
if not branch_name in branches.stdout:
print('There is no branch named '+branch_name+'.\n')
print('Make sure to run data owner review workflow from the beginning without missing any steps.')
return
curr_branch = git_ops.show_current_branch()
if not branch_name in curr_branch.stdout:
print('Complete metadata review could not be completed.\n')
print('Make sure a data-owner workflow has already been started on branch '+branch_name+'\n')
print('The step "Complete metadata review" will have no effect.')
return
# push
result = subprocess.run(push_command(repository,branch_name),capture_output=True,text=True,check=True)
print(result.stdout)
# 1. git add output_files/
# 2. delete review/
#shutil.rmtree(os.path.join(os.path.abspath(os.curdir),"review"))
# 3. git rm review/
# 4. git commit -m "Completed review process. Current state of hdf5 file and yml should be up to date."
return result.returncode
#import config_file
#import hdf5_vis
class MetadataHarvester:
def __init__(self, parent_files=None):
if parent_files is None:
parent_files = []
self.parent_files = parent_files
self.metadata = {
"project": {},
"sample": {},
"environment": {},
"instruments": {},
"datasets": {}
}
def add_project_info(self, key_or_dict, value=None, append=False):
self._add_info("project", key_or_dict, value, append)
def add_sample_info(self, key_or_dict, value=None, append=False):
self._add_info("sample", key_or_dict, value, append)
def add_environment_info(self, key_or_dict, value=None, append=False):
self._add_info("environment", key_or_dict, value, append)
def add_instrument_info(self, key_or_dict, value=None, append=False):
self._add_info("instruments", key_or_dict, value, append)
def add_dataset_info(self, key_or_dict, value=None, append=False):
self._add_info("datasets", key_or_dict, value, append)
def _add_info(self, category, key_or_dict, value, append):
"""Internal helper method to add information to a category."""
if isinstance(key_or_dict, dict):
self.metadata[category].update(key_or_dict)
else:
if key_or_dict in self.metadata[category]:
if append:
current_value = self.metadata[category][key_or_dict]
if isinstance(current_value, list):
if not isinstance(value, list):
# Append the new value to the list
self.metadata[category][key_or_dict].append(value)
else:
self.metadata[category][key_or_dict] = current_value + value
elif isinstance(current_value, str):
# Append the new value as a comma-separated string
self.metadata[category][key_or_dict] = current_value + ',' + str(value)
else:
# Handle other types (for completeness, usually not required)
self.metadata[category][key_or_dict] = [current_value, value]
else:
self.metadata[category][key_or_dict] = value
else:
self.metadata[category][key_or_dict] = value
def get_metadata(self):
return {
"parent_files": self.parent_files,
"metadata": self.metadata
}
def print_metadata(self):
print("parent_files", self.parent_files)
for key in self.metadata.keys():
print(key,'metadata:\n')
for item in self.metadata[key].items():
print(item[0],item[1])
def clear_metadata(self):
self.metadata = {
"project": {},
"sample": {},
"environment": {},
"instruments": {},
"datasets": {}
}
self.parent_files = []
def main():
output_filename_path = "output_files/unified_file_smog_chamber_2024-03-19_UTC-OFST_+0100_NG.h5"
output_yml_filename_path = "output_files/unified_file_smog_chamber_2024-03-19_UTC-OFST_+0100_NG.yalm"
output_yml_filename_path_tail, filename = os.path.split(output_yml_filename_path)
#output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename_path)
#first_initialize_metadata_review(output_filename_path,initials='NG')
#second_submit_metadata_review()
#if os.path.exists(os.path.join(os.path.join(os.path.abspath(os.curdir),"review"),filename)):
# third_update_hdf5_file_with_review(output_filename_path, os.path.join(os.path.join(os.path.abspath(os.curdir),"review"),filename))
#fourth_complete_metadata_review()
#if __name__ == '__main__':
# main()