130 lines
3.7 KiB
Python
130 lines
3.7 KiB
Python
"""Abstract base class and shared data structures for AYN detectors."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from abc import ABC, abstractmethod
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Detection result
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class DetectionResult:
|
||
"""A single detection produced by a detector.
|
||
|
||
Attributes
|
||
----------
|
||
threat_name:
|
||
Short identifier for the threat (e.g. ``"Trojan.Miner.XMRig"``).
|
||
threat_type:
|
||
Category string — ``VIRUS``, ``MALWARE``, ``SPYWARE``, ``MINER``,
|
||
``ROOTKIT``, ``HEURISTIC``, etc.
|
||
severity:
|
||
One of ``CRITICAL``, ``HIGH``, ``MEDIUM``, ``LOW``.
|
||
confidence:
|
||
How confident the detector is in the finding (0–100).
|
||
details:
|
||
Human-readable explanation.
|
||
detector_name:
|
||
Which detector produced this result.
|
||
"""
|
||
|
||
threat_name: str
|
||
threat_type: str
|
||
severity: str
|
||
confidence: int
|
||
details: str
|
||
detector_name: str
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Abstract base
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class BaseDetector(ABC):
|
||
"""Interface that every AYN detector must implement.
|
||
|
||
Detectors receive a file path (and optionally pre-read content / hash)
|
||
and return zero or more :class:`DetectionResult` instances.
|
||
"""
|
||
|
||
# ------------------------------------------------------------------
|
||
# Identity
|
||
# ------------------------------------------------------------------
|
||
|
||
@property
|
||
@abstractmethod
|
||
def name(self) -> str:
|
||
"""Machine-friendly detector identifier."""
|
||
...
|
||
|
||
@property
|
||
@abstractmethod
|
||
def description(self) -> str:
|
||
"""One-line human-readable summary."""
|
||
...
|
||
|
||
# ------------------------------------------------------------------
|
||
# Detection
|
||
# ------------------------------------------------------------------
|
||
|
||
@abstractmethod
|
||
def detect(
|
||
self,
|
||
file_path: str | Path,
|
||
file_content: Optional[bytes] = None,
|
||
file_hash: Optional[str] = None,
|
||
) -> List[DetectionResult]:
|
||
"""Run detection logic against a single file.
|
||
|
||
Parameters
|
||
----------
|
||
file_path:
|
||
Path to the file on disk.
|
||
file_content:
|
||
Optional pre-read bytes of the file (avoids double-read).
|
||
file_hash:
|
||
Optional pre-computed SHA-256 hex digest.
|
||
|
||
Returns
|
||
-------
|
||
list[DetectionResult]
|
||
Empty list when the file is clean.
|
||
"""
|
||
...
|
||
|
||
# ------------------------------------------------------------------
|
||
# Helpers
|
||
# ------------------------------------------------------------------
|
||
|
||
def _read_content(
|
||
self,
|
||
file_path: Path,
|
||
file_content: Optional[bytes],
|
||
max_bytes: int = 10 * 1024 * 1024,
|
||
) -> bytes:
|
||
"""Return *file_content* if provided, otherwise read from disk.
|
||
|
||
Reads at most *max_bytes* to avoid unbounded memory usage.
|
||
"""
|
||
if file_content is not None:
|
||
return file_content
|
||
with open(file_path, "rb") as fh:
|
||
return fh.read(max_bytes)
|
||
|
||
def _log(self, msg: str, *args) -> None:
|
||
logger.info("[%s] " + msg, self.name, *args)
|
||
|
||
def _warn(self, msg: str, *args) -> None:
|
||
logger.warning("[%s] " + msg, self.name, *args)
|
||
|
||
def _error(self, msg: str, *args) -> None:
|
||
logger.error("[%s] " + msg, self.name, *args)
|