mirror of
https://github.com/ivan-usov-org/bec.git
synced 2025-04-22 02:20:02 +02:00
refactor(bec_lib): changed connector to use abstract methods
This commit is contained in:
parent
7ca93d7412
commit
d35b992262
@ -50,50 +50,53 @@ class StoreInterface(abc.ABC):
|
||||
def __init__(self, store):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def pipeline(self):
|
||||
"""Create a pipeline for batch operations"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute_pipeline(self, pipeline):
|
||||
"""Execute a pipeline"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def lpush(
|
||||
self, topic: str, msg: str, pipe=None, max_size: int = None, expire: int = None
|
||||
) -> None:
|
||||
"""Push a message to the left of the list"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def lset(self, topic: str, index: int, msg: str, pipe=None) -> None:
|
||||
"""Set a value in the list at the given index"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def rpush(self, topic: str, msg: str, pipe=None) -> int:
|
||||
"""Push a message to the right of the list"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def lrange(self, topic: str, start: int, end: int, pipe=None):
|
||||
"""Get a range of values from the list"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def set(self, topic: str, msg, pipe=None, expire: int = None) -> None:
|
||||
"""Set a value"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def keys(self, pattern: str) -> list:
|
||||
"""Get keys that match the pattern"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, topic, pipe=None):
|
||||
"""Delete a key"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get(self, topic: str, pipe=None):
|
||||
"""Get a value"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def xadd(self, topic: str, msg_dict: dict, max_size=None, pipe=None, expire: int = None):
|
||||
"""Add a message to the stream"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def xread(
|
||||
self,
|
||||
topic: str,
|
||||
@ -104,35 +107,26 @@ class StoreInterface(abc.ABC):
|
||||
from_start=False,
|
||||
) -> list:
|
||||
"""Read from the stream"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def xrange(self, topic: str, min: str, max: str, count: int = None, pipe=None):
|
||||
"""Read from the stream"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PubSubInterface(abc.ABC):
|
||||
"""PubSubBase defines the interface for a pub/sub connector"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def raw_send(self, topic: str, msg: bytes) -> None:
|
||||
"""Send a raw message without using the BECMessage class"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def send(self, topic: str, msg: BECMessage) -> None:
|
||||
"""Send a message"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def register(self, topics=None, patterns=None, cb=None, start_thread=True, **kwargs):
|
||||
"""Register a callback for a topic or pattern"""
|
||||
raise NotImplementedError
|
||||
|
||||
def poll_messages(self, timeout=None):
|
||||
"""Poll for new messages, receive them and execute callbacks"""
|
||||
raise NotImplementedError
|
||||
|
||||
def run_messages_loop(self):
|
||||
"""Run the message loop"""
|
||||
raise NotImplementedError
|
||||
|
||||
def shutdown(self):
|
||||
"""Shutdown the connector"""
|
||||
|
@ -7,9 +7,10 @@ import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest import mock
|
||||
|
||||
import bec_lib
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
import bec_lib
|
||||
from bec_lib import BECClient, messages
|
||||
from bec_lib.connector import ConnectorBase
|
||||
from bec_lib.devicemanager import DeviceManagerBase
|
||||
@ -613,6 +614,29 @@ class ConnectorMock(ConnectorBase): # pragma: no cover
|
||||
def producer(self):
|
||||
return self
|
||||
|
||||
def execute_pipeline(self, pipeline):
|
||||
pipeline.execute()
|
||||
|
||||
def xadd(self, topic, msg_dict, max_size=None, pipe=None, expire: int = None):
|
||||
if pipe:
|
||||
pipe._pipe_buffer.append(("xadd", (topic, msg_dict), {"expire": expire}))
|
||||
return
|
||||
pass
|
||||
|
||||
def xread(self, topic, id=None, count=None, block=None, pipe=None, from_start=False):
|
||||
if pipe:
|
||||
pipe._pipe_buffer.append(
|
||||
("xread", (topic, id, count, block), {"from_start": from_start})
|
||||
)
|
||||
return
|
||||
return []
|
||||
|
||||
def xrange(self, topic, min="-", max="+", pipe=None):
|
||||
if pipe:
|
||||
pipe._pipe_buffer.append(("xrange", (topic, min, max), {}))
|
||||
return
|
||||
return []
|
||||
|
||||
|
||||
def create_session_from_config(config: dict) -> dict:
|
||||
device_configs = []
|
||||
|
Loading…
x
Reference in New Issue
Block a user