mirror of
https://github.com/bec-project/bec_widgets.git
synced 2025-07-13 19:21:50 +02:00
fix: update redis mock for changes in bec
This commit is contained in:
@ -4,7 +4,6 @@ import time
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import redis
|
||||
from bec_lib.messages import ScanMessage
|
||||
from bec_lib.serialization import MsgpackSerialization
|
||||
|
||||
@ -21,13 +20,13 @@ def bec_dispatcher_w_connector(bec_dispatcher, topics_msg_list, send_msg_event):
|
||||
time.sleep(0.2)
|
||||
yield StopIteration
|
||||
|
||||
with mock.patch("redis.Redis"):
|
||||
pubsub = redis.Redis().pubsub()
|
||||
messages = pubsub_msg_generator()
|
||||
pubsub.get_message.side_effect = lambda timeout: next(messages)
|
||||
connector = QtRedisConnector("localhost:1")
|
||||
bec_dispatcher.client.connector = connector
|
||||
yield bec_dispatcher
|
||||
redis_class_mock = mock.MagicMock()
|
||||
pubsub = redis_class_mock().pubsub()
|
||||
messages = pubsub_msg_generator()
|
||||
pubsub.get_message.side_effect = lambda timeout: next(messages)
|
||||
connector = QtRedisConnector("localhost:1", redis_class_mock)
|
||||
bec_dispatcher.client.connector = connector
|
||||
yield bec_dispatcher
|
||||
|
||||
|
||||
dummy_msg = MsgpackSerialization.dumps(ScanMessage(point_id=0, scan_id="0", data={}))
|
||||
|
Reference in New Issue
Block a user