Files

428 lines
16 KiB
Python

"""AYN Antivirus Dashboard — Web Server with Password Auth.
Lightweight aiohttp server that serves the dashboard SPA and REST API.
Non-localhost access requires username/password authentication via a
session cookie obtained through ``POST /login``.
"""
from __future__ import annotations
import logging
import secrets
import time
from typing import Dict, Optional
from urllib.parse import urlparse
from aiohttp import web
from ayn_antivirus.config import Config
from ayn_antivirus.constants import QUARANTINE_ENCRYPTION_KEY_FILE
from ayn_antivirus.dashboard.api import setup_routes
from ayn_antivirus.dashboard.collector import MetricsCollector
from ayn_antivirus.dashboard.store import DashboardStore
from ayn_antivirus.dashboard.templates import get_dashboard_html
logger = logging.getLogger("ayn_antivirus.dashboard.server")
# ------------------------------------------------------------------
# JSON error handler — prevent aiohttp returning HTML on /api/* routes
# ------------------------------------------------------------------
@web.middleware
async def json_error_middleware(
request: web.Request,
handler,
) -> web.StreamResponse:
"""Catch unhandled exceptions and return JSON for API routes.
Without this, aiohttp's default error handler returns HTML error
pages, which break frontend ``fetch().json()`` calls.
"""
try:
return await handler(request)
except web.HTTPException as exc:
if request.path.startswith("/api/"):
return web.json_response(
{"error": exc.reason or "Request failed"},
status=exc.status,
)
raise
except Exception as exc:
logger.exception("Unhandled error on %s %s", request.method, request.path)
if request.path.startswith("/api/"):
return web.json_response(
{"error": f"Internal server error: {exc}"},
status=500,
)
return web.Response(
text="<h1>500 Internal Server Error</h1>",
status=500,
content_type="text/html",
)
# ------------------------------------------------------------------
# Rate limiting state
# ------------------------------------------------------------------
_action_timestamps: Dict[str, float] = {}
_RATE_LIMIT_SECONDS = 10
# ------------------------------------------------------------------
# Authentication middleware
# ------------------------------------------------------------------
@web.middleware
async def auth_middleware(
request: web.Request,
handler,
) -> web.StreamResponse:
"""Authenticate all requests.
* ``/login`` and ``/favicon.ico`` are always allowed.
* All other routes require a valid session cookie.
* Unauthenticated HTML routes serve the login page.
* Unauthenticated ``/api/*`` returns 401.
* POST ``/api/actions/*`` enforces CSRF and rate limiting.
"""
# Login route is always open.
if request.path in ("/login", "/favicon.ico"):
return await handler(request)
# All requests require auth (no localhost bypass — behind reverse proxy).
# Check session cookie.
session_token = request.app.get("_session_token", "")
cookie = request.cookies.get("ayn_session", "")
authenticated = (
cookie
and session_token
and secrets.compare_digest(cookie, session_token)
)
if not authenticated:
if request.path.startswith("/api/"):
return web.json_response(
{"error": "Unauthorized. Please login."}, status=401,
)
# Serve login page for HTML routes.
return web.Response(
text=request.app["_login_html"], content_type="text/html",
)
# CSRF + rate-limiting for POST action endpoints.
if request.method == "POST" and request.path.startswith("/api/actions/"):
origin = request.headers.get("Origin", "")
if origin:
parsed = urlparse(origin)
origin_host = parsed.hostname or ""
host = request.headers.get("Host", "")
expected = host.split(":")[0] if host else ""
allowed = {expected, "localhost", "127.0.0.1", "::1"}
allowed.discard("")
if origin_host not in allowed:
return web.json_response(
{"error": "CSRF: Origin mismatch"}, status=403,
)
now = time.time()
last = _action_timestamps.get(request.path, 0)
if now - last < _RATE_LIMIT_SECONDS:
return web.json_response(
{"error": "Rate limited. Try again in a few seconds."},
status=429,
)
_action_timestamps[request.path] = now
return await handler(request)
# ------------------------------------------------------------------
# Dashboard server
# ------------------------------------------------------------------
class DashboardServer:
"""AYN Antivirus dashboard with username/password authentication."""
def __init__(self, config: Optional[Config] = None) -> None:
self.config = config or Config()
self.store = DashboardStore(self.config.dashboard_db_path)
self.collector = MetricsCollector(self.store)
self.app = web.Application(middlewares=[json_error_middleware, auth_middleware])
self._session_token: str = secrets.token_urlsafe(32)
self._runner: Optional[web.AppRunner] = None
self._site: Optional[web.TCPSite] = None
self._setup()
# ------------------------------------------------------------------
# Setup
# ------------------------------------------------------------------
def _setup(self) -> None:
"""Configure the aiohttp application."""
self.app["_session_token"] = self._session_token
self.app["_login_html"] = self._build_login_page()
self.app["store"] = self.store
self.app["collector"] = self.collector
self.app["config"] = self.config
# Quarantine vault (best-effort).
try:
from ayn_antivirus.quarantine.vault import QuarantineVault
self.app["vault"] = QuarantineVault(
quarantine_dir=self.config.quarantine_path,
key_file_path=QUARANTINE_ENCRYPTION_KEY_FILE,
)
except Exception as exc:
logger.warning("Quarantine vault not available: %s", exc)
# API routes (``/api/*``).
setup_routes(self.app)
# HTML routes.
self.app.router.add_get("/", self._serve_dashboard)
self.app.router.add_get("/dashboard", self._serve_dashboard)
self.app.router.add_get("/login", self._serve_login)
self.app.router.add_post("/login", self._handle_login)
# Lifecycle hooks.
self.app.on_startup.append(self._on_startup)
self.app.on_shutdown.append(self._on_shutdown)
# ------------------------------------------------------------------
# Request handlers
# ------------------------------------------------------------------
async def _serve_login(self, request: web.Request) -> web.Response:
"""``GET /login`` — render the login page."""
return web.Response(
text=self.app["_login_html"], content_type="text/html",
)
async def _serve_dashboard(self, request: web.Request) -> web.Response:
"""``GET /`` or ``GET /dashboard`` — render the SPA.
The middleware already enforces auth for non-localhost, so if we
reach here the client is authenticated (or local).
"""
html = get_dashboard_html()
return web.Response(text=html, content_type="text/html")
async def _handle_login(self, request: web.Request) -> web.Response:
"""``POST /login`` — validate username/password, set session cookie."""
try:
body = await request.json()
username = body.get("username", "").strip()
password = body.get("password", "").strip()
except Exception:
return web.json_response({"error": "Invalid request"}, status=400)
if not username or not password:
return web.json_response(
{"error": "Username and password required"}, status=400,
)
valid_user = secrets.compare_digest(
username, self.config.dashboard_username,
)
valid_pass = secrets.compare_digest(
password, self.config.dashboard_password,
)
if not (valid_user and valid_pass):
self.store.log_activity(
f"Failed login attempt from {request.remote}: user={username}",
"WARNING",
"auth",
)
return web.json_response(
{"error": "Invalid username or password"}, status=401,
)
self.store.log_activity(
f"Successful login from {request.remote}: user={username}",
"INFO",
"auth",
)
response = web.json_response(
{"status": "ok", "message": "Welcome to AYN Antivirus"},
)
response.set_cookie(
"ayn_session",
self._session_token,
httponly=True,
max_age=86400,
samesite="Strict",
)
return response
# ------------------------------------------------------------------
# Login page
# ------------------------------------------------------------------
@staticmethod
def _build_login_page() -> str:
"""Return a polished HTML login form with username + password fields."""
return '''<!DOCTYPE html>
<html lang="en"><head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<title>AYN Antivirus \u2014 Login</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
body{background:#0a0e17;color:#e2e8f0;font-family:'Segoe UI',system-ui,-apple-system,sans-serif;
display:flex;justify-content:center;align-items:center;min-height:100vh;
background-image:radial-gradient(circle at 50% 50%,#111827 0%,#0a0e17 70%)}
.login-box{background:#111827;padding:2.5rem;border-radius:16px;border:1px solid #2a3444;
width:420px;max-width:90vw;box-shadow:0 25px 80px rgba(0,0,0,0.6)}
.logo{text-align:center;margin-bottom:2rem}
.logo .shield{font-size:3.5rem;display:block;margin-bottom:0.5rem}
.logo h1{font-size:1.8rem;background:linear-gradient(135deg,#3b82f6,#06b6d4);
-webkit-background-clip:text;-webkit-text-fill-color:transparent;font-weight:800}
.logo .subtitle{color:#6b7280;font-size:0.8rem;margin-top:0.3rem;letter-spacing:2px;text-transform:uppercase}
.field{margin-bottom:1.2rem}
.field label{display:block;font-size:0.75rem;color:#9ca3af;margin-bottom:0.4rem;
text-transform:uppercase;letter-spacing:1px;font-weight:600}
.field input{width:100%;padding:12px 16px;background:#0d1117;border:1px solid #2a3444;
color:#e2e8f0;border-radius:10px;font-size:15px;transition:all 0.2s;outline:none}
.field input:focus{border-color:#3b82f6;box-shadow:0 0 0 3px rgba(59,130,246,0.15)}
.field input::placeholder{color:#4b5563}
.btn{width:100%;padding:13px;background:linear-gradient(135deg,#3b82f6,#2563eb);color:white;
border:none;border-radius:10px;cursor:pointer;font-size:15px;font-weight:700;
transition:all 0.2s;margin-top:0.8rem;letter-spacing:0.5px}
.btn:hover{transform:translateY(-2px);box-shadow:0 8px 25px rgba(59,130,246,0.4)}
.btn:active{transform:translateY(0)}
.btn:disabled{opacity:0.5;cursor:not-allowed;transform:none}
.error{color:#fca5a5;font-size:0.85rem;text-align:center;margin-top:1rem;padding:10px 16px;
background:rgba(239,68,68,0.1);border-radius:8px;display:none;border:1px solid rgba(239,68,68,0.2)}
.footer{text-align:center;margin-top:2rem;padding-top:1.5rem;border-top:1px solid #1e293b}
.footer p{color:#4b5563;font-size:0.7rem;line-height:1.8}
.spinner{display:inline-block;width:16px;height:16px;border:2px solid #fff;
border-top-color:transparent;border-radius:50%;animation:spin 0.6s linear infinite;
vertical-align:middle;margin-right:8px}
@keyframes spin{to{transform:rotate(360deg)}}
</style></head>
<body>
<div class="login-box">
<div class="logo">
<span class="shield">\U0001f6e1\ufe0f</span>
<h1>AYN ANTIVIRUS</h1>
<div class="subtitle">Security Operations Dashboard</div>
</div>
<form id="loginForm" onsubmit="return doLogin()">
<div class="field">
<label>Username</label>
<input type="text" id="username" placeholder="Enter username" autocomplete="username" autofocus required>
</div>
<div class="field">
<label>Password</label>
<input type="password" id="password" placeholder="Enter password" autocomplete="current-password" required>
</div>
<button type="submit" class="btn" id="loginBtn">\U0001f510 Sign In</button>
</form>
<div class="error" id="errMsg">Invalid credentials</div>
<div class="footer">
<p>AYN Antivirus v1.0.0 \u2014 Server Protection Suite<br>
Secure Access Portal</p>
</div>
</div>
<script>
async function doLogin(){
var btn=document.getElementById('loginBtn');
var err=document.getElementById('errMsg');
var user=document.getElementById('username').value.trim();
var pass=document.getElementById('password').value;
if(!user||!pass)return false;
err.style.display='none';
btn.disabled=true;
btn.innerHTML='<span class="spinner"></span>Signing in...';
try{
var r=await fetch('/login',{method:'POST',
headers:{'Content-Type':'application/json'},
body:JSON.stringify({username:user,password:pass})});
if(r.ok){window.location.href='/dashboard';}
else{var d=await r.json();err.textContent=d.error||'Invalid credentials';err.style.display='block';}
}catch(e){err.textContent='Connection failed. Check server.';err.style.display='block';}
btn.disabled=false;btn.innerHTML='\\U0001f510 Sign In';
return false;
}
document.querySelectorAll('input').forEach(function(i){i.addEventListener('input',function(){
document.getElementById('errMsg').style.display='none';
});});
</script>
</body></html>'''
# ------------------------------------------------------------------
# Lifecycle hooks
# ------------------------------------------------------------------
async def _on_startup(self, app: web.Application) -> None:
await self.collector.start()
self.store.log_activity("Dashboard server started", "INFO", "server")
logger.info(
"Dashboard on http://%s:%d",
self.config.dashboard_host,
self.config.dashboard_port,
)
async def _on_shutdown(self, app: web.Application) -> None:
await self.collector.stop()
self.store.log_activity("Dashboard server stopped", "INFO", "server")
self.store.close()
# ------------------------------------------------------------------
# Blocking run
# ------------------------------------------------------------------
def run(self) -> None:
"""Run the dashboard server (blocking)."""
host = self.config.dashboard_host
port = self.config.dashboard_port
print(f"\n \U0001f6e1\ufe0f AYN Antivirus Dashboard")
print(f" \U0001f310 http://{host}:{port}")
print(f" \U0001f464 Username: {self.config.dashboard_username}")
print(f" \U0001f511 Password: {self.config.dashboard_password}")
print(f" Press Ctrl+C to stop\n")
web.run_app(self.app, host=host, port=port, print=None)
# ------------------------------------------------------------------
# Async start / stop (non-blocking)
# ------------------------------------------------------------------
async def start_async(self) -> None:
"""Start the server without blocking."""
self._runner = web.AppRunner(self.app)
await self._runner.setup()
self._site = web.TCPSite(
self._runner,
self.config.dashboard_host,
self.config.dashboard_port,
)
await self._site.start()
self.store.log_activity(
"Dashboard server started (async)", "INFO", "server",
)
async def stop_async(self) -> None:
"""Stop a server previously started with :meth:`start_async`."""
if self._site:
await self._site.stop()
if self._runner:
await self._runner.cleanup()
await self.collector.stop()
self.store.close()
# ------------------------------------------------------------------
# Convenience entry point
# ------------------------------------------------------------------
def run_dashboard(config: Optional[Config] = None) -> None:
"""Create a :class:`DashboardServer` and run it (blocking)."""
DashboardServer(config).run()