This commit is contained in:
2024-12-16 19:38:20 +01:00
parent 3479e579ab
commit f06f4962e8
3 changed files with 28 additions and 23 deletions

View File

@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any
import socketio import socketio
from bec_lib.endpoints import EndpointInfo, MessageEndpoints, MessageOp from bec_lib.endpoints import EndpointInfo, MessageEndpoints, MessageOp
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from bec_lib.serialization import json_ext
from fastapi import APIRouter from fastapi import APIRouter
from bec_atlas.router.base_router import BaseRouter from bec_atlas.router.base_router import BaseRouter
@ -54,7 +55,7 @@ class RedisAtlasEndpoints:
return f"internal/deployment/{deployment}/data/{endpoint}" return f"internal/deployment/{deployment}/data/{endpoint}"
@staticmethod @staticmethod
def socketio_endpoint_room(endpoint: str): def socketio_endpoint_room(deployment: str, endpoint: str):
""" """
Endpoint for the socketio room for an endpoint. Endpoint for the socketio room for an endpoint.
@ -64,7 +65,7 @@ class RedisAtlasEndpoints:
Returns: Returns:
str: The endpoint for the socketio room str: The endpoint for the socketio room
""" """
return f"ENDPOINT::{endpoint}" return f"socketio/rooms/{deployment}/{endpoint}"
class RedisRouter(BaseRouter): class RedisRouter(BaseRouter):
@ -333,7 +334,7 @@ class RedisWebsocket:
RedisAtlasEndpoints.redis_data(deployment, endpoint), Any, MessageOp.STREAM RedisAtlasEndpoints.redis_data(deployment, endpoint), Any, MessageOp.STREAM
) )
room = RedisAtlasEndpoints.socketio_endpoint_room(endpoint) room = RedisAtlasEndpoints.socketio_endpoint_room(deployment, endpoint)
self.redis.register(endpoint_info, cb=self.on_redis_message, parent=self, room=room) self.redis.register(endpoint_info, cb=self.on_redis_message, parent=self, room=room)
if endpoint not in self.users[sid]["subscriptions"]: if endpoint not in self.users[sid]["subscriptions"]:
await self.socket.enter_room(sid, room) await self.socket.enter_room(sid, room)
@ -348,7 +349,7 @@ class RedisWebsocket:
else: else:
msg = message["data"] msg = message["data"]
outgoing = {"data": msg.content, "metadata": msg.metadata} outgoing = {"data": msg.content, "metadata": msg.metadata}
outgoing = json.dumps(outgoing) outgoing = json_ext.dumps(outgoing)
await parent.socket.emit("message", data=outgoing, room=room) await parent.socket.emit("message", data=outgoing, room=room)
# Run the coroutine in this loop # Run the coroutine in this loop

View File

@ -2,6 +2,8 @@ import json
from unittest import mock from unittest import mock
import pytest import pytest
from bec_atlas.router.redis_router import RedisAtlasEndpoints
from bec_lib.endpoints import MessageEndpoints
@pytest.fixture @pytest.fixture
@ -18,7 +20,7 @@ def backend_client(backend):
async def test_redis_websocket_connect(backend_client): async def test_redis_websocket_connect(backend_client):
client, app = backend_client client, app = backend_client
await app.redis_websocket.socket.handlers["/"]["connect"]( await app.redis_websocket.socket.handlers["/"]["connect"](
"sid", {"HTTP_QUERY": '{"user": "test"}'} "sid", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
) )
assert "sid" in app.redis_websocket.users assert "sid" in app.redis_websocket.users
@ -35,10 +37,10 @@ async def test_redis_websocket_disconnect(backend_client):
async def test_redis_websocket_multiple_connect(backend_client): async def test_redis_websocket_multiple_connect(backend_client):
client, app = backend_client client, app = backend_client
await app.redis_websocket.socket.handlers["/"]["connect"]( await app.redis_websocket.socket.handlers["/"]["connect"](
"sid1", {"HTTP_QUERY": '{"user": "test1"}'} "sid1", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
) )
await app.redis_websocket.socket.handlers["/"]["connect"]( await app.redis_websocket.socket.handlers["/"]["connect"](
"sid2", {"HTTP_QUERY": '{"user": "test2"}'} "sid2", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
) )
assert "sid1" in app.redis_websocket.users assert "sid1" in app.redis_websocket.users
assert "sid2" in app.redis_websocket.users assert "sid2" in app.redis_websocket.users
@ -48,10 +50,10 @@ async def test_redis_websocket_multiple_connect(backend_client):
async def test_redis_websocket_multiple_connect_same_sid(backend_client): async def test_redis_websocket_multiple_connect_same_sid(backend_client):
client, app = backend_client client, app = backend_client
await app.redis_websocket.socket.handlers["/"]["connect"]( await app.redis_websocket.socket.handlers["/"]["connect"](
"sid", {"HTTP_QUERY": '{"user": "test"}'} "sid", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
) )
await app.redis_websocket.socket.handlers["/"]["connect"]( await app.redis_websocket.socket.handlers["/"]["connect"](
"sid", {"HTTP_QUERY": '{"user": "test"}'} "sid", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
) )
assert "sid" in app.redis_websocket.users assert "sid" in app.redis_websocket.users
@ -84,16 +86,19 @@ async def test_redis_websocket_register(backend_client):
client, app = backend_client client, app = backend_client
with mock.patch.object(app.redis_websocket.socket, "emit") as emit: with mock.patch.object(app.redis_websocket.socket, "emit") as emit:
with mock.patch.object(app.redis_websocket.socket, "enter_room") as enter_room: with mock.patch.object(app.redis_websocket.socket, "enter_room") as enter_room:
with mock.patch.object(app.redis_websocket.socket.manager, "rooms") as rooms: await app.redis_websocket.socket.handlers["/"]["connect"](
rooms.__getitem__.return_value = {"ENDPOINT::scan_status": "sid"} "sid", {"HTTP_QUERY": '{"user": "test", "deployment": "test"}'}
await app.redis_websocket.socket.handlers["/"]["connect"]( )
"sid", {"HTTP_QUERY": '{"user": "test"}'}
)
await app.redis_websocket.socket.handlers["/"]["register"]( await app.redis_websocket.socket.handlers["/"]["register"](
"sid", json.dumps({"endpoint": "scan_status"}) "sid", json.dumps({"endpoint": "scan_status"})
) )
assert mock.call("error", mock.ANY, room="sid") not in emit.mock_calls assert mock.call("error", mock.ANY, room="sid") not in emit.mock_calls
enter_room.assert_called_with("sid", "ENDPOINT::scan_status") enter_room.assert_called_with(
"sid",
RedisAtlasEndpoints.socketio_endpoint_room(
"test", MessageEndpoints.scan_status().endpoint
),
)
assert mock.call("error", mock.ANY, room="sid") not in emit.mock_calls assert mock.call("error", mock.ANY, room="sid") not in emit.mock_calls

View File

@ -1,7 +1,6 @@
import pytest import pytest
from bec_lib import messages
from bec_atlas.ingestor.data_ingestor import DataIngestor from bec_atlas.ingestor.data_ingestor import DataIngestor
from bec_lib import messages
@pytest.fixture @pytest.fixture
@ -77,7 +76,7 @@ def test_scan_ingestor_create_scan(scan_ingestor, backend):
}, },
timestamp=1732610545.15924, timestamp=1732610545.15924,
) )
scan_ingestor.update_scan_status(msg) scan_ingestor.update_scan_status(msg, deployment_id="5cc67967-744d-4115-a46b-13246580cb3f")
response = client.post( response = client.post(
"/api/v1/user/login", json={"username": "admin@bec_atlas.ch", "password": "admin"} "/api/v1/user/login", json={"username": "admin@bec_atlas.ch", "password": "admin"}
@ -94,7 +93,7 @@ def test_scan_ingestor_create_scan(scan_ingestor, backend):
assert out["status"] == "open" assert out["status"] == "open"
msg.status = "closed" msg.status = "closed"
scan_ingestor.update_scan_status(msg) scan_ingestor.update_scan_status(msg, deployment_id="5cc67967-744d-4115-a46b-13246580cb3f")
response = client.get(f"/api/v1/scans/id/{scan_id}") response = client.get(f"/api/v1/scans/id/{scan_id}")
assert response.status_code == 200 assert response.status_code == 200
out = response.json() out = response.json()