fix: init backend

This commit is contained in:
2024-11-20 09:35:03 +01:00
parent ce967a7265
commit e7c8c39726
15 changed files with 511 additions and 88 deletions

View File

@ -9,20 +9,12 @@ variables:
SCYLLA_KEYSPACE: test_bec_atlas
services:
- name: scylladb/scylla:latest
- name: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/scylladb/scylla:latest
alias: scylla
before_script:
# Check if ScyllaDB is ready (retry until successful)
- pip install ./backend
# - |
# echo "Waiting for ScyllaDB to be ready..."
# until python -c "from cassandra.cluster import Cluster; cluster = Cluster(['scylla']); session = cluster.connect(); session.set_keyspace('test_bec_atlas');" 2>/dev/null; do
# echo "ScyllaDB is not ready yet, retrying in 5 seconds..."
# sleep 5
# done
# echo "ScyllaDB is up and running."
- python -u ./backend/ci/healthchecks.py
- python -u ./backend/ci/setup_database.py
test:

View File

@ -3,13 +3,12 @@ from datetime import datetime, timedelta
from typing import Annotated
import jwt
from bec_atlas.datasources.scylladb import scylladb_schema as schema
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from pwdlib import PasswordHash
from bec_atlas.datasources.scylladb import scylladb_schema as schema
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@ -26,6 +25,10 @@ def verify_password(plain_password, hashed_password):
return password_hash.verify(plain_password, hashed_password)
def get_password_hash(password):
return password_hash.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:

View File

@ -1,15 +1,12 @@
import json
import os
import uuid
from datetime import datetime
from cassandra.cluster import Cluster
from cassandra.cqlengine import columns, connection
from cassandra.cqlengine.management import create_keyspace_simple, sync_table
from pydantic import BaseModel
from bec_atlas.authentication import get_password_hash
from bec_atlas.datasources.scylladb import scylladb_schema as schema
from cassandra.cluster import Cluster
from cassandra.cqlengine import connection
from cassandra.cqlengine.management import create_keyspace_simple, sync_table
from pydantic import BaseModel
class ScylladbDatasource:
@ -71,8 +68,28 @@ class ScylladbDatasource:
functional_accounts_file = os.path.join(
os.path.dirname(__file__), "functional_accounts.json"
)
with open(functional_accounts_file, "r", encoding="utf-8") as file:
functional_accounts = json.load(file)
if os.path.exists(functional_accounts_file):
with open(functional_accounts_file, "r", encoding="utf-8") as file:
functional_accounts = json.load(file)
else:
print("Functional accounts file not found. Using default demo accounts.")
# Demo accounts
functional_accounts = [
{
"email": "admin@bec_atlas.ch",
"password": "admin",
"groups": ["demo"],
"first_name": "Admin",
"last_name": "Admin",
},
{
"email": "jane.doe@bec_atlas.ch",
"password": "atlas",
"groups": ["demo_user"],
"first_name": "Jane",
"last_name": "Doe",
},
]
for account in functional_accounts:
# check if the account already exists in the database

View File

@ -1,16 +1,15 @@
import socketio
import uvicorn
from fastapi import FastAPI
from bec_atlas.datasources.datasource_manager import DatasourceManager
from bec_atlas.router.redis_router import RedisRouter, RedisWebsocket
from bec_atlas.router.scan_router import ScanRouter
from bec_atlas.router.user import UserRouter
from fastapi import FastAPI
CONFIG = {"redis": {"host": "localhost", "port": 6379}, "scylla": {"hosts": ["localhost"]}}
CONFIG = {"redis": {"host": "localhost", "port": 6380}, "scylla": {"hosts": ["localhost"]}}
class HorizonApp:
class AtlasApp:
API_VERSION = "v1"
def __init__(self):
@ -43,10 +42,20 @@ class HorizonApp:
self.redis_websocket = RedisWebsocket(prefix=self.prefix, datasources=self.datasources)
self.app.mount("/", self.redis_websocket.app)
def run(self):
uvicorn.run(self.app, host="localhost", port=8000)
def run(self, port=8000):
uvicorn.run(self.app, host="localhost", port=port)
def main():
import argparse
parser = argparse.ArgumentParser(description="Run the BEC Atlas API")
parser.add_argument("--port", type=int, default=8000, help="Port to run the API on")
args = parser.parse_args()
horizon_app = AtlasApp()
horizon_app.run(port=args.port)
if __name__ == "__main__":
horizon_app = HorizonApp()
horizon_app.run()
main()

View File

@ -4,11 +4,10 @@ import json
from typing import TYPE_CHECKING
import socketio
from bec_atlas.router.base_router import BaseRouter
from bec_lib.endpoints import MessageEndpoints
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from bec_atlas.router.base_router import BaseRouter
if TYPE_CHECKING:
from bec_lib.redis_connector import RedisConnector
@ -55,18 +54,19 @@ class RedisWebsocket:
self.socket.on("register", self.redis_register)
self.socket.on("disconnect", self.disconnect_client)
def connect_client(self, sid, environ):
def connect_client(self, sid, environ=None):
print("Client connected")
self.active_connections.add(sid)
def disconnect_client(self, sid, _environ):
def disconnect_client(self, sid, _environ=None):
print("Client disconnected")
self.active_connections.pop(sid)
self.active_connections.remove(sid)
async def redis_register(self, sid: str, msg: str):
if sid not in self.active_connections:
self.active_connections.add(sid)
try:
print(msg)
data = json.loads(msg)
except json.JSONDecodeError:
return

View File

@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends
from bec_atlas.authentication import get_current_user
from bec_atlas.datasources.scylladb import scylladb_schema as schema
from bec_atlas.router.base_router import BaseRouter
from fastapi import APIRouter, Depends
class ScanRouter(BaseRouter):
@ -12,7 +12,7 @@ class ScanRouter(BaseRouter):
self.router.add_api_route("/scan", self.scan, methods=["GET"])
self.router.add_api_route("/scan/{scan_id}", self.scan_with_id, methods=["GET"])
async def scan(self, current_user: User = Depends(get_current_user)):
async def scan(self, current_user: schema.User = Depends(get_current_user)):
return self.scylla.get("scan", current_user=current_user)
async def scan_with_id(self, scan_id: str):

View File

@ -1,12 +1,17 @@
from typing import Annotated
from fastapi import APIRouter, Depends
from fastapi.exceptions import HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from bec_atlas.authentication import create_access_token, get_current_user, verify_password
from bec_atlas.datasources.scylladb import scylladb_schema as schema
from bec_atlas.router.base_router import BaseRouter
from fastapi import APIRouter, Depends
from fastapi.exceptions import HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
class UserLoginRequest(BaseModel):
username: str
password: str
class UserRouter(BaseRouter):
@ -20,18 +25,19 @@ class UserRouter(BaseRouter):
"/user/login/form", self.form_login, methods=["POST"], dependencies=[]
)
async def user_me(self, user: User = Depends(get_current_user)):
async def user_me(self, user: schema.User = Depends(get_current_user)):
data = schema.User.objects.filter(email=user.email)
if data.count() == 0:
raise HTTPException(status_code=404, detail="User not found")
return data.first()
async def form_login(self, form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
out = await self.user_login(form_data.username, form_data.password)
user_login = UserLoginRequest(username=form_data.username, password=form_data.password)
out = await self.user_login(user_login)
return {"access_token": out, "token_type": "bearer"}
async def user_login(self, username: str, password: str):
result = schema.User.objects.filter(email=username)
async def user_login(self, user_login: UserLoginRequest):
result = schema.User.objects.filter(email=user_login.username)
if result.count() == 0:
raise HTTPException(status_code=404, detail="User not found")
user: schema.User = result.first()
@ -39,7 +45,7 @@ class UserRouter(BaseRouter):
if credentials.count() == 0:
raise HTTPException(status_code=404, detail="User not found")
user_credentials = credentials.first()
if not verify_password(password, user_credentials.password):
if not verify_password(user_login.password, user_credentials.password):
raise HTTPException(status_code=401, detail="Invalid password")
return create_access_token(data={"groups": list(user.groups), "email": user.email})

View File

View File

@ -0,0 +1,53 @@
import argparse
import os
import libtmux
from bec_atlas.utils.service_handler import ServiceHandler
def main():
"""
Launch the BEC Atlas server in a tmux session. All services are launched in separate panes.
"""
parser = argparse.ArgumentParser(description="Utility tool managing the BEC Atlas server")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start the BEC Atlas server")
# start.add_argument(
# "--config", type=str, default=None, help="Path to the BEC service config file"
# )
# start.add_argument(
# "--no-tmux", action="store_true", default=False, help="Do not start processes in tmux"
# )
command.add_parser("stop", help="Stop the BEC Atlas server")
restart = command.add_parser("restart", help="Restart the BEC Atlas server")
command.add_parser("attach", help="Open the currently running BEC Atlas server session")
args = parser.parse_args()
try:
# 'stop' has no config
config = args.config
except AttributeError:
config = None
service_handler = ServiceHandler(
bec_path=os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
config_path=config,
no_tmux=args.no_tmux if "no_tmux" in args else False,
)
if args.command == "start":
service_handler.start()
elif args.command == "stop":
service_handler.stop()
elif args.command == "restart":
service_handler.restart()
elif args.command == "attach":
server = libtmux.Server()
session = server.find_where({"session_name": "bec_atlas"})
if session is None:
print("No BEC Atlas session found")
return
session.attach_session()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,107 @@
import sys
from string import Template
from bec_atlas.utils.tmux_launch import tmux_start, tmux_stop
class bcolors:
"""
Colors for the terminal output.
"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
class ServiceHandler:
"""
Service handler for the BEC server. This class is used to start, stop and restart the BEC server.
Depending on the platform, the server is launched in a tmux session or in an iTerm2 session.
"""
SERVICES = {
"fastapi-8000": {
"path": Template("$base_path"),
"command": "bec-atlas-fastapi --port 8000",
},
"fastapi-8001": {
"path": Template("$base_path"),
"command": "bec-atlas-fastapi --port 8001",
},
"redis": {"path": Template("$base_path"), "command": "redis-server --port 6380"},
}
def __init__(self, bec_path: str, config_path: str, no_tmux: bool = False):
"""
Args:
bec_path (str): Path to the BEC source code
config_path (str): Path to the config file
"""
self.bec_path = bec_path
self.config_path = config_path
self.no_tmux = no_tmux
self._detect_available_interfaces()
def _detect_available_interfaces(self):
if self.no_tmux:
self.interface = None
return
# check if we are on MacOS and if so, check if we have iTerm2 installed
if sys.platform == "darwin":
try:
import iterm2
except ImportError:
self.interface = "tmux"
else:
self.interface = "iterm2"
# if we are not on MacOS, we can only use tmux
else:
self.interface = "tmux"
def start(self):
"""
Start the BEC Atlas server using the available interface.
"""
if self.interface == "tmux":
print("Starting BEC Atlas server using tmux...")
tmux_start(self.bec_path, self.config_path, self.SERVICES)
print(
f"{bcolors.OKCYAN}{bcolors.BOLD}Use `bec-atlas attach` to attach to the BEC Atlas server. Once connected, use `ctrl+b d` to detach again.{bcolors.ENDC}"
)
elif self.interface == "iterm2":
pass
else:
# no tmux
raise ValueError("No valid interface available")
def stop(self):
"""
Stop the BEC server using the available interface.
"""
print("Stopping BEC Atlas server...")
if self.interface == "tmux":
tmux_stop()
elif self.interface == "iterm2":
pass
else:
# no tmux
raise ValueError("No valid interface available")
def restart(self):
"""
Restart the BEC Atlas server using the available interface.
"""
print("Restarting BEC Atlas server...")
self.stop()
self.start()

View File

@ -0,0 +1,97 @@
import os
import time
import libtmux
from libtmux.exc import LibTmuxException
def activate_venv(pane, service_name, service_path):
"""
Activate the python environment for a service.
"""
# check if the current file was installed with pip install -e (editable mode)
# if so, the venv is the service directory and it's called <service_name>_venv
# otherwise, we simply take the currently running venv ;
# in case of no venv, maybe it is running within a Conda environment
if "site-packages" in __file__:
venv_base_path = os.path.dirname(
os.path.dirname(os.path.dirname(__file__.split("site-packages", maxsplit=1)[0]))
)
pane.send_keys(f"source {venv_base_path}/bin/activate")
return
if os.path.exists(f"{service_path}/{service_name}_venv"):
pane.send_keys(f"source {service_path}/{service_name}_venv/bin/activate")
return
if os.getenv("CONDA_PREFIX"):
pane.send_keys(f"conda activate {os.path.basename(os.environ['CONDA_PREFIX'])}")
return
def tmux_start(bec_path: str, config_path: str, services: dict):
"""
Launch the BEC server in a tmux session. All services are launched in separate panes.
Args:
bec_path (str): Path to the BEC source code
config (str): Path to the config file
services (dict): Dictionary of services to launch. Keys are the service names, values are path and command templates.
"""
def get_new_session():
tmux_server = libtmux.Server()
session = tmux_server.new_session(
"bec_atlas",
window_name="BEC Atlas server. Use `ctrl+b d` to detach.",
kill_session=True,
)
return session
try:
session = get_new_session()
except LibTmuxException:
# retry once... sometimes there is a hiccup in creating the session
time.sleep(1)
session = get_new_session()
# create panes and run commands
panes = []
for ii, service_info in enumerate(services.items()):
service, service_config = service_info
if ii == 0:
pane = session.attached_window.attached_pane
else:
pane = session.attached_window.split_window(vertical=False)
panes.append(pane)
activate_venv(
pane,
service_name=service,
service_path=service_config["path"].substitute(base_path=bec_path),
)
if config_path:
pane.send_keys(f"{service_config['command']} --config {config_path}")
else:
pane.send_keys(f"{service_config['command']}")
session.attached_window.select_layout("tiled")
session.mouse_all_flag = True
session.set_option("mouse", "on")
def tmux_stop():
"""
Stop the BEC server.
"""
tmux_server = libtmux.Server()
avail_sessions = tmux_server.sessions.filter(session_name="bec_atlas")
if len(avail_sessions) != 0:
# send ctrl+c to all panes
for window in avail_sessions[0].windows:
for pane in window.panes:
pane.send_keys("C-c")
avail_sessions[0].kill_session()

View File

@ -1,42 +0,0 @@
import sys
import time
from cassandra.cluster import Cluster
SCYLLA_HOST = "scylla"
SCYLLA_KEYSPACE = "test_bec_atlas"
def wait_for_scylladb():
print("Waiting for ScyllaDB to be ready...")
while True:
try:
cluster = Cluster([SCYLLA_HOST])
session = cluster.connect()
print("Connected to ScyllaDB")
break
except Exception as e:
print(f"ScyllaDB not ready yet: {e}")
time.sleep(5)
def create_keyspace():
print(f"Creating keyspace '{SCYLLA_KEYSPACE}' if not exists...")
try:
cluster = Cluster([SCYLLA_HOST])
session = cluster.connect()
session.execute(
f"""
CREATE KEYSPACE IF NOT EXISTS {SCYLLA_KEYSPACE}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
"""
)
print(f"Keyspace '{SCYLLA_KEYSPACE}' created successfully.")
except Exception as e:
print(f"Failed to create keyspace: {e}")
sys.exit(1)
if __name__ == "__main__":
wait_for_scylladb()
create_keyspace()

View File

@ -0,0 +1,60 @@
import sys
import time
from cassandra.cluster import Cluster
SCYLLA_HOST = "scylla"
SCYLLA_KEYSPACE = "test_bec_atlas"
def wait_for_scylladb(scylla_host: str = SCYLLA_HOST):
"""
Wait for ScyllaDB to be ready by trying to connect to it.
Args:
scylla_host(str): The ScyllaDB host.
"""
print("Waiting for ScyllaDB to be ready...")
while True:
try:
cluster = Cluster([scylla_host])
session = cluster.connect()
print("Connected to ScyllaDB")
break
except Exception as e:
print(f"ScyllaDB not ready yet: {e}")
time.sleep(5)
def create_keyspace(scylla_host: str = SCYLLA_HOST, keyspace: str = SCYLLA_KEYSPACE):
"""
Create a new keyspace in ScyllaDB if it does not exist.
Args:
scylla_host(str): The ScyllaDB host.
keyspace(str): The keyspace to create.
"""
print(f"Creating keyspace '{keyspace}' if not exists...")
try:
cluster = Cluster([scylla_host])
session = cluster.connect()
session.execute(
f"""
CREATE KEYSPACE IF NOT EXISTS {keyspace}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
"""
)
print(f"Keyspace '{keyspace}' created successfully.")
except Exception as e:
print(f"Failed to create keyspace: {e}")
sys.exit(1)
def setup_database():
wait_for_scylladb()
create_keyspace()
if __name__ == "__main__":
wait_for_scylladb()
create_keyspace()

View File

@ -19,6 +19,8 @@ dependencies = [
"scylla-driver",
"bec_lib",
"python-socketio[asyncio_client]",
"libtmux",
"websocket-client",
]
@ -28,8 +30,13 @@ dev = [
"pytest-random-order~=1.1",
"pytest-timeout~=2.2",
"pytest~=8.0",
"pytest-docker",
"isort~=5.13, >=5.13.2",
]
[project.scripts]
bec-atlas-fastapi = "bec_atlas.main:main"
bec-atlas = "bec_atlas.utils.launch:main"
[project.urls]
"Bug Tracker" = "https://gitlab.psi.ch/bec/bec_atlas/issues"

114
backend/utils/nginx.conf Normal file
View File

@ -0,0 +1,114 @@
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
# include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
upstream fastapi_backend {
hash $remote_addr consistent; # Enable session persistence based on client IP on the same instance
server 127.0.0.1:8000; # First instance
server 127.0.0.1:8001; # Second instance
}
#gzip on;
server {
listen 80;
server_name yourdomain.com; # Replace with your domain or IP
# Configure SSL if needed
# listen 443 ssl;
# ssl_certificate /path/to/cert.pem;
# ssl_certificate_key /path/to/key.pem;
location / {
proxy_pass http://fastapi_backend;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location /nginx_status {
stub_status on;
access_log off;
allow 127.0.0.1; # Allow localhost
deny all; # Deny all others
}
# Optional: Serve static files
location /static/ {
root /path/to/static/files; # Adjust the path
}
# Optional: Add caching for static files
location ~* \.(jpg|jpeg|png|gif|css|js|ico|svg|woff|woff2|ttf|otf|eot|ttf|otf|html)$ {
expires 30d;
access_log off;
}
}
# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;
# location / {
# root html;
# index index.html index.htm;
# }
#}
# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;
# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;
# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
# location / {
# root html;
# index index.html index.htm;
# }
#}
include servers/*;
}