Files
x06da/script/devices/VmbCamera.py
alexgobbo 0692cd7cdc
2024-09-17 18:10:45 +02:00

291 lines
9.8 KiB
Python

import requests
import json
class VmbCamera (CameraBase):
def __init__(self, name, config_url="localhost:9010", stream_url="localhost:9000", image_name=None):
CameraBase.__init__(self,name)
self.config_url = config_url
self.base_url = "http://" + config_url + "/api"
self.data_type = None
self.color_mode= None
self.stream_url = stream_url
self.stream= Stream(name + "_stream",stream_url, SocketType.SUB)
self.stream.createMatrix = True
self.register = None
self.image = None
self.image_name = image_name if image_name else name + "_image"
def _get_response(self, response, is_json=True):
if response.status_code != 200:
raise Exception(response.text)
return json.loads(response.text) if is_json else response.text
def read_parameters(self):
return self._get_response(requests.get(url=self.base_url + "/parameters"))
def read_parameter(self, name):
return self._get_response(requests.get(url=self.base_url + "/parameters/"+name))[name]
def write_parameter(self, name, value):
return self._get_response(requests.put(url=self.base_url + "/parameters/"+name, json={"value": value}))
def doInitialize(self):
#super(SourceBase, self).doInitialize()
self.stream.initialize()
self.stream.start()
def init_img():
try:
self.stream.waitCacheChange(0)
if (self.stream.state == State.Busy) and (get_context().state.isActive()):
self.register = self.stream.getChild("image")
self.image = RegisterMatrixSource(self.image_name , self.register)
#self.image = RegisterArraySource(name, )
#self.image.config.imageWidth =self.register.shape[0]
#self.image.config.imageHeight = self.register.shape[1]
log("Successful VmbCamera Initialization")
self.setState(State.Ready)
except Exception as e:
log(str(e))
fork(init_img)
self.setState(State.Disabled)
def add_initialized_callback(self, callback):
def wait_init():
try:
self.waitStateNot(State.Disabled, -1)
if (self.stream.state == State.Busy) and (get_context().state.isActive()):
callback()
except Exception as e:
log(str(e))
fork(wait_init)
def getInfo(self):
try:
pars = self.read_parameters()
return pars["DeviceModelName"] + " - uid: " + pars["DeviceID"]
except:
return ""
def getSensorSize(self):
pars = self.read_parameters()
return [pars["SensorWidth"], pars["SensorHeight"]]
def setBinningX(self, value):
self.write_parameter("BinningHorizontal", value)
def getBinningX(self):
return self.read_parameter("BinningHorizontal")
def setBinningY(self, value):
self.write_parameter("BinningVertical", value)
def getBinningY(self):
return self.read_parameter("BinningVertical")
def setROI(self, x, y, w, h):
self.write_parameter("OffsetX", 0)
self.write_parameter("OffsetY", 0)
self.write_parameter("Height", h)
self.write_parameter("Width", w)
self.write_parameter("OffsetX", x)
self.write_parameter("OffsetY", y)
def getROI(self):
pars = self.read_parameters()
return [pars["OffsetX"], pars["OffsetY"], pars["Width"], pars["Height"]]
def setGain(self, value):
self.write_parameter("GainRaw", value);
def getGain(self):
return self.read_parameter("GainRaw");
def setDataType(self, data_type):
self.data_type = data_type;
self.setPixelFormat();
def getDataType(self):
if self.data_type:
return self.data_type
fmt = self.read_parameter("PixelFormat")
if fmt in ("Mono8", "Bayer8"):
return Camera.DataType.UInt8
if fmt in ("Mono16", "Bayer16"):
return Camera.DataType.UInt16
if fmt in ("Rgb24", "Brg24"):
return Camera.DataType.UInt24
if fmt in ("Rgba32", "Brga32"):
return Camera.DataType.UInt32
return None
def setPixelFormat(self):
pixelFormat = None
if not self.color_mode:
self.color_mode = self.getColorMode()
if not self.color_mode:
raise IOException("Invalid color mode")
if self.data_type is None:
self.data_type = self.getDataType();
if self.data_type is None:
raise IOException("Invalid data type")
if self.color_mode == Camera.ColorMode.Mono:
pixelFormat = "Mono"
elif self.color_mode == Camera.ColorMode. RGB1:
pixelFormat = "Rgb"
elif self.color_mode == Camera.ColorMode.RGB2:
pixelFormat = "Brg"
elif self.color_mode == Camera.ColorMode. RGB3:
pixelFormat = "Bayer"
size =self.data_type.getSize()
if size==1:
pixelFormat += "8"
elif size== 2:
pixelFormat += "16"
elif size==3:
pixelFormat += "24"
elif size==4:
pixelFormat += "a32"
self.write_parameter("PixelFormat", pixelFormat);
def setColorMode(self, mode):
self.color_mode = mode;
self.setPixelFormat();
def getColorMode(self):
if self.color_mode:
return self.color_mode
fmt = self.read_parameter("PixelFormat")
if fmt in ("Mono8", "Mono16"):
return Camera.ColorMode.Mono
if fmt in ("Rgb24", "Rgba32"):
return Camera.ColorMode.RGB1
if fmt in ("Brga24", "Brga32"):
return Camera.ColorMode.RGB2
if fmt in ("Bayer8", "Bayer16"):
return Camera.ColorMode.RGB3
return None
def getImageSize(self):
pars = self.read_parameters()
return [pars["Width"], pars["Height"]]
def setExposure(self, value):
self.write_parameter("ExposureTimeAbs", int(value * 1000))
def getExposure(self):
return float(self.read_parameter("ExposureTimeAbs"))/1000
def setAcquirePeriod(self, value):
self.write_parameter("AcquisitionFrameRateAbs", 1000.0 / value)
def getAcquirePeriod(self):
return 1000.0 / self.read_parameter("AcquisitionFrameRateAbs")
def setNumImages(self, value):
self.write_parameter("AcquisitionFrameCount", value)
def getNumImages(self):
return self.read_parameter("AcquisitionFrameCount")
def setIterations(self, value):
pass
def getIterations (self):
return 1;
def getImagesComplete(self):
return 0
def setGrabMode(self, value):
if value==Camera.GrabMode.Continuous:
mode = "Continuous"
elif value==Camera.GrabMode.Single:
mode = "SingleFrame"
elif value==Camera.GrabMode.Multiple:
mode = "MultiFrame"
else:
raise IOException("Not supported mode: ", str(value))
self.write_parameter("AcquisitionMode", mode)
def getGrabMode(self):
mode = self.read_parameter("AcquisitionMode")
if mode == "Continuous":
return Camera.GrabMode.Continuous
elif mode == "SingleFrame":
return Camera.GrabMode.Single
elif mode == "MultiFrame":
return Camera.GrabMode.Multiple
return None
def setTriggerMode(self, value):
if value == Camera.TriggerMode.Free_Run:
triggerMode = "Freerun";
elif value == Camera.TriggerMode.External:
triggerMode = "SyncIn1"
elif value == Camera.TriggerMode.Software:
triggerMode = "Software"
elif value == Camera.TriggerMode.Fixed_Rate:
triggerMode = "FixedRate"
else:
return
self.write_parameter("TriggerSource", triggerMode);
def getTriggerMode(self):
mode = self.read_parameter("TriggerSource")
if mode == "Freerun":
return Camera.TriggerMode.Free_Run
if mode == "SyncIn1":
return Camera.TriggerMode.External
if mode == "Software":
return Camera.TriggerMode.Software
if mode == "FixedRate":
return Camera.TriggerMode.Fixed_Rate
return None
def trigger(self):
pass
def doStart(self):
self._get_response(requests.put(url=self.base_url + "/streaming", json={"value": True}))
def doStop(self):
self._get_response(requests.put(url=self.base_url + "/streaming", json={"value": False}))
def getStarted(self):
return self._get_response(requests.get(url=self.base_url + "/streaming"))["value"]
def start_camera_server():
try:
python_name = "{home}/../anaconda3/envs/py38/python.exe"
script_name = "{home}/script/cpython/VmbPyServer.py"
python_name = expand_path(python_name)
script_name = expand_path(script_name)
print("Starting camera Server")
r=exec_cpython(script_name, python_name = python_name)
log("Start camera server = ret: " + str(e))
except Exception as e:
log(str(e))
fork(start_camera_server)
c = VmbCamera("cam", image_name="image")
add_device(c, True)
add_device(c.stream, True)
def initialized_callback():
add_device(c.image, True)
add_device(c.image.contrast, force = True)
log("Added VmbCamera image device")
run( "devices/CoverDetection")
c.add_initialized_callback(initialized_callback)