"""Process memory scanner for AYN Antivirus. Reads ``/proc//maps`` and ``/proc//mem`` on Linux to search for injected code, suspicious byte patterns (mining pool URLs, known malware strings), and anomalous RWX memory regions. Most operations require **root** privileges. On non-Linux systems the scanner gracefully returns empty results. """ from __future__ import annotations import logging import os import re from pathlib import Path from typing import Any, Dict, List, Optional, Sequence from ayn_antivirus.constants import CRYPTO_POOL_DOMAINS from ayn_antivirus.scanners.base import BaseScanner logger = logging.getLogger(__name__) # Default byte-level patterns to search for in process memory. _DEFAULT_PATTERNS: List[bytes] = [ # Mining pool URLs *(domain.encode() for domain in CRYPTO_POOL_DOMAINS), # Common miner stratum strings b"stratum+tcp://", b"stratum+ssl://", b"stratum2+tcp://", # Suspicious shell commands sometimes found in injected memory b"/bin/sh -c", b"/bin/bash -i", b"/dev/tcp/", # Known malware markers b"PAYLOAD_START", b"x86_64-linux-gnu", b"ELF\x02\x01\x01", ] # Size of chunks when reading /proc//mem. _MEM_READ_CHUNK = 65536 # Regex to parse a single line from /proc//maps. # address perms offset dev inode pathname # 7f1c2a000000-7f1c2a021000 rw-p 00000000 00:00 0 [heap] _MAPS_RE = re.compile( r"^([0-9a-f]+)-([0-9a-f]+)\s+(r[w-][x-][ps-])\s+\S+\s+\S+\s+\d+\s*(.*)", re.MULTILINE, ) class MemoryScanner(BaseScanner): """Scan process memory for injected code and suspicious patterns. .. note:: This scanner only works on Linux where ``/proc`` is available. Operations on ``/proc//mem`` typically require root or ``CAP_SYS_PTRACE``. """ # ------------------------------------------------------------------ # BaseScanner interface # ------------------------------------------------------------------ @property def name(self) -> str: return "memory_scanner" @property def description(self) -> str: return "Scans process memory for injected code and malicious patterns" def scan(self, target: Any) -> Dict[str, Any]: """Scan a single process by PID. Parameters ---------- target: The PID (``int``) of the process to inspect. Returns ------- dict ``pid``, ``rwx_regions``, ``pattern_matches``, ``strings_sample``, ``error``. """ pid = int(target) result: Dict[str, Any] = { "pid": pid, "rwx_regions": [], "pattern_matches": [], "strings_sample": [], "error": None, } if not Path("/proc").is_dir(): result["error"] = "Not a Linux system — /proc not available" return result try: result["rwx_regions"] = self.find_injected_code(pid) result["pattern_matches"] = self.scan_for_patterns(pid, _DEFAULT_PATTERNS) result["strings_sample"] = self.get_memory_strings(pid, min_length=8)[:200] except PermissionError: result["error"] = f"Permission denied reading /proc/{pid}/mem (need root)" except FileNotFoundError: result["error"] = f"Process {pid} no longer exists" except Exception as exc: result["error"] = str(exc) logger.exception("Error scanning memory for PID %d", pid) return result # ------------------------------------------------------------------ # /proc//maps parsing # ------------------------------------------------------------------ @staticmethod def _read_maps(pid: int) -> List[Dict[str, Any]]: """Parse ``/proc//maps`` and return a list of memory regions. Each dict contains ``start`` (int), ``end`` (int), ``perms`` (str), ``pathname`` (str). Raises ------ FileNotFoundError If the process does not exist. PermissionError If the caller cannot read the maps file. """ maps_path = Path(f"/proc/{pid}/maps") content = maps_path.read_text() regions: List[Dict[str, Any]] = [] for match in _MAPS_RE.finditer(content): regions.append({ "start": int(match.group(1), 16), "end": int(match.group(2), 16), "perms": match.group(3), "pathname": match.group(4).strip(), }) return regions # ------------------------------------------------------------------ # Memory reading helper # ------------------------------------------------------------------ @staticmethod def _read_region(pid: int, start: int, end: int) -> bytes: """Read bytes from ``/proc//mem`` between *start* and *end*. Returns as many bytes as could be read; silently returns partial data if parts of the region are not readable. """ mem_path = f"/proc/{pid}/mem" data = bytearray() try: fd = os.open(mem_path, os.O_RDONLY) try: os.lseek(fd, start, os.SEEK_SET) remaining = end - start while remaining > 0: chunk_size = min(_MEM_READ_CHUNK, remaining) try: chunk = os.read(fd, chunk_size) except OSError: break if not chunk: break data.extend(chunk) remaining -= len(chunk) finally: os.close(fd) except OSError: pass # region may be unmapped by the time we read return bytes(data) # ------------------------------------------------------------------ # Public scanning methods # ------------------------------------------------------------------ def scan_process_memory(self, pid: int) -> List[Dict[str, Any]]: """Scan all readable regions of a process's address space. Returns a list of dicts, one per region, containing ``start``, ``end``, ``perms``, ``pathname``, and a boolean ``has_suspicious`` flag set when default patterns are found. Raises ------ PermissionError, FileNotFoundError """ regions = self._read_maps(pid) results: List[Dict[str, Any]] = [] for region in regions: # Only read regions that are at least readable. if not region["perms"].startswith("r"): continue size = region["end"] - region["start"] if size > 50 * 1024 * 1024: continue # skip very large regions to avoid OOM data = self._read_region(pid, region["start"], region["end"]) has_suspicious = any(pat in data for pat in _DEFAULT_PATTERNS) results.append({ "start": hex(region["start"]), "end": hex(region["end"]), "perms": region["perms"], "pathname": region["pathname"], "size": size, "has_suspicious": has_suspicious, }) return results def find_injected_code(self, pid: int) -> List[Dict[str, Any]]: """Find memory regions with **RWX** (read-write-execute) permissions. Legitimate applications rarely need RWX regions. Their presence may indicate code injection, JIT shellcode, or a packed/encrypted payload that has been unpacked at runtime. Returns a list of dicts with ``start``, ``end``, ``perms``, ``pathname``, ``size``. """ regions = self._read_maps(pid) rwx: List[Dict[str, Any]] = [] for region in regions: perms = region["perms"] # RWX = positions: r(0) w(1) x(2) if len(perms) >= 3 and perms[0] == "r" and perms[1] == "w" and perms[2] == "x": size = region["end"] - region["start"] rwx.append({ "start": hex(region["start"]), "end": hex(region["end"]), "perms": perms, "pathname": region["pathname"], "size": size, "severity": "HIGH", "reason": f"RWX region ({size} bytes) — possible code injection", }) return rwx def get_memory_strings( self, pid: int, min_length: int = 6, ) -> List[str]: """Extract printable ASCII strings from readable memory regions. Parameters ---------- min_length: Minimum string length to keep. Returns a list of decoded strings (capped at 500 chars each). """ regions = self._read_maps(pid) strings: List[str] = [] printable_re = re.compile(rb"[\x20-\x7e]{%d,}" % min_length) for region in regions: if not region["perms"].startswith("r"): continue size = region["end"] - region["start"] if size > 10 * 1024 * 1024: continue # skip huge regions data = self._read_region(pid, region["start"], region["end"]) for match in printable_re.finditer(data): s = match.group().decode("ascii", errors="replace") strings.append(s[:500]) # Cap total to avoid unbounded memory usage. if len(strings) >= 10_000: return strings return strings def scan_for_patterns( self, pid: int, patterns: Optional[Sequence[bytes]] = None, ) -> List[Dict[str, Any]]: """Search process memory for specific byte patterns. Parameters ---------- patterns: Byte strings to search for. Defaults to :pydata:`_DEFAULT_PATTERNS` (mining pool URLs, stratum prefixes, shell commands). Returns a list of dicts with ``pattern``, ``region_start``, ``region_perms``, ``offset``. """ if patterns is None: patterns = _DEFAULT_PATTERNS regions = self._read_maps(pid) matches: List[Dict[str, Any]] = [] for region in regions: if not region["perms"].startswith("r"): continue size = region["end"] - region["start"] if size > 50 * 1024 * 1024: continue data = self._read_region(pid, region["start"], region["end"]) for pat in patterns: idx = data.find(pat) if idx != -1: matches.append({ "pattern": pat.decode("utf-8", errors="replace"), "region_start": hex(region["start"]), "region_perms": region["perms"], "region_pathname": region["pathname"], "offset": idx, "severity": "HIGH", "reason": f"Suspicious pattern found in memory: {pat[:60]!r}", }) return matches