Validator functionnal
This commit is contained in:
@ -1,39 +1,46 @@
|
||||
import logging
|
||||
import openpyxl
|
||||
from pydantic import ValidationError, parse_obj_as
|
||||
from typing import List
|
||||
from app.sample_models import SpreadsheetModel
|
||||
from pydantic import ValidationError
|
||||
from typing import Union
|
||||
from io import BytesIO
|
||||
from app.sample_models import SpreadsheetModel
|
||||
|
||||
UNASSIGNED_PUCKADDRESS = "---"
|
||||
logging.basicConfig(level=logging.DEBUG) # Change to DEBUG level to see more logs
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SpreadsheetImportError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SampleSpreadsheetImporter:
|
||||
def __init__(self):
|
||||
self.filename = None
|
||||
self.model = None
|
||||
self.available_puck_positions = []
|
||||
|
||||
def _clean_value(self, value):
|
||||
def _clean_value(self, value, expected_type=None):
|
||||
"""Clean value by converting it to the expected type and stripping whitespace for strings."""
|
||||
if value is None:
|
||||
return None
|
||||
if expected_type == str:
|
||||
return str(value).strip()
|
||||
if expected_type in [float, int]:
|
||||
try:
|
||||
return expected_type(value)
|
||||
except ValueError:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value.strip()
|
||||
elif isinstance(value, (float, int)):
|
||||
return str(value) # Always return strings for priority field validation
|
||||
try:
|
||||
if '.' in value:
|
||||
return float(value)
|
||||
else:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
return value.strip()
|
||||
return value
|
||||
|
||||
def import_spreadsheet(self, file):
|
||||
# Reinitialize state
|
||||
self.available_puck_positions = [
|
||||
f"{s}{p}" for s in list("ABCDEF") for p in range(1, 6)
|
||||
]
|
||||
self.available_puck_positions.append(UNASSIGNED_PUCKADDRESS)
|
||||
self.model = []
|
||||
|
||||
self.filename = file.filename
|
||||
logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}")
|
||||
|
||||
@ -68,73 +75,61 @@ class SampleSpreadsheetImporter:
|
||||
logger.error("The 'Samples' worksheet is empty.")
|
||||
raise SpreadsheetImportError("The 'Samples' worksheet is empty.")
|
||||
|
||||
expected_columns = 32 # Number of columns expected based on the model
|
||||
|
||||
for index, row in enumerate(rows):
|
||||
if not row or all(value is None for value in row):
|
||||
logger.debug(f"Skipping empty row or row with all None values at index {index}.")
|
||||
if not any(row):
|
||||
logger.debug(f"Skipping empty row at index {index}")
|
||||
continue
|
||||
|
||||
# Pad the row to ensure it has the expected number of columns
|
||||
if len(row) < expected_columns:
|
||||
row = list(row) + [None] * (expected_columns - len(row))
|
||||
|
||||
record = {
|
||||
'dewarname': self._clean_value(row[0], str),
|
||||
'puckname': self._clean_value(row[1], str),
|
||||
'pucktype': self._clean_value(row[2], str),
|
||||
'crystalname': self._clean_value(row[3], str),
|
||||
'positioninpuck': self._clean_value(row[4], int),
|
||||
'priority': self._clean_value(row[5], int),
|
||||
'comments': self._clean_value(row[6], str),
|
||||
'directory': self._clean_value(row[7], str),
|
||||
'proteinname': self._clean_value(row[8], str),
|
||||
'oscillation': self._clean_value(row[9], float),
|
||||
'aperture': self._clean_value(row[10], str),
|
||||
'exposure': self._clean_value(row[11], float),
|
||||
'totalrange': self._clean_value(row[12], float),
|
||||
'transmission': self._clean_value(row[13], int),
|
||||
'dose': self._clean_value(row[14], float),
|
||||
'targetresolution': self._clean_value(row[15], float),
|
||||
'datacollectiontype': self._clean_value(row[16], str),
|
||||
'processingpipeline': self._clean_value(row[17], str),
|
||||
'spacegroupnumber': self._clean_value(row[18], int),
|
||||
'cellparameters': self._clean_value(row[19], str),
|
||||
'rescutkey': self._clean_value(row[20], str),
|
||||
'rescutvalue': self._clean_value(row[21], str),
|
||||
'userresolution': self._clean_value(row[22], str),
|
||||
'pdbid': self._clean_value(row[23], str),
|
||||
'autoprocfull': self._clean_value(row[24], str),
|
||||
'procfull': self._clean_value(row[25], str),
|
||||
'adpenabled': self._clean_value(row[26], str),
|
||||
'noano': self._clean_value(row[27], str),
|
||||
'ffcscampaign': self._clean_value(row[28], str),
|
||||
'trustedhigh': self._clean_value(row[29], str),
|
||||
'autoprocextraparams': self._clean_value(row[30], str),
|
||||
'chiphiangles': self._clean_value(row[31], str)
|
||||
}
|
||||
|
||||
try:
|
||||
sample = {
|
||||
'dewarname': self._clean_value(row[0]),
|
||||
'puckname': self._clean_value(row[1]),
|
||||
'pucklocationindewar': self._clean_value(row[2]) if len(row) > 2 else None,
|
||||
'positioninpuck': self._clean_value(row[3]) if len(row) > 3 else None,
|
||||
'crystalname': self._clean_value(row[4]),
|
||||
'priority': self._clean_value(row[5]) if len(row) > 5 else None,
|
||||
'comments': self._clean_value(row[6]) if len(row) > 6 else None,
|
||||
'pinbarcode': self._clean_value(row[7]) if len(row) > 7 else None,
|
||||
'directory': self._clean_value(row[8]) if len(row) > 8 else None,
|
||||
}
|
||||
except IndexError:
|
||||
logger.error(f"Index error processing row at index {index}: Row has missing values.")
|
||||
raise SpreadsheetImportError(f"Index error processing row at index {index}: Row has missing values.")
|
||||
validated_record = SpreadsheetModel(**record)
|
||||
model.append(validated_record)
|
||||
logger.debug(f"Row {index + 4} processed and validated successfully")
|
||||
except ValidationError as e:
|
||||
error_message = f"Validation error in row {index + 4}: {e}"
|
||||
logger.error(error_message)
|
||||
raise SpreadsheetImportError(error_message)
|
||||
|
||||
# Skip rows missing essential fields
|
||||
if not sample['dewarname'] or not sample['puckname'] or not sample['crystalname']:
|
||||
logger.debug(f"Skipping row due to missing essential fields: {row}")
|
||||
continue
|
||||
|
||||
model.append(sample)
|
||||
logger.info(f"Sample processed: {sample}")
|
||||
|
||||
if not model:
|
||||
logger.error("No valid samples found in the spreadsheet.")
|
||||
raise SpreadsheetImportError("No valid samples found in the spreadsheet.")
|
||||
|
||||
logger.info(f"...finished import, got {len(model)} samples")
|
||||
logger.debug(f"Model data: {model}")
|
||||
self.model = model
|
||||
|
||||
try:
|
||||
validated_model = self.validate()
|
||||
except SpreadsheetImportError as e:
|
||||
logger.error(f"Failed to validate spreadsheet: {str(e)}")
|
||||
raise
|
||||
|
||||
return validated_model
|
||||
|
||||
def validate(self):
|
||||
model = self.model
|
||||
logger.info(f"...validating {len(model)} samples")
|
||||
|
||||
for sample in model:
|
||||
logger.info(f"Validating sample: {sample}")
|
||||
|
||||
validated_model = self.data_model_validation(SpreadsheetModel, model)
|
||||
|
||||
for sample in validated_model:
|
||||
logger.info(f"Validated sample: {sample}")
|
||||
|
||||
logger.debug(f"Validated model data: {validated_model}")
|
||||
return validated_model
|
||||
|
||||
@staticmethod
|
||||
def data_model_validation(data_model, model):
|
||||
try:
|
||||
validated = parse_obj_as(List[data_model], model)
|
||||
except ValidationError as e:
|
||||
logger.error(f"Validation error: {e.errors()}")
|
||||
raise SpreadsheetImportError(f"{e.errors()[0]['loc']} => {e.errors()[0]['msg']}")
|
||||
|
||||
validated_model = [dict(value) for value in validated]
|
||||
return validated_model
|
||||
logger.info(f"Finished processing {len(model)} records")
|
||||
return self.model
|
||||
|
Reference in New Issue
Block a user