Files
sf-op/script/Devices/ImageStats.py
2017-05-26 16:31:21 +02:00

205 lines
6.9 KiB
Python

from startup import *
from ijutils import *
from ch.psi.pshell.imaging.Overlays import *
import ch.psi.pshell.imaging.Pen as Pen
import java.awt.Color as Color
import random
import ch.psi.pshell.imaging.ImageListener as ImageListener
from operator import add, mul, sub, truediv
def arrmul(a, b):
"""Multiply 2 series of the same size.
Args:
a(list, tuple, array ...): subscriptable object containing numbers
b(list, tuple, array ...): subscriptable object containing numbers
Returns:
List
"""
return map(mul, a, b)
def center_of_mass(data, x = None):
"""Calculate the center of mass of a series, and its rms.
Args:
data(list, tuple, array ...): subscriptable object containing numbers
x(list, tuple, array ..., optional): x coordinates
Returns:
Tuple (com, rms)
"""
if x is None:
x = Arr.indexesDouble(len(data))
data_sum = sum(data)
if (data_sum==0):
return float('nan')
xmd = arrmul( x, data)
com = sum(xmd) / data_sum
xmd2 = arrmul( x, xmd)
com2 = sum(xmd2) / data_sum
rms = math.sqrt(abs(com2 - com * com))
return (com, rms)
def get_centroid(source):
bi = source.getImage()
if bi is None:
return None
op = show_panel(bi, "Original")
ip = load_image(bi)
plot(get_histogram(ip), title = "Histogram")
grayscale(ip)
invert(ip)
gaussian_blur(ip)
auto_threshold(ip)
#binary_erode(ip)
show_panel(ip.getBufferedImage(), "Image")
(results,output_img)=analyse_particles(ip, 2000,20000, exclude_edges=False, print_table=True)
op.clearOverlays()
show_panel(output_img.getBufferedImage(), "Outlines")
if results.size()>0:
centroid = (results.getValue("XM",0), results.getValue("YM",0))
ov = Crosshairs(Pen(Color.ORANGE), java.awt.Point(int(centroid[0]),int(centroid[1])), java.awt.Dimension(15,15))
op.addOverlay(ov)
return centroid
import ch.psi.pshell.imaging.Filter as Filter
class SimulatedSource(Filter):
def process(self, img, data):
self.img=img
if img is None:
return None
ip = load_image(img)
pad_h = int((random.random()-0.5) * 500)
pad_v = int((random.random()-0.5) * 500)
#print "Pad = " , (pad_h, pad_v)
ip = pad_image(ip, -pad_h, pad_h, pad_v, -pad_v, fill_color = Color.BLACK)
return ip.getBufferedImage()
#return img
def waitNext(self, timeout):
self.pushImage(self.process(self.img, None))
class ImageStats(DeviceBase):
def __init__(self, name, source):
DeviceBase.__init__(self, name)
self.source = source
self.com_x_samples, self.com_y_samples = [], []
class ComX(Readable):
def read(self):
if len(self.image_stats.com_x_samples)==0: return None
return mean(self.image_stats.com_x_samples)
self.com_x_mean = ComX(); self.com_x_mean.image_stats = self
class ComY(Readable):
def read(self):
if len(self.image_stats.com_y_samples)==0: return None
return mean(self.image_stats.com_y_samples)
self.com_y_mean = ComY(); self.com_y_mean.image_stats = self
class ComXVar(Readable):
def read(self):
if len(self.image_stats.com_x_samples)==0: return None
return stdev(self.image_stats.com_x_samples)
self.com_x_stdev = ComXVar(); self.com_x_stdev.image_stats = self
class ComXVar(Readable):
def read(self):
if len(self.image_stats.com_y_samples)==0: return None
return stdev(self.image_stats.com_y_samples)
self.com_y_stdev = ComXVar(); self.com_y_stdev.image_stats = self
set_device_alias(self.com_x_mean, name + " com x mean")
set_device_alias(self.com_y_mean, name + " com y mean")
set_device_alias(self.com_x_stdev, name + " com x stdev")
set_device_alias(self.com_y_stdev, name + " com y stdev")
self.bg_en = False
self.num_images = 1
self.initialize()
class SourceListener (ImageListener):
def __init__(self, dev):
self.dev=dev
def onImage(self, origin, image, data):
print "On image"
self.dev.doUpdate()
def onError(self, origin, ex):
self.dev.com_x_samples, self.dev.com_y_samples = [], []
self.listener = SourceListener(self)
self.source.addListener(self.listener)
def doUpdate(self):
print "Update"
self.com_x_samples, self.com_y_samples = [], []
for i in range(self.num_images):
#if type(self.source) is not ch.psi.pshell.imaging.FileSource:
# self.source.waitNext(5000)
#centroid = get_centroid(self.source)
#print "cent ", centroid
#if centroid is not None:
# self.com_x_samples.append(centroid[0])
# self.com_y_samples.append(centroid[1])
x_profile = self.source.data.integrateVertically(True)
y_profile = self.source.data.integrateHorizontally(True)
com_x,rms_x = center_of_mass(x_profile)
com_y,rms_y = center_of_mass(y_profile)
self.com_x_samples.append(com_x)
self.com_y_samples.append(com_y)
def setNumberOfImages(self, value):
self.num_images = value
def enableBackground(self, value):
self.bg_en = value
def captureBackground(self, images):
self.doInitialize()
def doClose(self):
print "close"
self.source.removeListener(self.listener)
def start(self):
pass
def stop(self):
pass
def get_simulated_source(img):
simulated_source = SimulatedSource()
simulated_source.img=None
simulated_source.initialize()
img.addListener(simulated_source)
show_panel(simulated_source)
return simulated_source
if __name__ == "__builtin__":
#simulated_source = get_simulated_source(image)
#print get_centroid(simulated_source)
add_device(ImageStats("image_stats", cam2), True)
cam2.waitNext(5000)
image_stats.enableBackground(False)
#for i in range (10):
# image_stats.update()
# print image_stats.take(), image_stats.com_x_mean.read(), image_stats.com_y_mean.read()
# time.sleep(1)
image_stats.setNumberOfImages(3)
sensors = [image_stats.com_x_mean, image_stats.com_y_mean, image_stats.com_x_stdev, image_stats.com_y_stdev]
try:
tscan(sensors, 10, 0.1)
finally:
image_stats.close()