388 lines
14 KiB
Python
388 lines
14 KiB
Python
"""Rootkit detector for AYN Antivirus.
|
|
|
|
Performs system-wide checks for indicators of rootkit compromise: known
|
|
rootkit files, modified system binaries, hidden processes, hidden kernel
|
|
modules, LD_PRELOAD hijacking, hidden network ports, and tampered logs.
|
|
|
|
Many checks require **root** privileges. On non-Linux systems, kernel-
|
|
module and /proc-based checks are gracefully skipped.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Optional, Set
|
|
|
|
import psutil
|
|
|
|
from ayn_antivirus.constants import (
|
|
KNOWN_ROOTKIT_FILES,
|
|
MALICIOUS_ENV_VARS,
|
|
)
|
|
from ayn_antivirus.detectors.base import BaseDetector, DetectionResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RootkitDetector(BaseDetector):
|
|
"""System-wide rootkit detection.
|
|
|
|
Unlike other detectors, the *file_path* argument is optional. When
|
|
called without a path (or with ``file_path=None``) the detector runs
|
|
every host-level check. When given a file it limits itself to checks
|
|
relevant to that file.
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# BaseDetector interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "rootkit_detector"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Detects rootkits via file, process, module, and environment analysis"
|
|
|
|
def detect(
|
|
self,
|
|
file_path: str | Path | None = None,
|
|
file_content: Optional[bytes] = None,
|
|
file_hash: Optional[str] = None,
|
|
) -> List[DetectionResult]:
|
|
"""Run rootkit checks.
|
|
|
|
If *file_path* is ``None``, all system-wide checks are executed.
|
|
Otherwise only file-specific checks run.
|
|
"""
|
|
results: List[DetectionResult] = []
|
|
|
|
if file_path is not None:
|
|
fp = Path(file_path)
|
|
# File-specific: is this a known rootkit artefact?
|
|
results.extend(self._check_known_rootkit_file(fp))
|
|
return results
|
|
|
|
# --- Full system-wide scan ---
|
|
results.extend(self._check_known_rootkit_files())
|
|
results.extend(self._check_ld_preload())
|
|
results.extend(self._check_ld_so_preload())
|
|
results.extend(self._check_hidden_processes())
|
|
results.extend(self._check_hidden_kernel_modules())
|
|
results.extend(self._check_hidden_network_ports())
|
|
results.extend(self._check_malicious_env_vars())
|
|
results.extend(self._check_tampered_logs())
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Known rootkit files
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_known_rootkit_files(self) -> List[DetectionResult]:
|
|
"""Check every path in :pydata:`KNOWN_ROOTKIT_FILES`."""
|
|
results: List[DetectionResult] = []
|
|
for path_str in KNOWN_ROOTKIT_FILES:
|
|
p = Path(path_str)
|
|
if p.exists():
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.KnownFile",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=90,
|
|
details=f"Known rootkit artefact present: {path_str}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_known_rootkit_file(self, file_path: Path) -> List[DetectionResult]:
|
|
"""Check whether *file_path* is a known rootkit file."""
|
|
results: List[DetectionResult] = []
|
|
path_str = str(file_path)
|
|
if path_str in KNOWN_ROOTKIT_FILES:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.KnownFile",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=90,
|
|
details=f"Known rootkit artefact: {path_str}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# LD_PRELOAD / ld.so.preload
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_ld_preload(self) -> List[DetectionResult]:
|
|
"""Flag the ``LD_PRELOAD`` environment variable if set globally."""
|
|
results: List[DetectionResult] = []
|
|
val = os.environ.get("LD_PRELOAD", "")
|
|
if val:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.LDPreload.EnvVar",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=85,
|
|
details=f"LD_PRELOAD is set: {val}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_ld_so_preload(self) -> List[DetectionResult]:
|
|
"""Check ``/etc/ld.so.preload`` for suspicious entries."""
|
|
results: List[DetectionResult] = []
|
|
ld_preload_file = Path("/etc/ld.so.preload")
|
|
if not ld_preload_file.exists():
|
|
return results
|
|
|
|
try:
|
|
content = ld_preload_file.read_text().strip()
|
|
except PermissionError:
|
|
self._warn("Cannot read /etc/ld.so.preload")
|
|
return results
|
|
|
|
if content:
|
|
lines = [l.strip() for l in content.splitlines() if l.strip() and not l.startswith("#")]
|
|
if lines:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.LDPreload.File",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=85,
|
|
details=f"/etc/ld.so.preload contains entries: {', '.join(lines[:5])}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Hidden processes
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_hidden_processes(self) -> List[DetectionResult]:
|
|
"""Compare /proc PIDs with psutil to find hidden processes."""
|
|
results: List[DetectionResult] = []
|
|
proc_dir = Path("/proc")
|
|
if not proc_dir.is_dir():
|
|
return results # non-Linux
|
|
|
|
proc_pids: Set[int] = set()
|
|
try:
|
|
for entry in proc_dir.iterdir():
|
|
if entry.name.isdigit():
|
|
proc_pids.add(int(entry.name))
|
|
except PermissionError:
|
|
return results
|
|
|
|
psutil_pids = set(psutil.pids())
|
|
hidden = proc_pids - psutil_pids
|
|
|
|
for pid in hidden:
|
|
name = ""
|
|
try:
|
|
comm = proc_dir / str(pid) / "comm"
|
|
if comm.exists():
|
|
name = comm.read_text().strip()
|
|
except OSError:
|
|
pass
|
|
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.HiddenProcess",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=85,
|
|
details=f"PID {pid} ({name or 'unknown'}) visible in /proc but hidden from psutil",
|
|
detector_name=self.name,
|
|
))
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Hidden kernel modules
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_hidden_kernel_modules(self) -> List[DetectionResult]:
|
|
"""Compare ``lsmod`` output with ``/proc/modules`` to find discrepancies."""
|
|
results: List[DetectionResult] = []
|
|
proc_modules_path = Path("/proc/modules")
|
|
if not proc_modules_path.exists():
|
|
return results # non-Linux
|
|
|
|
# Modules from /proc/modules.
|
|
try:
|
|
proc_content = proc_modules_path.read_text()
|
|
except PermissionError:
|
|
return results
|
|
|
|
proc_mods: Set[str] = set()
|
|
for line in proc_content.splitlines():
|
|
parts = line.split()
|
|
if parts:
|
|
proc_mods.add(parts[0])
|
|
|
|
# Modules from lsmod.
|
|
lsmod_mods: Set[str] = set()
|
|
try:
|
|
output = subprocess.check_output(["lsmod"], stderr=subprocess.DEVNULL, timeout=10)
|
|
for line in output.decode(errors="replace").splitlines()[1:]:
|
|
parts = line.split()
|
|
if parts:
|
|
lsmod_mods.add(parts[0])
|
|
except (FileNotFoundError, subprocess.SubprocessError, OSError):
|
|
return results # lsmod not available
|
|
|
|
# Modules in /proc but NOT in lsmod → hidden from userspace.
|
|
hidden = proc_mods - lsmod_mods
|
|
for mod in hidden:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.HiddenKernelModule",
|
|
threat_type="ROOTKIT",
|
|
severity="CRITICAL",
|
|
confidence=80,
|
|
details=f"Kernel module '{mod}' in /proc/modules but hidden from lsmod",
|
|
detector_name=self.name,
|
|
))
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Hidden network ports
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_hidden_network_ports(self) -> List[DetectionResult]:
|
|
"""Compare ``ss``/``netstat`` listening ports with psutil."""
|
|
results: List[DetectionResult] = []
|
|
|
|
# Ports from psutil.
|
|
psutil_ports: Set[int] = set()
|
|
try:
|
|
for conn in psutil.net_connections(kind="inet"):
|
|
if conn.status == "LISTEN" and conn.laddr:
|
|
psutil_ports.add(conn.laddr.port)
|
|
except psutil.AccessDenied:
|
|
return results
|
|
|
|
# Ports from ss.
|
|
ss_ports: Set[int] = set()
|
|
try:
|
|
output = subprocess.check_output(
|
|
["ss", "-tlnH"], stderr=subprocess.DEVNULL, timeout=10
|
|
)
|
|
for line in output.decode(errors="replace").splitlines():
|
|
# Typical ss output: LISTEN 0 128 0.0.0.0:22 ...
|
|
parts = line.split()
|
|
for part in parts:
|
|
if ":" in part:
|
|
try:
|
|
port = int(part.rsplit(":", 1)[1])
|
|
ss_ports.add(port)
|
|
except (ValueError, IndexError):
|
|
continue
|
|
except (FileNotFoundError, subprocess.SubprocessError, OSError):
|
|
return results # ss not available
|
|
|
|
# Ports in ss but not in psutil → potentially hidden by a rootkit.
|
|
hidden = ss_ports - psutil_ports
|
|
for port in hidden:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.HiddenPort",
|
|
threat_type="ROOTKIT",
|
|
severity="HIGH",
|
|
confidence=70,
|
|
details=f"Listening port {port} visible to ss but hidden from psutil",
|
|
detector_name=self.name,
|
|
))
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Malicious environment variables
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_malicious_env_vars(self) -> List[DetectionResult]:
|
|
"""Check the current environment for known-risky variables."""
|
|
results: List[DetectionResult] = []
|
|
for entry in MALICIOUS_ENV_VARS:
|
|
if "=" in entry:
|
|
# Exact key=value match (e.g. "HISTFILE=/dev/null").
|
|
key, val = entry.split("=", 1)
|
|
if os.environ.get(key) == val:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.EnvVar.Suspicious",
|
|
threat_type="ROOTKIT",
|
|
severity="HIGH",
|
|
confidence=75,
|
|
details=f"Suspicious environment variable: {key}={val}",
|
|
detector_name=self.name,
|
|
))
|
|
else:
|
|
# Key presence check (e.g. "LD_PRELOAD").
|
|
if entry in os.environ:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.EnvVar.Suspicious",
|
|
threat_type="ROOTKIT",
|
|
severity="HIGH",
|
|
confidence=65,
|
|
details=f"Suspicious environment variable set: {entry}={os.environ[entry][:100]}",
|
|
detector_name=self.name,
|
|
))
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tampered log files
|
|
# ------------------------------------------------------------------
|
|
|
|
_LOG_PATHS = [
|
|
"/var/log/auth.log",
|
|
"/var/log/syslog",
|
|
"/var/log/messages",
|
|
"/var/log/secure",
|
|
"/var/log/wtmp",
|
|
"/var/log/btmp",
|
|
"/var/log/lastlog",
|
|
]
|
|
|
|
def _check_tampered_logs(self) -> List[DetectionResult]:
|
|
"""Look for signs of log tampering: zero-byte logs, missing logs,
|
|
or logs whose mtime is suspiciously older than expected.
|
|
"""
|
|
results: List[DetectionResult] = []
|
|
|
|
for log_path_str in self._LOG_PATHS:
|
|
log_path = Path(log_path_str)
|
|
if not log_path.exists():
|
|
# Missing critical log.
|
|
if log_path_str in ("/var/log/auth.log", "/var/log/syslog", "/var/log/wtmp"):
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.Log.Missing",
|
|
threat_type="ROOTKIT",
|
|
severity="HIGH",
|
|
confidence=60,
|
|
details=f"Critical log file missing: {log_path_str}",
|
|
detector_name=self.name,
|
|
))
|
|
continue
|
|
|
|
try:
|
|
st = log_path.stat()
|
|
except OSError:
|
|
continue
|
|
|
|
# Zero-byte log file (may have been truncated).
|
|
if st.st_size == 0:
|
|
results.append(DetectionResult(
|
|
threat_name="Rootkit.Log.Truncated",
|
|
threat_type="ROOTKIT",
|
|
severity="HIGH",
|
|
confidence=70,
|
|
details=f"Log file is empty (possibly truncated): {log_path_str}",
|
|
detector_name=self.name,
|
|
))
|
|
|
|
return results
|