"""AYN Antivirus Dashboard — REST API Handlers.""" from __future__ import annotations import asyncio import json import logging import platform import time from datetime import datetime from aiohttp import web logger = logging.getLogger("ayn_antivirus.dashboard.api") def setup_routes(app: web.Application) -> None: """Register all API routes on the aiohttp app.""" app.router.add_get("/api/health", handle_health) app.router.add_get("/api/status", handle_status) app.router.add_get("/api/threats", handle_threats) app.router.add_get("/api/threat-stats", handle_threat_stats) app.router.add_get("/api/scans", handle_scans) app.router.add_get("/api/scan-chart", handle_scan_chart) app.router.add_get("/api/quarantine", handle_quarantine) app.router.add_get("/api/signatures", handle_signatures) app.router.add_get("/api/sig-updates", handle_sig_updates) app.router.add_get("/api/definitions", handle_definitions) app.router.add_get("/api/logs", handle_logs) app.router.add_get("/api/metrics-history", handle_metrics_history) # Action endpoints app.router.add_post("/api/actions/quick-scan", handle_action_quick_scan) app.router.add_post("/api/actions/full-scan", handle_action_full_scan) app.router.add_post("/api/actions/update-sigs", handle_action_update_sigs) app.router.add_post("/api/actions/update-feed", handle_action_update_feed) # Threat action endpoints app.router.add_post("/api/actions/quarantine", handle_action_quarantine) app.router.add_post("/api/actions/delete-threat", handle_action_delete_threat) app.router.add_post("/api/actions/whitelist", handle_action_whitelist) app.router.add_post("/api/actions/restore", handle_action_restore) app.router.add_post("/api/actions/ai-analyze", handle_action_ai_analyze) # Container endpoints app.router.add_get("/api/containers", handle_containers) app.router.add_get("/api/container-scan", handle_container_scan_results) app.router.add_post("/api/actions/scan-containers", handle_action_scan_containers) app.router.add_post("/api/actions/scan-container", handle_action_scan_single_container) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _json(data: object, status: int = 200) -> web.Response: return web.json_response(data, status=status) def _get_ai_analyzer(app): """Lazy-init the AI analyzer singleton on the app.""" if "_ai_analyzer" not in app: from ayn_antivirus.detectors.ai_analyzer import AIAnalyzer import os key = os.environ.get("ANTHROPIC_API_KEY", "") app["_ai_analyzer"] = AIAnalyzer(api_key=key) if key else None return app.get("_ai_analyzer") def _ai_filter_threats(app, store, threats_data: list) -> list: """Run AI analysis on detections. Returns only real threats.""" ai = _get_ai_analyzer(app) if not ai or not ai.available: return threats_data # No AI — pass all through filtered = [] for t in threats_data: verdict = ai.analyze( file_path=t["file_path"], threat_name=t["threat_name"], threat_type=t["threat_type"], severity=t["severity"], detector=t["detector"], confidence=t.get("confidence", 50), ) t["ai_verdict"] = verdict.verdict t["ai_confidence"] = verdict.confidence t["ai_reason"] = verdict.reason t["ai_action"] = verdict.recommended_action if verdict.is_safe: store.log_activity( f"AI dismissed: {t['file_path']} ({t['threat_name']}) — {verdict.reason}", "INFO", "ai_analyzer", ) continue # Skip false positive filtered.append(t) dismissed = len(threats_data) - len(filtered) if dismissed: store.log_activity( f"AI filtered {dismissed}/{len(threats_data)} false positives", "INFO", "ai_analyzer", ) return filtered def _auto_quarantine(store, vault, file_path: str, threat_name: str, severity: str) -> str: """Quarantine a file automatically. Returns quarantine ID or empty string.""" if not vault: return "" import os if not os.path.isfile(file_path): return "" try: qid = vault.quarantine_file( file_path=file_path, threat_name=threat_name, threat_type="auto", severity=severity, ) store.log_activity( f"Auto-quarantined: {file_path} ({threat_name})", "WARNING", "quarantine", ) return qid except Exception as exc: logger.warning("Auto-quarantine failed for %s: %s", file_path, exc) return "" def _safe_int( val: str, default: int, min_val: int = 1, max_val: int = 1000, ) -> int: """Parse an integer query param with clamping and fallback.""" try: n = int(val) return max(min_val, min(n, max_val)) except (ValueError, TypeError): return default def _threat_type_str(tt: object) -> str: """Convert a ThreatType enum (or anything) to a string.""" return tt.name if hasattr(tt, "name") else str(tt) def _severity_str(sev: object) -> str: """Convert a Severity enum (or anything) to a string.""" return sev.name if hasattr(sev, "name") else str(sev) # ------------------------------------------------------------------ # Read-only endpoints # ------------------------------------------------------------------ async def handle_health(request: web.Request) -> web.Response: """GET /api/health - System health metrics (live snapshot).""" collector = request.app["collector"] snapshot = await asyncio.to_thread(collector.get_snapshot) return _json(snapshot) async def handle_status(request: web.Request) -> web.Response: """GET /api/status - Protection status overview.""" store = request.app["store"] def _get() -> dict: threat_stats = store.get_threat_stats() scans = store.get_recent_scans(1) sig_stats = store.get_sig_stats() latest_metrics = store.get_latest_metrics() quarantine_count = 0 try: vault = request.app.get("vault") if vault: quarantine_count = vault.count() except Exception: pass try: import psutil uptime_secs = int(time.time() - psutil.boot_time()) except Exception: uptime_secs = 0 last_scan = scans[0] if scans else None return { "hostname": platform.node(), "os": f"{platform.system()} {platform.release()}", "arch": platform.machine(), "uptime_seconds": uptime_secs, "protection_active": True, "last_scan": last_scan, "threats": threat_stats, "signatures": sig_stats, "quarantine_count": quarantine_count, "metrics": latest_metrics, "server_time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), } data = await asyncio.to_thread(_get) return _json(data) async def handle_threats(request: web.Request) -> web.Response: """GET /api/threats?limit=50 - Recent threats list.""" store = request.app["store"] limit = _safe_int(request.query.get("limit", "50"), 50, max_val=500) threats = await asyncio.to_thread(store.get_recent_threats, limit) return _json({"threats": threats, "count": len(threats)}) async def handle_threat_stats(request: web.Request) -> web.Response: """GET /api/threat-stats - Threat statistics.""" store = request.app["store"] stats = await asyncio.to_thread(store.get_threat_stats) return _json(stats) async def handle_scans(request: web.Request) -> web.Response: """GET /api/scans?limit=30 - Recent scan history.""" store = request.app["store"] limit = _safe_int(request.query.get("limit", "30"), 30, max_val=500) scans = await asyncio.to_thread(store.get_recent_scans, limit) return _json({"scans": scans, "count": len(scans)}) async def handle_scan_chart(request: web.Request) -> web.Response: """GET /api/scan-chart?days=30 - Scan history chart data.""" store = request.app["store"] days = _safe_int(request.query.get("days", "30"), 30, max_val=365) data = await asyncio.to_thread(store.get_scan_chart_data, days) return _json({"chart": data}) async def handle_quarantine(request: web.Request) -> web.Response: """GET /api/quarantine - Quarantine vault status.""" vault = request.app.get("vault") if not vault: return _json({"count": 0, "items": [], "total_size": 0}) def _get() -> dict: items = vault.list_quarantined() total_size = sum( item.get("file_size", 0) or item.get("size", 0) for item in items ) return {"count": len(items), "items": items[:20], "total_size": total_size} data = await asyncio.to_thread(_get) return _json(data) async def handle_signatures(request: web.Request) -> web.Response: """GET /api/signatures - Signature database stats.""" store = request.app["store"] def _get() -> dict: sig_stats = store.get_sig_stats() config = request.app.get("config") if config: try: from ayn_antivirus.signatures.db.hash_db import HashDatabase from ayn_antivirus.signatures.db.ioc_db import IOCDatabase hdb = HashDatabase(config.db_path) hdb.initialize() idb = IOCDatabase(config.db_path) idb.initialize() sig_stats["db_hash_count"] = hdb.count() sig_stats["db_hash_stats"] = hdb.get_stats() sig_stats["db_ioc_stats"] = idb.get_stats() sig_stats["db_malicious_ips"] = len(idb.get_all_malicious_ips()) sig_stats["db_malicious_domains"] = len( idb.get_all_malicious_domains() ) hdb.close() idb.close() except Exception as exc: sig_stats["db_error"] = str(exc) return sig_stats data = await asyncio.to_thread(_get) return _json(data) async def handle_sig_updates(request: web.Request) -> web.Response: """GET /api/sig-updates?limit=20 - Recent signature update history.""" store = request.app["store"] limit = _safe_int(request.query.get("limit", "20"), 20, max_val=200) updates = await asyncio.to_thread(store.get_recent_sig_updates, limit) return _json({"updates": updates, "count": len(updates)}) async def handle_definitions(request: web.Request) -> web.Response: """GET /api/definitions - Full virus definition database view. Supports pagination (``page``, ``per_page``), search (``search``), and type filtering (``type=hash|ip|domain|url``). """ store = request.app["store"] config = request.app.get("config") page = _safe_int(request.query.get("page", "1"), 1, max_val=10000) per_page = _safe_int(request.query.get("per_page", "100"), 100, max_val=500) search = request.query.get("search", "").strip() filter_type = request.query.get("type", "").strip() def _get() -> dict: result: dict = { "hashes": [], "ips": [], "domains": [], "urls": [], "total_hashes": 0, "total_ips": 0, "total_domains": 0, "total_urls": 0, "page": page, "per_page": per_page, "feeds": [], "last_update": None, } if not config: return result try: from ayn_antivirus.signatures.db.hash_db import HashDatabase from ayn_antivirus.signatures.db.ioc_db import IOCDatabase hdb = HashDatabase(config.db_path) hdb.initialize() idb = IOCDatabase(config.db_path) idb.initialize() offset = (page - 1) * per_page conn = hdb.conn # Hash definitions if not filter_type or filter_type == "hash": if search: rows = conn.execute( "SELECT hash, threat_name, threat_type, severity, source, " "added_date, details FROM threats " "WHERE threat_name LIKE ? " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (f"%{search}%", per_page, offset), ).fetchall() else: rows = conn.execute( "SELECT hash, threat_name, threat_type, severity, source, " "added_date, details FROM threats " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (per_page, offset), ).fetchall() result["hashes"] = [dict(r) for r in rows] result["total_hashes"] = hdb.count() # IP definitions ioc_conn = idb.conn if not filter_type or filter_type == "ip": if search: rows = ioc_conn.execute( "SELECT ip, threat_name, type, source, added_date " "FROM ioc_ips " "WHERE ip LIKE ? OR threat_name LIKE ? " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (f"%{search}%", f"%{search}%", per_page, offset), ).fetchall() else: rows = ioc_conn.execute( "SELECT ip, threat_name, type, source, added_date " "FROM ioc_ips " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (per_page, offset), ).fetchall() result["ips"] = [dict(r) for r in rows] result["total_ips"] = ioc_conn.execute( "SELECT COUNT(*) FROM ioc_ips" ).fetchone()[0] # Domain definitions if not filter_type or filter_type == "domain": if search: rows = ioc_conn.execute( "SELECT domain, threat_name, type, source, added_date " "FROM ioc_domains " "WHERE domain LIKE ? OR threat_name LIKE ? " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (f"%{search}%", f"%{search}%", per_page, offset), ).fetchall() else: rows = ioc_conn.execute( "SELECT domain, threat_name, type, source, added_date " "FROM ioc_domains " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (per_page, offset), ).fetchall() result["domains"] = [dict(r) for r in rows] result["total_domains"] = ioc_conn.execute( "SELECT COUNT(*) FROM ioc_domains" ).fetchone()[0] # URL definitions if not filter_type or filter_type == "url": if search: rows = ioc_conn.execute( "SELECT url, threat_name, type, source, added_date " "FROM ioc_urls " "WHERE url LIKE ? OR threat_name LIKE ? " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (f"%{search}%", f"%{search}%", per_page, offset), ).fetchall() else: rows = ioc_conn.execute( "SELECT url, threat_name, type, source, added_date " "FROM ioc_urls " "ORDER BY added_date DESC LIMIT ? OFFSET ?", (per_page, offset), ).fetchall() result["urls"] = [dict(r) for r in rows] result["total_urls"] = ioc_conn.execute( "SELECT COUNT(*) FROM ioc_urls" ).fetchone()[0] # Feed info sig_updates = store.get_recent_sig_updates(20) result["feeds"] = sig_updates result["last_update"] = ( sig_updates[0]["timestamp"] if sig_updates else None ) hdb.close() idb.close() except Exception as exc: result["error"] = str(exc) logger.error("Error fetching definitions: %s", exc) return result data = await asyncio.to_thread(_get) return _json(data) async def handle_logs(request: web.Request) -> web.Response: """GET /api/logs?limit=20 - Recent activity logs.""" store = request.app["store"] limit = _safe_int(request.query.get("limit", "20"), 20, max_val=500) logs = await asyncio.to_thread(store.get_recent_logs, limit) return _json({"logs": logs, "count": len(logs)}) async def handle_metrics_history(request: web.Request) -> web.Response: """GET /api/metrics-history?hours=1 - Metrics time series.""" store = request.app["store"] hours = _safe_int(request.query.get("hours", "1"), 1, max_val=168) data = await asyncio.to_thread(store.get_metrics_history, hours) return _json({"metrics": data, "count": len(data)}) # ------------------------------------------------------------------ # Action handlers (trigger scans / updates) # ------------------------------------------------------------------ async def handle_action_quick_scan(request: web.Request) -> web.Response: """POST /api/actions/quick-scan - Trigger a quick scan.""" store = request.app["store"] store.log_activity("Quick scan triggered from dashboard", "INFO", "dashboard") def _run() -> dict: from ayn_antivirus.config import Config from ayn_antivirus.core.engine import ScanEngine config = request.app.get("config") or Config() engine = ScanEngine(config) result = engine.quick_scan() store.record_scan( scan_type="quick", scan_path=",".join(config.scan_paths), files_scanned=result.files_scanned, files_skipped=result.files_skipped, threats_found=len(result.threats), duration=result.duration_seconds, ) # Build detection list for AI analysis raw_threats = [] for t in result.threats: raw_threats.append({ "file_path": t.path, "threat_name": t.threat_name, "threat_type": _threat_type_str(t.threat_type), "severity": _severity_str(t.severity), "detector": t.detector_name, "file_hash": t.file_hash or "", "confidence": getattr(t, "confidence", 50), }) # AI filters out false positives verified = _ai_filter_threats(request.app, store, raw_threats) vault = request.app.get("vault") quarantined = 0 for t in verified: ai_action = t.get("ai_action", "quarantine") sev = t["severity"] qid = "" if ai_action in ("quarantine", "delete"): qid = _auto_quarantine(store, vault, t["file_path"], t["threat_name"], sev) action = "quarantined" if qid else ("monitoring" if ai_action == "monitor" else "detected") details = t.get("ai_reason", "") store.record_threat( file_path=t["file_path"], threat_name=t["threat_name"], threat_type=t["threat_type"], severity=sev, detector=t["detector"], file_hash=t.get("file_hash", ""), action=action, details=f"[AI: {t.get('ai_verdict','?')} {t.get('ai_confidence',0)}%] {details}", ) if qid: quarantined += 1 return { "status": "completed", "files_scanned": result.files_scanned, "threats_found": len(verified), "ai_dismissed": len(result.threats) - len(verified), "quarantined": quarantined, } try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Quick scan failed: %s", exc) store.log_activity(f"Quick scan failed: {exc}", "ERROR", "dashboard") return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_full_scan(request: web.Request) -> web.Response: """POST /api/actions/full-scan - Trigger a full scan.""" store = request.app["store"] store.log_activity("Full scan triggered from dashboard", "INFO", "dashboard") def _run() -> dict: from ayn_antivirus.config import Config from ayn_antivirus.core.engine import ScanEngine config = request.app.get("config") or Config() engine = ScanEngine(config) result = engine.full_scan() file_result = result.file_scan store.record_scan( scan_type="full", scan_path=",".join(config.scan_paths), files_scanned=file_result.files_scanned, files_skipped=file_result.files_skipped, threats_found=len(file_result.threats), duration=file_result.duration_seconds, ) raw_threats = [] for t in file_result.threats: raw_threats.append({ "file_path": t.path, "threat_name": t.threat_name, "threat_type": _threat_type_str(t.threat_type), "severity": _severity_str(t.severity), "detector": t.detector_name, "file_hash": t.file_hash or "", "confidence": getattr(t, "confidence", 50), }) verified = _ai_filter_threats(request.app, store, raw_threats) vault = request.app.get("vault") quarantined = 0 for t in verified: ai_action = t.get("ai_action", "quarantine") sev = t["severity"] qid = "" if ai_action in ("quarantine", "delete"): qid = _auto_quarantine(store, vault, t["file_path"], t["threat_name"], sev) action = "quarantined" if qid else ("monitoring" if ai_action == "monitor" else "detected") details = t.get("ai_reason", "") store.record_threat( file_path=t["file_path"], threat_name=t["threat_name"], threat_type=t["threat_type"], severity=sev, detector=t["detector"], file_hash=t.get("file_hash", ""), action=action, details=f"[AI: {t.get('ai_verdict','?')} {t.get('ai_confidence',0)}%] {details}", ) if qid: quarantined += 1 return { "status": "completed", "total_threats": result.total_threats, "ai_verified": len(verified), "ai_dismissed": len(raw_threats) - len(verified), "quarantined": quarantined, } try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Full scan failed: %s", exc) store.log_activity(f"Full scan failed: {exc}", "ERROR", "dashboard") return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_update_sigs(request: web.Request) -> web.Response: """POST /api/actions/update-sigs - Update all threat signature feeds.""" store = request.app["store"] store.log_activity( "Signature update triggered from dashboard", "INFO", "dashboard" ) def _run() -> dict: from ayn_antivirus.config import Config from ayn_antivirus.signatures.manager import SignatureManager config = request.app.get("config") or Config() manager = SignatureManager(config) summary = manager.update_all() # summary = {"feeds": {name: stats}, "total_new": int, "errors": [...]} for feed_name, feed_result in summary["feeds"].items(): if "error" in feed_result: store.record_sig_update( feed_name=feed_name, status="error", details=feed_result.get("error", ""), ) else: store.record_sig_update( feed_name=feed_name, hashes=feed_result.get("hashes", 0), ips=feed_result.get("ips", 0), domains=feed_result.get("domains", 0), urls=feed_result.get("urls", 0), status="success", details=json.dumps(feed_result), ) manager.close() store.log_activity( f"Signature update completed: {len(summary['feeds'])} feeds, " f"{summary['total_new']} new entries", "INFO", "signatures", ) return { "status": "completed", "feeds_updated": len(summary["feeds"]) - len(summary["errors"]), "total_new": summary["total_new"], "errors": summary["errors"], } try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Signature update failed: %s", exc) store.log_activity( f"Signature update failed: {exc}", "ERROR", "signatures" ) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_update_feed(request: web.Request) -> web.Response: """POST /api/actions/update-feed - Update a single feed. Body: ``{"feed": "malwarebazaar"}`` """ store = request.app["store"] try: body = await request.json() feed_name = body.get("feed", "") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not feed_name: return _json({"error": "Missing 'feed' parameter"}, 400) store.log_activity( f"Single feed update triggered: {feed_name}", "INFO", "dashboard" ) def _run() -> dict: from ayn_antivirus.config import Config from ayn_antivirus.signatures.manager import SignatureManager config = request.app.get("config") or Config() manager = SignatureManager(config) result = manager.update_feed(feed_name) store.record_sig_update( feed_name=feed_name, hashes=result.get("hashes", 0), ips=result.get("ips", 0), domains=result.get("domains", 0), urls=result.get("urls", 0), status="success", details=json.dumps(result), ) manager.close() return {"status": "completed", "feed": feed_name, "result": result} try: data = await asyncio.to_thread(_run) return _json(data) except KeyError as exc: return _json({"status": "error", "error": str(exc)}, 404) except Exception as exc: logger.error("Feed update failed for %s: %s", feed_name, exc) return _json({"status": "error", "error": str(exc)}, 500) # ------------------------------------------------------------------ # Threat action handlers (quarantine / delete / whitelist) # ------------------------------------------------------------------ async def handle_action_quarantine(request: web.Request) -> web.Response: """POST /api/actions/quarantine — Move a file to encrypted quarantine vault. Body: ``{"file_path": "/path/to/file", "threat_id": 5}`` """ store = request.app["store"] vault = request.app.get("vault") if not vault: return _json({"status": "error", "error": "Quarantine vault not available"}, 500) try: body = await request.json() file_path = body.get("file_path", "").strip() threat_id = body.get("threat_id") threat_name = body.get("threat_name", "Unknown") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not file_path: return _json({"error": "Missing 'file_path'"}, 400) def _run() -> dict: import os if not os.path.exists(file_path): return {"status": "error", "error": f"File not found: {file_path}"} qid = vault.quarantine_file( file_path=file_path, threat_name=threat_name, threat_type="detected", severity="HIGH", ) if threat_id: store.conn.execute( "UPDATE threat_log SET action_taken='quarantined' WHERE id=?", (threat_id,), ) store.conn.commit() store.log_activity( f"Quarantined: {file_path} ({threat_name}) -> {qid}", "WARNING", "quarantine", ) return {"status": "ok", "quarantine_id": qid, "file_path": file_path} try: data = await asyncio.to_thread(_run) return _json(data, 200 if data.get("status") == "ok" else 400) except Exception as exc: logger.error("Quarantine failed: %s", exc) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_delete_threat(request: web.Request) -> web.Response: """POST /api/actions/delete-threat — Permanently delete a malicious file. Body: ``{"file_path": "/path/to/file", "threat_id": 5}`` """ store = request.app["store"] try: body = await request.json() file_path = body.get("file_path", "").strip() threat_id = body.get("threat_id") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not file_path: return _json({"error": "Missing 'file_path'"}, 400) def _run() -> dict: import os if not os.path.exists(file_path): if threat_id: store.conn.execute( "UPDATE threat_log SET action_taken='deleted' WHERE id=?", (threat_id,), ) store.conn.commit() return {"status": "ok", "message": "File already gone", "file_path": file_path} os.remove(file_path) if threat_id: store.conn.execute( "UPDATE threat_log SET action_taken='deleted' WHERE id=?", (threat_id,), ) store.conn.commit() store.log_activity( f"Deleted threat file: {file_path}", "WARNING", "action", ) return {"status": "ok", "file_path": file_path} try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Delete failed: %s", exc) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_whitelist(request: web.Request) -> web.Response: """POST /api/actions/whitelist — Mark a threat as false positive. Body: ``{"threat_id": 5}`` """ store = request.app["store"] try: body = await request.json() threat_id = body.get("threat_id") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not threat_id: return _json({"error": "Missing 'threat_id'"}, 400) def _run() -> dict: row = store.conn.execute( "SELECT file_path, threat_name, file_hash FROM threat_log WHERE id=?", (threat_id,), ).fetchone() if not row: return {"status": "error", "error": "Threat not found"} store.conn.execute( "UPDATE threat_log SET action_taken='whitelisted' WHERE id=?", (threat_id,), ) store.conn.commit() store.log_activity( f"Whitelisted: {row['file_path']} ({row['threat_name']})", "INFO", "action", ) return {"status": "ok", "threat_id": threat_id} try: data = await asyncio.to_thread(_run) return _json(data, 200 if data.get("status") == "ok" else 400) except Exception as exc: logger.error("Whitelist failed: %s", exc) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_ai_analyze(request: web.Request) -> web.Response: """POST /api/actions/ai-analyze — Run AI analysis on a specific threat. Body: ``{"threat_id": 5}`` """ store = request.app["store"] try: body = await request.json() threat_id = body.get("threat_id") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not threat_id: return _json({"error": "Missing 'threat_id'"}, 400) def _run() -> dict: row = store.conn.execute( "SELECT * FROM threat_log WHERE id=?", (threat_id,), ).fetchone() if not row: return {"status": "error", "error": "Threat not found"} ai = _get_ai_analyzer(request.app) if not ai or not ai.available: return {"status": "error", "error": "AI not configured. Set ANTHROPIC_API_KEY."} r = dict(row) verdict = ai.analyze( file_path=r["file_path"], threat_name=r["threat_name"], threat_type=r["threat_type"], severity=r["severity"], detector=r["detector"], ) store.conn.execute( "UPDATE threat_log SET details=? WHERE id=?", (f"[AI: {verdict.verdict} {verdict.confidence}%] {verdict.reason}", threat_id), ) store.conn.commit() store.log_activity( f"AI analyzed #{threat_id}: {verdict.verdict} — {verdict.reason}", "INFO", "ai_analyzer", ) return { "status": "ok", "verdict": verdict.verdict, "confidence": verdict.confidence, "reason": verdict.reason, "recommended_action": verdict.recommended_action, } try: data = await asyncio.to_thread(_run) return _json(data, 200 if data.get("status") == "ok" else 400) except Exception as exc: logger.error("AI analysis failed: %s", exc) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_restore(request: web.Request) -> web.Response: """POST /api/actions/restore — Restore a quarantined file. Body: ``{"file_path": "/original/path", "threat_id": 5}`` """ store = request.app["store"] vault = request.app.get("vault") if not vault: return _json({"status": "error", "error": "Vault not available"}, 500) try: body = await request.json() file_path = body.get("file_path", "").strip() threat_id = body.get("threat_id") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not file_path: return _json({"error": "Missing 'file_path'"}, 400) def _run() -> dict: items = vault.list_quarantined() qid = None for item in items: if item.get("original_path") == file_path: qid = item.get("id") break if not qid: return {"status": "error", "error": f"No quarantine entry for {file_path}"} vault.restore_file(qid) if threat_id: store.conn.execute( "UPDATE threat_log SET action_taken='restored' WHERE id=?", (threat_id,), ) store.conn.commit() store.log_activity( f"Restored from quarantine: {file_path}", "WARNING", "quarantine", ) return {"status": "ok", "file_path": file_path} try: data = await asyncio.to_thread(_run) return _json(data, 200 if data.get("status") == "ok" else 400) except Exception as exc: logger.error("Restore failed: %s", exc) return _json({"status": "error", "error": str(exc)}, 500) # ------------------------------------------------------------------ # Container endpoints # ------------------------------------------------------------------ async def handle_containers(request: web.Request) -> web.Response: """GET /api/containers - List all containers across runtimes.""" def _get() -> dict: from ayn_antivirus.scanners.container_scanner import ContainerScanner scanner = ContainerScanner() containers = scanner.list_containers( runtime="all", include_stopped=True, ) return { "containers": [c.to_dict() for c in containers], "count": len(containers), "runtimes": scanner.available_runtimes, } data = await asyncio.to_thread(_get) return _json(data) async def handle_container_scan_results(request: web.Request) -> web.Response: """GET /api/container-scan - Recent container scan results from store.""" store = request.app["store"] def _get() -> dict: scans = store.conn.execute( "SELECT * FROM scan_history " "WHERE scan_type LIKE 'container%' " "ORDER BY id DESC LIMIT 10", ).fetchall() threats = store.conn.execute( "SELECT * FROM threat_log WHERE " "LOWER(threat_type) IN ('miner','misconfiguration','rootkit') " "OR LOWER(detector) = 'container_scanner' " "ORDER BY id DESC LIMIT 50", ).fetchall() return { "scans": [dict(r) for r in scans], "threats": [dict(t) for t in threats], } data = await asyncio.to_thread(_get) return _json(data) async def handle_action_scan_containers(request: web.Request) -> web.Response: """POST /api/actions/scan-containers - Scan all containers.""" store = request.app["store"] store.log_activity( "Container scan triggered from dashboard", "INFO", "dashboard", ) def _run() -> dict: from ayn_antivirus.scanners.container_scanner import ContainerScanner scanner = ContainerScanner() result = scanner.scan("all") store.record_scan( scan_type="container-full", scan_path="all-containers", files_scanned=result.containers_scanned, files_skipped=0, threats_found=len(result.threats), duration=result.duration_seconds, status=( "completed" if not result.errors else "completed_with_errors" ), ) for t in result.threats: store.record_threat( file_path=t.file_path or f"container:{t.container_name}", threat_name=t.threat_name, threat_type=t.threat_type, severity=t.severity, detector="container_scanner", file_hash="", action="detected", details=f"[{t.runtime}] {t.container_name}: {t.details}", ) store.log_activity( f"Container scan complete: {result.containers_found} found, " f"{result.containers_scanned} scanned, " f"{len(result.threats)} threats", "INFO", "container_scanner", ) return result.to_dict() try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Container scan failed: %s", exc) store.log_activity( f"Container scan failed: {exc}", "ERROR", "container_scanner", ) return _json({"status": "error", "error": str(exc)}, 500) async def handle_action_scan_single_container( request: web.Request, ) -> web.Response: """POST /api/actions/scan-container - Scan a single container. Body: ``{"container_id": "abc123"}`` """ store = request.app["store"] try: body = await request.json() container_id = body.get("container_id", "") except Exception: return _json({"error": "Invalid JSON body"}, 400) if not container_id: return _json({"error": "Missing 'container_id'"}, 400) store.log_activity( f"Single container scan: {container_id}", "INFO", "dashboard", ) def _run() -> dict: from ayn_antivirus.scanners.container_scanner import ContainerScanner scanner = ContainerScanner() result = scanner.scan_container(container_id) store.record_scan( scan_type="container-single", scan_path=container_id, files_scanned=result.containers_scanned, files_skipped=0, threats_found=len(result.threats), duration=result.duration_seconds, ) for t in result.threats: store.record_threat( file_path=t.file_path or f"container:{t.container_name}", threat_name=t.threat_name, threat_type=t.threat_type, severity=t.severity, detector="container_scanner", details=f"[{t.runtime}] {t.container_name}: {t.details}", ) return result.to_dict() try: data = await asyncio.to_thread(_run) return _json(data) except Exception as exc: logger.error("Container scan failed for %s: %s", container_id, exc) return _json({"status": "error", "error": str(exc)}, 500)