# sample_spreadsheet_importer.py import logging import openpyxl from pydantic import ValidationError from typing import Union, List, Tuple from io import BytesIO from app.sample_models import SpreadsheetModel logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class SpreadsheetImportError(Exception): pass class SampleSpreadsheetImporter: def __init__(self): self.filename = None self.model = None def _clean_value(self, value, expected_type=None): """Clean value by converting it to the expected type and stripping whitespace for strings.""" if value is None: return None if expected_type == str: return str(value).strip() if expected_type in [float, int]: try: return expected_type(value) except ValueError: return None if isinstance(value, str): try: if '.' in value: return float(value) else: return int(value) except ValueError: return value.strip() return value def import_spreadsheet(self, file): return self.import_spreadsheet_with_errors(file) def import_spreadsheet_with_errors(self, file) -> Tuple[List[SpreadsheetModel], List[dict], List[dict]]: self.model = [] self.filename = file.filename logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}") contents = file.file.read() file.file.seek(0) # Reset file pointer to the beginning if not contents: logger.error("The uploaded file is empty.") raise SpreadsheetImportError("The uploaded file is empty.") try: workbook = openpyxl.load_workbook(BytesIO(contents)) logger.debug("Workbook loaded successfully") if "Samples" not in workbook.sheetnames: logger.error("The file is missing 'Samples' worksheet.") raise SpreadsheetImportError("The file is missing 'Samples' worksheet.") sheet = workbook["Samples"] except Exception as e: logger.error(f"Failed to read the file: {str(e)}") raise SpreadsheetImportError(f"Failed to read the file: {str(e)}") return self.process_spreadsheet(sheet) def process_spreadsheet(self, sheet) -> Tuple[List[SpreadsheetModel], List[dict], List[dict]]: model = [] errors = [] raw_data = [] # Skip the first 3 rows rows = list(sheet.iter_rows(min_row=4, values_only=True)) logger.debug(f"Starting to process {len(rows)} rows from the sheet") if not rows: logger.error("The 'Samples' worksheet is empty.") raise SpreadsheetImportError("The 'Samples' worksheet is empty.") expected_columns = 32 # Number of columns expected based on the model for index, row in enumerate(rows): if not any(row): logger.debug(f"Skipping empty row at index {index}") continue # Record raw data for later use raw_data.append({"row_num": index + 4, "data": row}) # Pad the row to ensure it has the expected number of columns if len(row) < expected_columns: row = list(row) + [None] * (expected_columns - len(row)) record = { 'dewarname': self._clean_value(row[0], str), 'puckname': self._clean_value(row[1], str), 'pucktype': self._clean_value(row[2], str), 'crystalname': self._clean_value(row[3], str), 'positioninpuck': self._clean_value(row[4], int), 'priority': self._clean_value(row[5], int), 'comments': self._clean_value(row[6], str), 'directory': self._clean_value(row[7], str), 'proteinname': self._clean_value(row[8], str), 'oscillation': self._clean_value(row[9], float), 'aperture': self._clean_value(row[10], str), 'exposure': self._clean_value(row[11], float), 'totalrange': self._clean_value(row[12], float), 'transmission': self._clean_value(row[13], int), 'dose': self._clean_value(row[14], float), 'targetresolution': self._clean_value(row[15], float), 'datacollectiontype': self._clean_value(row[16], str), 'processingpipeline': self._clean_value(row[17], str), 'spacegroupnumber': self._clean_value(row[18], int), 'cellparameters': self._clean_value(row[19], str), 'rescutkey': self._clean_value(row[20], str), 'rescutvalue': self._clean_value(row[21], str), 'userresolution': self._clean_value(row[22], str), 'pdbid': self._clean_value(row[23], str), 'autoprocfull': self._clean_value(row[24], str), 'procfull': self._clean_value(row[25], str), 'adpenabled': self._clean_value(row[26], str), 'noano': self._clean_value(row[27], str), 'ffcscampaign': self._clean_value(row[28], str), 'trustedhigh': self._clean_value(row[29], str), 'autoprocextraparams': self._clean_value(row[30], str), 'chiphiangles': self._clean_value(row[31], str) } try: validated_record = SpreadsheetModel(**record) model.append(validated_record) logger.debug(f"Row {index + 4} processed and validated successfully") except ValidationError as e: logger.error(f"Validation error in row {index + 4}: {e}") for error in e.errors(): field = error['loc'][0] msg = error['msg'] # Map field name (which is the key in `record`) to its index in the row field_to_col = { 'dewarname': 0, 'puckname': 1, 'pucktype': 2, 'crystalname': 3, 'positioninpuck': 4, 'priority': 5, 'comments': 6, 'directory': 7, 'proteinname': 8, 'oscillation': 9, 'aperture': 10, 'exposure': 11, 'totalrange': 12, 'transmission': 13, 'dose': 14, 'targetresolution': 15, 'datacollectiontype': 16, 'processingpipeline': 17, 'spacegroupnumber': 18, 'cellparameters': 19, 'rescutkey': 20, 'rescutvalue': 21, 'userresolution': 22, 'pdbid': 23, 'autoprocfull': 24, 'procfull': 25, 'adpenabled': 26, 'noano': 27, 'ffcscampaign': 28, 'trustedhigh': 29, 'autoprocextraparams': 30, 'chiphiangles': 31 } column_index = field_to_col[field] error_info = { 'row': index + 4, 'cell': column_index, 'value': row[column_index], # Value that caused the error 'message': msg } errors.append(error_info) self.model = model logger.info(f"Finished processing {len(model)} records with {len(errors)} errors") return self.model, errors, raw_data