266 lines
8.4 KiB
Python
266 lines
8.4 KiB
Python
import os
|
|
import pickle
|
|
import pytest
|
|
import requests
|
|
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
|
from unittest.mock import patch, MagicMock
|
|
from getpass import getpass
|
|
from pathlib import Path
|
|
from slic.utils import DotDir
|
|
from slic.utils.duo import PickledDict, Secrets
|
|
import slic.utils.duo as pg
|
|
|
|
|
|
# Real tests for PickledDict with file manipulation
|
|
class TestPickledDictReal:
|
|
|
|
# Test set() and get() functionality with persistence
|
|
def test_set_get(self):
|
|
# Create and initialize a temporary pickle file
|
|
with NamedTemporaryFile(delete=False) as tmp_file:
|
|
tmp_path = tmp_file.name
|
|
pickle.dump({}, tmp_file) # Initialize with empty dict
|
|
|
|
try:
|
|
pd = PickledDict(tmp_path)
|
|
|
|
# 1. Test set() and file creation
|
|
pd.set("test_key", "test_value")
|
|
assert os.path.exists(tmp_path)
|
|
|
|
# 2. Verify get()
|
|
assert pd.get("test_key") == "test_value"
|
|
|
|
# 3. Test multiple values
|
|
pd.set("key2", 42)
|
|
pd.set("key3", [1, 2, 3])
|
|
|
|
# 4. Persistence check
|
|
pd2 = PickledDict(tmp_path)
|
|
assert pd2.get("test_key") == "test_value"
|
|
assert pd2.get("key2") == 42
|
|
assert pd2.get("key3") == [1, 2, 3]
|
|
|
|
# 5. Missing key should raise
|
|
with pytest.raises(KeyError):
|
|
pd2.get("missing_key")
|
|
|
|
finally:
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
|
|
# Test the _load() method
|
|
def test_load(self):
|
|
with NamedTemporaryFile(delete=False) as tmp_file:
|
|
tmp_path = tmp_file.name
|
|
pickle.dump({"initial": "data"}, tmp_file) # Initial data
|
|
|
|
try:
|
|
pd = PickledDict(tmp_path)
|
|
|
|
# 1. Verify initial load
|
|
assert pd._load() == {"initial": "data"}
|
|
|
|
# 2. Update
|
|
pd.set("new_key", "new_value")
|
|
assert pd._load() == {"initial": "data", "new_key": "new_value"}
|
|
|
|
# 3. Integrity check
|
|
pd2 = PickledDict(tmp_path)
|
|
assert pd2._load() == pd._load()
|
|
|
|
finally:
|
|
if os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
# Tests for Secrets with full getpass mocking
|
|
class TestSecrets:
|
|
|
|
@pytest.fixture
|
|
def setup_env(self, monkeypatch):
|
|
# Fixture to isolate environment completely
|
|
with TemporaryDirectory() as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
monkeypatch.setattr(Path, "home", lambda: temp_path)
|
|
|
|
# Create an empty secrets file
|
|
secrets_dir = temp_path / ".slic"
|
|
secrets_dir.mkdir()
|
|
secrets_file = secrets_dir / "secrets"
|
|
with open(secrets_file, "wb") as f:
|
|
pickle.dump({}, f)
|
|
|
|
yield temp_path
|
|
|
|
# Full workflow test with mocked getpass
|
|
def test_secret_workflow(self, setup_env):
|
|
with patch('slic.utils.duo.getpass') as mock_getpass:
|
|
mock_getpass.return_value = "test_secret"
|
|
|
|
secrets = Secrets()
|
|
secrets.set("test_key")
|
|
|
|
# Verify getpass was called
|
|
mock_getpass.assert_called_once_with(
|
|
prompt='Please enter the secret "test_key": '
|
|
)
|
|
|
|
# Verify persistence
|
|
secrets2 = Secrets()
|
|
assert secrets2.get("test_key") == "test_secret"
|
|
|
|
# Test multiple secrets storage
|
|
def test_multiple_secrets(self, setup_env):
|
|
with patch('slic.utils.duo.getpass') as mock_getpass:
|
|
mock_getpass.side_effect = ["secret1", "secret2"]
|
|
|
|
secrets = Secrets()
|
|
secrets.set("key1")
|
|
secrets.set("key2")
|
|
|
|
assert secrets.get("key1") == "secret1"
|
|
assert secrets.get("key2") == "secret2"
|
|
|
|
# Test cancellation by Ctrl+C (KeyboardInterrupt)
|
|
def test_keyboard_interrupt(self, setup_env):
|
|
with patch('slic.utils.duo.getpass') as mock_getpass:
|
|
mock_getpass.side_effect = KeyboardInterrupt()
|
|
|
|
secrets = Secrets()
|
|
secrets.set("canceled_key") # Should not crash
|
|
|
|
# Verify file remains unchanged
|
|
secrets_file = setup_env / ".slic" / "secrets"
|
|
with open(secrets_file, "rb") as f:
|
|
assert pickle.load(f) == {}
|
|
|
|
|
|
# Dummy response object for mocking requests
|
|
class DummyResponse:
|
|
def __init__(self, data):
|
|
self._data = data
|
|
def json(self):
|
|
return self._data
|
|
|
|
|
|
# Test that get_pgroup raises if KEY is missing
|
|
def test_get_pgroup_raises_if_no_key(monkeypatch):
|
|
monkeypatch.setattr(pg, "KEY", None)
|
|
with pytest.raises(ValueError, match="no secret for duo known"):
|
|
pg.get_pgroup("p99999")
|
|
|
|
|
|
# Test case where firstname/lastname == pi_firstname/pi_lastname
|
|
def test_get_pgroup_info_with_props_same_name_and_pi(monkeypatch):
|
|
monkeypatch.setattr(pg, "KEY", "secret123")
|
|
|
|
dummy_data = {
|
|
"group": {"storagetype": "ssd"},
|
|
"proposals": [
|
|
{
|
|
"firstname": " Alice ",
|
|
"lastname": " Smith ",
|
|
"pi_firstname": " Alice ",
|
|
"pi_lastname": " Smith ",
|
|
"title": " CoolTitle ",
|
|
"proposal": " P1 "
|
|
}
|
|
]
|
|
}
|
|
|
|
monkeypatch.setattr(pg.requests, "get", lambda url, headers: DummyResponse(dummy_data))
|
|
res = pg.get_pgroup_info("p1")
|
|
|
|
assert set(res.keys()) == {"name", "title", "type"}
|
|
assert res["type"] == "ssd"
|
|
assert res["name"] == "Alice Smith"
|
|
assert res["title"] == "CoolTitle (P1)"
|
|
|
|
|
|
# Test case where firstname/lastname != pi_firstname/pi_lastname
|
|
def test_get_pgroup_info_with_props_different_pi(monkeypatch):
|
|
monkeypatch.setattr(pg, "KEY", "secret123")
|
|
|
|
dummy_data = {
|
|
"group": {"storagetype": "ssd"},
|
|
"proposals": [
|
|
{
|
|
"firstname": " Alice ",
|
|
"lastname": " Smith ",
|
|
"pi_firstname": " Bob ",
|
|
"pi_lastname": " Johnson ",
|
|
"title": " CoolTitle ",
|
|
"proposal": " P2 "
|
|
}
|
|
]
|
|
}
|
|
|
|
monkeypatch.setattr(pg.requests, "get", lambda url, headers: DummyResponse(dummy_data))
|
|
res = pg.get_pgroup_info("p2")
|
|
|
|
assert set(res.keys()) == {"name", "title", "type"}
|
|
assert res["type"] == "ssd"
|
|
assert res["name"] == "Alice Smith | Bob Johnson"
|
|
assert res["title"] == "CoolTitle (P2)"
|
|
|
|
|
|
# Test case without proposals but with owner info
|
|
def test_get_pgroup_info_without_props_with_owner(monkeypatch):
|
|
monkeypatch.setattr(pg, "KEY", "secret123")
|
|
|
|
dummy_data = {
|
|
"group": {
|
|
"storagetype": "hdd",
|
|
"owner": {"firstname": " John ", "lastname": " Doe "},
|
|
"comments": " SomeComment "
|
|
}
|
|
}
|
|
|
|
monkeypatch.setattr(pg.requests, "get", lambda url, headers: DummyResponse(dummy_data))
|
|
res = pg.get_pgroup_info("p3")
|
|
|
|
assert set(res.keys()) == {"name", "title", "type"}
|
|
assert res["type"] == "hdd"
|
|
assert res["name"] == "John Doe"
|
|
assert res["title"] == "SomeComment"
|
|
|
|
|
|
# Test case without proposals and no owner info
|
|
def test_get_pgroup_info_without_props_no_owner(monkeypatch):
|
|
monkeypatch.setattr(pg, "KEY", "secret123")
|
|
|
|
dummy_data = {
|
|
"group": {
|
|
"storagetype": "tape",
|
|
"comments": " Fallback "
|
|
}
|
|
}
|
|
|
|
monkeypatch.setattr(pg.requests, "get", lambda url, headers: DummyResponse(dummy_data))
|
|
res = pg.get_pgroup_info("p4")
|
|
|
|
assert set(res.keys()) == {"name", "title", "type"}
|
|
assert res["type"] == "tape"
|
|
assert res["name"] == "unknown"
|
|
assert res["title"] == "Fallback"
|
|
|
|
|
|
# Test get_pgroup_info using a mocked get_pgroup
|
|
def test_get_pgroup_info_mock(monkeypatch):
|
|
# Mock to bypass Secrets().get()
|
|
monkeypatch.setattr(pg, "KEY", "fake-key")
|
|
|
|
def fake_get(p):
|
|
return {
|
|
"group": {"storagetype": "SSD", "comments": "some comments"},
|
|
"proposals": None
|
|
}
|
|
monkeypatch.setattr(pg, "get_pgroup", fake_get)
|
|
|
|
res = pg.get_pgroup_info("p12345")
|
|
assert set(res.keys()) == {"name", "title", "type"}
|
|
for key in ("name", "title", "type"):
|
|
assert isinstance(res[key], str)
|
|
assert res[key]
|