"""Real-time file-system monitor for AYN Antivirus. Uses the ``watchdog`` library to observe directories for file creation, modification, and move events, then immediately scans the affected files through the :class:`ScanEngine`. Supports debouncing, auto-quarantine, and thread-safe operation. """ from __future__ import annotations import logging import threading import time from pathlib import Path from typing import Any, Dict, List, Optional, Set from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer from ayn_antivirus.config import Config from ayn_antivirus.core.engine import ScanEngine, FileScanResult from ayn_antivirus.core.event_bus import EventType, event_bus from ayn_antivirus.quarantine.vault import QuarantineVault logger = logging.getLogger(__name__) # File suffixes that are almost always transient / editor artefacts. _SKIP_SUFFIXES = frozenset(( ".tmp", ".swp", ".swx", ".swo", ".lock", ".part", ".crdownload", ".kate-swp", ".~lock.", ".bak~", )) # Minimum seconds between re-scanning the same path (debounce). _DEBOUNCE_SECONDS = 2.0 # --------------------------------------------------------------------------- # Watchdog event handler # --------------------------------------------------------------------------- class _FileEventHandler(FileSystemEventHandler): """Internal handler that bridges watchdog events to the scan engine. Parameters ---------- monitor: The owning :class:`RealtimeMonitor` instance. """ def __init__(self, monitor: RealtimeMonitor) -> None: super().__init__() self._monitor = monitor # Only react to file events (not directories). def on_created(self, event: FileSystemEvent) -> None: if not event.is_directory: self._monitor._on_file_event(event.src_path, "created") def on_modified(self, event: FileSystemEvent) -> None: if not event.is_directory: self._monitor._on_file_event(event.src_path, "modified") def on_moved(self, event: FileSystemEvent) -> None: if not event.is_directory: dest = getattr(event, "dest_path", None) if dest: self._monitor._on_file_event(dest, "moved") # --------------------------------------------------------------------------- # RealtimeMonitor # --------------------------------------------------------------------------- class RealtimeMonitor: """Watch directories and scan new / changed files in real time. Parameters ---------- config: Application configuration. scan_engine: A pre-built :class:`ScanEngine` instance used to scan files. """ def __init__(self, config: Config, scan_engine: ScanEngine) -> None: self.config = config self.engine = scan_engine self._observer: Optional[Observer] = None self._lock = threading.Lock() self._recent: Dict[str, float] = {} # path → last-scan timestamp self._running = False # Optional auto-quarantine vault. self._vault: Optional[QuarantineVault] = None if config.auto_quarantine: self._vault = QuarantineVault(config.quarantine_path) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self, paths: Optional[List[str]] = None, recursive: bool = True) -> None: """Begin monitoring *paths* (defaults to ``config.scan_paths``). Parameters ---------- paths: Directories to watch. recursive: Watch subdirectories as well. """ watch_paths = paths or self.config.scan_paths with self._lock: if self._running: logger.warning("RealtimeMonitor is already running") return self._observer = Observer() handler = _FileEventHandler(self) for p in watch_paths: pp = Path(p) if not pp.is_dir(): logger.warning("Skipping non-existent path: %s", p) continue self._observer.schedule(handler, str(pp), recursive=recursive) logger.info("Watching: %s (recursive=%s)", pp, recursive) self._observer.start() self._running = True logger.info("RealtimeMonitor started — watching %d path(s)", len(watch_paths)) event_bus.publish(EventType.SCAN_STARTED, { "type": "realtime_monitor", "paths": watch_paths, }) def stop(self) -> None: """Stop monitoring and wait for the observer thread to exit.""" with self._lock: if not self._running or self._observer is None: return self._observer.stop() self._observer.join(timeout=10) with self._lock: self._running = False self._observer = None logger.info("RealtimeMonitor stopped") @property def is_running(self) -> bool: with self._lock: return self._running # ------------------------------------------------------------------ # Event callbacks (called by _FileEventHandler) # ------------------------------------------------------------------ def on_file_created(self, path: str) -> None: """Scan a newly created file.""" self._scan_file(path, "created") def on_file_modified(self, path: str) -> None: """Scan a modified file.""" self._scan_file(path, "modified") def on_file_moved(self, path: str) -> None: """Scan a file that was moved/renamed into a watched directory.""" self._scan_file(path, "moved") # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _on_file_event(self, path: str, event_type: str) -> None: """Central dispatcher invoked by the watchdog handler.""" if self._should_skip(path): return if self._is_debounced(path): return logger.debug("File event: %s %s", event_type, path) # Dispatch to the named callback (also usable directly). if event_type == "created": self.on_file_created(path) elif event_type == "modified": self.on_file_modified(path) elif event_type == "moved": self.on_file_moved(path) def _scan_file(self, path: str, reason: str) -> None: """Run the scan engine against a single file and handle results.""" fp = Path(path) if not fp.is_file(): return try: result: FileScanResult = self.engine.scan_file(fp) except Exception: logger.exception("Error scanning %s", fp) return if result.threats: logger.warning( "THREAT detected (%s) in %s: %s", reason, path, ", ".join(t.threat_name for t in result.threats), ) # Auto-quarantine if enabled. if self._vault and fp.exists(): try: threat = result.threats[0] qid = self._vault.quarantine_file(fp, { "threat_name": threat.threat_name, "threat_type": threat.threat_type.name if hasattr(threat.threat_type, "name") else str(threat.threat_type), "severity": threat.severity.name if hasattr(threat.severity, "name") else str(threat.severity), "file_hash": result.file_hash, }) logger.info("Auto-quarantined %s → %s", path, qid) except Exception: logger.exception("Auto-quarantine failed for %s", path) else: logger.debug("Clean: %s (%s)", path, reason) # ------------------------------------------------------------------ # Debounce & skip logic # ------------------------------------------------------------------ def _is_debounced(self, path: str) -> bool: """Return ``True`` if *path* was scanned within the debounce window.""" now = time.monotonic() with self._lock: last = self._recent.get(path, 0.0) if now - last < _DEBOUNCE_SECONDS: return True self._recent[path] = now # Prune stale entries periodically. if len(self._recent) > 5000: cutoff = now - _DEBOUNCE_SECONDS * 2 self._recent = { k: v for k, v in self._recent.items() if v > cutoff } return False @staticmethod def _should_skip(path: str) -> bool: """Return ``True`` for temporary / lock / editor backup files.""" name = Path(path).name.lower() if any(name.endswith(s) for s in _SKIP_SUFFIXES): return True # Hidden editor temp files like .#foo or 4913 (vim temp). if name.startswith(".#"): return True return False