mirror of
https://github.com/bec-project/bec_atlas.git
synced 2025-07-14 07:01:48 +02:00
367 lines
11 KiB
Python
367 lines
11 KiB
Python
from unittest import mock
|
|
|
|
import pytest
|
|
from bec_lib import messages
|
|
from bec_lib.serialization import MsgpackSerialization
|
|
|
|
from bec_atlas.model.model import BECAccessProfile, DeploymentAccess, User
|
|
from bec_atlas.router.redis_router import RemoteAccess
|
|
|
|
|
|
@pytest.fixture
|
|
def logged_in_client(backend):
|
|
client, _ = backend
|
|
response = client.post(
|
|
"/api/v1/user/login", json={"username": "admin@bec_atlas.ch", "password": "admin"}
|
|
)
|
|
assert response.status_code == 200
|
|
token = response.json()
|
|
assert isinstance(token, str)
|
|
assert len(token) > 20
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def deployment(logged_in_client):
|
|
client = logged_in_client
|
|
response = client.get("/api/v1/deployments/realm", params={"realm": "demo_beamline_1"})
|
|
assert response.status_code == 200
|
|
return response.json()[0]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"key, patterns, access",
|
|
[
|
|
(
|
|
"public/some/key",
|
|
[
|
|
"%R~public/*", # Read-only access
|
|
"%R~info/*", # Read-only access
|
|
"%RW~personal/test_username/*", # Read/Write access
|
|
"%RW~user/*", # Read/Write access
|
|
],
|
|
RemoteAccess.READ,
|
|
),
|
|
(
|
|
"info/some/key",
|
|
[
|
|
"%R~public/*", # Read-only access
|
|
"%R~info/*", # Read-only access
|
|
"%RW~personal/test_username/*", # Read/Write access
|
|
"%RW~user/*", # Read/Write access
|
|
],
|
|
RemoteAccess.READ,
|
|
),
|
|
(
|
|
"personal/test_username/some/key",
|
|
[
|
|
"%R~public/*", # Read-only access
|
|
"%R~info/*", # Read-only access
|
|
"%RW~personal/test_username/*", # Read/Write access
|
|
"%RW~user/*", # Read/Write access
|
|
],
|
|
RemoteAccess.READ_WRITE,
|
|
),
|
|
(
|
|
"user/some/key",
|
|
[
|
|
"%R~public/*", # Read-only access
|
|
"%R~info/*", # Read-only access
|
|
"%RW~personal/test_username/*", # Read/Write access
|
|
"%RW~user/*", # Read/Write access
|
|
],
|
|
RemoteAccess.READ_WRITE,
|
|
),
|
|
("user/some/key", ["*"], RemoteAccess.READ_WRITE),
|
|
("public/some/key", ["%W~public/*"], RemoteAccess.WRITE),
|
|
("some/key", ["%W~public/*"], RemoteAccess.NONE),
|
|
],
|
|
)
|
|
def test_get_key_pattern_access(backend, key, patterns, access):
|
|
_, app = backend
|
|
assert app.redis_router.get_key_pattern_access(key, patterns) == access
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"channel, patterns, access",
|
|
[
|
|
(
|
|
"public/some/channel",
|
|
["public/*", "info/*", "personal/test_username/*", "user/*"],
|
|
RemoteAccess.READ_WRITE,
|
|
),
|
|
("some/channel", ["public/*"], RemoteAccess.NONE),
|
|
],
|
|
)
|
|
def test_get_channel_pattern_access(backend, channel, patterns, access):
|
|
_, app = backend
|
|
assert app.redis_router.get_channel_pattern_access(channel, patterns) == access
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"user, deployment_access, expected_access",
|
|
[
|
|
(
|
|
User(
|
|
owner_groups=["admin"],
|
|
access_groups=["admin"],
|
|
email="admin@bec_atlas.ch",
|
|
groups=["admin"],
|
|
first_name="admin",
|
|
last_name="admin",
|
|
),
|
|
DeploymentAccess(
|
|
owner_groups=["admin"], access_groups=["admin"], user_read_access=["admin"]
|
|
),
|
|
RemoteAccess.NONE,
|
|
),
|
|
(
|
|
User(
|
|
owner_groups=["admin"],
|
|
access_groups=["admin"],
|
|
email="admin@bec_atlas.ch",
|
|
groups=["admin"],
|
|
first_name="admin",
|
|
last_name="admin",
|
|
),
|
|
DeploymentAccess(
|
|
owner_groups=["admin"], access_groups=["admin"], remote_read_access=["admin"]
|
|
),
|
|
RemoteAccess.READ,
|
|
),
|
|
(
|
|
User(
|
|
owner_groups=["admin"],
|
|
access_groups=["admin"],
|
|
email="admin@bec_atlas.ch",
|
|
groups=["admin"],
|
|
first_name="admin",
|
|
last_name="admin",
|
|
),
|
|
DeploymentAccess(
|
|
owner_groups=["admin"], access_groups=["admin"], remote_write_access=["admin"]
|
|
),
|
|
RemoteAccess.READ_WRITE,
|
|
),
|
|
],
|
|
)
|
|
def test_get_access(backend, user, deployment_access, expected_access):
|
|
_, app = backend
|
|
assert app.redis_router.get_access(user, deployment_access) == expected_access
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"bec_access, key, redis_op, raise_exception",
|
|
[
|
|
# Full access profile - should allow all operations
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="admin",
|
|
owner_groups=["admin"],
|
|
keys=["*"],
|
|
channels=["*"],
|
|
commands=["*"],
|
|
),
|
|
"some/key",
|
|
"get",
|
|
False,
|
|
),
|
|
# Read-only access to keys
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="reader",
|
|
owner_groups=["readers"],
|
|
keys=["%R~data/*"],
|
|
channels=["*"],
|
|
commands=["*"],
|
|
),
|
|
"data/sensor1",
|
|
"get",
|
|
False,
|
|
),
|
|
# Write operation with read-only access should fail
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="reader",
|
|
owner_groups=["readers"],
|
|
keys=["%R~data/*"],
|
|
channels=["*"],
|
|
commands=["*"],
|
|
),
|
|
"data/sensor1",
|
|
"set",
|
|
True,
|
|
),
|
|
# Send operation to allowed channel
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="writer",
|
|
owner_groups=["writers"],
|
|
keys=["*"],
|
|
channels=["commands/*"],
|
|
commands=["*"],
|
|
),
|
|
"commands/motor1",
|
|
"send",
|
|
False,
|
|
),
|
|
# Testing set_and_publish with mixed permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["%RW~status/*"],
|
|
channels=["status/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"set_and_publish",
|
|
False,
|
|
),
|
|
# Testing set_and_publish with insufficient key permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["%R~status/*"],
|
|
channels=["status/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"set_and_publish",
|
|
True,
|
|
),
|
|
# Testing invalid operation
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="admin",
|
|
owner_groups=["admin"],
|
|
keys=["*"],
|
|
channels=["*"],
|
|
commands=["*"],
|
|
),
|
|
"some/key",
|
|
"invalid_op",
|
|
True,
|
|
),
|
|
# Test send operation with insufficient channel permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["*"],
|
|
channels=["internal/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"send",
|
|
True,
|
|
),
|
|
# Test set_and_publish with insufficient write permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["%R~status/*"],
|
|
channels=["status/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"set_and_publish",
|
|
True,
|
|
),
|
|
# Test set_and_publish with insufficient channel permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["%RW~status*"],
|
|
channels=["internal/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"set_and_publish",
|
|
True,
|
|
),
|
|
# Test get operation with insufficient read permissions
|
|
(
|
|
BECAccessProfile(
|
|
deployment_id="test_id",
|
|
username="user",
|
|
owner_groups=["users"],
|
|
keys=["%W~status/*"],
|
|
channels=["status/*"],
|
|
commands=["*"],
|
|
),
|
|
"status/device1",
|
|
"get",
|
|
True,
|
|
),
|
|
],
|
|
ids=[
|
|
"Full access profile - should allow all operations",
|
|
"Read-only access to keys",
|
|
"Write operation with read-only access should fail",
|
|
"Send operation to allowed channel",
|
|
"Testing set_and_publish with mixed permissions",
|
|
"Testing set_and_publish with insufficient key permissions",
|
|
"Testing invalid operation",
|
|
"Test send operation with insufficient channel permissions",
|
|
"Test set_and_publish with insufficient write permissions",
|
|
"Test set_and_publish with insufficient channel permissions",
|
|
"Test get operation with insufficient read permissions",
|
|
],
|
|
)
|
|
def test_bec_access_profile_allows_op(backend, bec_access, key, redis_op, raise_exception):
|
|
_, app = backend
|
|
if raise_exception:
|
|
with pytest.raises(Exception):
|
|
app.redis_router.bec_access_profile_allows_op(bec_access, key, redis_op)
|
|
else:
|
|
app.redis_router.bec_access_profile_allows_op(bec_access, key, redis_op)
|
|
|
|
|
|
async def test_redis_get(logged_in_client, deployment, backend):
|
|
client = logged_in_client
|
|
_, app = backend
|
|
response = client.patch(
|
|
"/api/v1/deployment_access",
|
|
params={"deployment_id": deployment["_id"]},
|
|
json={
|
|
"user_read_access": ["admin@bec_atlas.ch"],
|
|
"remote_read_access": ["admin@bec_atlas.ch"],
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
with mock.patch.object(app.redis_router.redis, "pubsub") as pubsub_mock:
|
|
msg = MsgpackSerialization.dumps(
|
|
messages.RawMessage(data={"test_key": "test"}, metadata={"message": "test"})
|
|
)
|
|
response = {
|
|
"type": "message",
|
|
"pattern": None,
|
|
"channel": "internal/deployment",
|
|
"data": msg,
|
|
}
|
|
|
|
pubsub_mock().subscribe = mock.AsyncMock()
|
|
ret_msg = pubsub_mock().get_message = mock.AsyncMock()
|
|
ret_msg.side_effect = [None, response]
|
|
response = client.get(
|
|
"/api/v1/redis", params={"deployment": deployment["_id"], "key": "test_key"}
|
|
)
|
|
assert response.status_code == 200
|
|
test_response = {"data": {"data": {"test_key": "test"}}, "metadata": {"message": "test"}}
|
|
assert response.json() == test_response
|