225 lines
8.3 KiB
Python
225 lines
8.3 KiB
Python
run("imgproc/NewCoverDetection")
|
|
run("imgproc/NewCoverCorrelation")
|
|
|
|
|
|
class CoverDetectionConfig(RegisterConfig):
|
|
def __init__(self):
|
|
self.number_images = 1
|
|
self.continuous = False
|
|
self.modulo = 1
|
|
self.center_x = 820.0
|
|
self.center_y = 570.0
|
|
self.scale_x = 1.0
|
|
self.scale_y = 1.0
|
|
self.rotation_degrees = 0.0
|
|
self.correlation_threshold = 0.60
|
|
self.diameter_target=95
|
|
self.diameter_disk=670
|
|
self.diameter_dewar=1800
|
|
self.size_mask_disk=0.8
|
|
self.size_mask_dewar=0.98
|
|
self.max_detect_offset=10
|
|
self.open_strategy = "both"
|
|
|
|
class CoverCorrelation(ReadonlyAsyncRegisterBase):
|
|
def __init__(self, name):
|
|
ReadonlyAsyncRegisterBase.__init__(self, name)
|
|
self.precision = 3
|
|
|
|
def set(self, value):
|
|
self.onReadout(value)
|
|
|
|
|
|
class CoverDetection(ReadonlyAsyncRegisterBase, ReadonlyRegisterArray):
|
|
|
|
def __init__(self, name, image):
|
|
ReadonlyAsyncRegisterBase.__init__(self, name, CoverDetectionConfig())
|
|
self.image = image
|
|
|
|
class ImgListener (ImageListener):
|
|
def onImage(s, origin, image, data):
|
|
self.append(image)
|
|
def onError(s, origin, ex):
|
|
print ex
|
|
|
|
self.listener = ImgListener()
|
|
self.enabled = False
|
|
self.images = []
|
|
self.processing_time = None
|
|
self.renderer = None
|
|
self.error = None
|
|
self.error_overlay=None
|
|
self.img_counter, self.proc_counter = 0, 0
|
|
self.offset_px = self.offset_um = None
|
|
self.correlation = CoverCorrelation("cover_correlation")
|
|
self.correlation_error = False
|
|
|
|
def doInitialize(self):
|
|
try:
|
|
self.enable()
|
|
self.clear()
|
|
#self.grab_ref_image()
|
|
except:
|
|
traceback.print_exc()
|
|
raise
|
|
|
|
def doClose(self):
|
|
self.disable()
|
|
self.correlation.close()
|
|
|
|
def set_renderer(self, renderer):
|
|
self.clear()
|
|
self.renderer = renderer
|
|
|
|
def enable(self):
|
|
if not self.enabled:
|
|
self.enabled = True
|
|
self.image.addListener(self.listener)
|
|
self.img_counter, self.proc_counter = 0, 0
|
|
self.offset_px = self.offset_um = None
|
|
|
|
def disable(self):
|
|
if self.enabled:
|
|
self.image.removeListener(self.listener)
|
|
self.enabled = False
|
|
self.onReadout(None)
|
|
self.clear()
|
|
|
|
def clear(self):
|
|
self.set_marker(None)
|
|
self.set_error_overlay(None)
|
|
self.set_correlation(None)
|
|
|
|
def append(self, image):
|
|
self.img_counter += 1
|
|
if self.img_counter % self.config.modulo == 0:
|
|
ip = load_image(ImagingUtils.grayscale(image, None))
|
|
self.images.append(ip)
|
|
while len(self.images) > self.config.number_images:
|
|
self.images.pop(0)
|
|
if len(self.images) == self.config.number_images:
|
|
start = time.time()
|
|
if len(self.images) ==1:
|
|
ip = self.images[0]
|
|
else:
|
|
ip = integrate(self.images)
|
|
try:
|
|
self.error = None
|
|
center = (self.config.center_x, self.config.center_y)
|
|
x,y = detect(ip, center, \
|
|
diam_target=self.config.diameter_target, diam_disk= self.config.diameter_disk, diam_dewar=self.config.diameter_dewar, \
|
|
mask_disk=self.config.size_mask_disk, mask_dewar=self.config.size_mask_dewar, \
|
|
offset_threshold=self.config.max_detect_offset ,open_strategy= self.config.open_strategy)
|
|
|
|
except Exception as e:
|
|
self.error = e
|
|
x,y = -1, -1
|
|
self.processing_time = time.time()-start
|
|
self.proc_counter += 1
|
|
|
|
try:
|
|
corr = get_correlation(ip.getBufferedImage())
|
|
self.correlation.set(corr)
|
|
self.set_correlation(corr)
|
|
except Exception as e:
|
|
#traceback.print_exc()
|
|
if self.error is None:
|
|
self.error = e
|
|
|
|
self.set(x, y, self.processing_time * 1000, self.error)
|
|
|
|
#traceback.print_exc()
|
|
|
|
if not self.config.continuous:
|
|
self.images=[]
|
|
|
|
def calculate_factors(self, x1, y1, x2, y2, mx1, my1, mx2, my2):
|
|
dx = x2-x1
|
|
dy = y2-y1
|
|
dmx = mx2-mx1
|
|
dmy = my2-my1
|
|
sx = dmx/dx
|
|
sy = dmy / dy
|
|
tp=math.atan(dy/dx)
|
|
tm=math.atan(dmy/dmx)
|
|
t = tm - tp
|
|
return sx, sy, math.degrees(t)
|
|
|
|
def pixel_to_millis(self, x, y):
|
|
try:
|
|
t = math.radians(self.config.rotation_degrees)
|
|
offx, offy = x-self.config.center_x, y-self.config.center_y
|
|
rx, ry = (offx * math.cos(t) + offy * math.sin(t)), (-offx * math.sin(t) + offy * math.cos(t))
|
|
mx, my = self.config.scale_x*rx, self.config.scale_y*ry
|
|
return round(mx,3), round(my,3)
|
|
except:
|
|
return float('nan'), float('nan')
|
|
|
|
def set(self, x, y, tm, error):
|
|
if self.enabled:
|
|
if error:
|
|
self.offset_px = self.offset_um = None
|
|
value = None
|
|
else:
|
|
offx, offy = x-self.config.center_x, y-self.config.center_y
|
|
self.offset_px = opx, opy = round(offx), round(offy)
|
|
self.offset_um = omx, omy = self.pixel_to_millis(x,y)
|
|
value = to_array([x, y, opx, opy, omx, omy, tm], 'i')
|
|
self.onReadout(value)
|
|
if self.renderer is not None:
|
|
if error is None:
|
|
marker = Overlays.Crosshairs(renderer.getPenProfile(), Point(int(x),int(y)), Dimension(-1, -1))
|
|
else:
|
|
marker = None
|
|
self.set_marker(marker);
|
|
self.set_error_overlay(error)
|
|
|
|
def set_marker(self, marker):
|
|
if self.renderer is not None:
|
|
self.renderer.setMarker(marker)
|
|
|
|
def set_error_overlay(self, error):
|
|
if self.renderer is not None:
|
|
if error is None:
|
|
error_overlay = None
|
|
else:
|
|
error_overlay = Overlays.Text(renderer.getPenErrorText(), str(error), Font("Verdana", Font.PLAIN, 12), Point(20, 20))
|
|
error_overlay.setFixed(True)
|
|
error_overlay.setAnchor(Overlay.ANCHOR_VIEWPORT_TOP_LEFT)
|
|
renderer.updateOverlays(error_overlay, self.error_overlay)
|
|
self.error_overlay = error_overlay
|
|
else:
|
|
self.error_overlay = None
|
|
|
|
def grab_ref_image(self):
|
|
if self.config.number_images>1:
|
|
images = []
|
|
for i in range(self.config.number_images):
|
|
ip = load_image(ImagingUtils.grayscale(self.image.output, None))
|
|
images.append(ip)
|
|
self.image.waitNext(-1)
|
|
ip = integrate(images)
|
|
img = ip.getBufferedImage()
|
|
else:
|
|
img = self.image.output
|
|
save_ref_img(img)
|
|
|
|
def set_correlation(self, corr):
|
|
self.corr = corr
|
|
error = corr is not None and corr < self.config.correlation_threshold
|
|
if error != self.correlation_error:
|
|
self.correlation_error = error
|
|
if error:
|
|
print ("Image anomally detected")
|
|
else:
|
|
print ("Image ok")
|
|
if error:
|
|
raise Exception ("Image anomally detected")
|
|
|
|
|
|
|
|
add_device(CoverDetection("cover_detection", image), force = True)
|
|
add_device(cover_detection.correlation, force = True)
|
|
renderer=show_panel(image)
|
|
cover_detection.set_renderer(renderer)
|