"""AYN Antivirus — Signature-based Detector. Looks up file hashes against the threat signature database populated by the feed update pipeline (MalwareBazaar, ThreatFox, etc.). Uses :class:`~ayn_antivirus.signatures.db.hash_db.HashDatabase` so that definitions written by ``ayn-antivirus update`` are immediately available for detection. """ from __future__ import annotations import logging from pathlib import Path from typing import Dict, List, Optional from ayn_antivirus.constants import DEFAULT_DB_PATH from ayn_antivirus.detectors.base import BaseDetector, DetectionResult from ayn_antivirus.utils.helpers import hash_file as _hash_file_util logger = logging.getLogger("ayn_antivirus.detectors.signature") _VALID_SEVERITIES = {"CRITICAL", "HIGH", "MEDIUM", "LOW"} class SignatureDetector(BaseDetector): """Detect known malware by matching file hashes against the signature DB. Parameters ---------- db_path: Path to the shared SQLite database that holds the ``threats``, ``ioc_ips``, ``ioc_domains``, and ``ioc_urls`` tables. """ def __init__(self, db_path: str | Path = DEFAULT_DB_PATH) -> None: self.db_path = str(db_path) self._hash_db = None self._ioc_db = None self._loaded = False # ------------------------------------------------------------------ # BaseDetector interface # ------------------------------------------------------------------ @property def name(self) -> str: return "signature_detector" @property def description(self) -> str: return "Hash-based signature detection using threat intelligence feeds" def detect( self, file_path: str | Path, file_content: Optional[bytes] = None, file_hash: Optional[str] = None, ) -> List[DetectionResult]: """Check the file's hash against the ``threats`` table. If *file_hash* is not supplied it is computed on the fly. """ self._ensure_loaded() results: List[DetectionResult] = [] if not self._hash_db: return results # Compute hash if not provided. if not file_hash: try: file_hash = _hash_file_util(str(file_path), algo="sha256") except Exception: return results # Also compute MD5 for VirusShare lookups. md5_hash = None try: md5_hash = _hash_file_util(str(file_path), algo="md5") except Exception: pass # Look up SHA256 first, then MD5. threat = self._hash_db.lookup(file_hash) if not threat and md5_hash: threat = self._hash_db.lookup(md5_hash) if threat: severity = (threat.get("severity") or "HIGH").upper() if severity not in _VALID_SEVERITIES: severity = "HIGH" results.append(DetectionResult( threat_name=threat.get("threat_name", "Malware.Known"), threat_type=threat.get("threat_type", "MALWARE"), severity=severity, confidence=100, details=( f"Known threat signature match " f"(source: {threat.get('source', 'unknown')}). " f"Hash: {file_hash[:16]}... " f"Details: {threat.get('details', '')}" ), detector_name=self.name, )) return results # ------------------------------------------------------------------ # IOC lookup helpers (used by engine for network enrichment) # ------------------------------------------------------------------ def lookup_hash(self, file_hash: str) -> Optional[Dict]: """Look up a single hash. Returns threat info dict or ``None``.""" self._ensure_loaded() if not self._hash_db: return None return self._hash_db.lookup(file_hash) def lookup_ip(self, ip: str) -> Optional[Dict]: """Look up an IP against the IOC database.""" self._ensure_loaded() if not self._ioc_db: return None return self._ioc_db.lookup_ip(ip) def lookup_domain(self, domain: str) -> Optional[Dict]: """Look up a domain against the IOC database.""" self._ensure_loaded() if not self._ioc_db: return None return self._ioc_db.lookup_domain(domain) # ------------------------------------------------------------------ # Statistics # ------------------------------------------------------------------ def get_stats(self) -> Dict: """Return signature / IOC database statistics.""" self._ensure_loaded() stats: Dict = {"hash_count": 0, "loaded": self._loaded} if self._hash_db: stats["hash_count"] = self._hash_db.count() stats.update(self._hash_db.get_stats()) if self._ioc_db: stats["ioc_ips"] = len(self._ioc_db.get_all_malicious_ips()) stats["ioc_domains"] = len(self._ioc_db.get_all_malicious_domains()) return stats @property def signature_count(self) -> int: """Number of hash signatures currently loaded.""" self._ensure_loaded() return self._hash_db.count() if self._hash_db else 0 # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def close(self) -> None: """Close database connections.""" if self._hash_db: self._hash_db.close() self._hash_db = None if self._ioc_db: self._ioc_db.close() self._ioc_db = None self._loaded = False # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _ensure_loaded(self) -> None: """Lazy-load the database connections on first use.""" if self._loaded: return if not self.db_path: logger.warning("No signature DB path configured") self._loaded = True return try: from ayn_antivirus.signatures.db.hash_db import HashDatabase from ayn_antivirus.signatures.db.ioc_db import IOCDatabase self._hash_db = HashDatabase(self.db_path) self._hash_db.initialize() self._ioc_db = IOCDatabase(self.db_path) self._ioc_db.initialize() count = self._hash_db.count() logger.info("Signature DB loaded: %d hash signatures", count) except Exception as exc: logger.error("Failed to load signature DB: %s", exc) self._loaded = True