318 lines
12 KiB
Python
318 lines
12 KiB
Python
"""Crypto-miner detector for AYN Antivirus.
|
|
|
|
Combines file-content analysis, process inspection, and network connection
|
|
checks to detect cryptocurrency mining activity on the host.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import psutil
|
|
|
|
from ayn_antivirus.constants import (
|
|
CRYPTO_MINER_PROCESS_NAMES,
|
|
CRYPTO_POOL_DOMAINS,
|
|
HIGH_CPU_THRESHOLD,
|
|
SUSPICIOUS_PORTS,
|
|
)
|
|
from ayn_antivirus.detectors.base import BaseDetector, DetectionResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File-content patterns
|
|
# ---------------------------------------------------------------------------
|
|
_RE_STRATUM = re.compile(rb"stratum\+(?:tcp|ssl|tls)://[^\s\"']+", re.IGNORECASE)
|
|
_RE_POOL_DOMAIN = re.compile(
|
|
rb"(?:" + b"|".join(re.escape(d.encode()) for d in CRYPTO_POOL_DOMAINS) + rb")",
|
|
re.IGNORECASE,
|
|
)
|
|
_RE_ALGO_REF = re.compile(
|
|
rb"\b(?:cryptonight|randomx|ethash|kawpow|equihash|scrypt|sha256d|x11|x13|lyra2rev2|blake2s)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
_RE_MINING_CONFIG = re.compile(
|
|
rb"""["'](?:algo|pool|wallet|worker|pass|coin|url|user)["']\s*:\s*["']""",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Wallet address patterns (broad but useful).
|
|
_RE_BTC_ADDR = re.compile(rb"\b(?:1|3|bc1)[A-HJ-NP-Za-km-z1-9]{25,62}\b")
|
|
_RE_ETH_ADDR = re.compile(rb"\b0x[0-9a-fA-F]{40}\b")
|
|
_RE_XMR_ADDR = re.compile(rb"\b4[0-9AB][1-9A-HJ-NP-Za-km-z]{93}\b")
|
|
|
|
|
|
class CryptominerDetector(BaseDetector):
|
|
"""Detect cryptocurrency mining activity via files, processes, and network."""
|
|
|
|
# ------------------------------------------------------------------
|
|
# BaseDetector interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "cryptominer_detector"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Detects crypto-mining binaries, configs, processes, and network traffic"
|
|
|
|
def detect(
|
|
self,
|
|
file_path: str | Path,
|
|
file_content: Optional[bytes] = None,
|
|
file_hash: Optional[str] = None,
|
|
) -> List[DetectionResult]:
|
|
"""Analyse a file for mining indicators.
|
|
|
|
Also checks running processes and network connections for live mining
|
|
activity (these are host-wide and not specific to *file_path*, but
|
|
are included for a comprehensive picture).
|
|
"""
|
|
file_path = Path(file_path)
|
|
results: List[DetectionResult] = []
|
|
|
|
try:
|
|
content = self._read_content(file_path, file_content)
|
|
except OSError as exc:
|
|
self._warn("Cannot read %s: %s", file_path, exc)
|
|
return results
|
|
|
|
# --- File-content checks ---
|
|
results.extend(self._check_stratum_urls(file_path, content))
|
|
results.extend(self._check_pool_domains(file_path, content))
|
|
results.extend(self._check_algo_references(file_path, content))
|
|
results.extend(self._check_mining_config(file_path, content))
|
|
results.extend(self._check_wallet_addresses(file_path, content))
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# File-content checks
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_stratum_urls(
|
|
self, file_path: Path, content: bytes
|
|
) -> List[DetectionResult]:
|
|
results: List[DetectionResult] = []
|
|
matches = _RE_STRATUM.findall(content)
|
|
if matches:
|
|
urls = [m.decode(errors="replace") for m in matches[:5]]
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.Stratum.URL",
|
|
threat_type="MINER",
|
|
severity="CRITICAL",
|
|
confidence=95,
|
|
details=f"Stratum mining URL(s) found: {', '.join(urls)}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_pool_domains(
|
|
self, file_path: Path, content: bytes
|
|
) -> List[DetectionResult]:
|
|
results: List[DetectionResult] = []
|
|
matches = _RE_POOL_DOMAIN.findall(content)
|
|
if matches:
|
|
domains = sorted(set(m.decode(errors="replace") for m in matches))
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.PoolDomain",
|
|
threat_type="MINER",
|
|
severity="HIGH",
|
|
confidence=90,
|
|
details=f"Mining pool domain(s) referenced: {', '.join(domains[:5])}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_algo_references(
|
|
self, file_path: Path, content: bytes
|
|
) -> List[DetectionResult]:
|
|
results: List[DetectionResult] = []
|
|
matches = _RE_ALGO_REF.findall(content)
|
|
if matches:
|
|
algos = sorted(set(m.decode(errors="replace").lower() for m in matches))
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.AlgorithmReference",
|
|
threat_type="MINER",
|
|
severity="MEDIUM",
|
|
confidence=60,
|
|
details=f"Mining algorithm reference(s): {', '.join(algos)}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_mining_config(
|
|
self, file_path: Path, content: bytes
|
|
) -> List[DetectionResult]:
|
|
results: List[DetectionResult] = []
|
|
matches = _RE_MINING_CONFIG.findall(content)
|
|
if len(matches) >= 2:
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.ConfigFile",
|
|
threat_type="MINER",
|
|
severity="HIGH",
|
|
confidence=85,
|
|
details=(
|
|
f"File resembles a mining configuration "
|
|
f"({len(matches)} config key(s) detected)"
|
|
),
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
def _check_wallet_addresses(
|
|
self, file_path: Path, content: bytes
|
|
) -> List[DetectionResult]:
|
|
results: List[DetectionResult] = []
|
|
wallets: List[str] = []
|
|
|
|
for label, regex in [
|
|
("BTC", _RE_BTC_ADDR),
|
|
("ETH", _RE_ETH_ADDR),
|
|
("XMR", _RE_XMR_ADDR),
|
|
]:
|
|
matches = regex.findall(content)
|
|
for m in matches[:3]:
|
|
wallets.append(f"{label}:{m.decode(errors='replace')[:20]}…")
|
|
|
|
if wallets:
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.WalletAddress",
|
|
threat_type="MINER",
|
|
severity="HIGH",
|
|
confidence=70,
|
|
details=f"Cryptocurrency wallet address(es): {', '.join(wallets[:5])}",
|
|
detector_name=self.name,
|
|
))
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Process-based detection (host-wide, not file-specific)
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def find_miner_processes() -> List[DetectionResult]:
|
|
"""Scan running processes for known miner names.
|
|
|
|
This is a host-wide check and should be called independently from
|
|
the per-file ``detect()`` method.
|
|
"""
|
|
results: List[DetectionResult] = []
|
|
for proc in psutil.process_iter(["pid", "name", "cmdline", "cpu_percent"]):
|
|
try:
|
|
info = proc.info
|
|
pname = (info.get("name") or "").lower()
|
|
cmdline = " ".join(info.get("cmdline") or []).lower()
|
|
|
|
for miner in CRYPTO_MINER_PROCESS_NAMES:
|
|
if miner in pname or miner in cmdline:
|
|
results.append(DetectionResult(
|
|
threat_name=f"Miner.Process.{miner}",
|
|
threat_type="MINER",
|
|
severity="CRITICAL",
|
|
confidence=95,
|
|
details=(
|
|
f"Known miner process running: {info.get('name')} "
|
|
f"(PID {info['pid']}, CPU {info.get('cpu_percent', 0):.1f}%)"
|
|
),
|
|
detector_name="cryptominer_detector",
|
|
))
|
|
break
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# CPU analysis (host-wide)
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def find_high_cpu_processes(
|
|
threshold: float = HIGH_CPU_THRESHOLD,
|
|
) -> List[DetectionResult]:
|
|
"""Flag processes consuming CPU above *threshold* percent."""
|
|
results: List[DetectionResult] = []
|
|
for proc in psutil.process_iter(["pid", "name", "cpu_percent"]):
|
|
try:
|
|
info = proc.info
|
|
cpu = info.get("cpu_percent") or 0.0
|
|
if cpu > threshold:
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.HighCPU",
|
|
threat_type="MINER",
|
|
severity="HIGH",
|
|
confidence=55,
|
|
details=(
|
|
f"Process {info.get('name')} (PID {info['pid']}) "
|
|
f"using {cpu:.1f}% CPU"
|
|
),
|
|
detector_name="cryptominer_detector",
|
|
))
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Network detection (host-wide)
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def find_mining_connections() -> List[DetectionResult]:
|
|
"""Check active network connections for mining pool traffic."""
|
|
results: List[DetectionResult] = []
|
|
try:
|
|
connections = psutil.net_connections(kind="inet")
|
|
except psutil.AccessDenied:
|
|
logger.warning("Insufficient permissions to read network connections")
|
|
return results
|
|
|
|
for conn in connections:
|
|
raddr = conn.raddr
|
|
if not raddr:
|
|
continue
|
|
|
|
remote_ip = raddr.ip
|
|
remote_port = raddr.port
|
|
|
|
proc_name = ""
|
|
if conn.pid:
|
|
try:
|
|
proc_name = psutil.Process(conn.pid).name()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
proc_name = "?"
|
|
|
|
if remote_port in SUSPICIOUS_PORTS:
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.Network.SuspiciousPort",
|
|
threat_type="MINER",
|
|
severity="HIGH",
|
|
confidence=75,
|
|
details=(
|
|
f"Connection to port {remote_port} "
|
|
f"({remote_ip}, process={proc_name}, PID={conn.pid})"
|
|
),
|
|
detector_name="cryptominer_detector",
|
|
))
|
|
|
|
for domain in CRYPTO_POOL_DOMAINS:
|
|
if domain in remote_ip:
|
|
results.append(DetectionResult(
|
|
threat_name="Miner.Network.PoolConnection",
|
|
threat_type="MINER",
|
|
severity="CRITICAL",
|
|
confidence=95,
|
|
details=(
|
|
f"Active connection to mining pool {domain} "
|
|
f"({remote_ip}:{remote_port}, process={proc_name})"
|
|
),
|
|
detector_name="cryptominer_detector",
|
|
))
|
|
break
|
|
|
|
return results
|