This commit is contained in:
2019-08-16 11:43:23 +02:00
parent 3e3afde314
commit 9ece2a1d4e
7 changed files with 5267 additions and 0 deletions

900
script/___Lib/diffutils.py Normal file
View File

@@ -0,0 +1,900 @@
###################################################################################################\
# Diffcalc utilities
###################################################################################################
###################################################################################################\
# Installing
###################################################################################################
#1- Download from: https://github.com/DiamondLightSource/diffcalc/archive/v2.1.zip
#2- Extract the contents to {script}/Lib/diffcalc
#3- Download http://central.maven.org/maven2/gov/nist/math/jama/1.0.3/jama-1.0.3.jar
# to the extensions folder.
#4- On {script}/Lib/diffcalc/diffcalc/gdasupport/you.py, the line " wl.asynchronousMoveTo(1)"
# must be commented for the energy not to move when the library is loaded.
###################################################################################################\
# Library loading and Hardware setup
###################################################################################################
#1- Create a MotorGroup with the diffractometer motors
# e.g. 'sixc', containing mu, delta, gam, eta, chi, phi motors (gam = nu)
# or 'fivec', containing delta, gam, eta, chi, phi motors
# or 'fourc', containing delta, eta, chi, phi motors
#2- Create positioner to read/set the energy in kEv, e.g. named 'en'
#3- Execute: run("diffutils")
#4- Execute: setup_diff(sixc, en)
###################################################################################################\
# API
###################################################################################################
# Orientation commands defined in https://github.com/DiamondLightSource/diffcalc#id19 are
# defined heren with identical signatures, and so the constraint commands.
# Motion command names were changed because thge original can collide with other globals:
# hklci, hklca, hklwh, hklget, hklmv and hklsim(hkl).
from __future__ import absolute_import
import traceback
import Jama.Matrix
diffcalc_path = os.path.abspath(get_context().setup.expandPath("{script}/Lib/diffcalc"))
if not diffcalc_path in sys.path:
sys.path.append(diffcalc_path)
import diffcalc
import math
import os
from diffcalc import settings
from diffcalc.hkl.you.geometry import YouGeometry,SixCircle, FiveCircle, FourCircle, YouPosition
from diffcalc.hardware import HardwareAdapter
from diffcalc.ub.persistence import UBCalculationJSONPersister, UbCalculationNonPersister
from diffcalc.gdasupport.minigda.scannable import ScannableBase, ScannableGroup
#from diffcalc.gdasupport.minigda import command
from diffcalc.hardware import HardwareAdapter
import ch.psi.pshell.device.PositionerConfig as PositionerConfig
import ch.psi.pshell.device.RegisterConfig as RegisterConfig
import ch.psi.pshell.device.Register as Register
_difcalc_names = {}
#
# Disable error handling designed for interactive use
#diffcalc.util.DEBUG = True
# Disable console bold charcters
diffcalc.util.COLOURISE_TERMINAL_OUTPUT = False
###################################################################################################
# Device mapping to difcalc
###################################################################################################
class PositionerScannable(ScannableBase):
def __init__(self, positioner, name = None):
self.positioner = positioner
self.name = positioner.name if name is None else name
self.inputNames = [self.name]
self.outputFormat = ['% 6.4f']
self.level = 3
def isBusy(self):
return self.positioner.state == State.Busy
def waitWhileBusy(self):
self.positioner.waitReady(-1)
def asynchronousMoveTo(self, new_position):
#print "Moving " , self.name, " to: ", new_position
self.positioner.moveAsync(float(new_position), -1)
def getPosition(self):
return self.positioner.getPosition()
def _get_diffcalc_axis_names():
nu_name=diffcalc.hkl.you.constraints.NUNAME
return ("mu", "delta", nu_name, "eta", "chi", "phi")
class PositionerScannableGroup(ScannableGroup):
def __init__(self, name, motors, diffcalc_axis_names=None):
self.name = name
global _difcalc_names
_difcalc_names = {}
positioners = []
if diffcalc_axis_names is None:
if len(motors) == 6: diffcalc_axis_names = _get_diffcalc_axis_names()
elif len(motors) == 5: diffcalc_axis_names = ("delta", "gam", "eta", "chi", " phi")
elif len(motors) == 4: diffcalc_axis_names = ("delta", "eta", "chi", " phi")
self.diffcalc_axis_names = diffcalc_axis_names
for i in range(len(motors)):
_difcalc_names[motors[i]] = diffcalc_axis_names[i]
exec('self.' + diffcalc_axis_names[i] + ' = PositionerScannable(' + motors[i].name + ', "' +diffcalc_axis_names[i] + '")')
exec('positioners.append(self.' + diffcalc_axis_names[i] + ')' )
#for m in motors:
# exec('self.' + m.name + ' = PositionerScannable(' + m.name + ', "' + m.name + '")')
# exec('positioners.append(self.' + m.name + ')' )
ScannableGroup.__init__(self, self.name, positioners)
class MotorGroupScannable(PositionerScannableGroup):
def __init__(self, motor_group, diffcalc_axis_names=None, simultaneous_move=False):
self.simultaneous_move = simultaneous_move
self.motor_group = motor_group
PositionerScannableGroup.__init__(self, motor_group.name, motor_group.motors, diffcalc_axis_names)
self.motor_group.restoreSpeedAfterMove = self.simultaneous_move
#Make sync moves (default implementation trigger each motor individually)
def asynchronousMoveTo(self, position):
if self.simultaneous_move:
position = [(float('nan') if v is None else v) for v in position]
self.motor_group.write(position)
else:
PositionerScannableGroup.asynchronousMoveTo(self, position)
class ScannableAdapter(HardwareAdapter):
def __init__(self, diffractometer, energy, energy_multiplier_to_kev=1):
self.diffractometer = diffractometer
self.energy = energy
self.energy_multiplier_to_kev = energy_multiplier_to_kev
input_names = diffractometer.getInputNames()
HardwareAdapter.__init__(self, input_names)
#Returns the current physical POSITIONS
def get_position(self):
"""
pos = getDiffractometerPosition() -- returns the current physical
diffractometer position as a list in degrees
"""
return self.diffractometer.getPosition()
#returns energy in kEv
def get_energy(self):
"""energy = get_energy() -- returns energy in kEv (NOT eV!) """
multiplier = self.energy_multiplier_to_kev
energy = self.energy.getPosition() * multiplier
if energy is None:
raise DiffcalcException("Energy has not been set")
return energy
def get_motor(self,name):
global _motor_group
global _difcalc_names
for m in _difcalc_names.keys():
if _difcalc_names[m] == name:
return m
for m in _motor_group.motors:
if m.name == name:
return m
raise Exception("Invalid axis name: " + str(name))
def get_lower_limit(self, name):
'''returns lower limits by axis name. Limit may be None if not set
'''
m = self.get_motor(name)
ret = m.getMinValue()
if ret == float("NaN"): ret = None
return ret
def get_upper_limit(self, name):
'''returns upper limit by axis name. Limit may be None if not set
'''
m = self.get_motor(name)
ret = m.getMaxValue()
if ret == float("NaN"): ret = None
return ret
def set_lower_limit(self, name, value):
"""value may be None to remove limit"""
if value is None: value = float("NaN")
m = self.get_motor(name)
m.config.minValue =value
def set_upper_limit(self, name, value):
"""value may be None to remove limit"""
if value is None: value = float("NaN")
m = self.get_motor(name)
m.config.maxValue =value
def is_axis_value_within_limits(self, axis_name, value):
m = self.get_motor(axis_name)
upper = self.get_upper_limit(axis_name)
lower = self.get_lower_limit(axis_name)
if (upper is None) or (math.isnan(upper)): upper = sys.float_info.max
if (lower is None) or (math.isnan(lower)): lower = -sys.float_info.max
return lower <= value <= upper
@property
def name(self):
return self.diffractometer.getName()
class MotorGroupAdapter(ScannableAdapter):
def __init__(self, diffractometer, energy, energy_multiplier_to_kev=1, diffcalc_axis_names=None, simultaneous_move=False):
self.diffractometer = MotorGroupScannable(diffractometer, diffcalc_axis_names, simultaneous_move)
self.energy = PositionerScannable(energy)
self.energy.level = 3
ScannableAdapter.__init__(self, self.diffractometer, self.energy, energy_multiplier_to_kev)
class Wavelength(RegisterBase):
def doRead(self):
try:
return get_wavelength().getPosition()
except:
return None
def doWrite(self, val):
get_wavelength().asynchronousMoveTo(val)
###################################################################################################
# HKL Pseudo-devices
###################################################################################################
class HklPositoner (PositionerBase):
def __init__(self, name, index, hkl_group):
PositionerBase.__init__(self, name, PositionerConfig())
self.setParent(hkl_group)
self.index = index
def isReady(self):
return PositionerBase.isReady(self) and self.getParent().isReady()
def doRead(self):
return self.getParent()._setpoint[self.index]
def doWrite(self, value):
#print "Setting " , self.getName(), "to: ", value
pos = [None, None, None]
pos[self.index] = value
self.getParent().write(pos)
def doReadReadback(self):
if java.lang.Thread.currentThread() != self.getParent()._updating_thread:
self.getParent().update()
return self.getParent()._readback[self.index]
class HklGroup(RegisterBase, Register.RegisterArray):
def __init__(self, name):
RegisterBase.__init__(self, name, RegisterConfig())
self.hkl=get_hkl()
self.h, self.k, self.l = HklPositoner("h", 0, self), HklPositoner("k", 1, self), HklPositoner("l", 2, self)
add_device(self.h, True)
add_device(self.k, True)
add_device(self.l, True)
self._setpoint = self.doRead()
self._updating = False
def getSize(self):
return 3
def doRead(self):
try:
self._readback = self.hkl.getPosition()
self._updating_thread = java.lang.Thread.currentThread()
self.h.update()
self.k.update()
self.l.update()
except:
#traceback.print_exc()
self._readback = (None, None, None)
finally:
self._updating_thread = None
return self._readback
def doWrite(self, pos):
self._setpoint = None if (pos is None) else [(None if v is None else float(v)) for v in pos]
#print "Moving to: " + str(pos)
self.hkl.asynchronousMoveTo(pos)
def sim(self, pos):
return self.hkl.simulateMoveTo(pos)
###################################################################################################
# System setup
###################################################################################################
you = None
dc, ub, hardware, hkl = None, None, None, None
_motor_group = None
def setup_diff(diffractometer= None, energy= None, diffcalc_axis_names = None, geometry=None, persist_ub=True, simultaneous_move=False):
"""
configure diffractometer. Display configuration if no parameter is given
diffractometer: Diffraction motor group
energy: Positioner having energy in kev
geometry: YouGeometry extension. If none, uses default
diffcalc_axis_names: if None use defaults:
- mu, delta, gam, eta, chi, phi (six circle)
- delta, gam, eta, chi, phi (ficve circle)
- delta, eta, chi, phi (four circle)
"""
global you, dc, ub, hardware, hkl, _motor_group
if diffractometer is not None:
_motor_group = diffractometer
you = None
if geometry is not None:
settings.geometry = geometry
elif diffcalc_axis_names is not None:
class CustomGeometry(YouGeometry):
def __init__(self):
self.all_axis_names = _get_diffcalc_axis_names()
self.my_axis_names = diffcalc_axis_names
fixed_constraints = {}
for axis in self.all_axis_names:
if not axis in self.my_axis_names:
fixed_constraints[axis] = 0
YouGeometry.__init__(self, diffractometer.name, fixed_constraints)
def physical_angles_to_internal_position(self, physical_angle_tuple):
pos=[]
index = 0
for axis in self.all_axis_names:
pos.append(physical_angle_tuple[index] if (axis in self.my_axis_names) else 0)
index = index+1
pos.append("DEG")#units
return YouPosition(*pos)
def internal_position_to_physical_angles(self, internal_position):
pos = internal_position.clone()
pos.changeToDegrees()
pos = pos.totuple()
ret = []
for i in range (len(self.all_axis_names)):
if self.all_axis_names[i] in self.my_axis_names:
ret.append(pos[i])
return tuple(ret)
settings.geometry = CustomGeometry()
elif len(diffractometer.motors) == 6:
settings.geometry = SixCircle()
elif len(diffractometer.motors) == 5:
settings.geometry = FiveCircle()
elif len(diffractometer.motors) == 4:
settings.geometry = FourCircle()
else:
raise Exception("Invalid motor group")
settings.hardware = MotorGroupAdapter(diffractometer, energy, 1, diffcalc_axis_names, simultaneous_move)
if persist_ub:
settings.persistence_path = os.path.abspath(get_context().setup.expandPath("{config}/diffcalc"))
if not os.path.exists(settings.persistence_path):
os.makedirs(settings.persistence_path)
print "UB calculations persistence path: " + settings.persistence_path
settings.ubcalc_persister = UBCalculationJSONPersister(settings.persistence_path)
else:
print "UB calculations are not persisteds"
settings.ubcalc_persister = UbCalculationNonPersister()
settings.axes_scannable_group = settings.hardware.diffractometer
settings.energy_scannable = settings.hardware.energy
settings.ubcalc_strategy = diffcalc.hkl.you.calc.YouUbCalcStrategy()
settings.angles_to_hkl_function = diffcalc.hkl.you.calc.youAnglesToHkl
from diffcalc.gdasupport import you
reload(you)
# These must be imported AFTER the settings have been configured
from diffcalc.dc import dcyou as dc
from diffcalc.ub import ub
from diffcalc import hardware
from diffcalc.hkl.you import hkl
add_device(HklGroup("hkl_group"), True)
add_device(Wavelength("wavelength", 6), True)
hkl_group.polling = 250
wavelength.polling = 250
if settings.hardware is not None:
print "Diffractometer defined with:"
print " \t" + "Motor group: " + str(settings.hardware.diffractometer.name)
print " \t" + "Energy: " + str(settings.hardware.energy.name)
print "\nDiffcalc axis names:"
for m in _difcalc_names.keys():
print " \t Motor " + m.name + " = Axis " + _difcalc_names[m]
else:
print "Diffractometer is not defined\n"
print
def setup_axis(motor = None, min=None, max=None, cut=None):
"""
configure axis range and cut.
displays ranges if motor is None
"""
if motor is not None:
name = get_axis_name(motor)
if min is not None: hardware.setmin(name, min)
if max is not None: hardware.setmax(name, max)
if cut is not None: hardware.setcut(name, cut)
else:
print "Axis range configuration:"
hardware.hardware()
print
###################################################################################################
# Acceess functions
###################################################################################################
def get_diff():
return settings.hardware.diffractometer
def get_energy():
return settings.hardware.energy
def get_adapter():
return settings.hardware
def get_motor_group():
return _motor_group
def get_wavelength():
return you.wl
def get_hkl():
return you.hkl
def get_axis_name(motor):
if is_string(motor):
motor = get_adapter().get_motor(motor)
return _difcalc_names[motor]
###################################################################################################
# Orientation Commands
###################################################################################################
# State
def newub(name):
"""
start a new ub calculation name
"""
try:
rmub(name)
except:
pass
return ub.newub(name)
def loadub(name_or_num):
"""
load an existing ub calculation
"""
return ub.loadub(name_or_num)
def lastub():
"""
load the last used ub calculation
"""
return ub.lastub()
def listub():
"""
list the ub calculations available to load
"""
return ub.listub()
def rmub(name_or_num):
"""
remove existing ub calculation
"""
return ub.rmub(name_or_num)
def saveubas(name):
"""
save the ub calculation with a new name
"""
return ub.saveubas(name)
# Lattice
def setlat(name=None, *args):
"""
set lattice parameters (Angstroms and Deg)
setlat -- interactively enter lattice parameters (Angstroms and Deg)
setlat name a -- assumes cubic
setlat name a b -- assumes tetragonal
setlat name a b c -- assumes ortho
setlat name a b c gamma -- assumes mon/hex with gam not equal to 90
setlat name a b c alpha beta gamma -- arbitrary
"""
return ub.setlat(name, *args)
def c2th(hkl, en=None):
"""
calculate two-theta angle for reflection
"""
return ub.c2th(hkl, en)
def hklangle(hkl1, hkl2):
"""
calculate angle between [h1 k1 l1] and [h2 k2 l2] crystal planes
"""
return ub.hklangle(hkl1, hkl2)
# Reference (surface)
def setnphi(xyz = None):
"""
sets or displays (xyz=None) n_phi reference
"""
return ub.setnphi(xyz)
def setnhkl(hkl = None):
"""
sets or displays (hkl=None) n_hkl reference
"""
return ub.setnhkl(hkl)
# Reflections
def showref():
"""
shows full reflection list
"""
return ub.showref()
def addref(*args):
"""
Add reflection
addref -- add reflection interactively
addref [h k l] {'tag'} -- add reflection with current position and energy
addref [h k l] (p1, .., pN) energy {'tag'} -- add arbitrary reflection
"""
return ub.addref(*args)
def editref(idx):
"""
interactively edit a reflection (idx is tag or index numbered from 1)
"""
return ub.editref(idx)
def delref(idx):
"""
deletes a reflection (idx is tag or index numbered from 1)
"""
return ub.delref(idx)
def clearref():
"""
deletes all the reflections
"""
return ub.clearref()
def swapref(idx1=None, idx2=None):
"""
swaps two reflections
swapref -- swaps first two reflections used for calculating U matrix
swapref {num1 | 'tag1'} {num2 | 'tag2'} -- swaps two reflections
"""
return ub.swapref(idx1, idx2)
# Crystal Orientations
def showorient():
"""
shows full list of crystal orientations
"""
#TODO: Workaround of bug on Diffcalc (str_lines needs parameter)
if ub.ubcalc._state.orientlist:
print '\n'.join(ub.ubcalc._state.orientlist.str_lines(None))
return
return ub.showorient()
def addorient(*args):
"""
addorient -- add crystal orientation interactively
addorient [h k l] [x y z] {'tag'} -- add crystal orientation in laboratory frame
"""
return ub.addorient(*args)
def editorient(idx):
"""
interactively edit a crystal orientation (idx is tag or index numbered from 1)
"""
return ub.editorient(tag_or_num)
def delorient(idx):
"""
deletes a crystal orientation (idx is tag or index numbered from 1)
"""
return ub.delorient(tag_or_num)
def clearorient():
"""
deletes all the crystal orientations
"""
return ub.clearorient()
def swaporient(idx1=None, idx2=None):
"""
swaps two swaporient
swaporient -- swaps first two crystal orientations used for calculating U matrix
swaporient {num1 | 'tag1'} {num2 | 'tag2'} -- swaps two crystal orientations
"""
return ub.swaporient(idx1, idx2)
# UB Matrix
def showub():
"""
show the complete state of the ub calculation
NOT A DIFFCALC COMMAND
"""
return ub.ub()
def checkub():
"""
show calculated and entered hkl values for reflections
"""
return ub.checkub()
def setu(U=None):
"""
manually set U matrix
setu -- set U matrix interactively
setu [[..][..][..]] -- manually set U matrix
"""
return ub.setu(U)
def setub(UB=None):
"""
manually set UB matrix
setub -- set UB matrix interactively
setub [[..][..][..]] -- manually set UB matrix
"""
return ub.setub(UB)
def getub():
"""
returns current UB matrix
NOT A DIFFCALC COMMAND
"""
return None if ub.ubcalc._UB is None else ub.ubcalc._UB.tolist()
def calcub(idx1=None, idx2=None):
"""
(re)calculate u matrix
calcub -- (re)calculate U matrix from the first two reflections and/or orientations.
calcub idx1 idx2 -- (re)calculate U matrix from reflections and/or orientations referred by indices and/or tags idx1 and idx2.
"""
return ub.calcub(idx1, idx2)
def trialub(idx=1):
"""
(re)calculate u matrix using one reflection only
Use indice or tags idx1. Default: use first reflection.
"""
return ub.trialub(idx)
def refineub(*args):
"""
refine unit cell dimensions and U matrix to match diffractometer angles for a given hkl value
refineub -- interactively
refineub [h k l] {pos}
"""
return ub.refineub(*args)
def fitub(*args):
"""
fitub ref1, ref2, ref3... -- fit UB matrix to match list of provided reference reflections.
"""
return ub.fitub(*args)
def addmiscut(angle, xyz=None):
"""
apply miscut to U matrix using a specified miscut angle in degrees and a rotation axis (default: [0 1 0])
"""
return ub.addmiscut(angle, xyz)
def setmiscut(angle, xyz=None):
"""
manually set U matrix using a specified miscut angle in degrees and a rotation axis (default: [0 1 0])
"""
return ub.setmiscut(angle, xyz)
###################################################################################################
# Motion Commands
###################################################################################################
#Constraints
def con(*args):
"""
list or set available constraints and values
con -- list available constraints and values
con <name> {val} -- constrains and optionally sets one constraint
con <name> {val} <name> {val} <name> {val} -- clears and then fully constrains
"""
return hkl.con(*args)
def uncon(name):
"""
remove constraint
"""
return hkl.uncon(name)
# HKL
def allhkl(_hkl, wavelength=None):
"""
print all hkl solutions ignoring limits
"""
return hkl.allhkl(_hkl, wavelength)
#Hardware
def setmin(axis, val=None):
"""
set lower limits used by auto sector code (nan to clear)
"""
name = get_axis_name(axis)
return hardware.setmin(name, val)
def setmax(axis, val=None):
"""
set upper limits used by auto sector code (nan to clear)
"""
name = get_axis_name(axis)
return hardware.setmax(name, val)
def setcut(axis, val):
"""
sets cut angle
"""
name = get_axis_name(axis)
return hardware.setcut(name, val)
###################################################################################################
# Motion commands: not standard Diffcalc names
###################################################################################################
def hklci(positions, energy=None):
"""
converts positions of motors to reciprocal space coordinates (H K L)
"""
return dc.angles_to_hkl(positions, energy)
def hklca(hkl, energy=None):
"""
converts reciprocal space coordinates (H K L) to positions of motors.
"""
return dc.hkl_to_angles(hkl[0], hkl[1], hkl[2], energy)
def hklwh():
"""
prints the current reciprocal space coordinates (H K L) and positions of motors.
"""
hkl = hklget()
print "HKL: " + str(hkl)
for m in _difcalc_names.keys():
print _difcalc_names[m] + " [" + m.name + "] :" + str(m.take())
def hklget():
"""
get current hkl position
"""
return hkl_group.read()
def hklmv(hkl):
"""
move to hkl position
"""
hkl_group.write(hkl)
def hklsim(hkl):
"""
simulates moving diffractometer
"""
return hkl_group.sim(hkl)
###################################################################################################
# HKL Combined Scan
###################################################################################################
def hklscan(vector, readables,latency = 0.0, passes = 1, **pars):
"""
HKL Scan:
Args:
vector(list of lists): HKL values to be scanned
readables(list of Readable): Sensors to be sampled on each step.
latency(float, optional): settling time for each step before readout, defaults to 0.0.
passes(int, optional): number of passes
pars(keyworded variable length arguments, optional): scan optional named arguments:
- title(str, optional): plotting window name.
- hidden(bool, optional): if true generates no effects on user interface.
- before_read (function, optional): callback on each step, before sampling. Arguments: positions, scan
- after_read (function, optional): callback on each step, after sampling. Arguments: record, scan.
- before_pass (function, optional): callback before each scan pass execution. Arguments: pass_num, scan.
- after_pass (function, optional): callback after each scan pass execution. Arguments: pass_num, scan.
- Aditional arguments defined by set_exec_pars.
Returns:
ScanResult object.
"""
readables=to_list(string_to_obj(readables))
pars["initial_move"] = False
scan = ManualScan([h,k,l], readables ,vector[0], vector[-1], [len(vector)-1] * 3, dimensions = 1)
if not "domain_axis" in pars.keys():
pars["domain_axis"] = "Index"
processScanPars(scan, pars)
scan.start()
try:
for pos in vector:
#print "Writing ", pos
hkl_group.write(pos)
time.sleep(0.1) #Make sure is busy
get_motor_group().update()
get_motor_group().waitReady(-1)
time.sleep(latency)
hkl_group.update()
if scan.before_read: scan.before_read(pos,scan)
scan.append ([h.take(), k.take(), l.take()], [h.getPosition(), k.getPosition(), l.getPosition()], [readable.read() for readable in readables ])
if scan.after_read: scan.after_read(scan.currentRecord,scan)
finally:
scan.end()
return scan.result
def test_diffcalc():
print "Start test"
energy.move(20.0)
delta.config.maxSpeed = 50.0
delta.speed = 50.0
delta.move(1.0)
#Setup
setup_diff(sixc, energy)
setup_axis('gam', 0, 179)
setup_axis('delta', 0, 179)
setup_axis('delta', min=0)
setup_axis('phi', cut=-180.0)
setup_axis()
#Orientation
listub()
# Create a new ub calculation and set lattice parameters
newub('test')
setlat('cubic', 1, 1, 1, 90, 90, 90)
# Add 1st reflection (demonstrating the hardware adapter)
settings.hardware.wavelength = 1
c2th([1, 0, 0]) # energy from hardware
settings.hardware.position = 0, 60, 0, 30, 0, 0
addref([1, 0, 0])# energy and position from hardware
# Add 2nd reflection (this time without the harware adapter)
c2th([0, 1, 0], 12.39842)
addref([0, 1, 0], [0, 60, 0, 30, 0, 90], 12.39842)
# check the state
showub()
checkub()
#Constraints
con('qaz', 90)
con('a_eq_b')
con('mu', 0)
con()
#Motion
print hklci((0., 60., 0., 30., 0., 0.))
print hklca((1, 0, 0))
sixc.write([0, 60, 0, 30, 90, 0])
print "sixc=" , sixc.position
wavelength.write(1.0)
print "wavelength = ", wavelength.read()
lastub()
setu ([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
showref()
swapref(1,2)
hklwh()
hklsim([0.0,1.0,1.0])
hklmv([0.0,1.0,1.0])
#Scans
lscan(l, [sin], 1.0, 1.5, 0.1)
ascan([k,l], [sin], [1.0, 1.0], [1.2, 1.3], [0.1, 0.1], zigzag=True, parallel_positioning = False)
vector = [[1.0,1.0,1.0], [1.0,1.0,1.1], [1.0,1.0,1.2], [1.0,1.0,1.4]]
hklscan(vector, [sin, arr], 0.9)

704
script/___Lib/ijutils.py Normal file
View File

@@ -0,0 +1,704 @@
####################################################################################################
# Facade to ImageJ functionality
####################################################################################################
#More information on:
# Image: https://imagej.nih.gov/ij/docs/guide/146-28.html#toc-Section-28
# Process: https://imagej.nih.gov/ij/docs/guide/146-29.html#toc-Section-29
# Analyze: https://imagej.nih.gov/ij/docs/guide/146-30.html#toc-Section-30
import ch.psi.utils.Convert as Convert
import ch.psi.pshell.imaging.Utils as Utils
from startup import get_context
import java.awt.image.BufferedImage as BufferedImage
import jarray
import ij.IJ as IJ
import ij.ImageJ as ImageJ
import ij.WindowManager as WindowManager
import ij.ImagePlus as ImagePlus
import ij.Prefs as Prefs
import ij.io.FileSaver as FileSaver
import ij.process.ImageProcessor as ImageProcessor
import ij.process.ByteProcessor as ByteProcessor
import ij.process.ShortProcessor as ShortProcessor
import ij.process.ColorProcessor as ColorProcessor
import ij.process.FloatProcessor as FloatProcessor
import ij.process.ImageConverter as ImageConverter
import ij.process.AutoThresholder as AutoThresholder
import ij.process.LUT as LUT
import ij.measure.Measurements as Measurements
import ij.measure.ResultsTable as ResultsTable
import ij.plugin.filter.Analyzer as Analyzer
import ij.plugin.filter.GaussianBlur as GaussianBlur
import ij.plugin.filter.Filters as Filters
import ij.plugin.filter.FFTFilter as FFTFilter
import ij.plugin.filter.BackgroundSubtracter as BackgroundSubtracter
import ij.plugin.filter.EDM as EDM
import ij.plugin.filter.Shadows as Shadows
import ij.plugin.filter.UnsharpMask as UnsharpMask
import ij.plugin.filter.MaximumFinder as MaximumFinder
import ij.plugin.filter.EDM as EDM
import ij.plugin.filter.Shadows as Shadows
import ij.plugin.filter.UnsharpMask as UnsharpMask
import ij.plugin.filter.RankFilters as RankFilters
import ij.plugin.filter.Convolver as Convolver
import ij.plugin.filter.ParticleAnalyzer as ParticleAnalyzer
import ij.plugin.ContrastEnhancer as ContrastEnhancer
import ij.plugin.Thresholder as Thresholder
import ij.plugin.ImageCalculator as ImageCalculator
import ij.plugin.FFT as FFT
import ij.plugin.Concatenator as Concatenator
#ImageJ customizations
import ch.psi.pshell.imaging.ij.FFTMath as FFTMath
import ch.psi.pshell.imaging.ij.FFTFilter as FFTFilter
import ch.psi.pshell.imaging.ij.Binary as Binary
import ch.psi.pshell.imaging.ij.Slicer as Slicer
#This eliminates the error messages due to the bug on ij.gui.ImageWindow row 555 (ij is null)
if not "_image_j" in globals().keys():
_image_j = ImageJ(None, ImageJ.NO_SHOW)
###################################################################################################
#Image creation, copying & saving
###################################################################################################
def load_image(image, title = "img"):
"""
image: file name or BufferedImage
"""
if isinstance(image, str):
try:
file = get_context().setup.expandPath(image)
except:
pass
try:
image = Utils.newImage(file)
except:
#try loading from assembly
image = get_context().setup.getAssemblyImage(image)
return ImagePlus(title, image)
def load_array(array, width=None, height=None, title = "img"):
"""
array: 1d array if width and height defined , or else 2d array to be flattened.
"""
#2D
if (width==None) and (height==None):
if array.typecode == '[B': proc = ByteProcessor(len(array[0]), len(array), Convert.flatten(array))
elif array.typecode == '[S': proc = ShortProcessor(len(array[0]), len(array), Convert.flatten(array), None)
elif array.typecode in ['[I','[F', '[D']: proc = FloatProcessor(len(array[0]), len(array), Convert.flatten(array))
else: raise Exception("Invalid array type")
#1D
else:
if (len(array) > width*height):
array = array[:(width*height)]
if array.typecode == 'b': proc = ByteProcessor(width, height, array)
elif array.typecode == 'h': proc = ShortProcessor(width, height, array, None)
elif array.typecode in ['i','f','d']: proc = FloatProcessor(width, height, array)
else: raise Exception("Invalid array type")
return ImagePlus(title, proc)
def save_image(ip, path=None, format = None):
"""
Saves image or stack
If parameters omitted, saves image again in same location, with same format.
"""
fs = FileSaver(ip)
if path == None: fs.save()
else:
try:
path = get_context().setup.expandPath(path)
except:
pass
if format == "bmp": fs.saveAsBmp(path)
elif format == "fits": fs.saveAsFits(path)
elif format == "gif": fs.saveAsGif(path)
elif format == "jpeg": fs.saveAsJpeg(path)
elif format == "lut": fs.saveAsLut(path)
elif format == "pgm": fs.saveAsPgm(path)
elif format == "png": fs.saveAsPng(path)
elif format == "raw" and ip.getImageStackSize()>1: fs.saveAsRawStack(path)
elif format == "raw": fs.saveAsRaw(path)
elif format == "txt": fs.saveAsText(path)
elif format == "tiff" and ip.getImageStackSize()>1: fs.saveAsTiffStack(path)
elif format == "tiff": fs.saveAsTiff(path)
elif format == "zip": fs.saveAsZip(path)
def new_image(width, height, image_type="byte", title = "img", fill_color = None):
"""
type = "byte", "short", "color" or "float"
"""
if image_type == "byte": p=ByteProcessor(width, height)
elif image_type == "short": p=ShortProcessor(width, height)
elif image_type == "color": p=ColorProcessor(width, height)
elif image_type == "float": p=FloatProcessor(width, height)
else: raise Exception("Invalid image type " + str(image_type))
ret = ImagePlus(title, p)
if fill_color is not None:
p.setColor(fill_color)
p.resetRoi()
p.fill()
return ret
def sub_image(ip, x, y, width, height):
"""
Returns new ImagePlus
"""
ip.setRoi(x, y, width, height)
p=ip.getProcessor().crop()
return ImagePlus(ip.getTitle() + " subimage", p)
def copy_image(ip):
return ip.duplicate()
def copy_image_to(ip_source, ip_dest, x, y):
ip_source.deleteRoi()
ip_source.copy()
ip_dest.setRoi(x, y, ip_source.getWidth(), ip_source.getHeight())
ip_dest.paste()
ip_dest.changes = False
ip_dest.deleteRoi()
def pad_image(ip, left=0, right=0, top=0, bottom=0, fill_color = None):
p=ip.getProcessor()
width = p.getWidth() + left + right
height = p.getHeight() + top + bottom
image_type = get_image_type(ip)
ret = new_image(width, height, image_type, ip.getTitle() + " padded", fill_color)
ip.deleteRoi()
ip.copy()
ret.setRoi(left, top, p.getWidth(), p.getHeight())
ret.paste()
ret.changes = False
ret.deleteRoi()
return ret
def get_image_type(ip):
"""
Returns: "byte", "short", "color" or "float"
"""
p=ip.getProcessor()
if type(p) == ShortProcessor: return "short"
elif type(p) == ColorProcessor: return "color"
elif type(p) == FloatProcessor: return "float"
return "byte"
###################################################################################################
#Image type conversion
###################################################################################################
def grayscale(ip, do_scaling=None, in_place=True):
ip = ip if in_place else ip.duplicate()
ic = ImageConverter(ip)
if do_scaling is not None:
ic.setDoScaling(do_scaling)
ic.convertToGray8()
return ip
def get_channel(ip, channel):
"""
Return a channel from a color image as a new ImagePlus.
channel: "red", "green","blue", "alpha", "brightness",
"""
proc = ip.getProcessor()
if channel == "red": ret = proc.getChannel(1, None)
elif channel == "green": ret = proc.getChannel(2, None)
elif channel == "blue": ret = proc.getChannel(3, None)
elif channel == "alpha": ret = proc.getChannel(4, None)
elif channel == "brightness": ret = proc.getBrightness()
else: raise Exception("Invalid channel " + str(channel))
return ImagePlus(ip.getTitle() + " channel: " + channel, ret)
###################################################################################################
#Thresholder
###################################################################################################
def threshold(ip, min_threshold, max_threshold, in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().setThreshold(min_threshold, max_threshold, ImageProcessor.NO_LUT_UPDATE)
WindowManager.setTempCurrentImage(ip)
Thresholder().run("mask")
return ip
def auto_threshold(ip, dark_background = False, method = AutoThresholder.getMethods()[0], in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().setAutoThreshold(method, dark_background , ImageProcessor.NO_LUT_UPDATE)
WindowManager.setTempCurrentImage(ip)
thresholder=Thresholder().run("mask")
return ip
###################################################################################################
#Binary functions
###################################################################################################
def binary_op(ip, op, dark_background=False, iterations=1, count=1, in_place=True):
"""
op = "erode","dilate", "open","close", "outline", "fill holes", "skeletonize"
"""
ip = ip if in_place else ip.duplicate()
binary = Binary(count, iterations, dark_background )
binary.setup(op, ip)
binary.run(ip.getProcessor())
return ip
def binary_erode(ip, dark_background=False, iterations=1, count=1, in_place=True):
return binary_op(ip, "erode", dark_background, iterations, count, in_place)
def binary_dilate(ip, dark_background=False, iterations=1, count=1, in_place=True):
return binary_op(ip, "dilate", dark_background, iterations, count, in_place)
def binary_open(ip, dark_background=False, iterations=1, count=1, in_place=True):
return binary_op(ip, "open", dark_background, iterations, count, in_place)
def binary_close(ip, dark_background=False, iterations=1, count=1, in_place=True):
return binary_op(ip, "close", dark_background, iterations, count)
def binary_outline(ip, dark_background=False, in_place=True):
return binary_op(ip, "outline", dark_background, in_place=in_place)
def binary_fill_holes(ip, dark_background=False, in_place=True):
return binary_op(ip, "fill holes", dark_background, in_place=in_place)
def binary_skeletonize(ip, dark_background=False, in_place=True):
return binary_op(ip, "skeletonize", dark_background, in_place=in_place)
def analyse_particles(ip, min_size, max_size, fill_holes = True, exclude_edges = True, extra_measurements = 0, \
print_table = False, output_image = "outlines", minCirc = 0.0, maxCirc = 1.0):
"""
Returns: tuple (ResultsTable results_table, ImagePlus output_image)
output_image = "outlines", "overlay_outlines", "masks", "overlay_masks", "roi_masks" or None
extra_measurements = mask with Measurements.CENTROID, PERIMETER, RECT, MIN_MAX, ELLIPSE, CIRCULARITY, AREA_FRACTION, INTEGRATED_DENSITY, INVERT_Y, FERET, KURTOSIS, MEDIAN, MODE, SKEWNESS, STD_DEV
Measurements is a mask of flags: https://imagej.nih.gov/ij/developer/api/ij/measure/Measurements.html.
Returned ResultsTable hold public fields: https://imagej.nih.gov/ij/developer/api/ij/measure/ResultsTable.html
"""
rt = ResultsTable()
show_summary = False
options = ParticleAnalyzer.SHOW_RESULTS | ParticleAnalyzer.CLEAR_WORKSHEET
"""
ParticleAnalyzer.SHOW_ROI_MASKS | \
#ParticleAnalyzer.RECORD_STARTS | \
#ParticleAnalyzer.ADD_TO_MANAGER | \
#ParticleAnalyzer.FOUR_CONNECTED | \
#ParticleAnalyzer.IN_SITU_SHOW | \
#ParticleAnalyzer.SHOW_NONE | \
"""
if show_summary: options = options | ParticleAnalyzer.DISPLAY_SUMMARY
if output_image == "outlines": options = options | ParticleAnalyzer.SHOW_OUTLINES
elif output_image == "overlay_outlines": options = options | ParticleAnalyzer.SHOW_OVERLAY_OUTLINES
elif output_image == "masks": options = options | ParticleAnalyzer.SHOW_MASKS
elif output_image == "overlay_masks": options = options | ParticleAnalyzer.SHOW_OVERLAY_MASKS
elif output_image == "roi_masks": options = options | ParticleAnalyzer.SHOW_ROI_MASKS
#ParticleAnalyzer.SHOW_ROI_MASKS
if exclude_edges: options = options | ParticleAnalyzer.EXCLUDE_EDGE_PARTICLES
if fill_holes: options = options | ParticleAnalyzer.INCLUDE_HOLES
measurements = Measurements.AREA | Measurements.MEAN | Measurements.CENTER_OF_MASS | Measurements.RECT
pa = ParticleAnalyzer(options, measurements, rt, min_size, max_size, minCirc, maxCirc)
pa.setHideOutputImage(True)
pa.setResultsTable(rt)
if pa.analyze(ip):
if print_table:
print rt.getColumnHeadings()
for row in range (rt.counter):
print rt.getRowAsString(row)
return (rt, pa.getOutputImage())
###################################################################################################
#Image operators
###################################################################################################
def op_image(ip1, ip2, op, float_result=False, in_place=True):
"""
op = "add","subtract", "multiply","divide", "and", "or", "xor", "min", "max", "average", "difference" or "copy"
"""
ip1 = ip1 if in_place else ip1.duplicate()
ic = ImageCalculator()
pars = op
if float_result:
op = op + " float"
ic.run(pars, ip1, ip2)
return ip1
def op_const(ip, op, val, in_place=True):
"""
op = "add","subtract", "multiply","divide", "and", "or", "xor", "min", "max", "gamma", "set" or "log", "exp", "sqr", "sqrt","abs"
"""
ip = ip if in_place else ip.duplicate()
pr = ip.getProcessor()
if op == 'add': pr.add(val)
elif op == 'sub': pr.subtract(val)
elif op == 'multiply': pr.multiply(val)
elif op == 'divide' and val!=0: pr.multiply(1.0/val)
elif op == 'and': pr.and(val)
elif op == 'or': pr.or(val)
elif op == 'xor': pr.xor(val)
elif op == 'min': pr.min(val);pr.resetMinAndMax()
elif op == 'max': pr.max(val);pr.resetMinAndMax()
elif op == 'gamma' and 0.05 < val < 5.0: pr.gamma(val)
elif op == 'set': pr.set(val)
elif op == 'log': pr.log()
elif op == 'exp': pr.exp()
elif op == 'sqr': pr.sqr()
elif op == 'sqrt': pr.sqrt()
elif op == 'abs': pr.abs();pr.resetMinAndMax()
else: raise Exception("Invalid operation " + str(op))
return ip
def op_fft(ip1, ip2, op, do_inverse = True) :
"""
Images must have same sizes, and multiple of 2 height and width.
op = "correlate" (complex conjugate multiply), "convolve" (Fourier domain multiply), "deconvolve" (Fourier domain divide)
"""
if op == "correlate": op_index = 0
elif op == "convolve": op_index = 1
elif op == "deconvolve": op_index = 2
else: raise Exception("Invalid operation " + str(op))
return FFTMath().doMath(ip1, ip2, op_index, do_inverse)
def op_rank(ip, op, kernel_radius =1 , dark_outliers = False ,threshold = 50, in_place=True):
"""
op = "mean", "min", "max", "variance", "median", "close_maxima", "open_maxima", "remove_outliers", "remove_nan", "despeckle"
"""
if op == "mean": filter_type = RankFilters.MEAN
elif op == "min": filter_type = RankFilters.MIN
elif op == "max": filter_type = RankFilters.MAX
elif op == "variance": filter_type = RankFilters.VARIANCE
elif op == "median": filter_type = RankFilters.MEDIAN
elif op == "close_maxima": filter_type = RankFilters.CLOSE
elif op == "open_maxima": filter_type = RankFilters.OPEN
elif op == "remove_outliers": filter_type = RankFilters.OUTLIERS
elif op == "remove_nan": filter_type = RankFilters.REMOVE_NAN
elif op == "despeckle": filter_type, kernel_radius = RankFilters.MEDIAN, 1
else: raise Exception("Invalid operation " + str(op))
ip = ip if in_place else ip.duplicate()
RankFilters().rank(ip.getProcessor(), kernel_radius, filter_type, RankFilters.DARK_OUTLIERS if dark_outliers else RankFilters.BRIGHT_OUTLIERS ,threshold)
return ip
def op_edm(ip, op="edm", dark_background=False, in_place=True):
"""
Euclidian distance map & derived operations
op ="edm", "watershed","points", "voronoi"
"""
ip = ip if in_place else ip.duplicate()
pr = ip.getProcessor()
edm=EDM()
Prefs.blackBackground=dark_background
if op=="edm":
#pr.setPixels(0, edm.makeFloatEDM(pr, 0, False));
#pr.resetMinAndMax();
if dark_background:
pr.invert()
edm.toEDM(pr)
else:
edm.setup(op, ip)
edm.run(pr)
return ip
def watershed(ip, dark_background=False, in_place=True):
return op_edm(ip, "watershed", dark_background, in_place)
def ultimate_points(ip, dark_background=False, in_place=True):
return op_edm(ip, "points", dark_background, in_place)
def veronoi(ip, dark_background=False, in_place=True):
return op_edm(ip, "voronoi", dark_background, in_place)
def edm(ip, dark_background=False, in_place=True):
return op_edm(ip, "edm", dark_background, in_place)
def op_filter(ip, op, in_place=True):
"""
This is redundant as just calls processor methods.
op ="invert", "smooth", "sharpen", "edge", "add"
"""
ip = ip if in_place else ip.duplicate()
f = Filters()
f.setup(op, ip )
f.run(ip.getProcessor())
return ip
###################################################################################################
#Other operations
###################################################################################################
def gaussian_blur(ip, sigma_x=3.0, sigma_y=3.0, accuracy = 0.01, in_place=True):
ip = ip if in_place else ip.duplicate()
GaussianBlur().blurGaussian(ip.getProcessor(), sigma_x, sigma_y, accuracy)
return ip
def find_maxima(ip, tolerance=25, threshold = ImageProcessor.NO_THRESHOLD, output_type=MaximumFinder.IN_TOLERANCE, exclude_on_edges = False, is_edm = False):
"""
Returns new ImagePlus
tolerance: maxima are accepted only if protruding more than this value from the ridge to a higher maximum
threshhold: minimum height of a maximum (uncalibrated);
output_type = SINGLE_POINTS, IN_TOLERANCE or SEGMENTED. No output image is created for output types POINT_SELECTION, LIST and COUNT.
"""
byte_processor = MaximumFinder().findMaxima(ip.getProcessor(), tolerance, threshold, output_type, exclude_on_edges, is_edm)
return ImagePlus(ip.getTitle() + " maxima", byte_processor)
def get_maxima_points(ip, tolerance=25, exclude_on_edges = False):
polygon = MaximumFinder().getMaxima(ip.getProcessor(), tolerance, exclude_on_edges)
return (polygon.xpoints, polygon.ypoints)
def enhance_contrast(ip, equalize_histo = True, saturated_pixels = 0.5, normalize = False, stack_histo = False, in_place=True):
ip = ip if in_place else ip.duplicate()
ce = ContrastEnhancer()
if equalize_histo:
ce.equalize(ip.getProcessor());
else:
ce.stretchHistogram(ip.getProcessor(), saturated_pixels)
if normalize:
ip.getProcessor().setMinAndMax(0,1.0 if (ip.getProcessor().getBitDepth()==32) else ip.getProcessor().maxValue())
return ip
def shadows(ip, op, in_place=True):
"""
op ="north","northeast", "east", "southeast","south", "southwest", "west","northwest"
"""
ip = ip if in_place else ip.duplicate()
shadows= Shadows()
shadows.setup(op, ip)
shadows.run(ip.getProcessor())
return ip
def unsharp_mask(ip, sigma, weight, in_place=True):
"""
Float processor
"""
ip = ip if in_place else ip.duplicate()
ip.getProcessor().snapshot()
unsharp=UnsharpMask()
USmask.setup(" ", ip)
USmask.sharpenFloat( ip.getProcessor(),sigma, weight)
return ip
def subtract_background(ip, radius = 50, create_background=False, dark_background=False, use_paraboloid =True, do_presmooth = True, correctCorners = True, rgb_brightness=False, in_place=True):
ip = ip if in_place else ip.duplicate()
if rgb_brightness:
BackgroundSubtracter().rollingBallBrightnessBackground(ip.getProcessor(), radius, create_background,not dark_background, use_paraboloid, do_presmooth, correctCorners)
else:
BackgroundSubtracter().rollingBallBackground(ip.getProcessor(), radius, create_background, not dark_background, use_paraboloid, do_presmooth, correctCorners)
return ip
###################################################################################################
#FFT
###################################################################################################
def image_fft(ip, show = True):
WindowManager.setTempCurrentImage(ip)
fft = FFT()
fft.run("fft")
#TODO: how to avoid it to be created?
#ret = ImagePlus("FHT of " + ip.getTitle(), WindowManager.getCurrentImage().getProcessor())
ret = WindowManager.getCurrentImage()
if not show:
WindowManager.getCurrentImage().hide()
return ret
def image_ffti(ip, show = True):
WindowManager.setTempCurrentImage(ip)
fft = FFT()
fft.run("inverse")
#WindowManager.getCurrentImage().hide()
#TODO: how to avoid it to be created?
#ret = WindowManager.getCurrentImage()
#WindowManager.getCurrentImage().hide()
#ret = ImagePlus(ip.getTitle() + " ffti", WindowManager.getCurrentImage().getProcessor())
ret = WindowManager.getCurrentImage()
if not show:
WindowManager.getCurrentImage().hide()
return ret
def bandpass_filter(ip, small_dia_px, large_dia_px, suppress_stripes = 0, stripes_tolerance_direction = 5.0, autoscale_after_filtering = False, saturate_if_autoscale = False, display_filter = False, in_place=True):
"""
suppress_stripes = 0 for none, 1 for horizontal, 2 for vertical
"""
ip = ip if in_place else ip.duplicate()
filter= FFTFilter();
FFTFilter.filterLargeDia = large_dia_px
FFTFilter.filterSmallDia = small_dia_px
FFTFilter.choiceIndex = suppress_stripes
FFTFilter.toleranceDia = stripes_tolerance_direction
FFTFilter.doScalingDia = autoscale_after_filtering
FFTFilter.saturateDia = saturate_if_autoscale
FFTFilter.displayFilter =display_filter
filter.setup(None, ip);
filter.run(ip.getProcessor())
return ip
###################################################################################################
#Convolution
###################################################################################################
KERNEL_BLUR = [[0.1111, 0.1111, 0.1111], [0.1111, 0.1111, 0.1111], [0.1111, 0.1111, 0.1111]]
KERNEL_SHARPEN = [[0.0, -0.75, 0.0], [-0.75, 4.0, -0.75], [0.0, -0.75, 0.0]]
KERNEL_SHARPEN_2 = [[-1.0, -1.0, -1.0], [-1.0, 9.0, -1.0], [-1.0, -1.0, -1.0]]
KERNEL_LIGHT = [[0.1, 0.1, 0.1], [0.1, 1.0, 0.1],[0.1, 0.1, 0.1]]
KERNEL_DARK = [[0.01, 0.01, 0.01],[0.01, 0.5, 0.01],[0.01, 0.01, 0.01]]
KERNEL_EDGE_DETECT = [[0.0, -0.75, 0.0], [-0.75, 3.0, -0.75], [0.0, -0.75, 0.0]]
KERNEL_EDGE_DETECT_2 = [[-0.5, -0.5, -0.5], [-0.5, 4.0, -0.5], [-0.5, -0.5, -0.5]]
KERNEL_DIFFERENTIAL_EDGE_DETECT = [[-1.0, 0.0, 1.0], [0.0, 0.0, 0.0], [1.0, 0.0, -1.0]]
KERNEL_PREWITT = [[-2.0, -1.0, 0.0], [-1.0, 0.0, 1.0 ], [0.0, 1.0, 2.0]]
KERNEL_SOBEL = [[2.0, 2.0, 0.0], [2.0, 0.0, -2.0 ], [0.0, -2.0, -2.0]]
def convolve(ip, kernel, in_place=True):
"""
kernel: list of lists
"""
ip = ip if in_place else ip.duplicate()
kernel_width = len(kernel)
kernel_height= len(kernel[0])
kernel = [item for row in kernel for item in row]
#Convolver().convolve(ip.getProcessor(), kernel, kernel_width, kernel_height)
ip.getProcessor().convolve(kernel, kernel_width, kernel_height)
return ip
###################################################################################################
#Shortcut to ImageProcessor methods
###################################################################################################
def invert(ip, in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().invert()
return ip
def smooth(ip, in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().smooth()
return ip
def sharpen(ip, in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().sharpen()
return ip
def edges(ip, in_place=True): #Sobel
ip = ip if in_place else ip.duplicate()
ip.getProcessor().findEdges()
return ip
def noise(ip, sigma = 25.0, in_place=True):
ip = ip if in_place else ip.duplicate()
ip.getProcessor().noise(sigma)
return ip
def remap(ip, min=None, max=None, in_place=True):
ip = ip if in_place else ip.duplicate()
if min is None or max is None:
stats = get_statistics(ip, Measurements.MIN_MAX)
if min is None: min = stats.min
if max is None: max = stats.max
ip.getProcessor().setMinAndMax(min, max)
return ip
def set_lut(ip, r, g, b):
"""
r,g and b are lists of 256 integers
"""
r = [x if x<128 else x-256 for x in r]
g = [x if x<128 else x-256 for x in g]
b = [x if x<128 else x-256 for x in b]
ip.setLut(LUT(jarray.array(r,'b'),jarray.array(g,'b'),jarray.array(b,'b')))
def resize(ip, width, height):
"""
Returns new ImagePlus
"""
p = ip.getProcessor().resize(width, height)
return ImagePlus(ip.getTitle() + " resized", p)
def binning(ip, factor):
p=ip.getProcessor().bin(factor)
return ImagePlus(ip.getTitle() + " resized", p)
def get_histogram(ip, hist_min = 0, hist_max = 0, hist_bins = 256, roi=None):
"""
hist_min, hist_max, hist_bins used only for float images (otherwise fixed to 0,255,256)
roi is list [x,y,w,h]
"""
if roi == None: ip.deleteRoi()
else: ip.setRoi(roi[0],roi[1],roi[2],roi[3])
image_statistics = ip.getStatistics(0, hist_bins, hist_min, hist_max)
return image_statistics.getHistogram()
def get_array(ip):
return ip.getProcessor().getIntArray()
def get_line(ip, x1, y1, x2, y2):
return ip.getProcessor().getLine(x1, y1, x2, y2)
def get_pixel_range(ip):
return (ip.getProcessor().getMin(), ip.getProcessor().getMax())
def get_num_channels(ip):
return ip.getProcessor().getNChannels()
def is_binary(ip):
return ip.getProcessor().isBinary()
def get_pixel(ip, x, y):
return ip.getProcessor().getPixel(x,y)
def get_pixel_array(ip, x, y):
a = [0]*get_num_channels(ip)
return ip.getProcessor().getPixel(x,y,a)
def get_pixels(ip):
return ip.getProcessor().getPixels()
def get_width(ip):
return ip.getProcessor().getWidth()
def get_height(ip):
return ip.getProcessor().getHeight()
def get_row(ip, y):
a = [0]*get_width(ip)
array = jarray.array(a,'i')
ip.getProcessor().getRow(0, y, array, get_width(ip))
return array
def get_col(ip, x):
a = [0]*get_height(ip)
array = jarray.array(a,'i')
ip.getProcessor().getColumn(x, 0, array, get_height(ip))
return array
def get_statistics(ip, measurements = None):
"""
Measurements is a mask of flags: https://imagej.nih.gov/ij/developer/api/ij/measure/Measurements.html.
Statistics object hold public fields: https://imagej.nih.gov/ij/developer/api/ij/process/ImageStatistics.html
"""
if measurements is None:
return ip.getStatistics()
else:
return ip.getStatistics(measurements)
###################################################################################################
#Image stack functions
###################################################################################################
def create_stack(ip_list, keep=True, title = None):
stack = Concatenator().concatenate(ip_list, keep)
if title is not None:
stack.setTitle(title)
return stack
def reslice(stack, start_at = "Top", vertically = True, flip = True, output_pixel_spacing=1.0, avoid_interpolation = True, title = None):
ss = Slicer()
ss.rotate = vertically
ss.startAt = start_at
ss.flip = flip
ss.nointerpolate = avoid_interpolation
ss.outputZSpacing = output_pixel_spacing
stack = ss.reslice(stack)
if title is not None:
stack.setTitle(title)
return stack

128
script/___Lib/jeputils.py Normal file
View File

@@ -0,0 +1,128 @@
###################################################################################################
# Facade to JEP: Embedded Python
###################################################################################################
#Matplotlib won't work out of the box because it's default backend (Qt) uses signals, which only works in
#the main thread. Ideally should find a fix, in order to mark the running thread as the main.
#As a workaround, one can use the Tk backend:
#
#import matplotlib
#matplotlib.use('TkAgg')
import sys
import os
import jep.Jep
import jep.NDArray
import java.lang.Thread
from startup import to_array, get_context
__jep = {}
def __get_jep():
t = java.lang.Thread.currentThread()
if not t in __jep:
init_jep()
return __jep[t]
def __close_jep():
t = java.lang.Thread.currentThread()
if t in __jep:
__jep[t].close()
def init_jep():
#TODO: Should do it but generates errors
#__close_jep()
j = jep.Jep(False)
#Faster, but statements must be complete
j.setInteractive(False)
__jep[java.lang.Thread.currentThread()] = j
j.eval("import sys")
#sys.argv is not present in JEP and may be needed for certain modules (as Tkinter)
j.eval("sys.argv = ['PShell']");
#Add standard script path to python path
j.eval("sys.path.append('" + get_context().setup.getScriptPath() + "')")
#Redirect stdout
j.eval("class JepStdout:\n" +
" def write(self, str):\n" +
" self.str += str\n" +
" def clear(self):\n" +
" self.str = ''\n" +
" def flush(self):\n" +
" pass\n")
j.eval("sys.stdout=JepStdout()");
j.eval("sys.stderr=JepStdout()");
j.eval("sys.stdout.clear()")
j.eval("sys.stderr.clear()")
def __print_stdout():
j=__get_jep()
output = j.getValue("sys.stdout.str")
err = j.getValue("sys.stderr.str")
j.eval("sys.stdout.clear()")
j.eval("sys.stderr.clear()")
if (output is not None) and len(output)>0:
print output
if (err is not None) and len(err)>0:
print >> sys.stderr, err
def run_jep(script_name, vars = {}):
global __jep
script = get_context().scriptManager.library.resolveFile(script_name)
if script is None :
script= os.path.abspath(script_name)
j=__get_jep()
for v in vars:
j.set(v, vars[v])
try:
j.runScript(script)
finally:
__print_stdout()
def eval_jep(line):
j=__get_jep()
try:
j.eval(line)
finally:
__print_stdout()
def set_jep(var, value):
j=__get_jep()
j.set(var, value)
def get_jep(var):
j=__get_jep()
return j.getValue(var)
def call_jep(module, function, args = []):
j=__get_jep()
if "/" in module:
script = get_context().scriptManager.library.resolveFile(module)
if "\\" in script:
#Windows paths
module_path = script[0:script.rfind("\\")]
module = script[script.rfind("\\")+1:]
else:
#Linux paths
module_path = script[0:script.rfind("/")]
module = script[script.rfind("/")+1:]
eval_jep("import sys")
eval_jep("sys.path.append('" + module_path + "')")
if module.endswith(".py"):
module = module[0:-3]
f = module+"_" + function+"_"+str(j.hashCode())
try:
eval_jep("from " + module + " import " + function + " as " + f)
ret = j.invoke(f, args)
finally:
__print_stdout()
return ret
#Converts pythonlist or Java array to numpy array
def to_npa(data, dimensions = None, type = None):
data = to_array(data,'d' if type is None else type)
return jep.NDArray(data, dimensions)

655
script/___Lib/mathutils.py Normal file
View File

@@ -0,0 +1,655 @@
###################################################################################################
# Facade to Apache Commons Math
###################################################################################################
import sys
import math
import operator
import java.util.List
import java.lang.reflect.Array
import java.lang.Class as Class
import jarray
import org.python.core.PyArray as PyArray
import ch.psi.utils.Convert as Convert
import org.apache.commons.math3.util.FastMath as FastMath
import org.apache.commons.math3.util.Pair as Pair
import org.apache.commons.math3.complex.Complex as Complex
import org.apache.commons.math3.analysis.DifferentiableUnivariateFunction as DifferentiableUnivariateFunction
import org.apache.commons.math3.analysis.function.Gaussian as Gaussian
import org.apache.commons.math3.analysis.function.HarmonicOscillator as HarmonicOscillator
import org.apache.commons.math3.analysis.differentiation.DerivativeStructure as DerivativeStructure
import org.apache.commons.math3.analysis.differentiation.FiniteDifferencesDifferentiator as FiniteDifferencesDifferentiator
import org.apache.commons.math3.analysis.integration.SimpsonIntegrator as SimpsonIntegrator
import org.apache.commons.math3.analysis.integration.TrapezoidIntegrator as TrapezoidIntegrator
import org.apache.commons.math3.analysis.integration.RombergIntegrator as RombergIntegrator
import org.apache.commons.math3.analysis.integration.MidPointIntegrator as MidPointIntegrator
import org.apache.commons.math3.analysis.polynomials.PolynomialFunction as PolynomialFunction
import org.apache.commons.math3.analysis.polynomials.PolynomialFunctionLagrangeForm as PolynomialFunctionLagrangeForm
import org.apache.commons.math3.analysis.solvers.LaguerreSolver as LaguerreSolver
import org.apache.commons.math3.analysis.UnivariateFunction as UnivariateFunction
import org.apache.commons.math3.analysis.interpolation.SplineInterpolator as SplineInterpolator
import org.apache.commons.math3.analysis.interpolation.LinearInterpolator as LinearInterpolator
import org.apache.commons.math3.analysis.interpolation.NevilleInterpolator as NevilleInterpolator
import org.apache.commons.math3.analysis.interpolation.LoessInterpolator as LoessInterpolator
import org.apache.commons.math3.analysis.interpolation.DividedDifferenceInterpolator as DividedDifferenceInterpolator
import org.apache.commons.math3.analysis.interpolation.AkimaSplineInterpolator as AkimaSplineInterpolator
import org.apache.commons.math3.fitting.GaussianCurveFitter as GaussianCurveFitter
import org.apache.commons.math3.fitting.PolynomialCurveFitter as PolynomialCurveFitter
import org.apache.commons.math3.fitting.HarmonicCurveFitter as HarmonicCurveFitter
import org.apache.commons.math3.fitting.WeightedObservedPoint as WeightedObservedPoint
import org.apache.commons.math3.fitting.leastsquares.MultivariateJacobianFunction as MultivariateJacobianFunction
import org.apache.commons.math3.fitting.leastsquares.LeastSquaresBuilder as LeastSquaresBuilder
import org.apache.commons.math3.fitting.leastsquares.LevenbergMarquardtOptimizer as LevenbergMarquardtOptimizer
import org.apache.commons.math3.fitting.leastsquares.GaussNewtonOptimizer as GaussNewtonOptimizer
import org.apache.commons.math3.stat.regression.SimpleRegression as SimpleRegression
import org.apache.commons.math3.transform.FastFourierTransformer as FastFourierTransformer
import org.apache.commons.math3.transform.DftNormalization as DftNormalization
import org.apache.commons.math3.transform.TransformType as TransformType
import org.apache.commons.math3.linear.ArrayRealVector as ArrayRealVector
import org.apache.commons.math3.linear.Array2DRowRealMatrix as Array2DRowRealMatrix
import org.apache.commons.math3.linear.MatrixUtils as MatrixUtils
###################################################################################################
#Derivative and interpolation
###################################################################################################
def get_values(f, xdata):
"""Return list of values of a function
Args:
f(UnivariateFunction): function
xdata(float array or list): Domain values
Returns:
List of doubles
"""
v = []
for x in xdata:
v.append(f.value(x))
return v
def interpolate(data, xdata = None, interpolation_type = "linear"):
"""Interpolate data array or list to a UnivariateFunction
Args:
data(float array or list): The values to interpolate
xdata(float array or list, optional): Domain values
interpolation_type(str , optional): "linear", "cubic", "akima", "neville", "loess", "newton"
Returns:
UnivariateDifferentiableFunction object
"""
if xdata is None:
from startup import frange
xdata = frange(0, len(data), 1.0)
else:
#X must be ordered
xy = sorted(zip(xdata,data), key=operator.itemgetter(0))
xdata, data = zip(*xy)
if len(data) != len(xdata) or len(data)<2:
raise Exception("Dimension mismatch")
if interpolation_type == "cubic":
i = SplineInterpolator()
elif interpolation_type == "linear":
i = LinearInterpolator()
elif interpolation_type == "akima":
i = AkimaSplineInterpolator()
elif interpolation_type == "neville":
i = NevilleInterpolator()
elif interpolation_type == "loess":
i = LoessInterpolator()
elif interpolation_type == "newton":
i = DividedDifferenceInterpolator()
else:
raise Exception("Invalid interpolation type")
from startup import to_array
return i.interpolate(to_array(xdata,'d'), to_array(data,'d'))
def deriv(f, xdata = None, interpolation_type = "linear"):
"""Calculate derivative of UnivariateFunction, array or list.
Args:
f(UnivariateFunction or array): The function object. If array it is interpolated.
xdata(float array or list, optional): Domain values to process.
interpolation_type(str , optional): "linear", "cubic", "akima", "neville", "loess", "newton"
Returns:
List with the derivative values for xdata
"""
if not isinstance(f,UnivariateFunction):
if xdata is None:
from startup import frange
xdata = frange(0, len(f), 1.0)
f = interpolate(f, xdata, interpolation_type)
if xdata is None:
if isinstance(f,DifferentiableUnivariateFunction):
return f.derivative()
raise Exception("Domain range not defined")
d = []
for x in xdata:
xds = DerivativeStructure(1, 2, 0, x)
yds = f.value(xds)
d.append( yds.getPartialDerivative(1))
return d
def integrate(f, range = None, xdata = None, interpolation_type = "linear", integrator_type = "simpson"):
"""Integrate UnivariateFunction, array or list in an interval.
Args:
f(UnivariateFunction or array): The function object. If array it is interpolated.
range(list, optional): integration range ([min, max]).
xdata(float array or list, optional): disregarded if f is UnivariateFunction.
interpolation_type(str , optional): "linear", "cubic", "akima", "neville", "loess", "newton"
integrator_type(str , optional): "simpson", "trapezoid", "romberg" or "midpoint"
Returns:
Integrated value (Float)
"""
if not isinstance(f, UnivariateFunction):
from startup import frange
if xdata is None:
xdata = frange(0, len(f), 1.0)
if range is None:
range = xdata
f = interpolate(f, xdata, interpolation_type)
if range is None:
raise Exception("Domain range not defined")
d = []
if integrator_type == "simpson":
integrator = SimpsonIntegrator()
elif integrator_type == "trapezoid":
integrator = TrapezoidIntegrator()
elif integrator_type == "romberg":
integrator = RombergIntegrator()
elif integrator_type == "midpoint":
integrator = MidPointIntegrator()
raise Exception("Invalid integrator type")
lower = min(range)
upper = max(range)
return integrator.integrate(MAX_EVALUATIONS, f, lower, upper)
def trapz(y, xdata=None):
"""Integrate an array or list using the composite trapezoidal rule.
Args:
y(array or list)
xdata(float array or list, optional)
"""
return integrate(y, range = None, xdata = xdata, interpolation_type = "linear", integrator_type = "trapezoid")
###################################################################################################
#Fitting and peak search
###################################################################################################
try:
MAX_FLOAT = sys.float_info.max
except: # Python 2.5
MAX_FLOAT = 1.7976931348623157e+308
MAX_ITERATIONS = 1000
MAX_EVALUATIONS = 1000000
def calculate_peaks(function, start_value, end_value = MAX_FLOAT, positive=True):
"""Calculate peaks of a DifferentiableUnivariateFunction in a given range by finding the roots of the derivative
Args:
function(DifferentiableUnivariateFunction): The function object.
start_value(float): start of range
end_value(float, optional): end of range
positive (boolean, optional): True for searching positive peaks, False for negative.
Returns:
List of peaks in the interval
"""
derivative = function.derivative()
derivative2 = derivative.derivative()
ret = []
solver = LaguerreSolver()
for complex in solver.solveAllComplex(derivative.coefficients, start_value):
r = complex.real
if start_value < r < end_value:
if (positive and (derivative2.value(r) < 0)) or ( (not positive) and (derivative2.value(r) > 0)):
ret.append(r)
return ret
def estimate_peak_indexes(data, xdata = None, threshold = None, min_peak_distance = None, positive = True):
"""Estimation of peaks in an array by ordering local maxima according to given criteria.
Args:
data(float array or list)
xdata(float array or list, optional): if not None must have the same length as data.
threshold(float, optional): if specified filter peaks below this value
min_peak_distance(float, optional): if specified defines minimum distance between two peaks.
if xdata == None, it represents index counts, otherwise in xdata units.
positive (boolean, optional): True for searching positive peaks, False for negative.
Returns:
List of peaks indexes.
"""
peaks = []
indexes = sorted(range(len(data)),key=lambda x:data[x])
if positive:
indexes = reversed(indexes)
for index in indexes:
first = (index == 0)
last = (index == (len(data)-1))
val=data[index]
prev = float('NaN') if first else data[index-1]
next = float('NaN') if last else data[index+1]
if threshold is not None:
if (positive and (val<threshold)) or ((not positive) and (val>threshold)):
break
if ( positive and (first or val>prev ) and (last or val>=next ) ) or (
(not positive) and (first or val<prev ) and (last or val<=next ) ):
append = True
if min_peak_distance is not None:
for peak in peaks:
if ((xdata is None) and (abs(peak-index) < min_peak_distance)) or (
(xdata is not None) and (abs(xdata[peak]-xdata[index]) < min_peak_distance)):
append = False
break
if append:
peaks.append(index)
return peaks
def _assert_valid_for_fit(y,x):
if len(y)<2 or ((x is not None) and (len(x)>len(y))):
raise Exception("Invalid data for fit")
def fit_gaussians(y, x, peak_indexes):
"""Fits data on multiple gaussians on the given peak indexes.
Args:
x(float array or list)
y(float array or list)
peak_indexes(list of int)
Returns:
List of tuples of gaussian parameters: (normalization, mean, sigma)
"""
_assert_valid_for_fit(y,x)
ret = []
minimum = min(y)
for peak in peak_indexes:
#Copy data
data = y[:]
#Remover data from other peaks
for p in peak_indexes:
limit = int(round((p+peak)/2))
if (p > peak):
data[limit : len(y)] =[minimum] * (len(y)-limit)
elif (p < peak):
data[0:limit] = [minimum] *limit
#Build fit point list
values = create_fit_point_list(data, x)
maximum = max(data)
gaussian_fitter = GaussianCurveFitter.create().withStartPoint([(maximum-minimum)/2,x[peak],1.0]).withMaxIterations(MAX_ITERATIONS)
#Fit return parameters: (normalization, mean, sigma)
try:
ret.append(gaussian_fitter.fit(values).tolist())
except:
ret.append(None) #Fitting error
return ret
def create_fit_point_list(y, x, weights = None):
values = []
for i in sorted(range(len(x)),key=lambda v:x[v]): #Creating list ordered by x, needed for gauss fit
if weights is None:
values.append(WeightedObservedPoint(1.0, x[i], y[i]))
else:
values.append(WeightedObservedPoint(weights[i], x[i], y[i]))
return values
def fit_polynomial(y, x, order, start_point = None, weights = None):
"""Fits data into a polynomial.
Args:
x(float array or list): observed points x
y(float array or list): observed points y
order(int): if start_point is provided order parameter is disregarded - set to len(start_point)-1.
start_point(optional tuple of float): initial parameters (a0, a1, a2, ...)
weights(optional float array or list): weight for each observed point
Returns:
Tuples of polynomial parameters: (a0, a1, a2, ...)
"""
_assert_valid_for_fit(y,x)
fit_point_list = create_fit_point_list(y, x, weights)
if start_point is None:
polynomial_fitter = PolynomialCurveFitter.create(order).withMaxIterations(MAX_ITERATIONS)
else:
polynomial_fitter = PolynomialCurveFitter.create(0).withStartPoint(start_point).withMaxIterations(MAX_ITERATIONS)
try:
return polynomial_fitter.fit(fit_point_list).tolist()
except:
raise Exception("Fitting failure")
def fit_gaussian(y, x, start_point = None, weights = None):
"""Fits data into a gaussian.
Args:
x(float array or list): observed points x
y(float array or list): observed points y
start_point(optional tuple of float): initial parameters (normalization, mean, sigma)
If None, use a custom initial estimation.
Set to "default" to force Commons.Math the default (GaussianCurveFitter.ParameterGuesser).
weights(optional float array or list): weight for each observed point
Returns:
Tuples of gaussian parameters: (normalization, mean, sigma)
"""
_assert_valid_for_fit(y,x)
fit_point_list = create_fit_point_list(y, x, weights)
#If start point not provided, start on peak
if start_point is None:
maximum, minimum = max(y), min(y)
norm = maximum - minimum
mean = x[y.index(maximum)]
sigma = trapz([v-minimum for v in y], x) / (norm*math.sqrt(2*math.pi))
start_point = (norm, mean, sigma)
elif start_point == "simple":
start_point = [(max(y)-min(y))/2, x[y.index(max(y))], 1.0]
elif start_point == "default":
start_point = GaussianCurveFitter.ParameterGuesser(fit_point_list).guess().tolist()
gaussian_fitter = GaussianCurveFitter.create().withStartPoint(start_point).withMaxIterations(MAX_ITERATIONS)
try:
return gaussian_fitter.fit(fit_point_list).tolist() # (normalization, mean, sigma)
except:
raise Exception("Fitting failure")
def fit_harmonic(y, x, start_point = None, weights = None):
"""Fits data into an harmonic.
Args:
x(float array or list): observed points x
y(float array or list): observed points y
start_point(optional tuple of float): initial parameters (amplitude, angular_frequency, phase)
weights(optional float array or list): weight for each observed point
Returns:
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
"""
_assert_valid_for_fit(y,x)
fit_point_list = create_fit_point_list(y, x, weights)
if start_point is None:
harmonic_fitter = HarmonicCurveFitter.create().withMaxIterations(MAX_ITERATIONS)
else:
harmonic_fitter = HarmonicCurveFitter.create().withStartPoint(start_point).withMaxIterations(MAX_ITERATIONS)
try:
return harmonic_fitter.fit(fit_point_list).tolist() # (amplitude, angular_frequency, phase)
except:
raise Exception("Fitting failure")
def fit_gaussian_offset(y, x, start_point = None, weights = None):
"""Fits data into a gaussian with offset (constant background).
f(x) = a + b * exp(-(pow((x - c), 2) / (2 * pow(d, 2))))
Args:
x(float array or list): observed points x
y(float array or list): observed points y
start_point(optional tuple of float): initial parameters (normalization, mean, sigma)
weights(optional float array or list): weight for each observed point
Returns:
Tuples of gaussian parameters: (offset, normalization, mean, sigma)
"""
# For normalised gauss curve sigma=1/(amp*sqrt(2*pi))
if start_point is None:
off = min(y) # good enough starting point for offset
com = x[y.index(max(y))]
amp = max(y) - off
sigma = trapz([v-off for v in y], x) / (amp*math.sqrt(2*math.pi))
start_point = [off, amp, com , sigma]
class Model(MultivariateJacobianFunction):
def value(self, variables):
value = ArrayRealVector(len(x))
jacobian = Array2DRowRealMatrix(len(x), 4)
for i in range(len(x)):
(a,b,c,d) = (variables.getEntry(0), variables.getEntry(1), variables.getEntry(2), variables.getEntry(3))
v = math.exp(-(math.pow((x[i] - c), 2) / (2 * math.pow(d, 2))))
model = a + b * v
value.setEntry(i, model)
jacobian.setEntry(i, 0, 1) # derivative with respect to p0 = a
jacobian.setEntry(i, 1, v) # derivative with respect to p1 = b
v2 = b*v*((x[i] - c)/math.pow(d, 2))
jacobian.setEntry(i, 2, v2) # derivative with respect to p2 = c
jacobian.setEntry(i, 3, v2*(x[i] - c)/d ) # derivative with respect to p3 = d
return Pair(value, jacobian)
model = Model()
target = [v for v in y] #the target is to have all points at the positios
(parameters, residuals, rms, evals, iters) = optimize_least_squares(model, target, start_point, weights)
return parameters
def fit_gaussian_linear(y, x, start_point = None, weights = None):
"""Fits data into a gaussian with linear background.
f(x) = a * x + b + c * exp(-(pow((x - d), 2) / (2 * pow(e, 2))))
Args:
x(float array or list): observed points x
y(float array or list): observed points y
start_point(optional tuple of float): initial parameters (normalization, mean, sigma)
weights(optional float array or list): weight for each observed point
Returns:
Tuples of gaussian parameters: (a, b, normalization, mean, sigma)
"""
# For normalised gauss curve sigma=1/(amp*sqrt(2*pi))
if start_point is None:
off = min(y) # good enough starting point for offset
com = x[y.index(max(y))]
amp = max(y) - off
sigma = trapz([v-off for v in y], x) / (amp*math.sqrt(2*math.pi))
start_point = [0, off, amp, com, sigma]
class Model(MultivariateJacobianFunction):
def value(self, variables):
value = ArrayRealVector(len(x))
jacobian = Array2DRowRealMatrix(len(x), 5)
for i in range(len(x)):
(a,b,c,d,e) = (variables.getEntry(0), variables.getEntry(1), variables.getEntry(2), variables.getEntry(3), variables.getEntry(4))
v = math.exp(-(math.pow((x[i] - d), 2) / (2 * math.pow(e, 2))))
model = a*x[i] + b + c * v
value.setEntry(i, model)
jacobian.setEntry(i, 0, x[i]) # derivative with respect to p0 = a
jacobian.setEntry(i, 1, 1) # derivative with respect to p1 = b
jacobian.setEntry(i, 2, v) # derivative with respect to p2 = c
v2 = c*v*((x[i] - d)/math.pow(e, 2))
jacobian.setEntry(i, 3, v2) # derivative with respect to p3 = d
jacobian.setEntry(i, 4, v2*(x[i] - d)/e ) # derivative with respect to p4 = e
return Pair(value, jacobian)
model = Model()
target = [v for v in y] #the target is to have all points at the positios
(parameters, residuals, rms, evals, iters) = optimize_least_squares(model, target, start_point, weights)
return parameters
def fit_gaussian_exp_bkg(y, x, start_point = None, weights = None):
"""Fits data into a gaussian with exponential background.
f(x) = a * math.exp(-(x/b)) + c * exp(-(pow((x - d), 2) / (2 * pow(e, 2))))
Args:
x(float array or list): observed points x
y(float array or list): observed points y
start_point(optional tuple of float): initial parameters (normalization, mean, sigma)
weights(optional float array or list): weight for each observed point
Returns:
Tuples of gaussian parameters: (a,b , normalization, mean, sigma)
"""
# For normalised gauss curve sigma=1/(amp*sqrt(2*pi))
if start_point is None:
off = min(y) # good enough starting point for offset
com = x[len(x)/2]
#com = 11.9
amp = max(y) - off
sigma = trapz([v-off for v in y], x) / (amp*math.sqrt(2*math.pi))
start_point = [1, 1, amp, com, sigma]
class Model(MultivariateJacobianFunction):
def value(self, variables):
value = ArrayRealVector(len(x))
jacobian = Array2DRowRealMatrix(len(x), 5)
for i in range(len(x)):
(a,b,c,d,e) = (variables.getEntry(0), variables.getEntry(1), variables.getEntry(2), variables.getEntry(3), variables.getEntry(4))
v = math.exp(-(math.pow((x[i] - d), 2) / (2 * math.pow(e, 2))))
bkg=math.exp(-(x[i]/b))
model = a*bkg + c * v
value.setEntry(i, model)
jacobian.setEntry(i, 0, bkg) # derivative with respect to p0 = a
jacobian.setEntry(i, 1, a*x[i]*bkg/math.pow(b, 2)) # derivative with respect to p1 = b
jacobian.setEntry(i, 2, v) # derivative with respect to p2 = c
v2 = c*v*((x[i] - d)/math.pow(e, 2))
jacobian.setEntry(i, 3, v2) # derivative with respect to p3 = d
jacobian.setEntry(i, 4, v2*(x[i] - d)/e ) # derivative with respect to p4 = e
return Pair(value, jacobian)
model = Model()
target = [v for v in y] #the target is to have all points at the positios
(parameters, residuals, rms, evals, iters) = optimize_least_squares(model, target, start_point, weights)
return parameters
###################################################################################################
#Least squares problem
###################################################################################################
def optimize_least_squares(model, target, initial, weights):
"""Fits a parametric model to a set of observed values by minimizing a cost function.
Args:
model(MultivariateJacobianFunction): observed points x
target(float array or list): observed data
initial(optional tuple of float): initial guess
weights(optional float array or list): weight for each observed point
Returns:
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
"""
if isinstance(weights,tuple) or isinstance(weights,list):
weights = MatrixUtils.createRealDiagonalMatrix(weights)
problem = LeastSquaresBuilder().start(initial).model(model).target(target).lazyEvaluation(False).maxEvaluations(MAX_EVALUATIONS).maxIterations(MAX_ITERATIONS).weight(weights).build()
optimizer = LevenbergMarquardtOptimizer()
optimum = optimizer.optimize(problem)
parameters=optimum.getPoint().toArray().tolist()
residuals = optimum.getResiduals().toArray().tolist()
rms = optimum.getRMS()
evals = optimum.getEvaluations()
iters = optimum.getIterations()
return (parameters, residuals, rms, evals, iters)
###################################################################################################
#FFT
###################################################################################################
def is_power(num, base):
if base<=1: return num == 1
power = int (math.log (num, base) + 0.5)
return base ** power == num
def pad_to_power_of_two(data):
if is_power(len(data),2):
return data
pad =(1 << len(data).bit_length()) - len(data)
elem = complex(0,0) if type(data[0]) is complex else [0.0,]
return data + elem * pad
def get_real(values):
"""Returns real part of a complex numbers vector.
Args:
values: List of complex.
Returns:
List of float
"""
ret = []
for c in values:
ret.append(c.real)
return ret
def get_imag(values):
"""Returns imaginary part of a complex numbers vector.
Args:
values: List of complex.
Returns:
List of float
"""
ret = []
for c in values:
ret.append(c.imag)
return ret
def get_modulus(values):
"""Returns the modulus of a complex numbers vector.
Args:
values: List of complex.
Returns:
List of float
"""
ret = []
for c in values:
ret.append(math.hypot(c.imag,c.real))
return ret
def get_phase(values):
"""Returns the phase of a complex numbers vector.
Args:
values: List of complex.
Returns:
List of float
"""
ret = []
for c in values:
ret.append(math.atan(c.imag/c.real))
return ret
def fft(f):
"""Calculates the Fast Fourrier Transform of a vector, padding to the next power of 2 elements.
Args:
values(): List of float or complex
Returns:
List of complex
"""
f = pad_to_power_of_two(f)
if type(f[0]) is complex:
aux = []
for c in f:
aux.append(Complex(c.real, c.imag))
f = aux
fftt = FastFourierTransformer(DftNormalization.STANDARD)
ret = []
for c in fftt.transform(f,TransformType.FORWARD ):
ret.append(complex(c.getReal(),c.getImaginary()))
return ret
def ffti(f):
"""Calculates the Inverse Fast Fourrier Transform of a vector, padding to the next power of 2 elements.
Args:
values(): List of float or complex
Returns:
List of complex
"""
f = pad_to_power_of_two(f)
if type(f[0]) is complex:
aux = []
for c in f:
aux.append(Complex(c.real, c.imag))
f = aux
fftt = FastFourierTransformer(DftNormalization.STANDARD)
ret = []
for c in fftt.transform(f,TransformType.INVERSE ):
ret.append(complex(c.getReal(),c.getImaginary()))
return ret

119
script/___Lib/plotutils.py Normal file
View File

@@ -0,0 +1,119 @@
###################################################################################################
# Plot utilities
###################################################################################################
import ch.psi.pshell.plot.LinePlotSeries as LinePlotSeries
import ch.psi.pshell.plot.LinePlotErrorSeries as LinePlotErrorSeries
import math
from startup import frange, to_array
def plot_function(plot, function, name, range, show_points = True, show_lines = True, color = None):
"""Plots a function to a plot.
Args:
plot(LinePlot)
function(UnivariateFunction): Gaussian, PolynomialFunction, HarmonicOscillator...
name(str): name of the series
range(list or array of floats): x values to plot
Returns:
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
"""
if plot.style.isError():
s = LinePlotErrorSeries(name, color)
else:
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setPointsVisible(show_points)
s.setLinesVisible(show_lines)
for x in range:
s.appendData(x, function.value(x))
return s
def plot_data(plot, data, name, xdata = None, error = None, show_points = True, show_lines = True, color = None):
"""Plots a subscriptable object to a plot.
Args:
plot(LinePlot)
data(subscriptable): Y data
name(str): name of the series
xdata(subscriptable): X data
error(subscriptable): Error data (only for error plots)
Returns:
Tuples of harmonic parameters: (amplitude, angular_frequency, phase)
"""
if plot.style.isError():
s = LinePlotErrorSeries(name, color)
else:
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setPointsVisible(show_points)
s.setLinesVisible(show_lines)
if xdata is None:
xdata = range(len(data))
xdata = to_array(xdata, 'd')
data = to_array(data, 'd')
if plot.style.isError():
error = to_array(error, 'd')
s.setData(xdata, data, error)
else:
s.setData(xdata, data)
return s
def plot_point(plot, x, y, size = 3, color = None, name = "Point"):
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setPointSize(size)
s.appendData(x, y)
return s
def plot_line(plot, x1, y1, x2, y2, width = 1, color = None, name = "Line"):
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setLineWidth(width)
s.setPointsVisible(False)
s.appendData(x1, y1)
s.appendData(x2, y2)
return s
def plot_cross(plot, x, y, size = 1.0, width = 1, color = None, name = "Cross"):
size = float(size)
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setLineWidth(width)
s.setPointsVisible(False)
s.appendData(float('nan'), float('nan'))
s.appendData(x-size/2, y)
s.appendData(x+size/2, y)
s.appendData(float('nan'), float('nan'))
s.appendData(x, y-size/2)
s.appendData(x, y+size/2)
return s
def plot_rectangle(plot, x1, y1, x2, y2, width = 1, color = None, name = "Rectangle"):
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setLineWidth(width)
s.setPointsVisible(False)
s.appendData(x1, y1)
s.appendData(x1, y2)
s.appendData(x2, y2)
s.appendData(x2, y1)
s.appendData(x1, y1)
return s
def plot_circle(plot, cx, cy, radius, width = 1, color = None, name = "Circle"):
s = LinePlotSeries(name, color)
plot.addSeries(s)
s.setLineWidth(width)
s.setPointsVisible(False)
res=float(radius) / 100.0
epson = 1e-12
for xp in frange (cx+radius-epson , cx-radius+epson , -res):
yp = math.sqrt(math.pow(radius, 2) - math.pow(xp - cx, 2)) + cy
s.appendData(xp, yp)
for xp in frange (cx-radius+epson , cx+radius-epson, res):
yp = -math.sqrt(math.pow(radius, 2) - math.pow(xp - cx, 2)) + cy
s.appendData(xp, yp)
if s.getCount()>0:
s.appendData(s.getX()[0], s.getY()[0])
return s

2570
script/___Lib/startup.py Normal file

File diff suppressed because it is too large Load Diff

191
script/___Lib/statsutils.py Normal file
View File

@@ -0,0 +1,191 @@
###################################################################################################
# Utilities for generating reports from command statistics files
###################################################################################################
#CsvJdbc JAR file must be downloaded to extensions folder:
#http://central.maven.org/maven2/net/sourceforge/csvjdbc/csvjdbc/1.0.34/csvjdbc-1.0.34.jar
import java.sql.DriverManager as DriverManager
import java.sql.ResultSet as ResultSet
import java.util.Properties as Properties
import java.lang.Class as Class
import os
from startup import get_context
import ch.psi.pshell.core.CommandManager.CommandStatisticsFileRange as CommandStatisticsFileRange
stmt = None
STAT_COLUMN_NAMES = ["Command","Args","Source","Start","End","Background","Result","Return"]
def get_stats_connection():
global stmt
Class.forName("org.relique.jdbc.csv.CsvDriver");
db = os.path.abspath(get_context().setup.expandPath("{home}/statistics"))
props = Properties()
props.put("fileExtension", ".csv")
props.put("separator", ";")
props.put("timestampFormat", "dd/MM/yy HH:mm:ss.SSS")
props.put("indexedFiles", "true");
props.put("columnTypes", "String,String,String,Timestamp,Timestamp,Boolean,String,String");
fileRange = get_context().commandManager.commandStatisticsConfig.fileRange
if fileRange==CommandStatisticsFileRange.Daily:
props.put("fileTailPattern", "(\\d+)_(\\d+)_(\\d+)");
props.put("fileTailParts", "Year,Month,Day");
elif fileRange==CommandStatisticsFileRange.Monthly:
props.put("fileTailPattern", "(\\d+)_(\\d+)"); #props.put("fileTailPattern", "-(\\d+)_(\\d+)");
props.put("fileTailParts", "Year,Month");
elif fileRange==CommandStatisticsFileRange.Yearly:
props.put("fileTailPattern", "(\\d+)");
props.put("fileTailParts", "Year");
conn = DriverManager.getConnection("jdbc:relique:csv:" + db, props);
stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE,ResultSet.CONCUR_READ_ONLY);
return conn
def _get_count(sql):
ret = 0
results = stmt.executeQuery("SELECT COUNT(*) AS count FROM . WHERE " + sql)
if results.first():
ret = results.getInt("count")
return ret
def _add_sql_time(sql, start, end):
if start:
if len(start)==8:
start = start + " 00:00:00.000"
sql = sql + " AND Start>='" + start + "'"
if end:
if len(end)==8:
end = end + " 00:00:00.000"
sql = sql + " AND (\"End\"<'" + end + "')"
return sql
def get_count(command= "%%", start = None, end = None, result= "%%"):
sql = "Command LIKE '"+ command +"' AND Result LIKE '"+ result +"'"
sql = _add_sql_time(sql, start, end)
return _get_count(sql)
def get_return_count(command= "%%", start = None, end = None, ret= "%%"):
sql = "Command LIKE '"+ command +"' AND Return = '"+ ret +"'"
sql = _add_sql_time(sql, start, end)
return _get_count(sql)
def get_cmd_stats(command = "%", start = None, end = None):
s = get_count(command, start, end, "success")
a = get_count(command, start, end, "abort")
e = get_count(command, start, end, "error")
return (s,a,e)
def get_errors(command = "%", start = None, end = None):
sql = "SELECT Return, Count(Return) as count FROM . WHERE Command LIKE '"+ command +"' AND Result='error'"
sql = _add_sql_time(sql, start, end)
sql = sql + " GROUP BY Return ORDER BY count DESC"
results = stmt.executeQuery(sql)
ret = []
while results.next():
ret.append((results.getInt("count"), results.getString("Return")))
return ret
def get_cmd_records(command = "%", start = None, end = None, result= "%%"):
sql = "SELECT * FROM . WHERE Command LIKE '"+ command +"' AND Result LIKE '"+ result +"'"
sql = _add_sql_time(sql, start, end)
results = stmt.executeQuery(sql)
ret = []
while results.next():
rec={}
for col in STAT_COLUMN_NAMES:
rec[col]= results.getString(col)
ret.append(rec)
return ret
def get_commands(commands =None, start = None, end = None):
ret = []
if (commands is None) or (len(commands)==0):
sql = "SELECT * FROM . WHERE Command != ''"
sql = _add_sql_time(sql, start, end)
sql = sql + " GROUP BY Command"
results = stmt.executeQuery(sql)
while results.next():
cmd = results.getString("Command")
if cmd and not " " in cmd:
ret.append(cmd)
else:
for cmd in commands:
if get_count(cmd, start, end) >0 :
ret.append(cmd)
return ret
def print_cmd_stats(command = "%", start = None, end = None):
print "-----------------------------------------------------------"
print "Statistics from ", start , " to ", end
(s,a,e) = get_cmd_stats(command, start, end)
t=s+a+e #get_count(command, start, end, "%")
print "Command: " , command , " Records: ", t
if t>0:
print "%-10s %7.2f%% - %d" % ("Success", (float(s)/t) * 100, s)
print "%-10s %7.2f%% - %d" % ("Abort", (float(a)/t) * 100, a)
print "%-10s %7.2f%% - %d" % ("Error", (float(e)/t) * 100, e)
print "\nErrors:"
print "%5s %s" % ("Count", "Error")
errors = get_errors(command, start, end)
for error in errors:
print "%5d %s" % (error[0], error[1])
print "-----------------------------------------------------------"
def print_cmd_records(command = "%", start = None, end = None, result= "%%"):
print "-----------------------------------------------------------"
print "Records from ", start , " to ", end
info = get_cmd_records(command, start, end, result)
print "Command: " , command , " Result: ", result, " Records: ", len(info)
for col in STAT_COLUMN_NAMES:
print col+ "; " ,
print
for cmd in info:
s = ""
for col in STAT_COLUMN_NAMES:
s = s + cmd[col]+ "; "
print s
print "-----------------------------------------------------------"
def print_stats(commands = None, start = None, end = None):
print "-----------------------------------------------------------"
print "Statistics from ", start , " to ", end
print "%-20s %-5s %8s %8s %8s" % ("Command", "Total", "Success", "Abort", "Error")
cmds = get_commands(commands)
for cmd in cmds:
(s,a,e) = get_cmd_stats(cmd, start, end)
t=s+a+e
if t>0:
print "%-20s %-5d %7.2f%% %7.2f%% %7.2f%%" % (cmd, t, (float(s)/t) * 100, (float(a)/t) * 100, (float(e)/t) * 100)
else:
print "%-20s %-5d" % (cmd, t)
print "-----------------------------------------------------------"
if __name__=='__main__':
conn = get_stats_connection()
#Print stats of all commands, with no time range
print_stats()
cmds = ["%scan1%", "%scan2%"]
start= "01/03/19"
end= "01/04/19"
#Print stats all commands containing 'scan1' and 'scan2' in the month 03.2019
print_stats(cmds, start, end)
#Print individual statistics, including error count, for commands containing 'scan1' and 'scan2'
for cmd in cmds:
print_cmd_stats (cmd, start, end)
#Print all records for commands containing 'scan1'
print_cmd_records("%scan1%%", start, end, "error")
conn.close()