aaredb/backend/app/services/spreadsheet_service.py
2024-12-16 22:50:04 +01:00

254 lines
9.8 KiB
Python

import logging
import openpyxl
from pydantic import ValidationError
from typing import List, Tuple
from io import BytesIO
from app.sample_models import SpreadsheetModel
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class SpreadsheetImportError(Exception):
pass
class SampleSpreadsheetImporter:
def __init__(self):
self.filename = None
self.model = None
def _clean_value(self, value, expected_type=None):
"""Clean value by converting it to the expected type and handle edge cases."""
if value is None:
return None
if expected_type == str:
# Ensure value is converted to string and stripped of whitespace
return str(value).strip()
if expected_type in [float, int]:
try:
return expected_type(value)
except (ValueError, TypeError):
# If conversion fails, return None
return None
if isinstance(value, str):
try:
# Handle numeric strings
if "." in value:
return float(value)
else:
return int(value)
except ValueError:
pass
# In case of failure, return the stripped string
return value.strip()
# If no expected type or value type match, return the original value
return value
def import_spreadsheet(self, file):
return self.import_spreadsheet_with_errors(file)
def get_expected_type(self, col_name):
type_mapping = {
"dewarname": str,
"puckname": str,
"positioninpuck": int,
"priority": int,
"oscillation": float,
# Add all other mappings based on model requirements
}
return type_mapping.get(col_name, str) # Default to `str`
def import_spreadsheet_with_errors(
self, file
) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]:
self.model = []
self.filename = file.filename
logger.info(f"Importing spreadsheet from .xlsx file: {self.filename}")
contents = file.file.read()
file.file.seek(0) # Reset file pointer to the beginning
if not contents:
logger.error("The uploaded file is empty.")
raise SpreadsheetImportError("The uploaded file is empty.")
try:
workbook = openpyxl.load_workbook(BytesIO(contents))
logger.debug("Workbook loaded successfully")
if "Samples" not in workbook.sheetnames:
logger.error("The file is missing 'Samples' worksheet.")
raise SpreadsheetImportError("The file is missing 'Samples' worksheet.")
sheet = workbook["Samples"]
except Exception as e:
logger.error(f"Failed to read the file: {str(e)}")
raise SpreadsheetImportError(f"Failed to read the file: {str(e)}")
# Unpack four values from the process_spreadsheet method
model, errors, raw_data, headers = self.process_spreadsheet(sheet)
# Now, return the values correctly
return model, errors, raw_data, headers
def process_spreadsheet(
self, sheet
) -> Tuple[List[SpreadsheetModel], List[dict], List[dict], List[str]]:
model = []
errors = []
raw_data = []
headers = []
# Skip the first 3 rows
rows = list(sheet.iter_rows(min_row=4, values_only=True))
logger.debug(f"Starting to process {len(rows)} rows from the sheet")
if not rows:
logger.error("The 'Samples' worksheet is empty.")
raise SpreadsheetImportError("The 'Samples' worksheet is empty.")
expected_columns = 32 # Number of columns expected based on the model
# Add the headers (the first row in the spreadsheet or map them explicitly)
headers = [
"dewarname",
"puckname",
"pucktype",
"crystalname",
"positioninpuck",
"priority",
"comments",
"directory",
"proteinname",
"oscillation",
"aperture",
"exposure",
"totalrange",
"transmission",
"dose",
"targetresolution",
"datacollectiontype",
"processingpipeline",
"spacegroupnumber",
"cellparameters",
"rescutkey",
"rescutvalue",
"userresolution",
"pdbid",
"autoprocfull",
"procfull",
"adpenabled",
"noano",
"ffcscampaign",
"trustedhigh",
"autoprocextraparams",
"chiphiangles",
]
for index, row in enumerate(rows):
if not any(row):
logger.debug(f"Skipping empty row at index {index}")
continue
# Record raw data for later use
raw_data.append({"row_num": index + 4, "data": row})
# Pad the row to ensure it has the expected number of columns
if len(row) < expected_columns:
row = list(row) + [None] * (expected_columns - len(row))
# Prepare the record with the cleaned values
record = {
"dewarname": self._clean_value(row[0], str),
"puckname": self._clean_value(row[1], str),
"pucktype": self._clean_value(row[2], str),
"crystalname": self._clean_value(row[3], str),
"positioninpuck": self._clean_value(row[4], int),
"priority": self._clean_value(row[5], int),
"comments": self._clean_value(row[6], str),
"directory": self._clean_value(row[7], str),
"proteinname": self._clean_value(row[8], str),
"oscillation": self._clean_value(row[9], float),
"aperture": self._clean_value(row[10], str),
"exposure": self._clean_value(row[11], float),
"totalrange": self._clean_value(row[12], float),
"transmission": self._clean_value(row[13], int),
"dose": self._clean_value(row[14], float),
"targetresolution": self._clean_value(row[15], float),
"datacollectiontype": self._clean_value(row[16], str),
"processingpipeline": self._clean_value(row[17], str),
"spacegroupnumber": self._clean_value(row[18], int),
"cellparameters": self._clean_value(row[19], str),
"rescutkey": self._clean_value(row[20], str),
"rescutvalue": self._clean_value(row[21], str),
"userresolution": self._clean_value(row[22], str),
"pdbid": self._clean_value(row[23], str),
"autoprocfull": self._clean_value(row[24], str),
"procfull": self._clean_value(row[25], str),
"adpenabled": self._clean_value(row[26], str),
"noano": self._clean_value(row[27], str),
"ffcscampaign": self._clean_value(row[28], str),
"trustedhigh": self._clean_value(row[29], str),
"autoprocextraparams": self._clean_value(row[30], str),
"chiphiangles": self._clean_value(row[31], str),
}
try:
validated_record = SpreadsheetModel(**record)
model.append(validated_record)
logger.debug(f"Row {index + 4} processed and validated successfully")
except ValidationError as e:
logger.error(f"Validation error in row {index + 4}: {e}")
for error in e.errors():
field = error["loc"][0]
msg = error["msg"]
# Map field name (which is the key in `record`) to its index in the
# row
field_to_col = {
"dewarname": 0,
"puckname": 1,
"pucktype": 2,
"crystalname": 3,
"positioninpuck": 4,
"priority": 5,
"comments": 6,
"directory": 7,
"proteinname": 8,
"oscillation": 9,
"aperture": 10,
"exposure": 11,
"totalrange": 12,
"transmission": 13,
"dose": 14,
"targetresolution": 15,
"datacollectiontype": 16,
"processingpipeline": 17,
"spacegroupnumber": 18,
"cellparameters": 19,
"rescutkey": 20,
"rescutvalue": 21,
"userresolution": 22,
"pdbid": 23,
"autoprocfull": 24,
"procfull": 25,
"adpenabled": 26,
"noano": 27,
"ffcscampaign": 28,
"trustedhigh": 29,
"autoprocextraparams": 30,
"chiphiangles": 31,
}
column_index = field_to_col[field]
error_info = {
"row": index + 4,
"cell": column_index,
"value": row[column_index], # Value that caused the error
"message": msg,
}
errors.append(error_info)
self.model = model
logger.info(
f"Finished processing {len(model)} records with {len(errors)} errors"
)
return self.model, errors, raw_data, headers # Include headers in the response