test(bec_lib): added test for unregistering callbacks

This commit is contained in:
wakonig_k 2024-04-26 09:46:38 +02:00
parent 6b89240f46
commit 6e14de35dc

View File

@ -157,6 +157,92 @@ def test_redis_connector_register_identical(connected_connector):
assert received_event1.call_count == 2
def test_redis_connector_unregister_cb_not_topic(connected_connector):
connector = connected_connector
topic1 = EndpointInfo("topic1", TestMessage(), MessageOp.SEND)
topic2 = EndpointInfo("topic2", TestMessage(), MessageOp.SEND)
received_event1 = mock.Mock(spec=[])
received_event2 = mock.Mock(spec=[])
connector.register(topics=topic1, cb=received_event1, start_thread=False)
connector.register(topics=topic1, cb=received_event2, start_thread=False)
connector.register(topics=topic2, cb=received_event1, start_thread=False)
# normal behavior with two callbacks for the same topic
connector.send(topic1, TestMessage())
connector.poll_messages(timeout=1)
assert received_event1.call_count == 1
assert received_event2.call_count == 1
# unregistering one callback for a topic should not remove the topic from the list
connector.unregister(topic1, cb=received_event1)
connector.send(topic1, TestMessage())
connector.poll_messages(timeout=1)
assert received_event1.call_count == 1
assert received_event2.call_count == 2
# unregistering the last callback for a topic should remove the topic from the list
connector.unregister(topic1, cb=received_event2)
connector.send(topic1, TestMessage())
try:
connector.poll_messages(timeout=1)
except TimeoutError:
pass
assert received_event1.call_count == 1
assert received_event2.call_count == 2
def test_redis_connector_unregister_topic_keeps_others_alive(connected_connector):
def poll_until_timeout():
while True:
try:
connector.poll_messages(timeout=0.5)
except TimeoutError:
break
def send_msgs_and_poll():
connector.send(topic1, TestMessage())
connector.send(topic2, TestMessage())
poll_until_timeout()
connector = connected_connector
topic1 = EndpointInfo("topic1", TestMessage(), MessageOp.SEND)
topic2 = EndpointInfo("topic2", TestMessage(), MessageOp.SEND)
received_event1 = mock.Mock(spec=[])
received_event2 = mock.Mock(spec=[])
connector.register(topics=topic1, cb=received_event1, start_thread=False)
connector.register(topics=topic2, cb=received_event2, start_thread=False)
# normal behavior with two callbacks for the two topics
send_msgs_and_poll()
assert received_event1.call_count == 1
assert received_event2.call_count == 1
# unregistering one callback/topic should leave the other topic alive
connector.unregister(topic1, cb=received_event1)
send_msgs_and_poll()
assert received_event1.call_count == 1
assert received_event2.call_count == 2
# unregistering the last callback for a topic should remove the topic from the list
connector.unregister(topic2, cb=received_event2)
send_msgs_and_poll()
assert received_event1.call_count == 1
assert received_event2.call_count == 2
# adding a new callback for the topic should make it work again
connector.register(topics=topic1, cb=received_event1, start_thread=False)
send_msgs_and_poll()
assert received_event1.call_count == 2
assert received_event2.call_count == 2
def test_redis_register_poll_messages(connected_connector):
connector = connected_connector
cb_fcn_has_been_called = False