Files
pide/script/local.py
2022-08-11 16:24:40 +02:00

101 lines
3.1 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import numpy
from threading import Thread
import threading
import traceback
import queue
#import multiprocessing
#Workaround for zmq being a java package and having priority for import over python zmq package
from importlib._bootstrap import _load
from importlib import util
PYHOME = os.environ["PYTHONHOME"]
PYVER = str(sys.version_info[0]) + "." + str(sys.version_info[1])
_load(util.spec_from_file_location("zmq", PYHOME + "/lib/python" + PYVER + "/site-packages/zmq/__init__.py"))
from bsread import source, Source, PUB, SUB, PUSH, PULL, DEFAULT_DISPATCHER_URL
from cam_server import CamClient, PipelineClient, ProxyClient, config
from cam_server.utils import get_host_port_from_stream_address
from cam_server.pipeline.configuration import PipelineConfig
#from cam_server.pipeline.types.processing import run as processing_pipeline
from processing import run as processing_pipeline
from cam_server.pipeline.types.store import run as store_pipeline
from cam_server.start_camera_server import start_camera_server
camera_client = CamClient("http://" + App.getArgumentValue("cam_srv_url"))
class Namespace(object):
def __init__(self, **kwds):
self.__dict__.update(kwds)
def __repr__(self):
items = self.__dict__.items()
temp = []
for name, value in items:
if not name.startswith('_'):
temp.append('%s=%r' % (name, value))
temp.sort()
return 'Namespace(%s)' % str.join(', ', temp)
class Viewer():
def __init__(self):
app=App.getInstance()
c=get_context()
self.plugin=c.getPlugin("ScreenPanel10")
def show_stream(self, url):
self.plugin.initStream(url)
def show_pipeline(self, name):
self.plugin.initPipeline(name)
def show_camera(self, name):
self.plugin.initCamera(name)
def stop(self):
self.plugin.initCamera("")
viewer = Viewer()
class MockBackgroundManager:
def __init__(self):
self.backgrounds = {}
def get_background(self, background_name):
if not background_name:
return None
if background_name not in self.backgrounds:
raise ValueError("Requested background '%s' does not exist." % background_name)
return self.backgrounds[background_name]
def save_background(self, background_name, image, append_timestamp=True):
if append_timestamp:
background_name += datetime.now().strftime("_%Y%m%d_%H%M%S_%f")
self.backgrounds[background_name] = image
return background_name
def get_latest_background_id(self, background_prefix):
raise NotImplementedError("This cannot work in the mock.")
def get_background_ids(self, background_prefix):
raise NotImplementedError("This cannot work in the mock.")
def dont_exit(status):
print ("Exit status: ", status)
sys.exit = dont_exit