remove infra.md.example, infra.md is the source of truth

This commit is contained in:
Azreen Jamal
2026-03-03 03:06:13 +08:00
parent 1ad3033cc1
commit a3c6d09350
86 changed files with 17093 additions and 39 deletions

View File

@@ -0,0 +1,436 @@
"""Heuristic detector for AYN Antivirus.
Uses statistical and pattern-based analysis to flag files that *look*
malicious even when no signature or YARA rule matches. Checks include
Shannon entropy (packed/encrypted binaries), suspicious string patterns,
obfuscation indicators, ELF anomalies, and permission/location red flags.
"""
from __future__ import annotations
import logging
import math
import re
import stat
from collections import Counter
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
from ayn_antivirus.constants import SUSPICIOUS_EXTENSIONS
from ayn_antivirus.detectors.base import BaseDetector, DetectionResult
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
_HIGH_ENTROPY_THRESHOLD = 7.5 # bits per byte — likely packed / encrypted
_CHR_CHAIN_MIN = 6 # minimum chr()/\xNN sequence length
_B64_MIN_LENGTH = 40 # minimum base64 blob considered suspicious
# ---------------------------------------------------------------------------
# Compiled regexes (built once at import time)
# ---------------------------------------------------------------------------
_RE_BASE64_BLOB = re.compile(
rb"(?:(?:[A-Za-z0-9+/]{4}){10,})(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?"
)
_RE_EVAL_EXEC = re.compile(rb"\b(?:eval|exec|compile)\s*\(", re.IGNORECASE)
_RE_SYSTEM_CALL = re.compile(
rb"\b(?:os\.system|subprocess\.(?:call|run|Popen)|commands\.getoutput)\s*\(",
re.IGNORECASE,
)
_RE_REVERSE_SHELL = re.compile(
rb"(?:/dev/tcp/|bash\s+-i\s+>&|nc\s+-[elp]|ncat\s+-|socat\s+|python[23]?\s+-c\s+['\"]import\s+socket)",
re.IGNORECASE,
)
_RE_WGET_CURL_PIPE = re.compile(
rb"(?:wget|curl)\s+[^\n]*\|\s*(?:sh|bash|python|perl)", re.IGNORECASE
)
_RE_ENCODED_PS = re.compile(
rb"-(?:enc(?:odedcommand)?|e|ec)\s+[A-Za-z0-9+/=]{20,}", re.IGNORECASE
)
_RE_CHR_CHAIN = re.compile(
rb"(?:chr\s*\(\s*\d+\s*\)\s*[\.\+]\s*){" + str(_CHR_CHAIN_MIN).encode() + rb",}",
re.IGNORECASE,
)
_RE_HEX_STRING = re.compile(
rb"(?:\\x[0-9a-fA-F]{2}){8,}"
)
_RE_STRING_CONCAT = re.compile(
rb"""(?:["'][^"']{1,4}["']\s*[\+\.]\s*){6,}""",
)
# UPX magic at the beginning of packed sections.
_UPX_MAGIC = b"UPX!"
# System directories where world-writable or SUID files are suspicious.
_SYSTEM_DIRS = {"/usr/bin", "/usr/sbin", "/bin", "/sbin", "/usr/local/bin", "/usr/local/sbin"}
# Locations where hidden files are suspicious.
_SUSPICIOUS_HIDDEN_DIRS = {"/tmp", "/var/tmp", "/dev/shm", "/var/www", "/srv"}
class HeuristicDetector(BaseDetector):
"""Flag files that exhibit suspicious characteristics without a known signature."""
# ------------------------------------------------------------------
# BaseDetector interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "heuristic_detector"
@property
def description(self) -> str:
return "Statistical and pattern-based heuristic analysis"
def detect(
self,
file_path: str | Path,
file_content: Optional[bytes] = None,
file_hash: Optional[str] = None,
) -> List[DetectionResult]:
file_path = Path(file_path)
results: List[DetectionResult] = []
try:
content = self._read_content(file_path, file_content)
except OSError as exc:
self._warn("Cannot read %s: %s", file_path, exc)
return results
# --- Entropy analysis ---
results.extend(self._check_entropy(file_path, content))
# --- Suspicious string patterns ---
results.extend(self._check_suspicious_strings(file_path, content))
# --- Obfuscation indicators ---
results.extend(self._check_obfuscation(file_path, content))
# --- ELF anomalies ---
results.extend(self._check_elf_anomalies(file_path, content))
# --- Permission / location anomalies ---
results.extend(self._check_permission_anomalies(file_path))
# --- Hidden files in suspicious locations ---
results.extend(self._check_hidden_files(file_path))
# --- Recently modified system files ---
results.extend(self._check_recent_system_modification(file_path))
return results
# ------------------------------------------------------------------
# Entropy
# ------------------------------------------------------------------
@staticmethod
def calculate_entropy(data: bytes) -> float:
"""Calculate Shannon entropy (bits per byte) of *data*.
Returns a value between 0.0 (uniform) and 8.0 (maximum randomness).
"""
if not data:
return 0.0
length = len(data)
freq = Counter(data)
entropy = 0.0
for count in freq.values():
p = count / length
if p > 0:
entropy -= p * math.log2(p)
return entropy
def _check_entropy(
self, file_path: Path, content: bytes
) -> List[DetectionResult]:
results: List[DetectionResult] = []
if len(content) < 256:
return results # too short for meaningful entropy
entropy = self.calculate_entropy(content)
if entropy > _HIGH_ENTROPY_THRESHOLD:
results.append(DetectionResult(
threat_name="Heuristic.Packed.HighEntropy",
threat_type="MALWARE",
severity="MEDIUM",
confidence=65,
details=(
f"File entropy {entropy:.2f} bits/byte exceeds threshold "
f"({_HIGH_ENTROPY_THRESHOLD}) — likely packed or encrypted"
),
detector_name=self.name,
))
return results
# ------------------------------------------------------------------
# Suspicious strings
# ------------------------------------------------------------------
def _check_suspicious_strings(
self, file_path: Path, content: bytes
) -> List[DetectionResult]:
results: List[DetectionResult] = []
# Base64-encoded payloads.
b64_blobs = _RE_BASE64_BLOB.findall(content)
long_blobs = [b for b in b64_blobs if len(b) >= _B64_MIN_LENGTH]
if long_blobs:
results.append(DetectionResult(
threat_name="Heuristic.Obfuscation.Base64Payload",
threat_type="MALWARE",
severity="MEDIUM",
confidence=55,
details=f"Found {len(long_blobs)} large base64-encoded blob(s)",
detector_name=self.name,
))
# eval / exec / compile calls.
if _RE_EVAL_EXEC.search(content):
results.append(DetectionResult(
threat_name="Heuristic.Suspicious.DynamicExecution",
threat_type="MALWARE",
severity="MEDIUM",
confidence=50,
details="File uses eval()/exec()/compile() — possible code injection",
detector_name=self.name,
))
# os.system / subprocess calls.
if _RE_SYSTEM_CALL.search(content):
results.append(DetectionResult(
threat_name="Heuristic.Suspicious.SystemCall",
threat_type="MALWARE",
severity="MEDIUM",
confidence=45,
details="File invokes system commands via os.system/subprocess",
detector_name=self.name,
))
# Reverse shell patterns.
match = _RE_REVERSE_SHELL.search(content)
if match:
results.append(DetectionResult(
threat_name="Heuristic.ReverseShell",
threat_type="MALWARE",
severity="CRITICAL",
confidence=85,
details=f"Reverse shell pattern detected: {match.group()[:80]!r}",
detector_name=self.name,
))
# wget/curl piped to sh/bash.
if _RE_WGET_CURL_PIPE.search(content):
results.append(DetectionResult(
threat_name="Heuristic.Dropper.PipeToShell",
threat_type="MALWARE",
severity="HIGH",
confidence=80,
details="File downloads and pipes directly to a shell interpreter",
detector_name=self.name,
))
# Encoded PowerShell command.
if _RE_ENCODED_PS.search(content):
results.append(DetectionResult(
threat_name="Heuristic.PowerShell.EncodedCommand",
threat_type="MALWARE",
severity="HIGH",
confidence=75,
details="Encoded PowerShell command detected",
detector_name=self.name,
))
return results
# ------------------------------------------------------------------
# Obfuscation
# ------------------------------------------------------------------
def _check_obfuscation(
self, file_path: Path, content: bytes
) -> List[DetectionResult]:
results: List[DetectionResult] = []
# chr() chains.
if _RE_CHR_CHAIN.search(content):
results.append(DetectionResult(
threat_name="Heuristic.Obfuscation.ChrChain",
threat_type="MALWARE",
severity="MEDIUM",
confidence=60,
details="Obfuscation via long chr() concatenation chain",
detector_name=self.name,
))
# Hex-encoded byte strings.
hex_matches = _RE_HEX_STRING.findall(content)
if len(hex_matches) > 3:
results.append(DetectionResult(
threat_name="Heuristic.Obfuscation.HexStrings",
threat_type="MALWARE",
severity="MEDIUM",
confidence=55,
details=f"Multiple hex-encoded strings detected ({len(hex_matches)} occurrences)",
detector_name=self.name,
))
# Excessive string concatenation.
if _RE_STRING_CONCAT.search(content):
results.append(DetectionResult(
threat_name="Heuristic.Obfuscation.StringConcat",
threat_type="MALWARE",
severity="LOW",
confidence=40,
details="Excessive short-string concatenation — possible obfuscation",
detector_name=self.name,
))
return results
# ------------------------------------------------------------------
# ELF anomalies
# ------------------------------------------------------------------
def _check_elf_anomalies(
self, file_path: Path, content: bytes
) -> List[DetectionResult]:
results: List[DetectionResult] = []
if not content[:4] == b"\x7fELF":
return results
# UPX packed.
if _UPX_MAGIC in content[:4096]:
results.append(DetectionResult(
threat_name="Heuristic.Packed.UPX",
threat_type="MALWARE",
severity="MEDIUM",
confidence=60,
details="ELF binary is UPX-packed",
detector_name=self.name,
))
# Stripped binary in unusual location.
path_str = str(file_path)
is_in_system = any(path_str.startswith(d) for d in _SYSTEM_DIRS)
if not is_in_system:
# Non-system ELF — more suspicious if stripped (no .symtab).
if b".symtab" not in content and b".debug" not in content:
results.append(DetectionResult(
threat_name="Heuristic.ELF.StrippedNonSystem",
threat_type="MALWARE",
severity="LOW",
confidence=35,
details="Stripped ELF binary found outside standard system directories",
detector_name=self.name,
))
return results
# ------------------------------------------------------------------
# Permission anomalies
# ------------------------------------------------------------------
def _check_permission_anomalies(
self, file_path: Path
) -> List[DetectionResult]:
results: List[DetectionResult] = []
try:
st = file_path.stat()
except OSError:
return results
mode = st.st_mode
path_str = str(file_path)
# World-writable file in a system directory.
is_in_system = any(path_str.startswith(d) for d in _SYSTEM_DIRS)
if is_in_system and (mode & stat.S_IWOTH):
results.append(DetectionResult(
threat_name="Heuristic.Permissions.WorldWritableSystem",
threat_type="MALWARE",
severity="HIGH",
confidence=70,
details=f"World-writable file in system directory: {file_path}",
detector_name=self.name,
))
# SUID/SGID on unusual files.
is_suid = bool(mode & stat.S_ISUID)
is_sgid = bool(mode & stat.S_ISGID)
if (is_suid or is_sgid) and not is_in_system:
flag = "SUID" if is_suid else "SGID"
results.append(DetectionResult(
threat_name=f"Heuristic.Permissions.{flag}NonSystem",
threat_type="MALWARE",
severity="HIGH",
confidence=75,
details=f"{flag} bit set on file outside system directories: {file_path}",
detector_name=self.name,
))
return results
# ------------------------------------------------------------------
# Hidden files in suspicious locations
# ------------------------------------------------------------------
def _check_hidden_files(
self, file_path: Path
) -> List[DetectionResult]:
results: List[DetectionResult] = []
if not file_path.name.startswith("."):
return results
path_str = str(file_path)
for sus_dir in _SUSPICIOUS_HIDDEN_DIRS:
if path_str.startswith(sus_dir):
results.append(DetectionResult(
threat_name="Heuristic.HiddenFile.SuspiciousLocation",
threat_type="MALWARE",
severity="MEDIUM",
confidence=50,
details=f"Hidden file in suspicious directory: {file_path}",
detector_name=self.name,
))
break
return results
# ------------------------------------------------------------------
# Recently modified system files
# ------------------------------------------------------------------
def _check_recent_system_modification(
self, file_path: Path
) -> List[DetectionResult]:
results: List[DetectionResult] = []
path_str = str(file_path)
is_in_system = any(path_str.startswith(d) for d in _SYSTEM_DIRS)
if not is_in_system:
return results
try:
mtime = datetime.utcfromtimestamp(file_path.stat().st_mtime)
except OSError:
return results
if datetime.utcnow() - mtime < timedelta(hours=24):
results.append(DetectionResult(
threat_name="Heuristic.SystemFile.RecentlyModified",
threat_type="MALWARE",
severity="MEDIUM",
confidence=45,
details=(
f"System file modified within the last 24 hours: "
f"{file_path} (mtime: {mtime.isoformat()})"
),
detector_name=self.name,
))
return results