mirror of
https://github.com/bec-project/bec_widgets.git
synced 2026-03-08 01:37:52 +01:00
fix: bec_dispatcher.py can take multiple workers as a list
This commit is contained in:
@@ -128,31 +128,36 @@ class _BECDispatcher(QObject):
|
||||
self._connections[topic].consumer.shutdown()
|
||||
del self._connections[topic]
|
||||
|
||||
def connect_dap_slot(self, slot, dap_name):
|
||||
if dap_name not in self._daps:
|
||||
# create a new consumer and connect slot
|
||||
def connect_dap_slot(self, slot, dap_names):
|
||||
if not isinstance(dap_names, list):
|
||||
dap_names = [dap_names]
|
||||
|
||||
def _dap_cb(msg):
|
||||
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
|
||||
if not isinstance(msg, list):
|
||||
msg = [msg]
|
||||
for i in msg:
|
||||
self.new_dap_data.emit(i.content["data"], i.metadata)
|
||||
for dap_name in dap_names:
|
||||
if dap_name not in self._daps: # create a new consumer and connect slot
|
||||
self.add_new_dap_connection(slot, dap_name)
|
||||
|
||||
dap_ep = MessageEndpoints.processed_data(dap_name)
|
||||
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
|
||||
consumer.start()
|
||||
else:
|
||||
# connect slot if it's not yet connected
|
||||
if slot not in self._daps[dap_name].slots:
|
||||
self.new_dap_data.connect(slot)
|
||||
self._daps[dap_name].slots.add(slot)
|
||||
|
||||
self.new_dap_data.connect(slot)
|
||||
def add_new_dap_connection(self, slot, dap_name):
|
||||
def _dap_cb(msg):
|
||||
msg = BECMessage.ProcessedDataMessage.loads(msg.value)
|
||||
if not isinstance(msg, list):
|
||||
msg = [msg]
|
||||
for i in msg:
|
||||
self.new_dap_data.emit(i.content["data"], i.metadata)
|
||||
|
||||
self._daps[dap_name] = _BECDap(consumer)
|
||||
self._daps[dap_name].slots.add(slot)
|
||||
dap_ep = MessageEndpoints.processed_data(dap_name)
|
||||
consumer = self.client.connector.consumer(topics=dap_ep, cb=_dap_cb)
|
||||
consumer.start()
|
||||
|
||||
else:
|
||||
# connect slot if it's not yet connected
|
||||
if slot not in self._daps[dap_name].slots:
|
||||
self.new_dap_data.connect(slot)
|
||||
self._daps[dap_name].slots.add(slot)
|
||||
self.new_dap_data.connect(slot)
|
||||
|
||||
self._daps[dap_name] = _BECDap(consumer)
|
||||
self._daps[dap_name].slots.add(slot)
|
||||
|
||||
def disconnect_dap_slot(self, slot, dap_name):
|
||||
if dap_name not in self._daps:
|
||||
|
||||
Reference in New Issue
Block a user