import requests import json class VmbCamera (CameraBase): def __init__(self, name, config_url="localhost:9010", stream_url="localhost:9000", image_name=None): CameraBase.__init__(self,name) self.config_url = config_url self.base_url = "http://" + config_url + "/api" self.data_type = None self.color_mode= None self.stream_url = stream_url self.stream= Stream(name + "_stream",stream_url, SocketType.SUB) self.stream.createMatrix = True self.register = None self.image = None self.image_name = image_name if image_name else name + "_image" def _get_response(self, response, is_json=True): if response.status_code != 200: raise Exception(response.text) return json.loads(response.text) if is_json else response.text def read_parameters(self): return self._get_response(requests.get(url=self.base_url + "/parameters")) def read_parameter(self, name): return self._get_response(requests.get(url=self.base_url + "/parameters/"+name))[name] def write_parameter(self, name, value): return self._get_response(requests.put(url=self.base_url + "/parameters/"+name, json={"value": value})) def doInitialize(self): #super(SourceBase, self).doInitialize() self.stream.initialize() self.stream.start() def init_img(): try: self.stream.waitCacheChange(0) if (self.stream.state == State.Busy) and (Context.getState().isActive()): self.register = self.stream.getChild("image") self.image = RegisterMatrixSource(self.image_name , self.register) #self.image = RegisterArraySource(name, ) #self.image.config.imageWidth =self.register.shape[0] #self.image.config.imageHeight = self.register.shape[1] log("Successful VmbCamera Initialization") self.setState(State.Ready) except Exception as e: log(str(e)) fork(init_img) self.setState(State.Disabled) def doClose(self): try: self.stream.close() except Exception as e: print e log(str(e)) def add_initialized_callback(self, callback): def wait_init(): try: self.waitStateNot(State.Disabled, -1) if (self.stream.state == State.Busy) and (Context.getState().isActive()): callback() except Exception as e: log(str(e)) fork(wait_init) def getInfo(self): try: pars = self.read_parameters() return pars["DeviceModelName"] + " - uid: " + pars["DeviceID"] except: return "" def getSensorSize(self): pars = self.read_parameters() return [pars["SensorWidth"], pars["SensorHeight"]] def setBinningX(self, value): self.write_parameter("BinningHorizontal", value) def getBinningX(self): try: return self.read_parameter("BinningHorizontal") except: return 1 def setBinningY(self, value): self.write_parameter("BinningVertical", value) def getBinningY(self): try: return self.read_parameter("BinningVertical") except: return 1 def setROI(self, x, y, w, h): self.write_parameter("OffsetX", 0) self.write_parameter("OffsetY", 0) self.write_parameter("Height", h) self.write_parameter("Width", w) self.write_parameter("OffsetX", x) self.write_parameter("OffsetY", y) def getROI(self): pars = self.read_parameters() return [pars["OffsetX"], pars["OffsetY"], pars["Width"], pars["Height"]] def setGain(self, value): self.write_parameter("GainRaw", value); def getGain(self): return self.read_parameter("GainRaw"); def setDataType(self, data_type): self.data_type = data_type; self.setPixelFormat(); def getDataType(self): if self.data_type: return self.data_type fmt = self.read_parameter("PixelFormat") if fmt in ("Mono8", "Bayer8"): return Camera.DataType.UInt8 if fmt in ("Mono16", "Bayer16"): return Camera.DataType.UInt16 if fmt in ("Rgb24", "Brg24"): return Camera.DataType.UInt24 if fmt in ("Rgba32", "Brga32"): return Camera.DataType.UInt32 return None def setPixelFormat(self): pixelFormat = None if not self.color_mode: self.color_mode = self.getColorMode() if not self.color_mode: raise IOException("Invalid color mode") if self.data_type is None: self.data_type = self.getDataType(); if self.data_type is None: raise IOException("Invalid data type") if self.color_mode == Camera.ColorMode.Mono: pixelFormat = "Mono" elif self.color_mode == Camera.ColorMode. RGB1: pixelFormat = "Rgb" elif self.color_mode == Camera.ColorMode.RGB2: pixelFormat = "Brg" elif self.color_mode == Camera.ColorMode. RGB3: pixelFormat = "Bayer" size =self.data_type.getSize() if size==1: pixelFormat += "8" elif size== 2: pixelFormat += "16" elif size==3: pixelFormat += "24" elif size==4: pixelFormat += "a32" self.write_parameter("PixelFormat", pixelFormat); def setColorMode(self, mode): self.color_mode = mode; self.setPixelFormat(); def getColorMode(self): if self.color_mode: return self.color_mode fmt = self.read_parameter("PixelFormat") if fmt in ("Mono8", "Mono16"): return Camera.ColorMode.Mono if fmt in ("Rgb24", "Rgba32"): return Camera.ColorMode.RGB1 if fmt in ("Brga24", "Brga32"): return Camera.ColorMode.RGB2 if fmt in ("Bayer8", "Bayer16"): return Camera.ColorMode.RGB3 return None def getImageSize(self): pars = self.read_parameters() return [pars["Width"], pars["Height"]] def setExposure(self, value): self.write_parameter("ExposureTimeAbs", int(value * 1000)) def getExposure(self): return float(self.read_parameter("ExposureTimeAbs"))/1000 def setAcquirePeriod(self, value): self.write_parameter("AcquisitionFrameRateAbs", 1000.0 / value) def getAcquirePeriod(self): return 1000.0 / self.read_parameter("AcquisitionFrameRateAbs") def setNumImages(self, value): self.write_parameter("AcquisitionFrameCount", value) def getNumImages(self): return self.read_parameter("AcquisitionFrameCount") def setIterations(self, value): pass def getIterations (self): return 1; def getImagesComplete(self): return 0 def setGrabMode(self, value): if value==Camera.GrabMode.Continuous: mode = "Continuous" elif value==Camera.GrabMode.Single: mode = "SingleFrame" elif value==Camera.GrabMode.Multiple: mode = "MultiFrame" else: raise IOException("Not supported mode: ", str(value)) self.write_parameter("AcquisitionMode", mode) def getGrabMode(self): mode = self.read_parameter("AcquisitionMode") if mode == "Continuous": return Camera.GrabMode.Continuous elif mode == "SingleFrame": return Camera.GrabMode.Single elif mode == "MultiFrame": return Camera.GrabMode.Multiple return None def setTriggerMode(self, value): if value == Camera.TriggerMode.Free_Run: triggerMode = "Freerun"; elif value == Camera.TriggerMode.External: triggerMode = "SyncIn1" elif value == Camera.TriggerMode.Software: triggerMode = "Software" elif value == Camera.TriggerMode.Fixed_Rate: triggerMode = "FixedRate" else: return self.write_parameter("TriggerSource", triggerMode); def getTriggerMode(self): mode = self.read_parameter("TriggerSource") if mode == "Freerun": return Camera.TriggerMode.Free_Run if mode == "SyncIn1": return Camera.TriggerMode.External if mode == "Software": return Camera.TriggerMode.Software if mode == "FixedRate": return Camera.TriggerMode.Fixed_Rate return None def trigger(self): pass def doStart(self): self._get_response(requests.put(url=self.base_url + "/streaming", json={"value": True})) def doStop(self): self._get_response(requests.put(url=self.base_url + "/streaming", json={"value": False})) def getStarted(self): return self._get_response(requests.get(url=self.base_url + "/streaming"))["value"] def start_camera_server(): try: python_name = "{home}/../anaconda3/envs/py38/python.exe" script_name = "{home}/script/cpython/VmbPyServer.py" python_name = expand_path(python_name) script_name = expand_path(script_name) print("Starting camera Server") r=exec_cpython(script_name, python_name = python_name) log("Start camera server = ret: " + str(e)) except Exception as e: log(str(e)) fork(start_camera_server) c = VmbCamera("cam", image_name="img") add_device(c, True) add_device(c.stream, True) def initialized_callback(): add_device(c.image, True) add_device(c.image.contrast, force = True) log("Added VmbCamera image device") run( "devices/CoverDetection") c.add_initialized_callback(initialized_callback) cam.setGrabMode(Camera.GrabMode.Continuous) cam.setTriggerMode(Camera.TriggerMode.Fixed_Rate) cam.setAcquirePeriod(330.00) cam.setGain(0.0)