"""Crypto-miner detector for AYN Antivirus. Combines file-content analysis, process inspection, and network connection checks to detect cryptocurrency mining activity on the host. """ from __future__ import annotations import logging import re from pathlib import Path from typing import List, Optional import psutil from ayn_antivirus.constants import ( CRYPTO_MINER_PROCESS_NAMES, CRYPTO_POOL_DOMAINS, HIGH_CPU_THRESHOLD, SUSPICIOUS_PORTS, ) from ayn_antivirus.detectors.base import BaseDetector, DetectionResult logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # File-content patterns # --------------------------------------------------------------------------- _RE_STRATUM = re.compile(rb"stratum\+(?:tcp|ssl|tls)://[^\s\"']+", re.IGNORECASE) _RE_POOL_DOMAIN = re.compile( rb"(?:" + b"|".join(re.escape(d.encode()) for d in CRYPTO_POOL_DOMAINS) + rb")", re.IGNORECASE, ) _RE_ALGO_REF = re.compile( rb"\b(?:cryptonight|randomx|ethash|kawpow|equihash|scrypt|sha256d|x11|x13|lyra2rev2|blake2s)\b", re.IGNORECASE, ) _RE_MINING_CONFIG = re.compile( rb"""["'](?:algo|pool|wallet|worker|pass|coin|url|user)["']\s*:\s*["']""", re.IGNORECASE, ) # Wallet address patterns (broad but useful). _RE_BTC_ADDR = re.compile(rb"\b(?:1|3|bc1)[A-HJ-NP-Za-km-z1-9]{25,62}\b") _RE_ETH_ADDR = re.compile(rb"\b0x[0-9a-fA-F]{40}\b") _RE_XMR_ADDR = re.compile(rb"\b4[0-9AB][1-9A-HJ-NP-Za-km-z]{93}\b") class CryptominerDetector(BaseDetector): """Detect cryptocurrency mining activity via files, processes, and network.""" # ------------------------------------------------------------------ # BaseDetector interface # ------------------------------------------------------------------ @property def name(self) -> str: return "cryptominer_detector" @property def description(self) -> str: return "Detects crypto-mining binaries, configs, processes, and network traffic" def detect( self, file_path: str | Path, file_content: Optional[bytes] = None, file_hash: Optional[str] = None, ) -> List[DetectionResult]: """Analyse a file for mining indicators. Also checks running processes and network connections for live mining activity (these are host-wide and not specific to *file_path*, but are included for a comprehensive picture). """ file_path = Path(file_path) results: List[DetectionResult] = [] try: content = self._read_content(file_path, file_content) except OSError as exc: self._warn("Cannot read %s: %s", file_path, exc) return results # --- File-content checks --- results.extend(self._check_stratum_urls(file_path, content)) results.extend(self._check_pool_domains(file_path, content)) results.extend(self._check_algo_references(file_path, content)) results.extend(self._check_mining_config(file_path, content)) results.extend(self._check_wallet_addresses(file_path, content)) return results # ------------------------------------------------------------------ # File-content checks # ------------------------------------------------------------------ def _check_stratum_urls( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_STRATUM.findall(content) if matches: urls = [m.decode(errors="replace") for m in matches[:5]] results.append(DetectionResult( threat_name="Miner.Stratum.URL", threat_type="MINER", severity="CRITICAL", confidence=95, details=f"Stratum mining URL(s) found: {', '.join(urls)}", detector_name=self.name, )) return results def _check_pool_domains( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_POOL_DOMAIN.findall(content) if matches: domains = sorted(set(m.decode(errors="replace") for m in matches)) results.append(DetectionResult( threat_name="Miner.PoolDomain", threat_type="MINER", severity="HIGH", confidence=90, details=f"Mining pool domain(s) referenced: {', '.join(domains[:5])}", detector_name=self.name, )) return results def _check_algo_references( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_ALGO_REF.findall(content) if matches: algos = sorted(set(m.decode(errors="replace").lower() for m in matches)) results.append(DetectionResult( threat_name="Miner.AlgorithmReference", threat_type="MINER", severity="MEDIUM", confidence=60, details=f"Mining algorithm reference(s): {', '.join(algos)}", detector_name=self.name, )) return results def _check_mining_config( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_MINING_CONFIG.findall(content) if len(matches) >= 2: results.append(DetectionResult( threat_name="Miner.ConfigFile", threat_type="MINER", severity="HIGH", confidence=85, details=( f"File resembles a mining configuration " f"({len(matches)} config key(s) detected)" ), detector_name=self.name, )) return results def _check_wallet_addresses( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] wallets: List[str] = [] for label, regex in [ ("BTC", _RE_BTC_ADDR), ("ETH", _RE_ETH_ADDR), ("XMR", _RE_XMR_ADDR), ]: matches = regex.findall(content) for m in matches[:3]: wallets.append(f"{label}:{m.decode(errors='replace')[:20]}…") if wallets: results.append(DetectionResult( threat_name="Miner.WalletAddress", threat_type="MINER", severity="HIGH", confidence=70, details=f"Cryptocurrency wallet address(es): {', '.join(wallets[:5])}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Process-based detection (host-wide, not file-specific) # ------------------------------------------------------------------ @staticmethod def find_miner_processes() -> List[DetectionResult]: """Scan running processes for known miner names. This is a host-wide check and should be called independently from the per-file ``detect()`` method. """ results: List[DetectionResult] = [] for proc in psutil.process_iter(["pid", "name", "cmdline", "cpu_percent"]): try: info = proc.info pname = (info.get("name") or "").lower() cmdline = " ".join(info.get("cmdline") or []).lower() for miner in CRYPTO_MINER_PROCESS_NAMES: if miner in pname or miner in cmdline: results.append(DetectionResult( threat_name=f"Miner.Process.{miner}", threat_type="MINER", severity="CRITICAL", confidence=95, details=( f"Known miner process running: {info.get('name')} " f"(PID {info['pid']}, CPU {info.get('cpu_percent', 0):.1f}%)" ), detector_name="cryptominer_detector", )) break except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return results # ------------------------------------------------------------------ # CPU analysis (host-wide) # ------------------------------------------------------------------ @staticmethod def find_high_cpu_processes( threshold: float = HIGH_CPU_THRESHOLD, ) -> List[DetectionResult]: """Flag processes consuming CPU above *threshold* percent.""" results: List[DetectionResult] = [] for proc in psutil.process_iter(["pid", "name", "cpu_percent"]): try: info = proc.info cpu = info.get("cpu_percent") or 0.0 if cpu > threshold: results.append(DetectionResult( threat_name="Miner.HighCPU", threat_type="MINER", severity="HIGH", confidence=55, details=( f"Process {info.get('name')} (PID {info['pid']}) " f"using {cpu:.1f}% CPU" ), detector_name="cryptominer_detector", )) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return results # ------------------------------------------------------------------ # Network detection (host-wide) # ------------------------------------------------------------------ @staticmethod def find_mining_connections() -> List[DetectionResult]: """Check active network connections for mining pool traffic.""" results: List[DetectionResult] = [] try: connections = psutil.net_connections(kind="inet") except psutil.AccessDenied: logger.warning("Insufficient permissions to read network connections") return results for conn in connections: raddr = conn.raddr if not raddr: continue remote_ip = raddr.ip remote_port = raddr.port proc_name = "" if conn.pid: try: proc_name = psutil.Process(conn.pid).name() except (psutil.NoSuchProcess, psutil.AccessDenied): proc_name = "?" if remote_port in SUSPICIOUS_PORTS: results.append(DetectionResult( threat_name="Miner.Network.SuspiciousPort", threat_type="MINER", severity="HIGH", confidence=75, details=( f"Connection to port {remote_port} " f"({remote_ip}, process={proc_name}, PID={conn.pid})" ), detector_name="cryptominer_detector", )) for domain in CRYPTO_POOL_DOMAINS: if domain in remote_ip: results.append(DetectionResult( threat_name="Miner.Network.PoolConnection", threat_type="MINER", severity="CRITICAL", confidence=95, details=( f"Active connection to mining pool {domain} " f"({remote_ip}:{remote_port}, process={proc_name})" ), detector_name="cryptominer_detector", )) break return results