Files
pide/script/test/test_stream_out.py
2022-08-25 10:26:51 +02:00

41 lines
1.0 KiB
Python

#stop_event = multiprocessing.Event()
#parameter_queue = multiprocessing.Queue()
#manager = multiprocessing.Manager()
#statistics = manager.Namespace()
stop_event = threading.Event()
parameter_queue = queue.Queue()
statistics=Namespace()
OUTPUT_PORT = 12005
viewer.show_stream("tcp://localhost:" + str(OUTPUT_PORT))
pipeline_config = PipelineConfig("test_pipeline", parameters={"camera_name": "simulation", "function":"transparent"})
def send(port):
print ("Startint pipeline on port: ", port)
try:
processing_pipeline(stop_event, statistics, parameter_queue, camera_client, pipeline_config, port, MockBackgroundManager())
except:
traceback.print_exc()
thread = Thread(target=send, args=(OUTPUT_PORT,))
thread.start()
with source(host="127.0.0.1", port=OUTPUT_PORT, mode=SUB, receive_timeout = 3000) as stream:
data = stream.receive()
if not data:
raise Exception("Received None message.")
print (data.data.data.keys())
time.sleep(2.0)
print("Stopping")
stop_event.set()
thread.join(5.0)
print ("Done")