Add column type mapping and enhance validation

Introduced a backend mapping for column expected types, improving validation and error handling. Updated UI to highlight default and corrected values, with additional detailed validation for data collection parameters.
This commit is contained in:
GotthardG
2025-01-07 15:45:08 +01:00
parent 54975b5919
commit 92306fcfa6
5 changed files with 503 additions and 401 deletions

View File

@ -18,6 +18,49 @@ class SampleSpreadsheetImporter:
self.filename = None
self.model = None
def get_expected_type(self, column_name: str) -> type:
"""
Returns the expected data type for a given column name.
"""
# Define a mapping of column names to expected types
column_type_mapping = {
"dewarname": str,
"puckname": str,
"pucktype": str,
"crystalname": str,
"positioninpuck": int,
"priority": int,
"comments": str,
"proteinname": str,
"directory": str,
"oscillation": float,
"exposure": float,
"totalrange": int,
"transmission": int,
"targetresolution": float,
"aperture": str,
"datacollectiontype": str,
"processingpipeline": str,
"spacegroupnumber": int,
"cellparameters": str,
"rescutkey": str,
"rescutvalue": float,
"userresolution": float,
"pdbid": str,
"autoprocfull": bool,
"procfull": bool,
"adpenabled": bool,
"noano": bool,
"ffcscampaign": bool,
"trustedhigh": float,
"autoprocextraparams": str,
"chiphiangles": float,
"dose": float,
}
# Return type if column exists, else default to str
return column_type_mapping.get(column_name, str)
def _clean_value(self, value, expected_type=None):
"""Clean value by converting it to the expected type and handle edge cases."""
if value is None:
@ -139,13 +182,13 @@ class SampleSpreadsheetImporter:
continue
# Record raw data for later use
raw_data.append({"row_num": index + 4, "data": row})
raw_data.append({"row_num": index + 4, "data": list(row)})
# Pad the row to ensure it has the expected number of columns
# Ensure row has the expected number of columns
if len(row) < expected_columns:
row = list(row) + [None] * (expected_columns - len(row))
# Prepare the record with the cleaned values
# Prepare the record with cleaned values
record = {
"dewarname": self._clean_value(row[0], str),
"puckname": self._clean_value(row[1], str),
@ -154,8 +197,10 @@ class SampleSpreadsheetImporter:
"positioninpuck": self._clean_value(row[4], int),
"priority": self._clean_value(row[5], int),
"comments": self._clean_value(row[6], str),
"directory": self._clean_value(row[7], str),
"proteinname": self._clean_value(row[8], str),
}
record["data_collection_parameters"] = {
"directory": self._clean_value(row[7], str),
"oscillation": self._clean_value(row[9], float),
"aperture": self._clean_value(row[10], str),
"exposure": self._clean_value(row[11], float),
@ -182,69 +227,45 @@ class SampleSpreadsheetImporter:
}
try:
# Validate the record
validated_record = SpreadsheetModel(**record)
# Update the raw data with assigned default values
if (
validated_record.directory == "{sgPuck}/{sgPosition}"
and row[7] is None
):
row_list = list(row)
row_list[
7
] = validated_record.directory # Set the field to the default value
raw_data[-1]["data"] = row_list
raw_data[-1][
"default_set"
] = True # Mark this row as having a default value assigned
# Get the corrected `directory`
corrected_directory = (
validated_record.data_collection_parameters.directory
)
# Update `raw_data` to reflect the corrected value
raw_data[-1]["data"][
7
] = corrected_directory # Replace directory in raw data
raw_data[-1][
"directory"
] = corrected_directory # Add a top-level "directory" key
raw_data[-1]["default_set"] = (
corrected_directory == "{sgPuck}/{sgPosition}"
)
# Add validated record to the model
model.append(validated_record)
logger.debug(f"Row {index + 4} processed and validated successfully")
except ValidationError as e:
logger.error(f"Validation error in row {index + 4}: {e}")
for error in e.errors():
field = error["loc"][0]
field_path = error["loc"]
msg = error["msg"]
# Map field name (which is the key in `record`) to its index in the
# row
field_to_col = {
"dewarname": 0,
"puckname": 1,
"pucktype": 2,
"crystalname": 3,
"positioninpuck": 4,
"priority": 5,
"comments": 6,
"directory": 7,
"proteinname": 8,
"oscillation": 9,
"aperture": 10,
"exposure": 11,
"totalrange": 12,
"transmission": 13,
"dose": 14,
"targetresolution": 15,
"datacollectiontype": 16,
"processingpipeline": 17,
"spacegroupnumber": 18,
"cellparameters": 19,
"rescutkey": 20,
"rescutvalue": 21,
"userresolution": 22,
"pdbid": 23,
"autoprocfull": 24,
"procfull": 25,
"adpenabled": 26,
"noano": 27,
"ffcscampaign": 28,
"trustedhigh": 29,
"autoprocextraparams": 30,
"chiphiangles": 31,
}
column_index = field_to_col[field]
if field_path[0] == "data_collection_parameters":
subfield = field_path[1]
column_index = headers.index(subfield)
else:
field = field_path[0]
column_index = headers.index(field)
error_info = {
"row": index + 4,
"cell": column_index,
"value": row[column_index], # Value that caused the error
"value": row[column_index],
"message": msg,
}
errors.append(error_info)