"""Automated remediation engine for AYN Antivirus. Provides targeted fix actions for different threat types: permission hardening, process killing, cron cleanup, SSH key auditing, startup script removal, LD_PRELOAD cleaning, IP/domain blocking, and system binary restoration via the system package manager. All actions support a **dry-run** mode that logs intended changes without modifying the system. """ from __future__ import annotations import logging import os import re import shutil import stat import subprocess from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional import psutil from ayn_antivirus.constants import SUSPICIOUS_CRON_PATTERNS from ayn_antivirus.core.event_bus import EventType, event_bus logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Action record # --------------------------------------------------------------------------- @dataclass class RemediationAction: """Describes a single remediation step.""" action: str target: str details: str = "" success: bool = False dry_run: bool = False # --------------------------------------------------------------------------- # AutoPatcher # --------------------------------------------------------------------------- class AutoPatcher: """Apply targeted remediations against discovered threats. Parameters ---------- dry_run: If ``True``, no changes are made — only the intended actions are logged and returned. """ def __init__(self, dry_run: bool = False) -> None: self.dry_run = dry_run self.actions: List[RemediationAction] = [] # ------------------------------------------------------------------ # High-level dispatcher # ------------------------------------------------------------------ def remediate_threat(self, threat_info: Dict[str, Any]) -> List[RemediationAction]: """Choose and execute the correct fix(es) for *threat_info*. Routes on ``threat_type`` (MINER, ROOTKIT, SPYWARE, MALWARE, etc.) and the available metadata. """ ttype = (threat_info.get("threat_type") or "").upper() path = threat_info.get("path", "") pid = threat_info.get("pid") actions: List[RemediationAction] = [] # Kill associated process. if pid: actions.append(self.kill_malicious_process(int(pid))) # Quarantine / permission fix for file-based threats. if path and Path(path).exists(): actions.append(self.fix_permissions(path)) # Type-specific extras. if ttype == "ROOTKIT": actions.append(self.fix_ld_preload()) elif ttype == "MINER": # Block known pool domains if we have one. domain = threat_info.get("domain") if domain: actions.append(self.block_domain(domain)) ip = threat_info.get("ip") if ip: actions.append(self.block_ip(ip)) elif ttype == "SPYWARE": if path and "cron" in path: actions.append(self.remove_malicious_cron()) for a in actions: self._publish(a) self.actions.extend(actions) return actions # ------------------------------------------------------------------ # Permission fixes # ------------------------------------------------------------------ def fix_permissions(self, path: str | Path) -> RemediationAction: """Remove SUID, SGID, and world-writable bits from *path*.""" p = Path(path) action = RemediationAction( action="fix_permissions", target=str(p), dry_run=self.dry_run, ) try: st = p.stat() old_mode = st.st_mode new_mode = old_mode # Strip SUID / SGID. new_mode &= ~stat.S_ISUID new_mode &= ~stat.S_ISGID # Strip world-writable. new_mode &= ~stat.S_IWOTH if new_mode == old_mode: action.details = "Permissions already safe" action.success = True return action action.details = ( f"Changing permissions: {oct(old_mode & 0o7777)} → {oct(new_mode & 0o7777)}" ) if not self.dry_run: p.chmod(new_mode) action.success = True logger.info("fix_permissions: %s %s", action.details, "(dry-run)" if self.dry_run else "") except OSError as exc: action.details = f"Failed: {exc}" logger.error("fix_permissions failed on %s: %s", p, exc) return action # ------------------------------------------------------------------ # Process killing # ------------------------------------------------------------------ def kill_malicious_process(self, pid: int) -> RemediationAction: """Send SIGKILL to *pid*.""" action = RemediationAction( action="kill_process", target=str(pid), dry_run=self.dry_run, ) try: proc = psutil.Process(pid) action.details = f"Process: {proc.name()} (PID {pid})" except psutil.NoSuchProcess: action.details = f"PID {pid} no longer exists" action.success = True return action if self.dry_run: action.success = True return action try: proc.kill() proc.wait(timeout=5) action.success = True logger.info("Killed process %d (%s)", pid, proc.name()) except psutil.NoSuchProcess: action.success = True action.details += " (already exited)" except (psutil.AccessDenied, psutil.TimeoutExpired) as exc: action.details += f" — {exc}" logger.error("Failed to kill PID %d: %s", pid, exc) return action # ------------------------------------------------------------------ # Cron cleanup # ------------------------------------------------------------------ def remove_malicious_cron(self, pattern: Optional[str] = None) -> RemediationAction: """Remove cron entries matching suspicious patterns. If *pattern* is ``None``, uses all :pydata:`SUSPICIOUS_CRON_PATTERNS`. """ action = RemediationAction( action="remove_malicious_cron", target="/var/spool/cron + /etc/cron.d", dry_run=self.dry_run, ) patterns = [re.compile(pattern)] if pattern else [ re.compile(p) for p in SUSPICIOUS_CRON_PATTERNS ] removed_lines: List[str] = [] cron_dirs = [ Path("/var/spool/cron/crontabs"), Path("/var/spool/cron"), Path("/etc/cron.d"), ] for cron_dir in cron_dirs: if not cron_dir.is_dir(): continue for cron_file in cron_dir.iterdir(): if not cron_file.is_file(): continue try: lines = cron_file.read_text().splitlines() clean_lines = [] for line in lines: if any(pat.search(line) for pat in patterns): removed_lines.append(f"{cron_file}: {line.strip()}") else: clean_lines.append(line) if len(clean_lines) < len(lines) and not self.dry_run: cron_file.write_text("\n".join(clean_lines) + "\n") except OSError: continue action.details = f"Removed {len(removed_lines)} cron line(s)" if removed_lines: action.details += ": " + "; ".join(removed_lines[:5]) action.success = True logger.info("remove_malicious_cron: %s", action.details) return action # ------------------------------------------------------------------ # SSH key cleanup # ------------------------------------------------------------------ def clean_authorized_keys(self, path: Optional[str | Path] = None) -> RemediationAction: """Remove unauthorized keys from ``authorized_keys``. Without *path*, scans all users' ``~/.ssh/authorized_keys`` plus ``/root/.ssh/authorized_keys``. In non-dry-run mode, backs up the file before modifying. """ action = RemediationAction( action="clean_authorized_keys", target=str(path) if path else "all users", dry_run=self.dry_run, ) targets: List[Path] = [] if path: targets.append(Path(path)) else: # Root root_ak = Path("/root/.ssh/authorized_keys") if root_ak.exists(): targets.append(root_ak) # System users from /home home = Path("/home") if home.is_dir(): for user_dir in home.iterdir(): ak = user_dir / ".ssh" / "authorized_keys" if ak.exists(): targets.append(ak) total_removed = 0 for ak_path in targets: try: lines = ak_path.read_text().splitlines() clean: List[str] = [] for line in lines: stripped = line.strip() if not stripped or stripped.startswith("#"): clean.append(line) continue # Flag lines with forced commands as suspicious. if stripped.startswith("command="): total_removed += 1 continue clean.append(line) if len(clean) < len(lines) and not self.dry_run: backup = ak_path.with_suffix(".bak") shutil.copy2(str(ak_path), str(backup)) ak_path.write_text("\n".join(clean) + "\n") except OSError: continue action.details = f"Removed {total_removed} suspicious key(s) from {len(targets)} file(s)" action.success = True logger.info("clean_authorized_keys: %s", action.details) return action # ------------------------------------------------------------------ # Startup script cleanup # ------------------------------------------------------------------ def remove_suspicious_startup(self, path: Optional[str | Path] = None) -> RemediationAction: """Remove suspicious entries from init scripts, systemd units, or rc.local.""" action = RemediationAction( action="remove_suspicious_startup", target=str(path) if path else "/etc/init.d, systemd, rc.local", dry_run=self.dry_run, ) suspicious_re = re.compile( r"(?:curl|wget)\s+.*\|\s*(?:sh|bash)|xmrig|minerd|/dev/tcp/|nohup\s+.*&", re.IGNORECASE, ) targets: List[Path] = [] if path: targets.append(Path(path)) else: rc_local = Path("/etc/rc.local") if rc_local.exists(): targets.append(rc_local) for d in ("/etc/init.d", "/etc/systemd/system"): dp = Path(d) if dp.is_dir(): targets.extend(f for f in dp.iterdir() if f.is_file()) cleaned_count = 0 for target in targets: try: content = target.read_text() lines = content.splitlines() clean = [l for l in lines if not suspicious_re.search(l)] if len(clean) < len(lines): cleaned_count += len(lines) - len(clean) if not self.dry_run: backup = target.with_suffix(target.suffix + ".bak") shutil.copy2(str(target), str(backup)) target.write_text("\n".join(clean) + "\n") except OSError: continue action.details = f"Removed {cleaned_count} suspicious line(s) from {len(targets)} file(s)" action.success = True logger.info("remove_suspicious_startup: %s", action.details) return action # ------------------------------------------------------------------ # LD_PRELOAD cleanup # ------------------------------------------------------------------ def fix_ld_preload(self) -> RemediationAction: """Remove all entries from ``/etc/ld.so.preload``.""" action = RemediationAction( action="fix_ld_preload", target="/etc/ld.so.preload", dry_run=self.dry_run, ) ld_path = Path("/etc/ld.so.preload") if not ld_path.exists(): action.details = "File does not exist — nothing to fix" action.success = True return action try: content = ld_path.read_text().strip() if not content: action.details = "File is already empty" action.success = True return action action.details = f"Clearing ld.so.preload (was: {content[:120]})" if not self.dry_run: backup = ld_path.with_suffix(".bak") shutil.copy2(str(ld_path), str(backup)) ld_path.write_text("") action.success = True logger.info("fix_ld_preload: %s", action.details) except OSError as exc: action.details = f"Failed: {exc}" logger.error("fix_ld_preload: %s", exc) return action # ------------------------------------------------------------------ # Network blocking # ------------------------------------------------------------------ def block_ip(self, ip_address: str) -> RemediationAction: """Add an iptables DROP rule for *ip_address*.""" action = RemediationAction( action="block_ip", target=ip_address, dry_run=self.dry_run, ) cmd = ["iptables", "-A", "OUTPUT", "-d", ip_address, "-j", "DROP"] action.details = f"Rule: {' '.join(cmd)}" if self.dry_run: action.success = True return action try: subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, timeout=10) action.success = True logger.info("Blocked IP via iptables: %s", ip_address) except (subprocess.CalledProcessError, FileNotFoundError, OSError) as exc: action.details += f" — failed: {exc}" logger.error("Failed to block IP %s: %s", ip_address, exc) return action def block_domain(self, domain: str) -> RemediationAction: """Redirect *domain* to 127.0.0.1 via ``/etc/hosts``.""" action = RemediationAction( action="block_domain", target=domain, dry_run=self.dry_run, ) hosts_path = Path("/etc/hosts") entry = f"127.0.0.1 {domain} # blocked by ayn-antivirus" action.details = f"Adding to /etc/hosts: {entry}" if self.dry_run: action.success = True return action try: current = hosts_path.read_text() if domain in current: action.details = f"Domain {domain} already in /etc/hosts" action.success = True return action with open(hosts_path, "a") as fh: fh.write(f"\n{entry}\n") action.success = True logger.info("Blocked domain via /etc/hosts: %s", domain) except OSError as exc: action.details += f" — failed: {exc}" logger.error("Failed to block domain %s: %s", domain, exc) return action # ------------------------------------------------------------------ # System binary restoration # ------------------------------------------------------------------ def restore_system_binary(self, binary_path: str | Path) -> RemediationAction: """Reinstall the package owning *binary_path* using the system package manager.""" binary_path = Path(binary_path) action = RemediationAction( action="restore_system_binary", target=str(binary_path), dry_run=self.dry_run, ) # Determine package manager and owning package. pkg_name, pm_cmd = _find_owning_package(binary_path) if not pkg_name: action.details = f"Cannot determine owning package for {binary_path}" return action reinstall_cmd = pm_cmd + [pkg_name] action.details = f"Reinstalling package '{pkg_name}': {' '.join(reinstall_cmd)}" if self.dry_run: action.success = True return action try: subprocess.check_call( reinstall_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, timeout=120 ) action.success = True logger.info("Restored %s via %s", binary_path, " ".join(reinstall_cmd)) except (subprocess.CalledProcessError, FileNotFoundError, OSError) as exc: action.details += f" — failed: {exc}" logger.error("Failed to restore %s: %s", binary_path, exc) return action # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _publish(self, action: RemediationAction) -> None: event_bus.publish(EventType.REMEDIATION_ACTION, { "action": action.action, "target": action.target, "details": action.details, "success": action.success, "dry_run": action.dry_run, }) # --------------------------------------------------------------------------- # Package-manager helpers # --------------------------------------------------------------------------- def _find_owning_package(binary_path: Path) -> tuple: """Return ``(package_name, reinstall_command_prefix)`` or ``("", [])``.""" path_str = str(binary_path) # dpkg (Debian/Ubuntu) try: out = subprocess.check_output( ["dpkg", "-S", path_str], stderr=subprocess.DEVNULL, timeout=10 ).decode().strip() pkg = out.split(":")[0] return pkg, ["apt-get", "install", "--reinstall", "-y"] except (subprocess.CalledProcessError, FileNotFoundError, OSError): pass # rpm (RHEL/CentOS/Fedora) try: out = subprocess.check_output( ["rpm", "-qf", path_str], stderr=subprocess.DEVNULL, timeout=10 ).decode().strip() if "not owned" not in out: # Try dnf first, fall back to yum. pm = "dnf" if shutil.which("dnf") else "yum" return out, [pm, "reinstall", "-y"] except (subprocess.CalledProcessError, FileNotFoundError, OSError): pass return "", []