import logging
import numpy
import os
import random
import struct
import time
import yaml
#from scipy.misc import bytescale
import zmq
from os.path import join
from pathlib import Path
#import qsingleton #ZAC: orig. code
#from CoordinatesModel import CoordinatesModel #ZAC: orig. code
from app_config import AppCfg #settings, option, appsconf
EMBL_RESULTS_TIMEOUT = "embl/results_timeout"
EMBL_SAVE_FILES = "embl/save_files"
#embl = appsconf["embl"]
#import imageio
import numpy as np
from PyQt5 import QtCore
from PyQt5.QtCore import pyqtSignal, Qt, QObject, pyqtSlot, QPoint
from PyQt5.QtGui import QStandardItemModel, QCursor
from PyQt5.QtWidgets import (
QWidget,
QVBoxLayout,
QHeaderView,
QPushButton,
QFormLayout,
QDoubleSpinBox,
QLabel,
QTableWidgetItem,
QLineEdit,
QTableWidget,
QTextEdit,
QProgressBar,
QHBoxLayout,
QSpinBox,
QTabWidget,
QTableView,
QMenu,
QProgressDialog,
)
#import storage
from app_utils import assert_motor_positions
#folders = storage.Folders()
import logging
logger = logging.getLogger(__name__)
class EmblTaskResultTimeoutException(Exception):
pass
ZPOS, FPATH = range(2)
# record separator
RS = 0x1e
#class EmblMessage(QObject, metaclass=qsingleton.Singleton): #ZAC: orig. code
class EmblMessage(QObject):
def __init__(self, parent=None, **kwargs):
super(EmblMessage, self).__init__(parent, **kwargs)
return #ZAC: orig. code
if not embl["enabled"]:
return
context = zmq.Context()
logger.info("Connecting to server...")
self.socket = context.socket(zmq.REQ)
uri = embl["uri"]
logger.info(f"EMBL alignment server at: {uri}")
self.socket.connect(uri)
def make_PAR(self, min: float, max: float, step: float, ppm: int, beam_xy: tuple):
puckid, sampleid = folders._prefix.split("_")
beam_x, beam_y = beam_xy
msg = (
f"PAR{RS:c}{min}{RS:c}{max}{RS:c}{step}{RS:c}{ppm}{RS:c}{beam_x:.0f}{RS:c}{beam_y:.0f}{RS:c}{puckid}{RS:c}{sampleid}"
)
logger.debug(f"PAR = {msg}")
self.socket.send_string(msg)
ans = self.socket.recv_string()
def make_IMG(self, pos: float, img):
width, height = img.shape
msg = f"IMG{RS:c}{pos:.3f}{RS:c}{width}{RS:c}{height}"
logger.info(f"sending img header: {msg}")
self.socket.send_string(msg)
self.socket.recv_string()
logger.info(
f"about to send img shape={img.shape} type={img.dtype} pos = {pos} mm"
)
self.socket.send(numpy.ascontiguousarray(img))
logger.info(f"waiting for recv")
ans = self.socket.recv_string()
logger.info(f"received!")
def finished(self):
self.make_STS("Finished")
def aborted(self):
self.make_STS("Aborted")
def make_STS(self, status):
msg = f"STS{RS:c}{status}"
self.socket.send_string(msg)
ans = self.socket.recv_string()
def make_RES(self):
n = 1
timeout = settings.value(EMBL_RESULTS_TIMEOUT, type=float) + time.time()
result = "failed"
while time.time() < timeout:
time.sleep(2.0)
logger.info(f"#{n}.requesting RES")
self.socket.send_string("RES")
result = self.socket.recv_string()
logger.info(f" RES = {result}")
if "Pending" != result:
break
n += 1
return result
embl_tube = EmblMessage()
class EmblWidget(QWidget):
def __init__(self, parent):
super(EmblWidget, self).__init__(parent)
#parent.pixelsPerMillimeter.connect(self.save_ppm)
#parent.beamCameraCoordinatesChanged.connect(self.save_beam_coordinates)
self.setLayout(QVBoxLayout())
w = QWidget()
self.layout().addWidget(w)
w.setLayout(QFormLayout())
w.layout().setLabelAlignment(Qt.AlignRight)
self._startZ = QDoubleSpinBox()
self._startZ.setValue(0.6)
self._startZ.setSuffix(" mm")
self._startZ.setRange(-5, 5)
self._startZ.setSingleStep(0.1)
self._startZ.setDecimals(3)
self._endZ = QDoubleSpinBox()
self._endZ.setValue(1.4)
self._endZ.setSuffix(" mm")
self._endZ.setRange(-5, 5)
self._endZ.setSingleStep(0.1)
self._endZ.setDecimals(3)
self._stepZ = QDoubleSpinBox()
self._stepZ.setValue(0.050)
self._stepZ.setSuffix(" mm")
self._stepZ.setRange(0.005, 1.000)
self._stepZ.setSingleStep(0.010)
self._stepZ.setDecimals(3)
self._resultsTimeout = QDoubleSpinBox()
self._resultsTimeout.setRange(1., 2000.)
self._resultsTimeout.setValue(60)
self._resultsTimeout.setSuffix(" seconds")
self._resultsTimeout.setDecimals(0)
self._factors = QLineEdit("1 1 1")
but = QPushButton("update results")
but.clicked.connect(self.apply_factors)
w.layout().addRow(QLabel("Start Z"), self._startZ)
w.layout().addRow(QLabel("End Z"), self._endZ)
w.layout().addRow(QLabel("Step"), self._stepZ)
w.layout().addRow(QLabel("Task Result Timeout"), self._resultsTimeout)
w.layout().addRow(QLabel("Factors"), self._factors)
w.layout().addRow(QLabel("Apply Factors"), but)
but = QPushButton("Scan Z")
but.clicked.connect(self.executeScan)
but.setAccessibleName("emblScanZ")
self.layout().addWidget(but)
tabs = QTabWidget(self)
self.layout().addWidget(tabs)
self._table = QTableWidget()
self._table.setColumnCount(2)
self._table.setHorizontalHeaderLabels(("Z (mm)", "File Path"))
self._table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.layout().addWidget(self._table)
tabs.addTab(self._table, "Z Scans")
tabs.setAccessibleName("embl_tabs")
tabs.setObjectName("embl_tabs")
view = self._results_view = QTableView(self)
#view.setModel(CoordinatesModel()) #ZAC: orig. code
view.verticalHeader().show()
view.horizontalHeader().show()
view.setContextMenuPolicy(Qt.CustomContextMenu)
view.customContextMenuRequested.connect(self.results_menu)
tabs.addTab(view, "Results")
self._tabs = tabs
def apply_factors(self):
self.populate_results(self._results)
@pyqtSlot(float)
def save_ppm(self, ppm: float):
self._ppm = int(ppm)
@pyqtSlot(float, float)
def save_beam_coordinates(self, bx: float, by: float):
logger.debug(f"saving beam_xy {bx} & {by}")
self._beam_xy = (int(bx), int(by))
@pyqtSlot(QPoint)
def results_menu(self, at_point):
table = self.sender()
model = table.model()
index = table.indexAt(at_point)
if not index.isValid():
return
coords = model.coords_at(index)
logger.debug(f"coord: {coords}")
menu = QMenu(table)
gotoAction = menu.addAction("Go here")
if menu.exec_(QCursor.pos()) is gotoAction:
logger.info(f"going to: {index.row()} => {coords}")
self.goto_position(coords)
def configure(self, motors, camera, zoom_device):
self._camera = camera
self._zoom_device = zoom_device
self.motors = motors
def executeScan(self):
fx, fy, cx, cz, omega = self.motors
x, y, z = fx.get_position(), fy.get_position(), cz.get_position()
logger.info(f"scan started at gonio: x,y,z = {x}, {y}, {z}")
self._table.setRowCount(0) # clear previous information
zs = self._startZ.value()
ze = self._endZ.value()
step = self._stepZ.value()
scan_config = {
"z-start": zs,
"z-end": ze + step, # inclusive last value
"z-step": step,
"ppm": self._ppm,
"beam_xy": self._beam_xy,
}
self._gonio_at_start = (x, y, ze + step)
base_Z = self.motors[3] # base_Z
camera = self._camera
zvals = np.arange(zs, ze, step)
numZs = len(zvals)
dlg = QProgressDialog(self)
dlg.setWindowModality(Qt.WindowModal)
dlg.setMinimumDuration(0)
dlg.setCancelButton(None)
dlg.setRange(0, 0)
dlg.setLabelText(f"EMBL Z-Scan
")
dlg.setAutoClose(True)
dlg.show()
dlg.setValue(random.randint(1, 20))
self._progress_dialog = dlg
self._acq_thread = AcquisitionThread(base_Z, camera, scan_config)
self._acq_thread.image_acquired.connect(self.addImage)
self._acq_thread.acquisition_finished.connect(self.acquisition_finished)
self._acq_thread.results_fetched.connect(self.populate_results)
self._acq_thread.message.connect(
lambda msg: dlg.setLabelText(f"EMBL Z-Scan
{msg}")
)
self._acq_thread.start()
def createModel(self, parent):
model = QStandardItemModel(0, 2, parent)
model.setHeaderData(ZPOS, QtCore.Qt.Horizontal, "Z (mm)")
model.setHeaderData(FPATH, QtCore.Qt.Horizontal, "Image Path")
self._model = model
return model
def addImage(self, idx, zpos, fpath):
logger.info("zpos = {}, fpath = {}".format(zpos, fpath))
rows = self._table.rowCount()
# table.setRowCount(rows + 1)
self._table.insertRow(rows)
item = QTableWidgetItem()
item.setText("{:.03f}".format(zpos))
self._table.setItem(rows, ZPOS, item)
item = QTableWidgetItem()
item.setText(str(fpath))
self._table.setItem(rows, FPATH, item)
self._table.resizeColumnsToContents()
def acquisition_finished(self):
self._progress_dialog.reset()
def goto_position(self, coords):
fx, fy, cx, cz, o = self.motors
tx, ty, bb, tz, to = coords
logger.info(f"moving gonio to: {tx}, {ty}, {tz}")
fx.move(tx, wait=True, ignore_limits=True)
fy.move(ty, wait=True, ignore_limits=True)
cz.move(tz, wait=True, ignore_limits=True)
@pyqtSlot(str)
def populate_results(self, results):
"""Results.txt
60.624%363.4026940025185%-768.32%-1052.52
angle between sample and camera, x, y, z
:return:
"""
if "no results" in results:
return
s1, s2, s3 = [int(c) for c in self._factors.text().split()]
self._results = results
self.coords = []
ox, oy, oz = self._gonio_at_start
oz = self._endZ.value()
for n, line in enumerate(results.split("\n")):
# o, z, y, x = [float(n) for n in line.split("%")]
omega, x, y, z = [float(n) for n in line.split("%")]
no, nx, ny, nz = [omega, ox + s1*x, oy + s2*y, oz + s3*z]
self.coords.append([nx, ny, 0, nz, no])
model = self._results_view.model()
model.update_coordinates(self.coords)
self._tabs.setCurrentIndex(1)
class AcquisitionThread(QtCore.QThread):
image_acquired = pyqtSignal(int, float, str)
results_fetched = pyqtSignal(str)
message = pyqtSignal(str)
acquisition_finished = pyqtSignal()
def __init__(self, z_motor, camera, scan_config):
QtCore.QThread.__init__(self)
self._scanconfig = scan_config
logger.debug(f"scanconfig: {scan_config}")
self.z_motor = z_motor
self.camera = camera
def save_params(self, params):
fname = Path(folders.res_folder) / "z_scan" / "parameters.yaml"
try:
with open(fname, "w") as fp:
yaml.dump(params, fp)
except:
logger.warning(f"failed to save z-scan parameter file: {fname}")
def save_image(self, n, z, img):
if not option(EMBL_SAVE_FILES):
return
zp = f"{n:02d}_Z{z:.3f}".replace(".", "p") + ".png"
fname = Path(folders.res_folder) / "z_scan" / zp
try:
imageio.imsave(fname, img)
except:
logger.error(f"failed to write: {fname}")
return f"failed to write image {n}"
return fname
def save_result(self, result):
if not option(EMBL_SAVE_FILES):
return
fname = Path(folders.res_folder) / "z_scan" / "result.txt"
with open(fname, "w") as fp:
fp.write(result)
def make_folders(self):
if not option(EMBL_SAVE_FILES):
return
fold = Path(folders.res_folder) / "z_scan"
fold.mkdir(parents=True, exist_ok=True)
def run(self):
self.make_folders()
cnf = self._scanconfig
zmin, zmax, zstep = cnf["z-start"], cnf["z-end"], cnf["z-step"]
zvals = np.arange(zmin, zmax, zstep)
num = len(zvals)
logger.debug(
f"Z-scan: min/max/step {zmin}/{zmax}/{zstep} ppm={cnf['ppm']} beam_xy={cnf['beam_xy']}"
)
logger.info(f"Z-vals: {', '.join([f'{z:.2f}' for z in zvals])}")
self.save_params(cnf)
embl_tube.make_PAR(zmin, zmax, zstep, cnf["ppm"], cnf["beam_xy"])
for n, z in enumerate(zvals):
idx = n + 1
self.message.emit(f"{idx}/{num} Z ➠ {z:.2f}")
self.z_motor.move(z, ignore_limits=True, wait=True)
assert_motor_positions([(self.z_motor, z, 0.1)], timeout=3.)
self.message.emit(f"{idx}/{num} acquiring")
img = self.camera.get_image()
self.message.emit(f"{idx}/{num} acquired")
img = np.rot90(img, 1)
img = bytescale(img)
fname = self.save_image(idx, z, img)
self.image_acquired.emit(idx, z, str(fname)) # just to populate table
self.message.emit(f"{idx}/{num} sending")
embl_tube.make_IMG(z, img)
self.message.emit(f"{idx}/{num} sent")
self.message.emit(f"acquisition finished")
embl_tube.finished()
self.message.emit(f"waiting for results")
result = embl_tube.make_RES()
self.save_result(result)
self.results_fetched.emit(result)
self.acquisition_finished.emit()