Files
x06da/script/cpython/VmbPyServer.py
alexgobbo 75400ca453
2024-09-11 16:17:17 +02:00

276 lines
11 KiB
Python

import sys
import time
import argparse
import socket
from bsread.sender import sender, PUSH, PUB
from typing import Optional, Tuple
from logging import getLogger
from vmbpy import *
from flask import Flask, request, jsonify
import threading
_logger = getLogger(__name__)
def can_bind_to_port(port, host='0.0.0.0'):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
# Attempt to bind to the host and port
sock.bind((host, port))
return True # If binding is successful, the port is available
except OSError:
return False # If an OSError occurs, the port is not available
class VmbPyStreamServer():
def __init__(self, id=None, port=None, alloc=None, start = None):
self.id = id
self.port = port if port else 9000
self.alloc = AllocationMode.AllocAndAnnounceFrame if alloc else AllocationMode.AnnounceFrame
self.camera = None
self.handler_exception = None
self.exit_cmd = False
self.start_streaming = True if (start or (start is None)) else None
if self.port > 0:
if not can_bind_to_port(self.port):
raise Exception(f"Stream port {self.port} is not free")
def open_camera(self, camera_id: Optional[str]) -> Camera:
self.camera = None
self.handler_exception = None
with VmbSystem.get_instance() as vmb:
if camera_id:
try:
return vmb.get_camera_by_id(camera_id)
except VmbCameraError:
abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))
else:
cams = vmb.get_all_cameras()
if not cams:
abort('No Cameras accessible. Abort.')
return cams[0]
def init_camera(self):
#cam.set_pixel_format(PixelFormat.BayerGR8)
self.camera.set_pixel_format(PixelFormat.Mono8)
# Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
try:
stream = self.camera.get_streams()[0]
stream.GVSPAdjustPacketSize.run()
while not stream.GVSPAdjustPacketSize.is_done():
pass
except (AttributeError, VmbFeatureError):
pass
def get_camera_features(self, readonly=True, readwrite=True):
if self.camera is None:
raise Exception("No camera instantiated")
features = self.camera.get_all_features()
pars = {}
for f in features:
try:
name, v, (r, w) = f.get_name(), f.get(), f.get_access_mode()
ro, rw = not w, w
if (readonly and ro) or (readwrite and rw):
if type(v) is EnumEntry:
pars[name] = str(v)
else:
pars[name] = v
except:
pass
return pars
def get_camera_feature(self,name):
pars = self.get_camera_features()
if name in pars:
return pars[name]
else:
raise Exception( f"Invalid parameter: {name}")
def set_camera_feature(self, name, value):
feature = self.camera.get_feature_by_name(name)
feature.set(value)
def start_stream(self, publisher):
def frame_handler(cam: Camera, stream: Stream, frame: Frame):
try:
fmt = frame.get_pixel_format()
if fmt == PixelFormat.Mono8:
arr = frame.as_numpy_ndarray()
else:
arr = frame.convert_pixel_format(PixelFormat.Mono8).as_numpy_ndarray()
arr = arr[:, :, 0]
data = {"image": arr}
publisher.send(data=data, pulse_id=None, timestamp=time.time(), check_data=True)
cam.queue_frame(frame)
except Exception as e:
self.handler_exception = e
if not self.is_streaming():
self.camera.start_streaming(handler=frame_handler,
buffer_count=10,
allocation_mode=self.alloc)
def stop_stream(self):
if self.is_streaming():
self.camera.stop_streaming()
def is_streaming(self):
return self.camera.is_streaming()
def set_streaming(self, value):
if value!= self.is_streaming():
self.start_streaming = value
def run(self, retry=False):
while True:
try:
self.camera = None
self.handler_exception = None
with sender(queue_size=10, port=self.port, mode=PUB, start_pulse_id=1,
data_header_compression=None) as publisher:
with VmbSystem.get_instance():
with self.open_camera(self.id) as self.camera:
self.init_camera()
_logger.info(f"Connected to {self.camera}")
try:
while True:
if self.start_streaming is not None:
if self.start_streaming:
self.start_stream(publisher)
else:
self.stop_stream()
self.start_streaming = None
if self.handler_exception is not None:
raise self.handler_exception
time.sleep(0.1)
if self.exit_cmd:
break
finally:
self.stop_stream()
except Exception as e:
_logger.exception(e)
if not retry or self.exit_cmd:
break
time.sleep(3.0)
_logger.info("Restarting")
def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def exit(self):
_logger.info(f"Received exit command")
self.exit_cmd = True
class VmbPyConfigServer():
def __init__(self, stream_server, port=None):
self.stream_server = stream_server
self.port = port if port else 9010
if self.port>0:
if not can_bind_to_port(self.port):
raise Exception(f"Config port {self.port} is not free")
def run(self):
if self.port > 0:
app = Flask(__name__)
@app.route('/api/parameters', methods=['GET'])
def get_parameters():
def str_to_bool(val):
return not str(val).lower() in ('false', '1', 'no')
try:
ro = str_to_bool(request.args.get("ro", True))
rw = str_to_bool(request.args.get("rw", True))
return jsonify(self.stream_server.get_camera_features(ro, rw))
except Exception as e:
return jsonify({"error": str(e)}), 404
# Route to get a specific parameter
@app.route('/api/parameters/<name>', methods=['GET'])
def get_parameter(name):
try:
value = self.stream_server.get_camera_feature(name)
return jsonify({name: value})
except Exception as e:
return jsonify({"error": str(e)}), 404
# Route to update a specific parameter
@app.route('/api/parameters/<name>', methods=['PUT'])
def update_parameter(name):
try:
pars = self.stream_server.get_camera_features(False, True)
if name in pars:
new_value = request.json.get('value')
self.stream_server.set_camera_feature(name, new_value)
updated_value = self.stream_server.get_camera_feature(name)
return jsonify({name: updated_value})
else:
return jsonify({"error": f"Not a writable parameter: {name}"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 404
@app.route('/api/streaming', methods=['GET'])
def get_streaming():
try:
return jsonify({"value": self.stream_server.is_streaming()})
except Exception as e:
return jsonify({"error": str(e)}), 404
# Route to update a specific parameter
@app.route('/api/streaming', methods=['PUT'])
def update_streaming():
try:
self.stream_server.set_streaming(request.json.get('value'))
return jsonify({"value": self.stream_server.is_streaming()})
except Exception as e:
return jsonify({"error": str(e)}), 404
@app.route('/api/exit', methods=['GET'])
def exit():
self.stream_server.exit()
return jsonify({"exit": "ok"})
app.run(host='0.0.0.0', port=self.port, debug=False, use_reloader=False)
def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
def print_preamble():
print('///////////////////////////////////////')
print('/// VmbPy Camera Server ///')
print('///////////////////////////////////////\n')
def abort(reason: str, return_code: int = 1, usage: bool = False):
print(reason + '\n')
sys.exit(return_code)
def parse_args():
usage = None
desc=None
epilog=None
parser = argparse.ArgumentParser(usage=usage, description=desc, prefix_chars='--', epilog=epilog)
parser.add_argument("-i", "--id", help="Camera ID (default first camera)", required=False)
parser.add_argument("-p", "--port", help="Output stream port (default 9000)", required=False)
parser.add_argument("-c", "--config", help="Config port (default 9010, -1 for disabling config interface)", required=False)
parser.add_argument("-a", "--alloc", action='store_true', help="Alloc frames", required=False)
parser.add_argument("-s", "--start", action='store_true', help="Start streaming (default true)", default= True, required=False)
return parser.parse_args()
def main():
print_preamble()
args = parse_args()
stream_server = VmbPyStreamServer(args.id, args.port, args.alloc, args.start)
config_server = VmbPyConfigServer(stream_server, args.config)
config_server.start()
stream_server.run(retry=True)
sys.exit(0)
if __name__ == '__main__':
#args = ["-h"]
#sys.argv = sys.argv + args
main()