fix: remove pytest dependency for eiger, falcon and pilatus; closes #18 and #9

This commit is contained in:
appel_c 2023-11-09 13:48:31 +01:00
parent a4efb59589
commit c6e6737547

View File

@ -2,7 +2,6 @@ import time
from bec_lib.core import bec_logger
from bec_lib.core.devicemanager import DeviceContainer
from bec_lib.core.tests.utils import ProducerMock
from ophyd import Signal, Kind
@ -53,6 +52,93 @@ class DeviceMock:
return self
class ProducerMock:
def __init__(self, store_data=True) -> None:
self.message_sent = []
self._get_buffer = {}
self.store_data = store_data
def set(self, topic, msg, pipe=None, expire: int = None):
if pipe:
pipe._pipe_buffer.append(("set", (topic, msg), {"expire": expire}))
return
self.message_sent.append({"queue": topic, "msg": msg, "expire": expire})
def send(self, topic, msg, pipe=None):
if pipe:
pipe._pipe_buffer.append(("send", (topic, msg), {}))
return
self.message_sent.append({"queue": topic, "msg": msg})
def set_and_publish(self, topic, msg, pipe=None, expire: int = None):
if pipe:
pipe._pipe_buffer.append(("set_and_publish", (topic, msg), {"expire": expire}))
return
self.message_sent.append({"queue": topic, "msg": msg, "expire": expire})
def lpush(self, topic, msg, pipe=None):
if pipe:
pipe._pipe_buffer.append(("lpush", (topic, msg), {}))
return
def rpush(self, topic, msg, pipe=None):
if pipe:
pipe._pipe_buffer.append(("rpush", (topic, msg), {}))
return
pass
def lrange(self, topic, start, stop, pipe=None):
if pipe:
pipe._pipe_buffer.append(("lrange", (topic, start, stop), {}))
return
return []
def get(self, topic, pipe=None):
if pipe:
pipe._pipe_buffer.append(("get", (topic,), {}))
return
val = self._get_buffer.get(topic)
if isinstance(val, list):
return val.pop(0)
self._get_buffer.pop(topic, None)
return val
def keys(self, pattern: str) -> list:
return []
def pipeline(self):
return PipelineMock(self)
def delete(self, topic, pipe=None):
if pipe:
pipe._pipe_buffer.append(("delete", (topic,), {}))
return
def lset(self, topic: str, index: int, msgs: str, pipe=None) -> None:
if pipe:
pipe._pipe_buffer.append(("lrange", (topic, index, msgs), {}))
return
class PipelineMock:
_pipe_buffer = []
_producer = None
def __init__(self, producer) -> None:
self._producer = producer
def execute(self):
if not self._producer.store_data:
self._pipe_buffer = []
return []
res = [
getattr(self._producer, method)(*args, **kwargs)
for method, args, kwargs in self._pipe_buffer
]
self._pipe_buffer = []
return res
class DMMock:
"""Mock for DeviceManager