140 lines
5.5 KiB
Python
140 lines
5.5 KiB
Python
import logging
|
|
import openpyxl
|
|
from pydantic import ValidationError, parse_obj_as
|
|
from typing import List
|
|
from app.sample_models import SpreadsheetModel
|
|
from io import BytesIO
|
|
|
|
UNASSIGNED_PUCKADDRESS = "---"
|
|
logging.basicConfig(level=logging.DEBUG) # Change to DEBUG level to see more logs
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SpreadsheetImportError(Exception):
|
|
pass
|
|
|
|
class SampleSpreadsheetImporter:
|
|
def __init__(self):
|
|
self.filename = None
|
|
self.model = None
|
|
self.available_puck_positions = []
|
|
|
|
def _clean_value(self, value):
|
|
"""Clean value by converting it to the expected type and stripping whitespace for strings."""
|
|
if isinstance(value, str):
|
|
return value.strip()
|
|
elif isinstance(value, (float, int)):
|
|
return str(value) # Always return strings for priority field validation
|
|
return value
|
|
|
|
def import_spreadsheet(self, file):
|
|
# Reinitialize state
|
|
self.available_puck_positions = [
|
|
f"{s}{p}" for s in list("ABCDEF") for p in range(1, 6)
|
|
]
|
|
self.available_puck_positions.append(UNASSIGNED_PUCKADDRESS)
|
|
self.model = []
|
|
|
|
self.filename = file.filename
|
|
logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}")
|
|
|
|
contents = file.file.read()
|
|
file.file.seek(0) # Reset file pointer to the beginning
|
|
|
|
if not contents:
|
|
logger.error("The uploaded file is empty.")
|
|
raise SpreadsheetImportError("The uploaded file is empty.")
|
|
|
|
try:
|
|
workbook = openpyxl.load_workbook(BytesIO(contents))
|
|
logger.debug("Workbook loaded successfully")
|
|
if "Samples" not in workbook.sheetnames:
|
|
logger.error("The file is missing 'Samples' worksheet.")
|
|
raise SpreadsheetImportError("The file is missing 'Samples' worksheet.")
|
|
sheet = workbook["Samples"]
|
|
except Exception as e:
|
|
logger.error(f"Failed to read the file: {str(e)}")
|
|
raise SpreadsheetImportError(f"Failed to read the file: {str(e)}")
|
|
|
|
return self.process_spreadsheet(sheet)
|
|
|
|
def process_spreadsheet(self, sheet):
|
|
model = []
|
|
|
|
# Skip the first 3 rows
|
|
rows = list(sheet.iter_rows(min_row=4, values_only=True))
|
|
logger.debug(f"Starting to process {len(rows)} rows from the sheet")
|
|
|
|
if not rows:
|
|
logger.error("The 'Samples' worksheet is empty.")
|
|
raise SpreadsheetImportError("The 'Samples' worksheet is empty.")
|
|
|
|
for index, row in enumerate(rows):
|
|
if not row or all(value is None for value in row):
|
|
logger.debug(f"Skipping empty row or row with all None values at index {index}.")
|
|
continue
|
|
|
|
try:
|
|
sample = {
|
|
'dewarname': self._clean_value(row[0]),
|
|
'puckname': self._clean_value(row[1]),
|
|
'pucklocationindewar': self._clean_value(row[2]) if len(row) > 2 else None,
|
|
'positioninpuck': self._clean_value(row[3]) if len(row) > 3 else None,
|
|
'crystalname': self._clean_value(row[4]),
|
|
'priority': self._clean_value(row[5]) if len(row) > 5 else None,
|
|
'comments': self._clean_value(row[6]) if len(row) > 6 else None,
|
|
'pinbarcode': self._clean_value(row[7]) if len(row) > 7 else None,
|
|
'directory': self._clean_value(row[8]) if len(row) > 8 else None,
|
|
}
|
|
except IndexError:
|
|
logger.error(f"Index error processing row at index {index}: Row has missing values.")
|
|
raise SpreadsheetImportError(f"Index error processing row at index {index}: Row has missing values.")
|
|
|
|
# Skip rows missing essential fields
|
|
if not sample['dewarname'] or not sample['puckname'] or not sample['crystalname']:
|
|
logger.debug(f"Skipping row due to missing essential fields: {row}")
|
|
continue
|
|
|
|
model.append(sample)
|
|
logger.info(f"Sample processed: {sample}")
|
|
|
|
if not model:
|
|
logger.error("No valid samples found in the spreadsheet.")
|
|
raise SpreadsheetImportError("No valid samples found in the spreadsheet.")
|
|
|
|
logger.info(f"...finished import, got {len(model)} samples")
|
|
logger.debug(f"Model data: {model}")
|
|
self.model = model
|
|
|
|
try:
|
|
validated_model = self.validate()
|
|
except SpreadsheetImportError as e:
|
|
logger.error(f"Failed to validate spreadsheet: {str(e)}")
|
|
raise
|
|
|
|
return validated_model
|
|
|
|
def validate(self):
|
|
model = self.model
|
|
logger.info(f"...validating {len(model)} samples")
|
|
|
|
for sample in model:
|
|
logger.info(f"Validating sample: {sample}")
|
|
|
|
validated_model = self.data_model_validation(SpreadsheetModel, model)
|
|
|
|
for sample in validated_model:
|
|
logger.info(f"Validated sample: {sample}")
|
|
|
|
logger.debug(f"Validated model data: {validated_model}")
|
|
return validated_model
|
|
|
|
@staticmethod
|
|
def data_model_validation(data_model, model):
|
|
try:
|
|
validated = parse_obj_as(List[data_model], model)
|
|
except ValidationError as e:
|
|
logger.error(f"Validation error: {e.errors()}")
|
|
raise SpreadsheetImportError(f"{e.errors()[0]['loc']} => {e.errors()[0]['msg']}")
|
|
|
|
validated_model = [dict(value) for value in validated]
|
|
return validated_model |