Files

260 lines
8.8 KiB
Python

"""SQLite-backed Indicator of Compromise (IOC) database for AYN Antivirus.
Stores malicious IPs, domains, and URLs sourced from threat-intelligence
feeds so that the network scanner and detectors can perform real-time
lookups.
"""
from __future__ import annotations
import logging
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
from ayn_antivirus.constants import DEFAULT_DB_PATH
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
_SCHEMA = """
CREATE TABLE IF NOT EXISTS ioc_ips (
ip TEXT PRIMARY KEY,
threat_name TEXT NOT NULL DEFAULT '',
type TEXT NOT NULL DEFAULT 'C2',
source TEXT NOT NULL DEFAULT '',
added_date TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_ioc_ips_source ON ioc_ips(source);
CREATE TABLE IF NOT EXISTS ioc_domains (
domain TEXT PRIMARY KEY,
threat_name TEXT NOT NULL DEFAULT '',
type TEXT NOT NULL DEFAULT 'C2',
source TEXT NOT NULL DEFAULT '',
added_date TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_ioc_domains_source ON ioc_domains(source);
CREATE TABLE IF NOT EXISTS ioc_urls (
url TEXT PRIMARY KEY,
threat_name TEXT NOT NULL DEFAULT '',
type TEXT NOT NULL DEFAULT 'malware_distribution',
source TEXT NOT NULL DEFAULT '',
added_date TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_ioc_urls_source ON ioc_urls(source);
"""
class IOCDatabase:
"""Manage a local SQLite store of Indicators of Compromise.
Parameters
----------
db_path:
Path to the SQLite file. Shares the same file as
:class:`HashDatabase` by default; each uses its own tables.
"""
_VALID_TABLES: frozenset = frozenset({"ioc_ips", "ioc_domains", "ioc_urls"})
def __init__(self, db_path: str | Path = DEFAULT_DB_PATH) -> None:
self.db_path = Path(db_path)
self._conn: Optional[sqlite3.Connection] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def initialize(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
self._conn.commit()
logger.info(
"IOCDatabase opened: %s (IPs=%d, domains=%d, URLs=%d)",
self.db_path,
self._count("ioc_ips"),
self._count("ioc_domains"),
self._count("ioc_urls"),
)
def close(self) -> None:
if self._conn:
self._conn.close()
self._conn = None
@property
def conn(self) -> sqlite3.Connection:
if self._conn is None:
self.initialize()
assert self._conn is not None
return self._conn
def _count(self, table: str) -> int:
if table not in self._VALID_TABLES:
raise ValueError(f"Invalid table name: {table}")
return self.conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
# ------------------------------------------------------------------
# IPs
# ------------------------------------------------------------------
def add_ip(
self,
ip: str,
threat_name: str = "",
type: str = "C2",
source: str = "",
) -> None:
self.conn.execute(
"INSERT OR REPLACE INTO ioc_ips (ip, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
(ip, threat_name, type, source, datetime.utcnow().isoformat()),
)
self.conn.commit()
def bulk_add_ips(
self,
records: Sequence[Tuple[str, str, str, str]],
) -> int:
"""Bulk-insert IPs. Each tuple: ``(ip, threat_name, type, source)``.
Returns the number of **new** rows actually inserted.
"""
if not records:
return 0
now = datetime.utcnow().isoformat()
rows = [(ip, tn, t, src, now) for ip, tn, t, src in records]
before = self._count("ioc_ips")
self.conn.executemany(
"INSERT OR IGNORE INTO ioc_ips (ip, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
rows,
)
self.conn.commit()
return self._count("ioc_ips") - before
def lookup_ip(self, ip: str) -> Optional[Dict[str, Any]]:
row = self.conn.execute(
"SELECT * FROM ioc_ips WHERE ip = ?", (ip,)
).fetchone()
return dict(row) if row else None
def get_all_malicious_ips(self) -> Set[str]:
"""Return every stored malicious IP as a set for fast membership tests."""
rows = self.conn.execute("SELECT ip FROM ioc_ips").fetchall()
return {row[0] for row in rows}
# ------------------------------------------------------------------
# Domains
# ------------------------------------------------------------------
def add_domain(
self,
domain: str,
threat_name: str = "",
type: str = "C2",
source: str = "",
) -> None:
self.conn.execute(
"INSERT OR REPLACE INTO ioc_domains (domain, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
(domain.lower(), threat_name, type, source, datetime.utcnow().isoformat()),
)
self.conn.commit()
def bulk_add_domains(
self,
records: Sequence[Tuple[str, str, str, str]],
) -> int:
"""Bulk-insert domains. Each tuple: ``(domain, threat_name, type, source)``.
Returns the number of **new** rows actually inserted.
"""
if not records:
return 0
now = datetime.utcnow().isoformat()
rows = [(d.lower(), tn, t, src, now) for d, tn, t, src in records]
before = self._count("ioc_domains")
self.conn.executemany(
"INSERT OR IGNORE INTO ioc_domains (domain, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
rows,
)
self.conn.commit()
return self._count("ioc_domains") - before
def lookup_domain(self, domain: str) -> Optional[Dict[str, Any]]:
row = self.conn.execute(
"SELECT * FROM ioc_domains WHERE domain = ?", (domain.lower(),)
).fetchone()
return dict(row) if row else None
def get_all_malicious_domains(self) -> Set[str]:
"""Return every stored malicious domain as a set."""
rows = self.conn.execute("SELECT domain FROM ioc_domains").fetchall()
return {row[0] for row in rows}
# ------------------------------------------------------------------
# URLs
# ------------------------------------------------------------------
def add_url(
self,
url: str,
threat_name: str = "",
type: str = "malware_distribution",
source: str = "",
) -> None:
self.conn.execute(
"INSERT OR REPLACE INTO ioc_urls (url, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
(url, threat_name, type, source, datetime.utcnow().isoformat()),
)
self.conn.commit()
def bulk_add_urls(
self,
records: Sequence[Tuple[str, str, str, str]],
) -> int:
"""Bulk-insert URLs. Each tuple: ``(url, threat_name, type, source)``.
Returns the number of **new** rows actually inserted.
"""
if not records:
return 0
now = datetime.utcnow().isoformat()
rows = [(u, tn, t, src, now) for u, tn, t, src in records]
before = self._count("ioc_urls")
self.conn.executemany(
"INSERT OR IGNORE INTO ioc_urls (url, threat_name, type, source, added_date) "
"VALUES (?, ?, ?, ?, ?)",
rows,
)
self.conn.commit()
return self._count("ioc_urls") - before
def lookup_url(self, url: str) -> Optional[Dict[str, Any]]:
row = self.conn.execute(
"SELECT * FROM ioc_urls WHERE url = ?", (url,)
).fetchone()
return dict(row) if row else None
# ------------------------------------------------------------------
# Aggregate stats
# ------------------------------------------------------------------
def get_stats(self) -> Dict[str, Any]:
return {
"ips": self._count("ioc_ips"),
"domains": self._count("ioc_domains"),
"urls": self._count("ioc_urls"),
}