import logging import openpyxl from pydantic import ValidationError from typing import Union from io import BytesIO from app.sample_models import SpreadsheetModel logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class SpreadsheetImportError(Exception): pass class SampleSpreadsheetImporter: def __init__(self): self.filename = None self.model = None def _clean_value(self, value, expected_type=None): """Clean value by converting it to the expected type and stripping whitespace for strings.""" if value is None: return None if expected_type == str: return str(value).strip() if expected_type in [float, int]: try: return expected_type(value) except ValueError: return None if isinstance(value, str): try: if '.' in value: return float(value) else: return int(value) except ValueError: return value.strip() return value def import_spreadsheet(self, file): self.model = [] self.filename = file.filename logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}") contents = file.file.read() file.file.seek(0) # Reset file pointer to the beginning if not contents: logger.error("The uploaded file is empty.") raise SpreadsheetImportError("The uploaded file is empty.") try: workbook = openpyxl.load_workbook(BytesIO(contents)) logger.debug("Workbook loaded successfully") if "Samples" not in workbook.sheetnames: logger.error("The file is missing 'Samples' worksheet.") raise SpreadsheetImportError("The file is missing 'Samples' worksheet.") sheet = workbook["Samples"] except Exception as e: logger.error(f"Failed to read the file: {str(e)}") raise SpreadsheetImportError(f"Failed to read the file: {str(e)}") return self.process_spreadsheet(sheet) def process_spreadsheet(self, sheet): model = [] # Skip the first 3 rows rows = list(sheet.iter_rows(min_row=4, values_only=True)) logger.debug(f"Starting to process {len(rows)} rows from the sheet") if not rows: logger.error("The 'Samples' worksheet is empty.") raise SpreadsheetImportError("The 'Samples' worksheet is empty.") expected_columns = 32 # Number of columns expected based on the model for index, row in enumerate(rows): if not any(row): logger.debug(f"Skipping empty row at index {index}") continue # Pad the row to ensure it has the expected number of columns if len(row) < expected_columns: row = list(row) + [None] * (expected_columns - len(row)) record = { 'dewarname': self._clean_value(row[0], str), 'puckname': self._clean_value(row[1], str), 'pucktype': self._clean_value(row[2], str), 'crystalname': self._clean_value(row[3], str), 'positioninpuck': self._clean_value(row[4], int), 'priority': self._clean_value(row[5], int), 'comments': self._clean_value(row[6], str), 'directory': self._clean_value(row[7], str), 'proteinname': self._clean_value(row[8], str), 'oscillation': self._clean_value(row[9], float), 'aperture': self._clean_value(row[10], str), 'exposure': self._clean_value(row[11], float), 'totalrange': self._clean_value(row[12], float), 'transmission': self._clean_value(row[13], int), 'dose': self._clean_value(row[14], float), 'targetresolution': self._clean_value(row[15], float), 'datacollectiontype': self._clean_value(row[16], str), 'processingpipeline': self._clean_value(row[17], str), 'spacegroupnumber': self._clean_value(row[18], int), 'cellparameters': self._clean_value(row[19], str), 'rescutkey': self._clean_value(row[20], str), 'rescutvalue': self._clean_value(row[21], str), 'userresolution': self._clean_value(row[22], str), 'pdbid': self._clean_value(row[23], str), 'autoprocfull': self._clean_value(row[24], str), 'procfull': self._clean_value(row[25], str), 'adpenabled': self._clean_value(row[26], str), 'noano': self._clean_value(row[27], str), 'ffcscampaign': self._clean_value(row[28], str), 'trustedhigh': self._clean_value(row[29], str), 'autoprocextraparams': self._clean_value(row[30], str), 'chiphiangles': self._clean_value(row[31], str) } try: validated_record = SpreadsheetModel(**record) model.append(validated_record) logger.debug(f"Row {index + 4} processed and validated successfully") except ValidationError as e: error_message = f"Validation error in row {index + 4}: {e}" logger.error(error_message) raise SpreadsheetImportError(error_message) self.model = model logger.info(f"Finished processing {len(model)} records") return self.model