import logging import openpyxl from pydantic import ValidationError from typing import List, Tuple from io import BytesIO from app.sample_models import SpreadsheetModel logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class SpreadsheetImportError(Exception): pass class SampleSpreadsheetImporter: def __init__(self): self.filename = None self.model = None def get_expected_type(self, column_name: str) -> type: """ Returns the expected data type for a given column name. """ # Define a mapping of column names to expected types column_type_mapping = { "dewarname": str, "puckname": str, "pucktype": str, "crystalname": str, "positioninpuck": int, "priority": int, "comments": str, "proteinname": str, "directory": str, "oscillation": float, "exposure": float, "totalrange": int, "transmission": int, "targetresolution": float, "aperture": str, "datacollectiontype": str, "processingpipeline": str, "spacegroupnumber": int, "cellparameters": str, "rescutkey": str, "rescutvalue": float, "userresolution": float, "pdbid": str, "autoprocfull": bool, "procfull": bool, "adpenabled": bool, "noano": bool, "ffcscampaign": bool, "trustedhigh": float, "autoprocextraparams": str, "chiphiangles": float, "dose": float, } # Return type if column exists, else default to str return column_type_mapping.get(column_name, str) def _clean_value(self, value, expected_type=None): if value is None: return None if expected_type == str: return str(value).strip() if expected_type in [float, int]: try: return expected_type(value) except (ValueError, TypeError) as e: logger.error( f"Failed to cast value '{value}' to {expected_type}. Error: {e}" ) raise ValueError( f"Invalid value: '{value}'. Expected type: {expected_type}." ) # Fallback for unhandled types logger.warning(f"Unhandled type for value: '{value}'. Returning as-is.") return value def import_spreadsheet(self, file): return self.import_spreadsheet_with_errors(file) def import_spreadsheet_with_errors( self, file ) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]: self.model = [] self.filename = file.filename logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}") contents = file.file.read() file.file.seek(0) # Reset file pointer to the beginning if not contents: logger.error("The uploaded file is empty.") raise SpreadsheetImportError("The uploaded file is empty.") try: workbook = openpyxl.load_workbook(BytesIO(contents)) logger.debug("Workbook loaded successfully") if "Samples" not in workbook.sheetnames: logger.error("The file is missing 'Samples' worksheet.") raise SpreadsheetImportError("The file is missing 'Samples' worksheet.") sheet = workbook["Samples"] except Exception as e: logger.error(f"Failed to read the file: {str(e)}") raise SpreadsheetImportError(f"Failed to read the file: {str(e)}") # Unpack four values from the process_spreadsheet method model, errors, raw_data, headers = self.process_spreadsheet(sheet) # Now, return the values correctly return model, errors, raw_data, headers def process_spreadsheet( self, sheet ) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]: model = [] errors = [] raw_data = [] headers = [] # Skip the first 3 rows rows = list(sheet.iter_rows(min_row=4, values_only=True)) logger.debug(f"Starting to process {len(rows)} rows from the sheet") if not rows: logger.error("The 'Samples' worksheet is empty.") raise SpreadsheetImportError("The 'Samples' worksheet is empty.") expected_columns = 32 # Number of columns expected based on the model # Add the headers (the first row in the spreadsheet or map them explicitly) headers = [ "dewarname", "puckname", "pucktype", "crystalname", "positioninpuck", "priority", "comments", "directory", "proteinname", "oscillation", "aperture", "exposure", "totalrange", "transmission", "dose", "targetresolution", "datacollectiontype", "processingpipeline", "spacegroupnumber", "cellparameters", "rescutkey", "rescutvalue", "userresolution", "pdbid", "autoprocfull", "procfull", "adpenabled", "noano", "ffcscampaign", "trustedhigh", "autoprocextraparams", "chiphiangles", ] for index, row in enumerate(rows): if not any(row): logger.debug(f"Skipping empty row at index {index}") continue # Record raw data for later use raw_data.append({"row_num": index + 4, "data": list(row)}) # Ensure row has the expected number of columns if len(row) < expected_columns: row = list(row) + [None] * (expected_columns - len(row)) # Prepare the record with cleaned values record = { "dewarname": self._clean_value(row[0], str), "puckname": self._clean_value(row[1], str), "pucktype": self._clean_value(row[2], str), "crystalname": self._clean_value(row[3], str), "positioninpuck": self._clean_value(row[4], int), "priority": self._clean_value(row[5], int), "comments": self._clean_value(row[6], str), "proteinname": self._clean_value(row[8], str), } record["data_collection_parameters"] = { "directory": self._clean_value(row[7], str), "oscillation": self._clean_value(row[9], float), "aperture": self._clean_value(row[10], str), "exposure": self._clean_value(row[11], float), "totalrange": self._clean_value(row[12], float), "transmission": self._clean_value(row[13], int), "dose": self._clean_value(row[14], float), "targetresolution": self._clean_value(row[15], float), "datacollectiontype": self._clean_value(row[16], str), "processingpipeline": self._clean_value(row[17], str), "spacegroupnumber": self._clean_value(row[18], int), "cellparameters": self._clean_value(row[19], str), "rescutkey": self._clean_value(row[20], str), "rescutvalue": self._clean_value(row[21], str), "userresolution": self._clean_value(row[22], str), "pdbid": self._clean_value(row[23], str), "autoprocfull": self._clean_value(row[24], str), "procfull": self._clean_value(row[25], str), "adpenabled": self._clean_value(row[26], str), "noano": self._clean_value(row[27], str), "ffcscampaign": self._clean_value(row[28], str), "trustedhigh": self._clean_value(row[29], str), "autoprocextraparams": self._clean_value(row[30], str), "chiphiangles": self._clean_value(row[31], str), } try: # Validate the record validated_record = SpreadsheetModel(**record) # Get the corrected `directory` corrected_directory = ( validated_record.data_collection_parameters.directory ) # Update `raw_data` to reflect the corrected value raw_data[-1]["data"][ 7 ] = corrected_directory # Replace directory in raw data raw_data[-1][ "directory" ] = corrected_directory # Add a top-level "directory" key raw_data[-1]["default_set"] = ( corrected_directory == "{sgPuck}/{sgPosition}" ) # Add validated record to the model model.append(validated_record) except ValidationError as e: logger.error(f"Validation error in row {index + 4}: {e}") for error in e.errors(): field_path = error["loc"] msg = error["msg"] if field_path[0] == "data_collection_parameters": subfield = field_path[1] column_index = headers.index(subfield) else: field = field_path[0] column_index = headers.index(field) error_info = { "row": index + 4, "cell": column_index, "value": row[column_index], "message": msg, } errors.append(error_info) self.model = model logger.info( f"Finished processing {len(model)} records with {len(errors)} errors" ) return self.model, errors, raw_data, headers # Include headers in the response