44 lines
1.2 KiB
Python
44 lines
1.2 KiB
Python
import numpy
|
|
|
|
from cam_server.camera.source.camera import *
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
|
|
class source_test(Camera):
|
|
def __init__(self, camera_config):
|
|
Camera.__init__(self, camera_config)
|
|
self.camera_config = camera_config
|
|
# Width and height of the raw image
|
|
self.width_raw = 80
|
|
self.height_raw = 40
|
|
self.check_data=True
|
|
|
|
def get_raw_geometry(self):
|
|
return self.width_raw, self.height_raw
|
|
|
|
def verify_camera_online(self):
|
|
return
|
|
|
|
def connect(self):
|
|
pass
|
|
|
|
def disconnect(self):
|
|
pass
|
|
|
|
def read(self):
|
|
width, height = self.get_raw_geometry()
|
|
return numpy.random.randint(1, 101, width * height, "uint16").reshape((height, width))
|
|
|
|
def register_channels(self, sender):
|
|
Camera.register_channels(self, sender)
|
|
sender.add_channel("type", metadata={"compression": config.CAMERA_BSREAD_SCALAR_COMPRESSION, "type": "string"})
|
|
|
|
def get_send_channels(self, default_channels):
|
|
default_channels["type"] = "Test"
|
|
return default_channels
|
|
|
|
def process(self, stop_event, statistics, parameter_queue, logs_queue, port):
|
|
return Camera.process(self, stop_event, statistics, parameter_queue, logs_queue, port)
|
|
|