diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index 1ac07a8..382716d 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -2,7 +2,6 @@ import time from bec_lib.core import bec_logger from bec_lib.core.devicemanager import DeviceContainer -from bec_lib.core.tests.utils import ProducerMock from ophyd import Signal, Kind @@ -53,6 +52,93 @@ class DeviceMock: return self +class ProducerMock: + def __init__(self, store_data=True) -> None: + self.message_sent = [] + self._get_buffer = {} + self.store_data = store_data + + def set(self, topic, msg, pipe=None, expire: int = None): + if pipe: + pipe._pipe_buffer.append(("set", (topic, msg), {"expire": expire})) + return + self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) + + def send(self, topic, msg, pipe=None): + if pipe: + pipe._pipe_buffer.append(("send", (topic, msg), {})) + return + self.message_sent.append({"queue": topic, "msg": msg}) + + def set_and_publish(self, topic, msg, pipe=None, expire: int = None): + if pipe: + pipe._pipe_buffer.append(("set_and_publish", (topic, msg), {"expire": expire})) + return + self.message_sent.append({"queue": topic, "msg": msg, "expire": expire}) + + def lpush(self, topic, msg, pipe=None): + if pipe: + pipe._pipe_buffer.append(("lpush", (topic, msg), {})) + return + + def rpush(self, topic, msg, pipe=None): + if pipe: + pipe._pipe_buffer.append(("rpush", (topic, msg), {})) + return + pass + + def lrange(self, topic, start, stop, pipe=None): + if pipe: + pipe._pipe_buffer.append(("lrange", (topic, start, stop), {})) + return + return [] + + def get(self, topic, pipe=None): + if pipe: + pipe._pipe_buffer.append(("get", (topic,), {})) + return + val = self._get_buffer.get(topic) + if isinstance(val, list): + return val.pop(0) + self._get_buffer.pop(topic, None) + return val + + def keys(self, pattern: str) -> list: + return [] + + def pipeline(self): + return PipelineMock(self) + + def delete(self, topic, pipe=None): + if pipe: + pipe._pipe_buffer.append(("delete", (topic,), {})) + return + + def lset(self, topic: str, index: int, msgs: str, pipe=None) -> None: + if pipe: + pipe._pipe_buffer.append(("lrange", (topic, index, msgs), {})) + return + + +class PipelineMock: + _pipe_buffer = [] + _producer = None + + def __init__(self, producer) -> None: + self._producer = producer + + def execute(self): + if not self._producer.store_data: + self._pipe_buffer = [] + return [] + res = [ + getattr(self._producer, method)(*args, **kwargs) + for method, args, kwargs in self._pipe_buffer + ] + self._pipe_buffer = [] + return res + + class DMMock: """Mock for DeviceManager