import logging import openpyxl import re from pydantic import ValidationError from typing import List, Tuple from io import BytesIO from app.sample_models import SpreadsheetModel logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class SpreadsheetImportError(Exception): pass class SampleSpreadsheetImporter: def __init__(self): self.filename = None self.model = None def get_expected_type(self, column_name: str) -> type: """ Returns the expected data type for a given column name. """ # Define a mapping of column names to expected types column_type_mapping = { "dewarname": str, "puckname": str, "pucktype": str, "crystalname": str, "positioninpuck": int, "priority": int, "comments": str, "proteinname": str, "directory": str, "oscillation": float, "exposure": float, "totalrange": int, "transmission": int, "targetresolution": float, "aperture": str, "datacollectiontype": str, "processingpipeline": str, "spacegroupnumber": int, "cellparameters": str, "rescutkey": str, "rescutvalue": float, "userresolution": float, "pdbid": str, "autoprocfull": bool, "procfull": bool, "adpenabled": bool, "noano": bool, "ffcscampaign": bool, "trustedhigh": float, "autoprocextraparams": str, "chiphiangles": float, "dose": float, } # Return type if column exists, else default to str return column_type_mapping.get(column_name, str) def _clean_value(self, value, expected_type=None, column_name=None): """ Cleans and validates the given value based on its expected type. Different behavior is applied to specific columns if needed. """ if value is None or (isinstance(value, str) and value.strip() == ""): # Handle empty or None values if column_name == "directory": logger.warning("Directory value is empty. Assigning default value.") self.default_set = True # Flag to indicate a default value is set. return "{sgPuck}/{sgPosition}" # Default directory self.default_set = False return None # Convert to string and strip whitespaces cleaned_value = str(value).strip() # Handle specific column behaviors if expected_type == str: if expected_type == str: if column_name is None: logger.warning(f"Missing column_name for value: {value}") elif column_name == "comments": return " ".join(cleaned_value.split()) # Normalize excessive spaces else: # Replace spaces with underscores for general string columns return cleaned_value.replace(" ", "_") elif expected_type in [int, float]: try: # Remove any invalid characters and cast to the expected type cleaned_value = re.sub(r"[^\d.]", "", cleaned_value) return expected_type(cleaned_value) except (ValueError, TypeError) as e: logger.error( f"Failed to cast value '{value}' to {expected_type}. Error: {e}" ) raise ValueError( f"Invalid value: '{value}'. Expected type: {expected_type}." ) # Return cleaned value for other types return cleaned_value def import_spreadsheet(self, file): return self.import_spreadsheet_with_errors(file) def import_spreadsheet_with_errors( self, file ) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]: self.model = [] self.filename = file.filename logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}") contents = file.file.read() file.file.seek(0) # Reset file pointer to the beginning if not contents: logger.error("The uploaded file is empty.") raise SpreadsheetImportError("The uploaded file is empty.") try: workbook = openpyxl.load_workbook(BytesIO(contents)) logger.debug("Workbook loaded successfully") if "Samples" not in workbook.sheetnames: logger.error("The file is missing 'Samples' worksheet.") raise SpreadsheetImportError("The file is missing 'Samples' worksheet.") sheet = workbook["Samples"] except Exception as e: logger.error(f"Failed to read the file: {str(e)}") raise SpreadsheetImportError(f"Failed to read the file: {str(e)}") # Unpack four values from the process_spreadsheet method model, errors, raw_data, headers = self.process_spreadsheet(sheet) # Now, return the values correctly return model, errors, raw_data, headers def process_spreadsheet( self, sheet ) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]: model = [] errors = [] raw_data = [] headers = [] # Skip the first 3 rows rows = list(sheet.iter_rows(min_row=4, values_only=True)) logger.debug(f"Starting to process {len(rows)} rows from the sheet") if not rows: logger.error("The 'Samples' worksheet is empty.") raise SpreadsheetImportError("The 'Samples' worksheet is empty.") expected_columns = 32 # Number of columns expected based on the model # Add the headers (the first row in the spreadsheet or map them explicitly) headers = [ "dewarname", "puckname", "pucktype", "crystalname", "positioninpuck", "priority", "comments", "directory", "proteinname", "oscillation", "aperture", "exposure", "totalrange", "transmission", "dose", "targetresolution", "datacollectiontype", "processingpipeline", "spacegroupnumber", "cellparameters", "rescutkey", "rescutvalue", "userresolution", "pdbid", "autoprocfull", "procfull", "adpenabled", "noano", "ffcscampaign", "trustedhigh", "autoprocextraparams", "chiphiangles", ] for index, row in enumerate(rows): if not any(row): logger.debug(f"Skipping empty row at index {index}") continue # Record raw data for later use raw_data.append({"row_num": index + 4, "data": list(row)}) # Ensure row has the expected number of columns if len(row) < expected_columns: row = list(row) + [None] * (expected_columns - len(row)) # Prepare the record dynamically based on headers record = {} for col_idx, column_name in enumerate(headers): original_value = row[col_idx] if col_idx < len(row) else None expected_type = self.get_expected_type(column_name) # Call _clean_value dynamically with the correct column_name try: cleaned_value = self._clean_value( original_value, expected_type, column_name ) record[column_name] = cleaned_value except (ValueError, TypeError) as e: logger.error( f"Validation error for row {index + 4}," f" column '{column_name}': " f"{str(e)}" ) errors.append( { "row": index + 4, "column": column_name, "value": original_value, "message": str(e), } ) # Nested processing for data_collection_parameters record["data_collection_parameters"] = { "directory": record.get("directory"), "oscillation": record.get("oscillation"), "aperture": record.get("aperture"), "exposure": record.get("exposure"), "totalrange": record.get("totalrange"), "transmission": record.get("transmission"), "dose": record.get("dose"), "targetresolution": record.get("targetresolution"), "datacollectiontype": record.get("datacollectiontype"), "processingpipeline": record.get("processingpipeline"), "spacegroupnumber": record.get("spacegroupnumber"), "cellparameters": record.get("cellparameters"), "rescutkey": record.get("rescutkey"), "rescutvalue": record.get("rescutvalue"), "userresolution": record.get("userresolution"), "pdbid": record.get("pdbid"), "autoprocfull": record.get("autoprocfull"), "procfull": record.get("procfull"), "adpenabled": record.get("adpenabled"), "noano": record.get("noano"), "ffcscampaign": record.get("ffcscampaign"), "trustedhigh": record.get("trustedhigh"), "autoprocextraparams": record.get("autoprocextraparams"), "chiphiangles": record.get("chiphiangles"), } try: # Validate the record validated_record = SpreadsheetModel(**record) # Add validated record to the model model.append(validated_record) except ValidationError as e: logger.error(f"Validation error in row {index + 4}: {e}") for error in e.errors(): field_path = error["loc"] msg = error["msg"] if field_path[0] == "data_collection_parameters": subfield = field_path[1] column_index = headers.index(subfield) else: field = field_path[0] column_index = headers.index(field) error_info = { "row": index + 4, "cell": column_index, "value": row[column_index], "message": msg, } errors.append(error_info) self.model = model logger.info( f"Finished processing {len(model)} records with {len(errors)} errors" ) return self.model, errors, raw_data, headers # Include headers in the response