"""Rootkit detector for AYN Antivirus. Performs system-wide checks for indicators of rootkit compromise: known rootkit files, modified system binaries, hidden processes, hidden kernel modules, LD_PRELOAD hijacking, hidden network ports, and tampered logs. Many checks require **root** privileges. On non-Linux systems, kernel- module and /proc-based checks are gracefully skipped. """ from __future__ import annotations import logging import os import subprocess from pathlib import Path from typing import List, Optional, Set import psutil from ayn_antivirus.constants import ( KNOWN_ROOTKIT_FILES, MALICIOUS_ENV_VARS, ) from ayn_antivirus.detectors.base import BaseDetector, DetectionResult logger = logging.getLogger(__name__) class RootkitDetector(BaseDetector): """System-wide rootkit detection. Unlike other detectors, the *file_path* argument is optional. When called without a path (or with ``file_path=None``) the detector runs every host-level check. When given a file it limits itself to checks relevant to that file. """ # ------------------------------------------------------------------ # BaseDetector interface # ------------------------------------------------------------------ @property def name(self) -> str: return "rootkit_detector" @property def description(self) -> str: return "Detects rootkits via file, process, module, and environment analysis" def detect( self, file_path: str | Path | None = None, file_content: Optional[bytes] = None, file_hash: Optional[str] = None, ) -> List[DetectionResult]: """Run rootkit checks. If *file_path* is ``None``, all system-wide checks are executed. Otherwise only file-specific checks run. """ results: List[DetectionResult] = [] if file_path is not None: fp = Path(file_path) # File-specific: is this a known rootkit artefact? results.extend(self._check_known_rootkit_file(fp)) return results # --- Full system-wide scan --- results.extend(self._check_known_rootkit_files()) results.extend(self._check_ld_preload()) results.extend(self._check_ld_so_preload()) results.extend(self._check_hidden_processes()) results.extend(self._check_hidden_kernel_modules()) results.extend(self._check_hidden_network_ports()) results.extend(self._check_malicious_env_vars()) results.extend(self._check_tampered_logs()) return results # ------------------------------------------------------------------ # Known rootkit files # ------------------------------------------------------------------ def _check_known_rootkit_files(self) -> List[DetectionResult]: """Check every path in :pydata:`KNOWN_ROOTKIT_FILES`.""" results: List[DetectionResult] = [] for path_str in KNOWN_ROOTKIT_FILES: p = Path(path_str) if p.exists(): results.append(DetectionResult( threat_name="Rootkit.KnownFile", threat_type="ROOTKIT", severity="CRITICAL", confidence=90, details=f"Known rootkit artefact present: {path_str}", detector_name=self.name, )) return results def _check_known_rootkit_file(self, file_path: Path) -> List[DetectionResult]: """Check whether *file_path* is a known rootkit file.""" results: List[DetectionResult] = [] path_str = str(file_path) if path_str in KNOWN_ROOTKIT_FILES: results.append(DetectionResult( threat_name="Rootkit.KnownFile", threat_type="ROOTKIT", severity="CRITICAL", confidence=90, details=f"Known rootkit artefact: {path_str}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # LD_PRELOAD / ld.so.preload # ------------------------------------------------------------------ def _check_ld_preload(self) -> List[DetectionResult]: """Flag the ``LD_PRELOAD`` environment variable if set globally.""" results: List[DetectionResult] = [] val = os.environ.get("LD_PRELOAD", "") if val: results.append(DetectionResult( threat_name="Rootkit.LDPreload.EnvVar", threat_type="ROOTKIT", severity="CRITICAL", confidence=85, details=f"LD_PRELOAD is set: {val}", detector_name=self.name, )) return results def _check_ld_so_preload(self) -> List[DetectionResult]: """Check ``/etc/ld.so.preload`` for suspicious entries.""" results: List[DetectionResult] = [] ld_preload_file = Path("/etc/ld.so.preload") if not ld_preload_file.exists(): return results try: content = ld_preload_file.read_text().strip() except PermissionError: self._warn("Cannot read /etc/ld.so.preload") return results if content: lines = [l.strip() for l in content.splitlines() if l.strip() and not l.startswith("#")] if lines: results.append(DetectionResult( threat_name="Rootkit.LDPreload.File", threat_type="ROOTKIT", severity="CRITICAL", confidence=85, details=f"/etc/ld.so.preload contains entries: {', '.join(lines[:5])}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Hidden processes # ------------------------------------------------------------------ def _check_hidden_processes(self) -> List[DetectionResult]: """Compare /proc PIDs with psutil to find hidden processes.""" results: List[DetectionResult] = [] proc_dir = Path("/proc") if not proc_dir.is_dir(): return results # non-Linux proc_pids: Set[int] = set() try: for entry in proc_dir.iterdir(): if entry.name.isdigit(): proc_pids.add(int(entry.name)) except PermissionError: return results psutil_pids = set(psutil.pids()) hidden = proc_pids - psutil_pids for pid in hidden: name = "" try: comm = proc_dir / str(pid) / "comm" if comm.exists(): name = comm.read_text().strip() except OSError: pass results.append(DetectionResult( threat_name="Rootkit.HiddenProcess", threat_type="ROOTKIT", severity="CRITICAL", confidence=85, details=f"PID {pid} ({name or 'unknown'}) visible in /proc but hidden from psutil", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Hidden kernel modules # ------------------------------------------------------------------ def _check_hidden_kernel_modules(self) -> List[DetectionResult]: """Compare ``lsmod`` output with ``/proc/modules`` to find discrepancies.""" results: List[DetectionResult] = [] proc_modules_path = Path("/proc/modules") if not proc_modules_path.exists(): return results # non-Linux # Modules from /proc/modules. try: proc_content = proc_modules_path.read_text() except PermissionError: return results proc_mods: Set[str] = set() for line in proc_content.splitlines(): parts = line.split() if parts: proc_mods.add(parts[0]) # Modules from lsmod. lsmod_mods: Set[str] = set() try: output = subprocess.check_output(["lsmod"], stderr=subprocess.DEVNULL, timeout=10) for line in output.decode(errors="replace").splitlines()[1:]: parts = line.split() if parts: lsmod_mods.add(parts[0]) except (FileNotFoundError, subprocess.SubprocessError, OSError): return results # lsmod not available # Modules in /proc but NOT in lsmod → hidden from userspace. hidden = proc_mods - lsmod_mods for mod in hidden: results.append(DetectionResult( threat_name="Rootkit.HiddenKernelModule", threat_type="ROOTKIT", severity="CRITICAL", confidence=80, details=f"Kernel module '{mod}' in /proc/modules but hidden from lsmod", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Hidden network ports # ------------------------------------------------------------------ def _check_hidden_network_ports(self) -> List[DetectionResult]: """Compare ``ss``/``netstat`` listening ports with psutil.""" results: List[DetectionResult] = [] # Ports from psutil. psutil_ports: Set[int] = set() try: for conn in psutil.net_connections(kind="inet"): if conn.status == "LISTEN" and conn.laddr: psutil_ports.add(conn.laddr.port) except psutil.AccessDenied: return results # Ports from ss. ss_ports: Set[int] = set() try: output = subprocess.check_output( ["ss", "-tlnH"], stderr=subprocess.DEVNULL, timeout=10 ) for line in output.decode(errors="replace").splitlines(): # Typical ss output: LISTEN 0 128 0.0.0.0:22 ... parts = line.split() for part in parts: if ":" in part: try: port = int(part.rsplit(":", 1)[1]) ss_ports.add(port) except (ValueError, IndexError): continue except (FileNotFoundError, subprocess.SubprocessError, OSError): return results # ss not available # Ports in ss but not in psutil → potentially hidden by a rootkit. hidden = ss_ports - psutil_ports for port in hidden: results.append(DetectionResult( threat_name="Rootkit.HiddenPort", threat_type="ROOTKIT", severity="HIGH", confidence=70, details=f"Listening port {port} visible to ss but hidden from psutil", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Malicious environment variables # ------------------------------------------------------------------ def _check_malicious_env_vars(self) -> List[DetectionResult]: """Check the current environment for known-risky variables.""" results: List[DetectionResult] = [] for entry in MALICIOUS_ENV_VARS: if "=" in entry: # Exact key=value match (e.g. "HISTFILE=/dev/null"). key, val = entry.split("=", 1) if os.environ.get(key) == val: results.append(DetectionResult( threat_name="Rootkit.EnvVar.Suspicious", threat_type="ROOTKIT", severity="HIGH", confidence=75, details=f"Suspicious environment variable: {key}={val}", detector_name=self.name, )) else: # Key presence check (e.g. "LD_PRELOAD"). if entry in os.environ: results.append(DetectionResult( threat_name="Rootkit.EnvVar.Suspicious", threat_type="ROOTKIT", severity="HIGH", confidence=65, details=f"Suspicious environment variable set: {entry}={os.environ[entry][:100]}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Tampered log files # ------------------------------------------------------------------ _LOG_PATHS = [ "/var/log/auth.log", "/var/log/syslog", "/var/log/messages", "/var/log/secure", "/var/log/wtmp", "/var/log/btmp", "/var/log/lastlog", ] def _check_tampered_logs(self) -> List[DetectionResult]: """Look for signs of log tampering: zero-byte logs, missing logs, or logs whose mtime is suspiciously older than expected. """ results: List[DetectionResult] = [] for log_path_str in self._LOG_PATHS: log_path = Path(log_path_str) if not log_path.exists(): # Missing critical log. if log_path_str in ("/var/log/auth.log", "/var/log/syslog", "/var/log/wtmp"): results.append(DetectionResult( threat_name="Rootkit.Log.Missing", threat_type="ROOTKIT", severity="HIGH", confidence=60, details=f"Critical log file missing: {log_path_str}", detector_name=self.name, )) continue try: st = log_path.stat() except OSError: continue # Zero-byte log file (may have been truncated). if st.st_size == 0: results.append(DetectionResult( threat_name="Rootkit.Log.Truncated", threat_type="ROOTKIT", severity="HIGH", confidence=70, details=f"Log file is empty (possibly truncated): {log_path_str}", detector_name=self.name, )) return results