Files

333 lines
11 KiB
Python

"""Process memory scanner for AYN Antivirus.
Reads ``/proc/<pid>/maps`` and ``/proc/<pid>/mem`` on Linux to search for
injected code, suspicious byte patterns (mining pool URLs, known malware
strings), and anomalous RWX memory regions.
Most operations require **root** privileges. On non-Linux systems the
scanner gracefully returns empty results.
"""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
from ayn_antivirus.constants import CRYPTO_POOL_DOMAINS
from ayn_antivirus.scanners.base import BaseScanner
logger = logging.getLogger(__name__)
# Default byte-level patterns to search for in process memory.
_DEFAULT_PATTERNS: List[bytes] = [
# Mining pool URLs
*(domain.encode() for domain in CRYPTO_POOL_DOMAINS),
# Common miner stratum strings
b"stratum+tcp://",
b"stratum+ssl://",
b"stratum2+tcp://",
# Suspicious shell commands sometimes found in injected memory
b"/bin/sh -c",
b"/bin/bash -i",
b"/dev/tcp/",
# Known malware markers
b"PAYLOAD_START",
b"x86_64-linux-gnu",
b"ELF\x02\x01\x01",
]
# Size of chunks when reading /proc/<pid>/mem.
_MEM_READ_CHUNK = 65536
# Regex to parse a single line from /proc/<pid>/maps.
# address perms offset dev inode pathname
# 7f1c2a000000-7f1c2a021000 rw-p 00000000 00:00 0 [heap]
_MAPS_RE = re.compile(
r"^([0-9a-f]+)-([0-9a-f]+)\s+(r[w-][x-][ps-])\s+\S+\s+\S+\s+\d+\s*(.*)",
re.MULTILINE,
)
class MemoryScanner(BaseScanner):
"""Scan process memory for injected code and suspicious patterns.
.. note::
This scanner only works on Linux where ``/proc`` is available.
Operations on ``/proc/<pid>/mem`` typically require root or
``CAP_SYS_PTRACE``.
"""
# ------------------------------------------------------------------
# BaseScanner interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "memory_scanner"
@property
def description(self) -> str:
return "Scans process memory for injected code and malicious patterns"
def scan(self, target: Any) -> Dict[str, Any]:
"""Scan a single process by PID.
Parameters
----------
target:
The PID (``int``) of the process to inspect.
Returns
-------
dict
``pid``, ``rwx_regions``, ``pattern_matches``, ``strings_sample``,
``error``.
"""
pid = int(target)
result: Dict[str, Any] = {
"pid": pid,
"rwx_regions": [],
"pattern_matches": [],
"strings_sample": [],
"error": None,
}
if not Path("/proc").is_dir():
result["error"] = "Not a Linux system — /proc not available"
return result
try:
result["rwx_regions"] = self.find_injected_code(pid)
result["pattern_matches"] = self.scan_for_patterns(pid, _DEFAULT_PATTERNS)
result["strings_sample"] = self.get_memory_strings(pid, min_length=8)[:200]
except PermissionError:
result["error"] = f"Permission denied reading /proc/{pid}/mem (need root)"
except FileNotFoundError:
result["error"] = f"Process {pid} no longer exists"
except Exception as exc:
result["error"] = str(exc)
logger.exception("Error scanning memory for PID %d", pid)
return result
# ------------------------------------------------------------------
# /proc/<pid>/maps parsing
# ------------------------------------------------------------------
@staticmethod
def _read_maps(pid: int) -> List[Dict[str, Any]]:
"""Parse ``/proc/<pid>/maps`` and return a list of memory regions.
Each dict contains ``start`` (int), ``end`` (int), ``perms`` (str),
``pathname`` (str).
Raises
------
FileNotFoundError
If the process does not exist.
PermissionError
If the caller cannot read the maps file.
"""
maps_path = Path(f"/proc/{pid}/maps")
content = maps_path.read_text()
regions: List[Dict[str, Any]] = []
for match in _MAPS_RE.finditer(content):
regions.append({
"start": int(match.group(1), 16),
"end": int(match.group(2), 16),
"perms": match.group(3),
"pathname": match.group(4).strip(),
})
return regions
# ------------------------------------------------------------------
# Memory reading helper
# ------------------------------------------------------------------
@staticmethod
def _read_region(pid: int, start: int, end: int) -> bytes:
"""Read bytes from ``/proc/<pid>/mem`` between *start* and *end*.
Returns as many bytes as could be read; silently returns partial
data if parts of the region are not readable.
"""
mem_path = f"/proc/{pid}/mem"
data = bytearray()
try:
fd = os.open(mem_path, os.O_RDONLY)
try:
os.lseek(fd, start, os.SEEK_SET)
remaining = end - start
while remaining > 0:
chunk_size = min(_MEM_READ_CHUNK, remaining)
try:
chunk = os.read(fd, chunk_size)
except OSError:
break
if not chunk:
break
data.extend(chunk)
remaining -= len(chunk)
finally:
os.close(fd)
except OSError:
pass # region may be unmapped by the time we read
return bytes(data)
# ------------------------------------------------------------------
# Public scanning methods
# ------------------------------------------------------------------
def scan_process_memory(self, pid: int) -> List[Dict[str, Any]]:
"""Scan all readable regions of a process's address space.
Returns a list of dicts, one per region, containing ``start``,
``end``, ``perms``, ``pathname``, and a boolean ``has_suspicious``
flag set when default patterns are found.
Raises
------
PermissionError, FileNotFoundError
"""
regions = self._read_maps(pid)
results: List[Dict[str, Any]] = []
for region in regions:
# Only read regions that are at least readable.
if not region["perms"].startswith("r"):
continue
size = region["end"] - region["start"]
if size > 50 * 1024 * 1024:
continue # skip very large regions to avoid OOM
data = self._read_region(pid, region["start"], region["end"])
has_suspicious = any(pat in data for pat in _DEFAULT_PATTERNS)
results.append({
"start": hex(region["start"]),
"end": hex(region["end"]),
"perms": region["perms"],
"pathname": region["pathname"],
"size": size,
"has_suspicious": has_suspicious,
})
return results
def find_injected_code(self, pid: int) -> List[Dict[str, Any]]:
"""Find memory regions with **RWX** (read-write-execute) permissions.
Legitimate applications rarely need RWX regions. Their presence may
indicate code injection, JIT shellcode, or a packed/encrypted payload
that has been unpacked at runtime.
Returns a list of dicts with ``start``, ``end``, ``perms``,
``pathname``, ``size``.
"""
regions = self._read_maps(pid)
rwx: List[Dict[str, Any]] = []
for region in regions:
perms = region["perms"]
# RWX = positions: r(0) w(1) x(2)
if len(perms) >= 3 and perms[0] == "r" and perms[1] == "w" and perms[2] == "x":
size = region["end"] - region["start"]
rwx.append({
"start": hex(region["start"]),
"end": hex(region["end"]),
"perms": perms,
"pathname": region["pathname"],
"size": size,
"severity": "HIGH",
"reason": f"RWX region ({size} bytes) — possible code injection",
})
return rwx
def get_memory_strings(
self,
pid: int,
min_length: int = 6,
) -> List[str]:
"""Extract printable ASCII strings from readable memory regions.
Parameters
----------
min_length:
Minimum string length to keep.
Returns a list of decoded strings (capped at 500 chars each).
"""
regions = self._read_maps(pid)
strings: List[str] = []
printable_re = re.compile(rb"[\x20-\x7e]{%d,}" % min_length)
for region in regions:
if not region["perms"].startswith("r"):
continue
size = region["end"] - region["start"]
if size > 10 * 1024 * 1024:
continue # skip huge regions
data = self._read_region(pid, region["start"], region["end"])
for match in printable_re.finditer(data):
s = match.group().decode("ascii", errors="replace")
strings.append(s[:500])
# Cap total to avoid unbounded memory usage.
if len(strings) >= 10_000:
return strings
return strings
def scan_for_patterns(
self,
pid: int,
patterns: Optional[Sequence[bytes]] = None,
) -> List[Dict[str, Any]]:
"""Search process memory for specific byte patterns.
Parameters
----------
patterns:
Byte strings to search for. Defaults to
:pydata:`_DEFAULT_PATTERNS` (mining pool URLs, stratum prefixes,
shell commands).
Returns a list of dicts with ``pattern``, ``region_start``,
``region_perms``, ``offset``.
"""
if patterns is None:
patterns = _DEFAULT_PATTERNS
regions = self._read_maps(pid)
matches: List[Dict[str, Any]] = []
for region in regions:
if not region["perms"].startswith("r"):
continue
size = region["end"] - region["start"]
if size > 50 * 1024 * 1024:
continue
data = self._read_region(pid, region["start"], region["end"])
for pat in patterns:
idx = data.find(pat)
if idx != -1:
matches.append({
"pattern": pat.decode("utf-8", errors="replace"),
"region_start": hex(region["start"]),
"region_perms": region["perms"],
"region_pathname": region["pathname"],
"offset": idx,
"severity": "HIGH",
"reason": f"Suspicious pattern found in memory: {pat[:60]!r}",
})
return matches