"""YARA-rule detector for AYN Antivirus. Compiles and caches YARA rule files from the configured rules directory, then matches them against scanned files. ``yara-python`` is treated as an optional dependency — if it is missing the detector logs a warning and returns no results. """ from __future__ import annotations import logging from pathlib import Path from typing import Any, List, Optional from ayn_antivirus.constants import DEFAULT_YARA_RULES_DIR from ayn_antivirus.detectors.base import BaseDetector, DetectionResult logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Conditional import — yara-python is optional. # --------------------------------------------------------------------------- try: import yara # type: ignore[import-untyped] _YARA_AVAILABLE = True except ImportError: _YARA_AVAILABLE = False yara = None # type: ignore[assignment] # Severity mapping for YARA rule meta tags. _META_SEVERITY_MAP = { "critical": "CRITICAL", "high": "HIGH", "medium": "MEDIUM", "low": "LOW", } class YaraDetector(BaseDetector): """Detect threats by matching YARA rules against file contents. Parameters ---------- rules_dir: Directory containing ``.yar`` / ``.yara`` rule files. Defaults to the bundled ``signatures/yara_rules/`` directory. """ def __init__(self, rules_dir: str | Path = DEFAULT_YARA_RULES_DIR) -> None: self.rules_dir = Path(rules_dir) self._rules: Any = None # compiled yara.Rules object self._rule_count: int = 0 self._loaded = False # ------------------------------------------------------------------ # BaseDetector interface # ------------------------------------------------------------------ @property def name(self) -> str: return "yara_detector" @property def description(self) -> str: return "Pattern matching using compiled YARA rules" def detect( self, file_path: str | Path, file_content: Optional[bytes] = None, file_hash: Optional[str] = None, ) -> List[DetectionResult]: """Match all loaded YARA rules against *file_path*. Falls back to in-memory matching if *file_content* is provided. """ if not _YARA_AVAILABLE: self._warn("yara-python is not installed — skipping YARA detection") return [] if not self._loaded: self.load_rules() if self._rules is None: return [] file_path = Path(file_path) results: List[DetectionResult] = [] try: if file_content is not None: matches = self._rules.match(data=file_content) else: matches = self._rules.match(filepath=str(file_path)) except yara.Error as exc: self._warn("YARA scan failed for %s: %s", file_path, exc) return results for match in matches: meta = match.meta or {} severity = _META_SEVERITY_MAP.get( str(meta.get("severity", "")).lower(), "HIGH" ) threat_type = meta.get("threat_type", "MALWARE").upper() threat_name = meta.get("threat_name") or match.rule matched_strings = [] try: for offset, identifier, data in match.strings: matched_strings.append( f"{identifier} @ 0x{offset:x}" ) except (TypeError, ValueError): # match.strings format varies between yara-python versions. pass detail_parts = [f"YARA rule '{match.rule}' matched"] if match.namespace and match.namespace != "default": detail_parts.append(f"namespace={match.namespace}") if matched_strings: detail_parts.append( f"strings=[{', '.join(matched_strings[:5])}]" ) if meta.get("description"): detail_parts.append(meta["description"]) results.append(DetectionResult( threat_name=threat_name, threat_type=threat_type, severity=severity, confidence=int(meta.get("confidence", 90)), details=" | ".join(detail_parts), detector_name=self.name, )) return results # ------------------------------------------------------------------ # Rule management # ------------------------------------------------------------------ def load_rules(self, rules_dir: Optional[str | Path] = None) -> None: """Compile all ``.yar`` / ``.yara`` files in *rules_dir*. Compiled rules are cached in ``self._rules``. Call this again after updating rule files to pick up changes. """ if not _YARA_AVAILABLE: self._warn("yara-python is not installed — cannot load rules") return directory = Path(rules_dir) if rules_dir else self.rules_dir if not directory.is_dir(): self._warn("YARA rules directory does not exist: %s", directory) return rule_files = sorted( p for p in directory.iterdir() if p.suffix.lower() in (".yar", ".yara") and p.is_file() ) if not rule_files: self._log("No YARA rule files found in %s", directory) self._rules = None self._rule_count = 0 self._loaded = True return # Build a filepaths dict for yara.compile(filepaths={...}). filepaths = {} for idx, rf in enumerate(rule_files): namespace = rf.stem filepaths[namespace] = str(rf) try: self._rules = yara.compile(filepaths=filepaths) self._rule_count = len(rule_files) self._loaded = True self._log( "Compiled %d YARA rule file(s) from %s", self._rule_count, directory, ) except yara.SyntaxError as exc: self._error("YARA compilation error: %s", exc) self._rules = None except yara.Error as exc: self._error("YARA error: %s", exc) self._rules = None @property def rule_count(self) -> int: """Number of rule files currently compiled.""" return self._rule_count @property def available(self) -> bool: """Return ``True`` if ``yara-python`` is installed.""" return _YARA_AVAILABLE