918 lines
30 KiB
Python
918 lines
30 KiB
Python
"""Core scan engine for AYN Antivirus.
|
|
|
|
Orchestrates file-system, process, and network scanning by delegating to
|
|
pluggable detectors (hash lookup, YARA, heuristic) and emitting events via
|
|
the :pymod:`event_bus`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Protocol
|
|
|
|
from ayn_antivirus.config import Config
|
|
from ayn_antivirus.core.event_bus import EventType, event_bus
|
|
from ayn_antivirus.utils.helpers import hash_file as _hash_file_util
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Enums
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ThreatType(Enum):
|
|
"""Classification of a detected threat."""
|
|
|
|
VIRUS = auto()
|
|
MALWARE = auto()
|
|
SPYWARE = auto()
|
|
MINER = auto()
|
|
ROOTKIT = auto()
|
|
|
|
|
|
class Severity(Enum):
|
|
"""Threat severity level, ordered low → critical."""
|
|
|
|
LOW = 1
|
|
MEDIUM = 2
|
|
HIGH = 3
|
|
CRITICAL = 4
|
|
|
|
|
|
class ScanType(Enum):
|
|
"""Kind of scan that was executed."""
|
|
|
|
FULL = "full"
|
|
QUICK = "quick"
|
|
DEEP = "deep"
|
|
SINGLE_FILE = "single_file"
|
|
TARGETED = "targeted"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class ThreatInfo:
|
|
"""A single threat detected during a file scan."""
|
|
|
|
path: str
|
|
threat_name: str
|
|
threat_type: ThreatType
|
|
severity: Severity
|
|
detector_name: str
|
|
details: str = ""
|
|
timestamp: datetime = field(default_factory=datetime.utcnow)
|
|
file_hash: str = ""
|
|
|
|
|
|
@dataclass
|
|
class FileScanResult:
|
|
"""Result of scanning a single file."""
|
|
|
|
path: str
|
|
scanned: bool = True
|
|
file_hash: str = ""
|
|
size: int = 0
|
|
threats: List[ThreatInfo] = field(default_factory=list)
|
|
error: Optional[str] = None
|
|
|
|
@property
|
|
def is_clean(self) -> bool:
|
|
return len(self.threats) == 0 and self.error is None
|
|
|
|
|
|
@dataclass
|
|
class ProcessThreat:
|
|
"""A suspicious process discovered at runtime."""
|
|
|
|
pid: int
|
|
name: str
|
|
cmdline: str
|
|
cpu_percent: float
|
|
memory_percent: float
|
|
threat_type: ThreatType
|
|
severity: Severity
|
|
details: str = ""
|
|
|
|
|
|
@dataclass
|
|
class NetworkThreat:
|
|
"""A suspicious network connection."""
|
|
|
|
local_addr: str
|
|
remote_addr: str
|
|
pid: Optional[int]
|
|
process_name: str
|
|
threat_type: ThreatType
|
|
severity: Severity
|
|
details: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ScanResult:
|
|
"""Aggregated result of a path / multi-file scan."""
|
|
|
|
scan_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
|
|
start_time: datetime = field(default_factory=datetime.utcnow)
|
|
end_time: Optional[datetime] = None
|
|
files_scanned: int = 0
|
|
files_skipped: int = 0
|
|
threats: List[ThreatInfo] = field(default_factory=list)
|
|
scan_path: str = ""
|
|
scan_type: ScanType = ScanType.FULL
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
if self.end_time is None:
|
|
return 0.0
|
|
return (self.end_time - self.start_time).total_seconds()
|
|
|
|
@property
|
|
def is_clean(self) -> bool:
|
|
return len(self.threats) == 0
|
|
|
|
|
|
@dataclass
|
|
class ProcessScanResult:
|
|
"""Aggregated result of a process scan."""
|
|
|
|
processes_scanned: int = 0
|
|
threats: List[ProcessThreat] = field(default_factory=list)
|
|
scan_duration: float = 0.0
|
|
|
|
@property
|
|
def total_processes(self) -> int:
|
|
"""Alias for processes_scanned (backward compat)."""
|
|
return self.processes_scanned
|
|
|
|
@property
|
|
def is_clean(self) -> bool:
|
|
return len(self.threats) == 0
|
|
|
|
|
|
@dataclass
|
|
class NetworkScanResult:
|
|
"""Aggregated result of a network scan."""
|
|
|
|
connections_scanned: int = 0
|
|
threats: List[NetworkThreat] = field(default_factory=list)
|
|
scan_duration: float = 0.0
|
|
|
|
@property
|
|
def total_connections(self) -> int:
|
|
"""Alias for connections_scanned (backward compat)."""
|
|
return self.connections_scanned
|
|
|
|
@property
|
|
def is_clean(self) -> bool:
|
|
return len(self.threats) == 0
|
|
|
|
|
|
@dataclass
|
|
class FullScanResult:
|
|
"""Combined results from a full scan (files + processes + network + containers)."""
|
|
|
|
file_scan: ScanResult = field(default_factory=ScanResult)
|
|
process_scan: ProcessScanResult = field(default_factory=ProcessScanResult)
|
|
network_scan: NetworkScanResult = field(default_factory=NetworkScanResult)
|
|
container_scan: Any = None # Optional[ContainerScanResult]
|
|
|
|
@property
|
|
def total_threats(self) -> int:
|
|
count = (
|
|
len(self.file_scan.threats)
|
|
+ len(self.process_scan.threats)
|
|
+ len(self.network_scan.threats)
|
|
)
|
|
if self.container_scan is not None:
|
|
count += len(self.container_scan.threats)
|
|
return count
|
|
|
|
@property
|
|
def is_clean(self) -> bool:
|
|
return self.total_threats == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Detector protocol (for type hints & documentation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _Detector(Protocol):
|
|
"""Any object with a ``detect()`` method matching the BaseDetector API."""
|
|
|
|
def detect(
|
|
self,
|
|
file_path: str | Path,
|
|
file_content: Optional[bytes] = None,
|
|
file_hash: Optional[str] = None,
|
|
) -> list: ...
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helper: file hashing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _hash_file(filepath: Path, algo: str = "sha256") -> str:
|
|
"""Return the hex digest of *filepath*.
|
|
|
|
Delegates to :func:`ayn_antivirus.utils.helpers.hash_file`.
|
|
"""
|
|
return _hash_file_util(filepath, algo)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Detector result → engine dataclass mapping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_THREAT_TYPE_MAP = {
|
|
"VIRUS": ThreatType.VIRUS,
|
|
"MALWARE": ThreatType.MALWARE,
|
|
"SPYWARE": ThreatType.SPYWARE,
|
|
"MINER": ThreatType.MINER,
|
|
"ROOTKIT": ThreatType.ROOTKIT,
|
|
"HEURISTIC": ThreatType.MALWARE,
|
|
}
|
|
|
|
_SEVERITY_MAP = {
|
|
"CRITICAL": Severity.CRITICAL,
|
|
"HIGH": Severity.HIGH,
|
|
"MEDIUM": Severity.MEDIUM,
|
|
"LOW": Severity.LOW,
|
|
}
|
|
|
|
|
|
def _map_threat_type(raw: str) -> ThreatType:
|
|
"""Convert a detector's threat-type string to :class:`ThreatType`."""
|
|
return _THREAT_TYPE_MAP.get(raw.upper(), ThreatType.MALWARE)
|
|
|
|
|
|
def _map_severity(raw: str) -> Severity:
|
|
"""Convert a detector's severity string to :class:`Severity`."""
|
|
return _SEVERITY_MAP.get(raw.upper(), Severity.MEDIUM)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Quick-scan target directories
|
|
# ---------------------------------------------------------------------------
|
|
|
|
QUICK_SCAN_PATHS = [
|
|
"/tmp",
|
|
"/var/tmp",
|
|
"/dev/shm",
|
|
"/usr/local/bin",
|
|
"/var/spool/cron",
|
|
"/etc/cron.d",
|
|
"/etc/cron.daily",
|
|
"/etc/crontab",
|
|
"/var/www",
|
|
"/srv",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ScanEngine
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ScanEngine:
|
|
"""Central orchestrator for all AYN scanning activities.
|
|
|
|
The engine walks the file system, delegates to pluggable detectors, tracks
|
|
statistics, and publishes events on the global :pydata:`event_bus`.
|
|
|
|
Parameters
|
|
----------
|
|
config:
|
|
Application configuration instance.
|
|
max_workers:
|
|
Thread pool size for parallel file scanning. Defaults to
|
|
``min(os.cpu_count(), 8)``.
|
|
"""
|
|
|
|
def __init__(self, config: Config, max_workers: int | None = None) -> None:
|
|
self.config = config
|
|
self.max_workers = max_workers or min(os.cpu_count() or 4, 8)
|
|
|
|
# Detector registry — populated by external plug-ins via register_detector().
|
|
# Each detector is a callable: (filepath: Path, cfg: Config) -> List[ThreatInfo]
|
|
self._detectors: List[_Detector] = []
|
|
|
|
self._init_builtin_detectors()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Detector registration
|
|
# ------------------------------------------------------------------
|
|
|
|
def register_detector(self, detector: _Detector) -> None:
|
|
"""Add a detector to the scanning pipeline."""
|
|
self._detectors.append(detector)
|
|
|
|
def _init_builtin_detectors(self) -> None:
|
|
"""Register all built-in detection engines."""
|
|
from ayn_antivirus.detectors.signature_detector import SignatureDetector
|
|
from ayn_antivirus.detectors.heuristic_detector import HeuristicDetector
|
|
from ayn_antivirus.detectors.cryptominer_detector import CryptominerDetector
|
|
from ayn_antivirus.detectors.spyware_detector import SpywareDetector
|
|
from ayn_antivirus.detectors.rootkit_detector import RootkitDetector
|
|
|
|
try:
|
|
sig_det = SignatureDetector(db_path=self.config.db_path)
|
|
self.register_detector(sig_det)
|
|
except Exception as e:
|
|
logger.warning("Failed to load SignatureDetector: %s", e)
|
|
|
|
try:
|
|
self.register_detector(HeuristicDetector())
|
|
except Exception as e:
|
|
logger.warning("Failed to load HeuristicDetector: %s", e)
|
|
|
|
try:
|
|
self.register_detector(CryptominerDetector())
|
|
except Exception as e:
|
|
logger.warning("Failed to load CryptominerDetector: %s", e)
|
|
|
|
try:
|
|
self.register_detector(SpywareDetector())
|
|
except Exception as e:
|
|
logger.warning("Failed to load SpywareDetector: %s", e)
|
|
|
|
try:
|
|
self.register_detector(RootkitDetector())
|
|
except Exception as e:
|
|
logger.warning("Failed to load RootkitDetector: %s", e)
|
|
|
|
if self.config.enable_yara:
|
|
try:
|
|
from ayn_antivirus.detectors.yara_detector import YaraDetector
|
|
yara_det = YaraDetector()
|
|
self.register_detector(yara_det)
|
|
except Exception as e:
|
|
logger.debug("YARA detector not available: %s", e)
|
|
|
|
logger.info("Registered %d detectors", len(self._detectors))
|
|
|
|
# ------------------------------------------------------------------
|
|
# File scanning
|
|
# ------------------------------------------------------------------
|
|
|
|
def scan_file(self, filepath: str | Path) -> FileScanResult:
|
|
"""Scan a single file through every registered detector.
|
|
|
|
Parameters
|
|
----------
|
|
filepath:
|
|
Absolute or relative path to the file.
|
|
|
|
Returns
|
|
-------
|
|
FileScanResult
|
|
"""
|
|
filepath = Path(filepath)
|
|
result = FileScanResult(path=str(filepath))
|
|
|
|
if not filepath.is_file():
|
|
result.scanned = False
|
|
result.error = "Not a file or does not exist"
|
|
return result
|
|
|
|
try:
|
|
stat = filepath.stat()
|
|
except OSError as exc:
|
|
result.scanned = False
|
|
result.error = str(exc)
|
|
return result
|
|
|
|
result.size = stat.st_size
|
|
|
|
if result.size > self.config.max_file_size:
|
|
result.scanned = False
|
|
result.error = f"File exceeds max size ({result.size} > {self.config.max_file_size})"
|
|
return result
|
|
|
|
# Hash the file — needed by hash-based detectors and for recording.
|
|
try:
|
|
result.file_hash = _hash_file(filepath)
|
|
except OSError as exc:
|
|
result.scanned = False
|
|
result.error = f"Cannot read file: {exc}"
|
|
return result
|
|
|
|
# Enrich with FileScanner metadata (type classification).
|
|
try:
|
|
from ayn_antivirus.scanners.file_scanner import FileScanner
|
|
file_scanner = FileScanner(max_file_size=self.config.max_file_size)
|
|
file_info = file_scanner.scan(str(filepath))
|
|
result._file_info = file_info # type: ignore[attr-defined]
|
|
except Exception:
|
|
logger.debug("FileScanner enrichment skipped for %s", filepath)
|
|
|
|
# Run every registered detector.
|
|
for detector in self._detectors:
|
|
try:
|
|
detections = detector.detect(filepath, file_hash=result.file_hash)
|
|
for d in detections:
|
|
threat = ThreatInfo(
|
|
path=str(filepath),
|
|
threat_name=d.threat_name,
|
|
threat_type=_map_threat_type(d.threat_type),
|
|
severity=_map_severity(d.severity),
|
|
detector_name=d.detector_name,
|
|
details=d.details,
|
|
file_hash=result.file_hash,
|
|
)
|
|
result.threats.append(threat)
|
|
except Exception:
|
|
logger.exception("Detector %r failed on %s", detector, filepath)
|
|
|
|
# Publish per-file events.
|
|
event_bus.publish(EventType.FILE_SCANNED, result)
|
|
if result.threats:
|
|
for threat in result.threats:
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Path scanning (recursive)
|
|
# ------------------------------------------------------------------
|
|
|
|
def scan_path(
|
|
self,
|
|
path: str | Path,
|
|
recursive: bool = True,
|
|
quick: bool = False,
|
|
callback: Optional[Callable[[FileScanResult], None]] = None,
|
|
) -> ScanResult:
|
|
"""Walk *path* and scan every eligible file.
|
|
|
|
Parameters
|
|
----------
|
|
path:
|
|
Root directory (or single file) to scan.
|
|
recursive:
|
|
Descend into subdirectories.
|
|
quick:
|
|
If ``True``, only scan :pydata:`QUICK_SCAN_PATHS` that exist
|
|
under *path* (or the quick-scan list itself when *path* is ``/``).
|
|
callback:
|
|
Optional function called after each file is scanned — useful for
|
|
progress reporting.
|
|
|
|
Returns
|
|
-------
|
|
ScanResult
|
|
"""
|
|
scan_type = ScanType.QUICK if quick else ScanType.FULL
|
|
result = ScanResult(
|
|
scan_path=str(path),
|
|
scan_type=scan_type,
|
|
start_time=datetime.utcnow(),
|
|
)
|
|
|
|
event_bus.publish(EventType.SCAN_STARTED, {
|
|
"scan_id": result.scan_id,
|
|
"scan_type": scan_type.value,
|
|
"path": str(path),
|
|
})
|
|
|
|
# Collect files to scan.
|
|
files = self._collect_files(Path(path), recursive=recursive, quick=quick)
|
|
|
|
# Parallel scan.
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
|
|
futures = {pool.submit(self.scan_file, fp): fp for fp in files}
|
|
for future in as_completed(futures):
|
|
try:
|
|
file_result = future.result()
|
|
except Exception:
|
|
result.files_skipped += 1
|
|
logger.exception("Unhandled error scanning %s", futures[future])
|
|
continue
|
|
|
|
if file_result.scanned:
|
|
result.files_scanned += 1
|
|
else:
|
|
result.files_skipped += 1
|
|
|
|
result.threats.extend(file_result.threats)
|
|
|
|
if callback is not None:
|
|
try:
|
|
callback(file_result)
|
|
except Exception:
|
|
logger.exception("Scan callback raised an exception")
|
|
|
|
result.end_time = datetime.utcnow()
|
|
|
|
event_bus.publish(EventType.SCAN_COMPLETED, {
|
|
"scan_id": result.scan_id,
|
|
"files_scanned": result.files_scanned,
|
|
"threats": len(result.threats),
|
|
"duration": result.duration_seconds,
|
|
})
|
|
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Process scanning
|
|
# ------------------------------------------------------------------
|
|
|
|
def scan_processes(self) -> ProcessScanResult:
|
|
"""Inspect all running processes for known miners and anomalies.
|
|
|
|
Delegates to :class:`~ayn_antivirus.scanners.process_scanner.ProcessScanner`
|
|
for detection and converts results to engine dataclasses.
|
|
|
|
Returns
|
|
-------
|
|
ProcessScanResult
|
|
"""
|
|
from ayn_antivirus.scanners.process_scanner import ProcessScanner
|
|
|
|
result = ProcessScanResult()
|
|
start = time.monotonic()
|
|
|
|
proc_scanner = ProcessScanner()
|
|
scan_data = proc_scanner.scan()
|
|
|
|
result.processes_scanned = scan_data.get("total", 0)
|
|
|
|
# Known miner matches.
|
|
for s in scan_data.get("suspicious", []):
|
|
threat = ProcessThreat(
|
|
pid=s["pid"],
|
|
name=s.get("name", ""),
|
|
cmdline=" ".join(s.get("cmdline") or []),
|
|
cpu_percent=s.get("cpu_percent", 0.0),
|
|
memory_percent=0.0,
|
|
threat_type=ThreatType.MINER,
|
|
severity=Severity.CRITICAL,
|
|
details=s.get("reason", "Known miner process"),
|
|
)
|
|
result.threats.append(threat)
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
# High-CPU anomalies (skip duplicates already caught as miners).
|
|
miner_pids = {t.pid for t in result.threats}
|
|
for h in scan_data.get("high_cpu", []):
|
|
if h["pid"] in miner_pids:
|
|
continue
|
|
threat = ProcessThreat(
|
|
pid=h["pid"],
|
|
name=h.get("name", ""),
|
|
cmdline=" ".join(h.get("cmdline") or []),
|
|
cpu_percent=h.get("cpu_percent", 0.0),
|
|
memory_percent=0.0,
|
|
threat_type=ThreatType.MINER,
|
|
severity=Severity.HIGH,
|
|
details=h.get("reason", "Abnormally high CPU usage"),
|
|
)
|
|
result.threats.append(threat)
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
# Hidden processes (possible rootkit).
|
|
for hp in scan_data.get("hidden", []):
|
|
threat = ProcessThreat(
|
|
pid=hp["pid"],
|
|
name=hp.get("name", ""),
|
|
cmdline=hp.get("cmdline", ""),
|
|
cpu_percent=0.0,
|
|
memory_percent=0.0,
|
|
threat_type=ThreatType.ROOTKIT,
|
|
severity=Severity.CRITICAL,
|
|
details=hp.get("reason", "Hidden process"),
|
|
)
|
|
result.threats.append(threat)
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
# Optional memory scan for suspicious PIDs.
|
|
try:
|
|
from ayn_antivirus.scanners.memory_scanner import MemoryScanner
|
|
mem_scanner = MemoryScanner()
|
|
suspicious_pids = {t.pid for t in result.threats}
|
|
for pid in suspicious_pids:
|
|
try:
|
|
mem_result = mem_scanner.scan(pid)
|
|
rwx_regions = mem_result.get("rwx_regions") or []
|
|
if rwx_regions:
|
|
result.threats.append(ProcessThreat(
|
|
pid=pid,
|
|
name="",
|
|
cmdline="",
|
|
cpu_percent=0.0,
|
|
memory_percent=0.0,
|
|
threat_type=ThreatType.ROOTKIT,
|
|
severity=Severity.HIGH,
|
|
details=(
|
|
f"Injected code detected in PID {pid}: "
|
|
f"{len(rwx_regions)} RWX region(s)"
|
|
),
|
|
))
|
|
except Exception:
|
|
pass # Memory scan for individual PID is best-effort
|
|
except Exception as exc:
|
|
logger.debug("Memory scan skipped: %s", exc)
|
|
|
|
result.scan_duration = time.monotonic() - start
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Network scanning
|
|
# ------------------------------------------------------------------
|
|
|
|
def scan_network(self) -> NetworkScanResult:
|
|
"""Scan active network connections for mining pool traffic.
|
|
|
|
Delegates to :class:`~ayn_antivirus.scanners.network_scanner.NetworkScanner`
|
|
for detection and converts results to engine dataclasses.
|
|
|
|
Returns
|
|
-------
|
|
NetworkScanResult
|
|
"""
|
|
from ayn_antivirus.scanners.network_scanner import NetworkScanner
|
|
|
|
result = NetworkScanResult()
|
|
start = time.monotonic()
|
|
|
|
net_scanner = NetworkScanner()
|
|
scan_data = net_scanner.scan()
|
|
|
|
result.connections_scanned = scan_data.get("total", 0)
|
|
|
|
# Suspicious connections (mining pools, suspicious ports).
|
|
for s in scan_data.get("suspicious", []):
|
|
sev = _map_severity(s.get("severity", "HIGH"))
|
|
threat = NetworkThreat(
|
|
local_addr=s.get("local_addr", "?"),
|
|
remote_addr=s.get("remote_addr", "?"),
|
|
pid=s.get("pid"),
|
|
process_name=(s.get("process", {}) or {}).get("name", ""),
|
|
threat_type=ThreatType.MINER,
|
|
severity=sev,
|
|
details=s.get("reason", "Suspicious connection"),
|
|
)
|
|
result.threats.append(threat)
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
# Unexpected listening ports.
|
|
for lp in scan_data.get("unexpected_listeners", []):
|
|
threat = NetworkThreat(
|
|
local_addr=lp.get("local_addr", f"?:{lp.get('port', '?')}"),
|
|
remote_addr="",
|
|
pid=lp.get("pid"),
|
|
process_name=lp.get("process_name", ""),
|
|
threat_type=ThreatType.MALWARE,
|
|
severity=_map_severity(lp.get("severity", "MEDIUM")),
|
|
details=lp.get("reason", "Unexpected listener"),
|
|
)
|
|
result.threats.append(threat)
|
|
event_bus.publish(EventType.THREAT_FOUND, threat)
|
|
|
|
# Enrich with IOC database lookups — flag connections to known-bad IPs.
|
|
try:
|
|
from ayn_antivirus.signatures.db.ioc_db import IOCDatabase
|
|
ioc_db = IOCDatabase(self.config.db_path)
|
|
ioc_db.initialize()
|
|
malicious_ips = ioc_db.get_all_malicious_ips()
|
|
|
|
if malicious_ips:
|
|
import psutil as _psutil
|
|
already_flagged = {
|
|
t.remote_addr for t in result.threats
|
|
}
|
|
try:
|
|
for conn in _psutil.net_connections(kind="inet"):
|
|
if not conn.raddr:
|
|
continue
|
|
remote_ip = conn.raddr.ip
|
|
remote_str = f"{remote_ip}:{conn.raddr.port}"
|
|
if remote_ip in malicious_ips and remote_str not in already_flagged:
|
|
ioc_info = ioc_db.lookup_ip(remote_ip) or {}
|
|
result.threats.append(NetworkThreat(
|
|
local_addr=(
|
|
f"{conn.laddr.ip}:{conn.laddr.port}"
|
|
if conn.laddr else ""
|
|
),
|
|
remote_addr=remote_str,
|
|
pid=conn.pid or 0,
|
|
process_name=self._get_proc_name(conn.pid),
|
|
threat_type=ThreatType.MALWARE,
|
|
severity=Severity.CRITICAL,
|
|
details=(
|
|
f"Connection to known malicious IP {remote_ip} "
|
|
f"(threat: {ioc_info.get('threat_name', 'IOC match')})"
|
|
),
|
|
))
|
|
except (_psutil.AccessDenied, OSError):
|
|
pass
|
|
|
|
ioc_db.close()
|
|
except Exception as exc:
|
|
logger.debug("IOC network enrichment skipped: %s", exc)
|
|
|
|
result.scan_duration = time.monotonic() - start
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _get_proc_name(pid: int) -> str:
|
|
"""Best-effort process name lookup for a PID."""
|
|
if not pid:
|
|
return ""
|
|
try:
|
|
import psutil as _ps
|
|
return _ps.Process(pid).name()
|
|
except Exception:
|
|
return ""
|
|
|
|
# ------------------------------------------------------------------
|
|
# Container scanning
|
|
# ------------------------------------------------------------------
|
|
|
|
def scan_containers(
|
|
self,
|
|
runtime: str = "all",
|
|
container_id: Optional[str] = None,
|
|
):
|
|
"""Scan containers for threats.
|
|
|
|
Parameters
|
|
----------
|
|
runtime:
|
|
Container runtime to target (``"all"``, ``"docker"``,
|
|
``"podman"``, ``"lxc"``).
|
|
container_id:
|
|
If provided, scan only this specific container.
|
|
|
|
Returns
|
|
-------
|
|
ContainerScanResult
|
|
"""
|
|
from ayn_antivirus.scanners.container_scanner import ContainerScanner
|
|
|
|
scanner = ContainerScanner()
|
|
if container_id:
|
|
return scanner.scan_container(container_id)
|
|
return scanner.scan(runtime)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Composite scans
|
|
# ------------------------------------------------------------------
|
|
|
|
def full_scan(
|
|
self,
|
|
callback: Optional[Callable[[FileScanResult], None]] = None,
|
|
) -> FullScanResult:
|
|
"""Run a complete scan: files, processes, and network.
|
|
|
|
Parameters
|
|
----------
|
|
callback:
|
|
Optional per-file progress callback.
|
|
|
|
Returns
|
|
-------
|
|
FullScanResult
|
|
"""
|
|
full = FullScanResult()
|
|
|
|
# File scan across all configured paths.
|
|
aggregate = ScanResult(scan_type=ScanType.FULL, start_time=datetime.utcnow())
|
|
for scan_path in self.config.scan_paths:
|
|
partial = self.scan_path(scan_path, recursive=True, quick=False, callback=callback)
|
|
aggregate.files_scanned += partial.files_scanned
|
|
aggregate.files_skipped += partial.files_skipped
|
|
aggregate.threats.extend(partial.threats)
|
|
aggregate.end_time = datetime.utcnow()
|
|
full.file_scan = aggregate
|
|
|
|
# Process + network.
|
|
full.process_scan = self.scan_processes()
|
|
full.network_scan = self.scan_network()
|
|
|
|
# Containers (best-effort — skipped if no runtimes available).
|
|
try:
|
|
container_result = self.scan_containers()
|
|
if container_result.containers_found > 0:
|
|
full.container_scan = container_result
|
|
except Exception:
|
|
logger.debug("Container scanning skipped", exc_info=True)
|
|
|
|
return full
|
|
|
|
def quick_scan(
|
|
self,
|
|
callback: Optional[Callable[[FileScanResult], None]] = None,
|
|
) -> ScanResult:
|
|
"""Scan only high-risk directories.
|
|
|
|
Targets :pydata:`QUICK_SCAN_PATHS` and any additional web roots
|
|
or crontab locations.
|
|
|
|
Returns
|
|
-------
|
|
ScanResult
|
|
"""
|
|
aggregate = ScanResult(scan_type=ScanType.QUICK, start_time=datetime.utcnow())
|
|
|
|
event_bus.publish(EventType.SCAN_STARTED, {
|
|
"scan_id": aggregate.scan_id,
|
|
"scan_type": "quick",
|
|
"paths": QUICK_SCAN_PATHS,
|
|
})
|
|
|
|
for scan_path in QUICK_SCAN_PATHS:
|
|
p = Path(scan_path)
|
|
if not p.exists():
|
|
continue
|
|
partial = self.scan_path(scan_path, recursive=True, quick=False, callback=callback)
|
|
aggregate.files_scanned += partial.files_scanned
|
|
aggregate.files_skipped += partial.files_skipped
|
|
aggregate.threats.extend(partial.threats)
|
|
|
|
aggregate.end_time = datetime.utcnow()
|
|
|
|
event_bus.publish(EventType.SCAN_COMPLETED, {
|
|
"scan_id": aggregate.scan_id,
|
|
"files_scanned": aggregate.files_scanned,
|
|
"threats": len(aggregate.threats),
|
|
"duration": aggregate.duration_seconds,
|
|
})
|
|
|
|
return aggregate
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _collect_files(
|
|
self,
|
|
root: Path,
|
|
recursive: bool = True,
|
|
quick: bool = False,
|
|
) -> List[Path]:
|
|
"""Walk *root* and return a list of scannable file paths.
|
|
|
|
Respects ``config.exclude_paths`` and ``config.max_file_size``.
|
|
"""
|
|
targets: List[Path] = []
|
|
|
|
if quick:
|
|
# In quick mode, only descend into known-risky subdirectories.
|
|
roots = [
|
|
root / rel
|
|
for rel in (
|
|
"tmp", "var/tmp", "dev/shm", "usr/local/bin",
|
|
"var/spool/cron", "etc/cron.d", "etc/cron.daily",
|
|
"var/www", "srv",
|
|
)
|
|
if (root / rel).exists()
|
|
]
|
|
# Also include the quick-scan list itself if root is /.
|
|
if str(root) == "/":
|
|
roots = [Path(p) for p in QUICK_SCAN_PATHS if Path(p).exists()]
|
|
else:
|
|
roots = [root]
|
|
|
|
exclude = set(self.config.exclude_paths)
|
|
|
|
for r in roots:
|
|
if r.is_file():
|
|
targets.append(r)
|
|
continue
|
|
iterator = r.rglob("*") if recursive else r.iterdir()
|
|
try:
|
|
for entry in iterator:
|
|
if not entry.is_file():
|
|
continue
|
|
# Exclude check.
|
|
entry_str = str(entry)
|
|
if any(entry_str.startswith(ex) for ex in exclude):
|
|
continue
|
|
try:
|
|
if entry.stat().st_size > self.config.max_file_size:
|
|
continue
|
|
except OSError:
|
|
continue
|
|
targets.append(entry)
|
|
except PermissionError:
|
|
logger.warning("Permission denied: %s", r)
|
|
|
|
return targets
|