Files

266 lines
9.0 KiB
Python

"""Real-time file-system monitor for AYN Antivirus.
Uses the ``watchdog`` library to observe directories for file creation,
modification, and move events, then immediately scans the affected files
through the :class:`ScanEngine`. Supports debouncing, auto-quarantine,
and thread-safe operation.
"""
from __future__ import annotations
import logging
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from ayn_antivirus.config import Config
from ayn_antivirus.core.engine import ScanEngine, FileScanResult
from ayn_antivirus.core.event_bus import EventType, event_bus
from ayn_antivirus.quarantine.vault import QuarantineVault
logger = logging.getLogger(__name__)
# File suffixes that are almost always transient / editor artefacts.
_SKIP_SUFFIXES = frozenset((
".tmp", ".swp", ".swx", ".swo", ".lock", ".part",
".crdownload", ".kate-swp", ".~lock.", ".bak~",
))
# Minimum seconds between re-scanning the same path (debounce).
_DEBOUNCE_SECONDS = 2.0
# ---------------------------------------------------------------------------
# Watchdog event handler
# ---------------------------------------------------------------------------
class _FileEventHandler(FileSystemEventHandler):
"""Internal handler that bridges watchdog events to the scan engine.
Parameters
----------
monitor:
The owning :class:`RealtimeMonitor` instance.
"""
def __init__(self, monitor: RealtimeMonitor) -> None:
super().__init__()
self._monitor = monitor
# Only react to file events (not directories).
def on_created(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self._monitor._on_file_event(event.src_path, "created")
def on_modified(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self._monitor._on_file_event(event.src_path, "modified")
def on_moved(self, event: FileSystemEvent) -> None:
if not event.is_directory:
dest = getattr(event, "dest_path", None)
if dest:
self._monitor._on_file_event(dest, "moved")
# ---------------------------------------------------------------------------
# RealtimeMonitor
# ---------------------------------------------------------------------------
class RealtimeMonitor:
"""Watch directories and scan new / changed files in real time.
Parameters
----------
config:
Application configuration.
scan_engine:
A pre-built :class:`ScanEngine` instance used to scan files.
"""
def __init__(self, config: Config, scan_engine: ScanEngine) -> None:
self.config = config
self.engine = scan_engine
self._observer: Optional[Observer] = None
self._lock = threading.Lock()
self._recent: Dict[str, float] = {} # path → last-scan timestamp
self._running = False
# Optional auto-quarantine vault.
self._vault: Optional[QuarantineVault] = None
if config.auto_quarantine:
self._vault = QuarantineVault(config.quarantine_path)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self, paths: Optional[List[str]] = None, recursive: bool = True) -> None:
"""Begin monitoring *paths* (defaults to ``config.scan_paths``).
Parameters
----------
paths:
Directories to watch.
recursive:
Watch subdirectories as well.
"""
watch_paths = paths or self.config.scan_paths
with self._lock:
if self._running:
logger.warning("RealtimeMonitor is already running")
return
self._observer = Observer()
handler = _FileEventHandler(self)
for p in watch_paths:
pp = Path(p)
if not pp.is_dir():
logger.warning("Skipping non-existent path: %s", p)
continue
self._observer.schedule(handler, str(pp), recursive=recursive)
logger.info("Watching: %s (recursive=%s)", pp, recursive)
self._observer.start()
self._running = True
logger.info("RealtimeMonitor started — watching %d path(s)", len(watch_paths))
event_bus.publish(EventType.SCAN_STARTED, {
"type": "realtime_monitor",
"paths": watch_paths,
})
def stop(self) -> None:
"""Stop monitoring and wait for the observer thread to exit."""
with self._lock:
if not self._running or self._observer is None:
return
self._observer.stop()
self._observer.join(timeout=10)
with self._lock:
self._running = False
self._observer = None
logger.info("RealtimeMonitor stopped")
@property
def is_running(self) -> bool:
with self._lock:
return self._running
# ------------------------------------------------------------------
# Event callbacks (called by _FileEventHandler)
# ------------------------------------------------------------------
def on_file_created(self, path: str) -> None:
"""Scan a newly created file."""
self._scan_file(path, "created")
def on_file_modified(self, path: str) -> None:
"""Scan a modified file."""
self._scan_file(path, "modified")
def on_file_moved(self, path: str) -> None:
"""Scan a file that was moved/renamed into a watched directory."""
self._scan_file(path, "moved")
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _on_file_event(self, path: str, event_type: str) -> None:
"""Central dispatcher invoked by the watchdog handler."""
if self._should_skip(path):
return
if self._is_debounced(path):
return
logger.debug("File event: %s %s", event_type, path)
# Dispatch to the named callback (also usable directly).
if event_type == "created":
self.on_file_created(path)
elif event_type == "modified":
self.on_file_modified(path)
elif event_type == "moved":
self.on_file_moved(path)
def _scan_file(self, path: str, reason: str) -> None:
"""Run the scan engine against a single file and handle results."""
fp = Path(path)
if not fp.is_file():
return
try:
result: FileScanResult = self.engine.scan_file(fp)
except Exception:
logger.exception("Error scanning %s", fp)
return
if result.threats:
logger.warning(
"THREAT detected (%s) in %s: %s",
reason,
path,
", ".join(t.threat_name for t in result.threats),
)
# Auto-quarantine if enabled.
if self._vault and fp.exists():
try:
threat = result.threats[0]
qid = self._vault.quarantine_file(fp, {
"threat_name": threat.threat_name,
"threat_type": threat.threat_type.name if hasattr(threat.threat_type, "name") else str(threat.threat_type),
"severity": threat.severity.name if hasattr(threat.severity, "name") else str(threat.severity),
"file_hash": result.file_hash,
})
logger.info("Auto-quarantined %s%s", path, qid)
except Exception:
logger.exception("Auto-quarantine failed for %s", path)
else:
logger.debug("Clean: %s (%s)", path, reason)
# ------------------------------------------------------------------
# Debounce & skip logic
# ------------------------------------------------------------------
def _is_debounced(self, path: str) -> bool:
"""Return ``True`` if *path* was scanned within the debounce window."""
now = time.monotonic()
with self._lock:
last = self._recent.get(path, 0.0)
if now - last < _DEBOUNCE_SECONDS:
return True
self._recent[path] = now
# Prune stale entries periodically.
if len(self._recent) > 5000:
cutoff = now - _DEBOUNCE_SECONDS * 2
self._recent = {
k: v for k, v in self._recent.items() if v > cutoff
}
return False
@staticmethod
def _should_skip(path: str) -> bool:
"""Return ``True`` for temporary / lock / editor backup files."""
name = Path(path).name.lower()
if any(name.endswith(s) for s in _SKIP_SUFFIXES):
return True
# Hidden editor temp files like .#foo or 4913 (vim temp).
if name.startswith(".#"):
return True
return False