387 lines
14 KiB
Python
387 lines
14 KiB
Python
"""Persistent storage for dashboard metrics, threat logs, and scan history."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from ayn_antivirus.constants import (
|
|
DASHBOARD_MAX_THREATS_DISPLAY,
|
|
DASHBOARD_METRIC_RETENTION_HOURS,
|
|
DASHBOARD_SCAN_HISTORY_DAYS,
|
|
DEFAULT_DASHBOARD_DB_PATH,
|
|
)
|
|
|
|
|
|
class DashboardStore:
|
|
"""SQLite-backed store for all dashboard data.
|
|
|
|
Parameters
|
|
----------
|
|
db_path:
|
|
Path to the SQLite database file. Created automatically if it
|
|
does not exist.
|
|
"""
|
|
|
|
def __init__(self, db_path: str = DEFAULT_DASHBOARD_DB_PATH) -> None:
|
|
os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
|
|
self.db_path = db_path
|
|
self._lock = threading.RLock()
|
|
self.conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
self.conn.execute("PRAGMA journal_mode=WAL")
|
|
self.conn.execute("PRAGMA synchronous=NORMAL")
|
|
self._create_tables()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Schema
|
|
# ------------------------------------------------------------------
|
|
|
|
def _create_tables(self) -> None:
|
|
self.conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS metrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
cpu_percent REAL DEFAULT 0,
|
|
mem_percent REAL DEFAULT 0,
|
|
mem_used INTEGER DEFAULT 0,
|
|
mem_total INTEGER DEFAULT 0,
|
|
disk_usage_json TEXT DEFAULT '[]',
|
|
load_avg_json TEXT DEFAULT '[]',
|
|
net_connections INTEGER DEFAULT 0
|
|
);
|
|
CREATE TABLE IF NOT EXISTS threat_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
file_path TEXT,
|
|
threat_name TEXT NOT NULL,
|
|
threat_type TEXT NOT NULL,
|
|
severity TEXT NOT NULL,
|
|
detector TEXT,
|
|
file_hash TEXT,
|
|
action_taken TEXT DEFAULT 'detected',
|
|
details TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS scan_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
scan_type TEXT NOT NULL,
|
|
scan_path TEXT,
|
|
files_scanned INTEGER DEFAULT 0,
|
|
files_skipped INTEGER DEFAULT 0,
|
|
threats_found INTEGER DEFAULT 0,
|
|
duration_seconds REAL DEFAULT 0,
|
|
status TEXT DEFAULT 'completed'
|
|
);
|
|
CREATE TABLE IF NOT EXISTS signature_updates (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
feed_name TEXT NOT NULL,
|
|
hashes_added INTEGER DEFAULT 0,
|
|
ips_added INTEGER DEFAULT 0,
|
|
domains_added INTEGER DEFAULT 0,
|
|
urls_added INTEGER DEFAULT 0,
|
|
status TEXT DEFAULT 'success',
|
|
details TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS activity_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
|
level TEXT NOT NULL DEFAULT 'INFO',
|
|
source TEXT,
|
|
message TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_metrics_ts ON metrics(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_threats_ts ON threat_log(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_threats_severity ON threat_log(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_scans_ts ON scan_history(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_sigs_ts ON signature_updates(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_activity_ts ON activity_log(timestamp);
|
|
""")
|
|
self.conn.commit()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Metrics
|
|
# ------------------------------------------------------------------
|
|
|
|
def record_metric(
|
|
self,
|
|
cpu: float,
|
|
mem_pct: float,
|
|
mem_used: int,
|
|
mem_total: int,
|
|
disk_usage: list,
|
|
load_avg: list,
|
|
net_conns: int,
|
|
) -> None:
|
|
with self._lock:
|
|
self.conn.execute(
|
|
"INSERT INTO metrics "
|
|
"(cpu_percent, mem_percent, mem_used, mem_total, "
|
|
"disk_usage_json, load_avg_json, net_connections) "
|
|
"VALUES (?,?,?,?,?,?,?)",
|
|
(cpu, mem_pct, mem_used, mem_total,
|
|
json.dumps(disk_usage), json.dumps(load_avg), net_conns),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_latest_metrics(self) -> Optional[Dict[str, Any]]:
|
|
with self._lock:
|
|
row = self.conn.execute(
|
|
"SELECT * FROM metrics ORDER BY id DESC LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
d = dict(row)
|
|
d["disk_usage"] = json.loads(d.pop("disk_usage_json", "[]"))
|
|
d["load_avg"] = json.loads(d.pop("load_avg_json", "[]"))
|
|
return d
|
|
|
|
def get_metrics_history(self, hours: int = 1) -> List[Dict[str, Any]]:
|
|
cutoff = (datetime.utcnow() - timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM metrics WHERE timestamp >= ? ORDER BY timestamp",
|
|
(cutoff,),
|
|
).fetchall()
|
|
result: List[Dict[str, Any]] = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["disk_usage"] = json.loads(d.pop("disk_usage_json", "[]"))
|
|
d["load_avg"] = json.loads(d.pop("load_avg_json", "[]"))
|
|
result.append(d)
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Threats
|
|
# ------------------------------------------------------------------
|
|
|
|
def record_threat(
|
|
self,
|
|
file_path: str,
|
|
threat_name: str,
|
|
threat_type: str,
|
|
severity: str,
|
|
detector: str = "",
|
|
file_hash: str = "",
|
|
action: str = "detected",
|
|
details: str = "",
|
|
) -> None:
|
|
with self._lock:
|
|
self.conn.execute(
|
|
"INSERT INTO threat_log "
|
|
"(file_path, threat_name, threat_type, severity, "
|
|
"detector, file_hash, action_taken, details) "
|
|
"VALUES (?,?,?,?,?,?,?,?)",
|
|
(file_path, threat_name, threat_type, severity,
|
|
detector, file_hash, action, details),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_recent_threats(
|
|
self, limit: int = DASHBOARD_MAX_THREATS_DISPLAY,
|
|
) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM threat_log ORDER BY id DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_threat_stats(self) -> Dict[str, Any]:
|
|
with self._lock:
|
|
total = self.conn.execute(
|
|
"SELECT COUNT(*) FROM threat_log"
|
|
).fetchone()[0]
|
|
|
|
by_severity: Dict[str, int] = {}
|
|
for row in self.conn.execute(
|
|
"SELECT severity, COUNT(*) as cnt FROM threat_log GROUP BY severity"
|
|
):
|
|
by_severity[row[0]] = row[1]
|
|
|
|
cutoff_24h = (datetime.utcnow() - timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
|
|
cutoff_7d = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
last_24h = self.conn.execute(
|
|
"SELECT COUNT(*) FROM threat_log WHERE timestamp >= ?",
|
|
(cutoff_24h,),
|
|
).fetchone()[0]
|
|
last_7d = self.conn.execute(
|
|
"SELECT COUNT(*) FROM threat_log WHERE timestamp >= ?",
|
|
(cutoff_7d,),
|
|
).fetchone()[0]
|
|
|
|
return {
|
|
"total": total,
|
|
"by_severity": by_severity,
|
|
"last_24h": last_24h,
|
|
"last_7d": last_7d,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Scans
|
|
# ------------------------------------------------------------------
|
|
|
|
def record_scan(
|
|
self,
|
|
scan_type: str,
|
|
scan_path: str,
|
|
files_scanned: int,
|
|
files_skipped: int,
|
|
threats_found: int,
|
|
duration: float,
|
|
status: str = "completed",
|
|
) -> None:
|
|
with self._lock:
|
|
self.conn.execute(
|
|
"INSERT INTO scan_history "
|
|
"(scan_type, scan_path, files_scanned, files_skipped, "
|
|
"threats_found, duration_seconds, status) "
|
|
"VALUES (?,?,?,?,?,?,?)",
|
|
(scan_type, scan_path, files_scanned, files_skipped,
|
|
threats_found, duration, status),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_recent_scans(self, limit: int = 30) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM scan_history ORDER BY id DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_scan_chart_data(
|
|
self, days: int = DASHBOARD_SCAN_HISTORY_DAYS,
|
|
) -> List[Dict[str, Any]]:
|
|
cutoff = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT DATE(timestamp) as day, "
|
|
"COUNT(*) as scans, "
|
|
"SUM(threats_found) as threats, "
|
|
"SUM(files_scanned) as files "
|
|
"FROM scan_history WHERE timestamp >= ? "
|
|
"GROUP BY DATE(timestamp) ORDER BY day",
|
|
(cutoff,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Signature Updates
|
|
# ------------------------------------------------------------------
|
|
|
|
def record_sig_update(
|
|
self,
|
|
feed_name: str,
|
|
hashes: int = 0,
|
|
ips: int = 0,
|
|
domains: int = 0,
|
|
urls: int = 0,
|
|
status: str = "success",
|
|
details: str = "",
|
|
) -> None:
|
|
with self._lock:
|
|
self.conn.execute(
|
|
"INSERT INTO signature_updates "
|
|
"(feed_name, hashes_added, ips_added, domains_added, "
|
|
"urls_added, status, details) "
|
|
"VALUES (?,?,?,?,?,?,?)",
|
|
(feed_name, hashes, ips, domains, urls, status, details),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_recent_sig_updates(self, limit: int = 20) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM signature_updates ORDER BY id DESC LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
def get_sig_stats(self) -> Dict[str, Any]:
|
|
"""Return signature stats from the actual signatures database."""
|
|
result = {
|
|
"total_hashes": 0,
|
|
"total_ips": 0,
|
|
"total_domains": 0,
|
|
"total_urls": 0,
|
|
"last_update": None,
|
|
}
|
|
# Try to read live counts from the signatures DB
|
|
sig_db_path = self.db_path.replace("dashboard.db", "signatures.db")
|
|
try:
|
|
import sqlite3 as _sql
|
|
sdb = _sql.connect(sig_db_path)
|
|
sdb.row_factory = _sql.Row
|
|
for tbl, key in [("threats", "total_hashes"), ("ioc_ips", "total_ips"),
|
|
("ioc_domains", "total_domains"), ("ioc_urls", "total_urls")]:
|
|
try:
|
|
result[key] = sdb.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0]
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ts = sdb.execute("SELECT MAX(added_date) FROM threats").fetchone()[0]
|
|
result["last_update"] = ts
|
|
except Exception:
|
|
pass
|
|
sdb.close()
|
|
except Exception:
|
|
# Fallback to dashboard update log
|
|
with self._lock:
|
|
row = self.conn.execute(
|
|
"SELECT SUM(hashes_added), SUM(ips_added), "
|
|
"SUM(domains_added), SUM(urls_added) FROM signature_updates"
|
|
).fetchone()
|
|
result["total_hashes"] = row[0] or 0
|
|
result["total_ips"] = row[1] or 0
|
|
result["total_domains"] = row[2] or 0
|
|
result["total_urls"] = row[3] or 0
|
|
lu = self.conn.execute(
|
|
"SELECT MAX(timestamp) FROM signature_updates"
|
|
).fetchone()[0]
|
|
result["last_update"] = lu
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Activity Log
|
|
# ------------------------------------------------------------------
|
|
|
|
def log_activity(
|
|
self,
|
|
message: str,
|
|
level: str = "INFO",
|
|
source: str = "system",
|
|
) -> None:
|
|
with self._lock:
|
|
self.conn.execute(
|
|
"INSERT INTO activity_log (level, source, message) VALUES (?,?,?)",
|
|
(level, source, message),
|
|
)
|
|
self.conn.commit()
|
|
|
|
def get_recent_logs(self, limit: int = 20) -> List[Dict[str, Any]]:
|
|
with self._lock:
|
|
rows = self.conn.execute(
|
|
"SELECT * FROM activity_log ORDER BY id DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cleanup
|
|
# ------------------------------------------------------------------
|
|
|
|
def cleanup_old_metrics(
|
|
self, hours: int = DASHBOARD_METRIC_RETENTION_HOURS,
|
|
) -> None:
|
|
cutoff = (datetime.utcnow() - timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
|
|
with self._lock:
|
|
self.conn.execute("DELETE FROM metrics WHERE timestamp < ?", (cutoff,))
|
|
self.conn.commit()
|
|
|
|
def close(self) -> None:
|
|
self.conn.close()
|