mirror of
https://gitea.psi.ch/APOG/acsmnode.git
synced 2025-06-25 05:31:09 +02:00
Add import statement and adjust dependencies.
This commit is contained in:
0
pipelines/steps/__init__.py
Normal file
0
pipelines/steps/__init__.py
Normal file
@ -25,6 +25,7 @@ import yaml
|
|||||||
sys.path.append(dimaPath)
|
sys.path.append(dimaPath)
|
||||||
|
|
||||||
import dima.src.hdf5_ops as dataOps
|
import dima.src.hdf5_ops as dataOps
|
||||||
|
import pipelines.steps.utils as stepUtils
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
@ -167,35 +168,7 @@ def apply_calibration_factors(data_table, datetime_var_name, calibration_file):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
def record_data_lineage(path_to_output_file, metadata):
|
|
||||||
|
|
||||||
path_to_output_dir, output_file = os.path.split(path_to_output_file)
|
|
||||||
|
|
||||||
path_to_metadata_file = '/'.join([path_to_output_dir,'data_lineage_metadata.json'])
|
|
||||||
# Ensure the file exists
|
|
||||||
if not os.path.exists(path_to_metadata_file):
|
|
||||||
with open(path_to_metadata_file, 'w') as f:
|
|
||||||
json.dump({}, f) # Initialize empty JSON
|
|
||||||
|
|
||||||
# Read the existing JSON
|
|
||||||
with open(path_to_metadata_file, 'r') as metadata_file:
|
|
||||||
try:
|
|
||||||
json_dict = json.load(metadata_file)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
json_dict = {} # Start fresh if file is invalid
|
|
||||||
|
|
||||||
# Compute relative output file path and update the JSON object
|
|
||||||
relpath_to_output_file = os.path.relpath(path_to_output_file, start=projectPath).replace(os.sep, '/')
|
|
||||||
json_dict[relpath_to_output_file] = metadata
|
|
||||||
|
|
||||||
# Write updated JSON back to the file
|
|
||||||
with open(path_to_metadata_file, 'w') as metadata_file:
|
|
||||||
json.dump(json_dict, metadata_file, indent=4)
|
|
||||||
|
|
||||||
|
|
||||||
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
|
|
||||||
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
||||||
@ -275,8 +248,8 @@ if __name__ == '__main__':
|
|||||||
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
||||||
calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False)
|
calibration_factor_table.to_csv(path_to_calibration_factors_file, index=False)
|
||||||
|
|
||||||
status = record_data_lineage(path_to_calibrated_file, metadata)
|
status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata)
|
||||||
status = record_data_lineage(path_to_calibration_factors_file, metadata)
|
status = stepUtils.record_data_lineage(path_to_calibration_factors_file, projectPath, metadata)
|
||||||
|
|
||||||
|
|
||||||
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
||||||
|
@ -20,7 +20,9 @@ projectPath = os.path.normpath(os.path.join(dimaPath,'..'))
|
|||||||
print(dimaPath)
|
print(dimaPath)
|
||||||
sys.path.append(dimaPath)
|
sys.path.append(dimaPath)
|
||||||
import dima.src.hdf5_ops as dataOps
|
import dima.src.hdf5_ops as dataOps
|
||||||
|
import pipelines.steps.utils as stepUtils
|
||||||
|
import dima.utils.g5505_utils as utils
|
||||||
|
import json
|
||||||
|
|
||||||
def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
def compute_diagnostic_variable_flags(data_table, validity_thresholds_dict):
|
||||||
"""
|
"""
|
||||||
@ -135,32 +137,15 @@ if __name__ == '__main__':
|
|||||||
path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
path_to_metadata_file = '/'.join([path_tail, 'data_lineage_metadata.json'])
|
||||||
|
|
||||||
print('Path to output file :', path_to_calibrated_file)
|
print('Path to output file :', path_to_calibrated_file)
|
||||||
import dima.utils.g5505_utils as utils
|
|
||||||
import json
|
|
||||||
print(calibration_factors.keys())
|
print(calibration_factors.keys())
|
||||||
calibrated_table = compute_diagnostic_variable_flags(data_table, calibration_factors)
|
calibrated_table = compute_diagnostic_variable_flags(data_table, calibration_factors)
|
||||||
metadata['processing_date'] = utils.created_at()
|
metadata['processing_date'] = utils.created_at()
|
||||||
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
calibrated_table.to_csv(path_to_calibrated_file, index=False)
|
||||||
|
|
||||||
# Ensure the file exists
|
status = stepUtils.record_data_lineage(path_to_calibrated_file, projectPath, metadata)
|
||||||
if not os.path.exists(path_to_metadata_file):
|
|
||||||
with open(path_to_metadata_file, 'w') as f:
|
|
||||||
json.dump({}, f) # Initialize empty JSON
|
|
||||||
|
|
||||||
# Read the existing JSON
|
|
||||||
with open(path_to_metadata_file, 'r') as metadata_file:
|
|
||||||
try:
|
|
||||||
json_dict = json.load(metadata_file)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
json_dict = {} # Start fresh if file is invalid
|
|
||||||
|
|
||||||
# Update the JSON object
|
|
||||||
outputfileRelPath = os.path.relpath(path_to_calibrated_file, start=projectPath).replace(os.sep, '/')
|
|
||||||
json_dict[outputfileRelPath] = metadata
|
|
||||||
|
|
||||||
# Write updated JSON back to the file
|
|
||||||
with open(path_to_metadata_file, 'w') as metadata_file:
|
|
||||||
json.dump(json_dict, metadata_file, indent=4)
|
|
||||||
|
|
||||||
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
print(f"Calibrated data saved to {path_to_calibrated_file}")
|
||||||
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
|
print(f"Metadata for calibrated data saved to {path_to_metadata_file}")
|
||||||
|
Reference in New Issue
Block a user