"""Spyware detector for AYN Antivirus. Scans files and system state for indicators of spyware: keyloggers, screen capture utilities, data exfiltration patterns, reverse shells, unauthorized SSH keys, and suspicious shell-profile modifications. """ from __future__ import annotations import re from pathlib import Path from typing import List, Optional from ayn_antivirus.constants import SUSPICIOUS_CRON_PATTERNS from ayn_antivirus.detectors.base import BaseDetector, DetectionResult # --------------------------------------------------------------------------- # File-content patterns # --------------------------------------------------------------------------- # Keylogger indicators. _RE_KEYLOGGER = re.compile( rb"(?:" rb"/dev/input/event\d+" rb"|xinput\s+(?:test|list)" rb"|xdotool\b" rb"|showkey\b" rb"|logkeys\b" rb"|pynput\.keyboard" rb"|keyboard\.on_press" rb"|evdev\.InputDevice" rb"|GetAsyncKeyState" rb"|SetWindowsHookEx" rb")", re.IGNORECASE, ) # Screen / audio capture. _RE_SCREEN_CAPTURE = re.compile( rb"(?:" rb"scrot\b" rb"|import\s+-window\s+root" rb"|xwd\b" rb"|ffmpeg\s+.*-f\s+x11grab" rb"|xdpyinfo" rb"|ImageGrab\.grab" rb"|screenshot" rb"|pyautogui\.screenshot" rb"|screencapture\b" rb")", re.IGNORECASE, ) _RE_AUDIO_CAPTURE = re.compile( rb"(?:" rb"arecord\b" rb"|parecord\b" rb"|ffmpeg\s+.*-f\s+(?:alsa|pulse|avfoundation)" rb"|pyaudio" rb"|sounddevice" rb")", re.IGNORECASE, ) # Data exfiltration. _RE_EXFIL = re.compile( rb"(?:" rb"curl\s+.*-[FdT]\s" rb"|curl\s+.*--upload-file" rb"|wget\s+.*--post-file" rb"|scp\s+.*@" rb"|rsync\s+.*@" rb"|nc\s+-[^\s]*\s+\d+\s*<" rb"|python[23]?\s+-m\s+http\.server" rb")", re.IGNORECASE, ) # Reverse shell. _RE_REVERSE_SHELL = re.compile( rb"(?:" rb"bash\s+-i\s+>&\s*/dev/tcp/" rb"|nc\s+-e\s+/bin/" rb"|ncat\s+.*-e\s+/bin/" rb"|socat\s+exec:" rb"|python[23]?\s+-c\s+['\"]import\s+socket" rb"|perl\s+-e\s+['\"]use\s+Socket" rb"|ruby\s+-rsocket\s+-e" rb"|php\s+-r\s+['\"].*fsockopen" rb"|mkfifo\s+/tmp/.*;\s*nc" rb"|/dev/tcp/\d+\.\d+\.\d+\.\d+" rb")", re.IGNORECASE, ) # Suspicious cron patterns (compiled from constants). _RE_CRON_PATTERNS = [ re.compile(pat.encode(), re.IGNORECASE) for pat in SUSPICIOUS_CRON_PATTERNS ] class SpywareDetector(BaseDetector): """Detect spyware indicators in files and on the host.""" # ------------------------------------------------------------------ # BaseDetector interface # ------------------------------------------------------------------ @property def name(self) -> str: return "spyware_detector" @property def description(self) -> str: return "Detects keyloggers, screen capture, data exfiltration, and reverse shells" def detect( self, file_path: str | Path, file_content: Optional[bytes] = None, file_hash: Optional[str] = None, ) -> List[DetectionResult]: file_path = Path(file_path) results: List[DetectionResult] = [] try: content = self._read_content(file_path, file_content) except OSError as exc: self._warn("Cannot read %s: %s", file_path, exc) return results # --- File-content checks --- results.extend(self._check_keylogger(file_path, content)) results.extend(self._check_screen_capture(file_path, content)) results.extend(self._check_audio_capture(file_path, content)) results.extend(self._check_exfiltration(file_path, content)) results.extend(self._check_reverse_shell(file_path, content)) results.extend(self._check_hidden_cron(file_path, content)) # --- Host-state checks (only for relevant paths) --- results.extend(self._check_authorized_keys(file_path, content)) results.extend(self._check_shell_profile(file_path, content)) return results # ------------------------------------------------------------------ # Keylogger patterns # ------------------------------------------------------------------ def _check_keylogger( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_KEYLOGGER.findall(content) if matches: samples = sorted(set(m.decode(errors="replace") for m in matches[:5])) results.append(DetectionResult( threat_name="Spyware.Keylogger", threat_type="SPYWARE", severity="CRITICAL", confidence=80, details=f"Keylogger indicators: {', '.join(samples)}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Screen capture # ------------------------------------------------------------------ def _check_screen_capture( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] if _RE_SCREEN_CAPTURE.search(content): results.append(DetectionResult( threat_name="Spyware.ScreenCapture", threat_type="SPYWARE", severity="HIGH", confidence=70, details="Screen-capture tools or API calls detected", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Audio capture # ------------------------------------------------------------------ def _check_audio_capture( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] if _RE_AUDIO_CAPTURE.search(content): results.append(DetectionResult( threat_name="Spyware.AudioCapture", threat_type="SPYWARE", severity="HIGH", confidence=65, details="Audio recording tools or API calls detected", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Data exfiltration # ------------------------------------------------------------------ def _check_exfiltration( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] matches = _RE_EXFIL.findall(content) if matches: samples = [m.decode(errors="replace")[:80] for m in matches[:3]] results.append(DetectionResult( threat_name="Spyware.DataExfiltration", threat_type="SPYWARE", severity="HIGH", confidence=70, details=f"Data exfiltration pattern(s): {'; '.join(samples)}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Reverse shell # ------------------------------------------------------------------ def _check_reverse_shell( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] match = _RE_REVERSE_SHELL.search(content) if match: results.append(DetectionResult( threat_name="Spyware.ReverseShell", threat_type="SPYWARE", severity="CRITICAL", confidence=90, details=f"Reverse shell pattern: {match.group()[:100]!r}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Hidden cron jobs # ------------------------------------------------------------------ def _check_hidden_cron( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] # Only check cron-related files. path_str = str(file_path) is_cron = any(tok in path_str for tok in ("cron", "crontab", "/var/spool/")) if not is_cron: return results for pat in _RE_CRON_PATTERNS: match = pat.search(content) if match: results.append(DetectionResult( threat_name="Spyware.Cron.SuspiciousEntry", threat_type="SPYWARE", severity="HIGH", confidence=80, details=f"Suspicious cron pattern in {file_path}: {match.group()[:80]!r}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Unauthorized SSH keys # ------------------------------------------------------------------ def _check_authorized_keys( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] if file_path.name != "authorized_keys": return results # Flag if the file exists in an unexpected location. path_str = str(file_path) if not path_str.startswith("/root/") and "/.ssh/" not in path_str: results.append(DetectionResult( threat_name="Spyware.SSH.UnauthorizedKeysFile", threat_type="SPYWARE", severity="HIGH", confidence=75, details=f"authorized_keys found in unexpected location: {file_path}", detector_name=self.name, )) # Check for suspiciously many keys. key_count = content.count(b"ssh-rsa") + content.count(b"ssh-ed25519") + content.count(b"ecdsa-sha2") if key_count > 10: results.append(DetectionResult( threat_name="Spyware.SSH.ExcessiveKeys", threat_type="SPYWARE", severity="MEDIUM", confidence=55, details=f"{key_count} SSH keys in {file_path} — possible unauthorized access", detector_name=self.name, )) # command= prefix can force a shell command on login — often abused. if b'command="' in content or b"command='" in content: results.append(DetectionResult( threat_name="Spyware.SSH.ForcedCommand", threat_type="SPYWARE", severity="MEDIUM", confidence=60, details=f"Forced command found in authorized_keys: {file_path}", detector_name=self.name, )) return results # ------------------------------------------------------------------ # Shell profile modifications # ------------------------------------------------------------------ _PROFILE_FILES = { ".bashrc", ".bash_profile", ".profile", ".zshrc", ".bash_login", ".bash_logout", } _RE_PROFILE_SUSPICIOUS = re.compile( rb"(?:" rb"curl\s+[^\n]*\|\s*(?:sh|bash)" rb"|wget\s+[^\n]*\|\s*(?:sh|bash)" rb"|/dev/tcp/" rb"|base64\s+--decode" rb"|nohup\s+.*&" rb"|eval\s+\$\(" rb"|python[23]?\s+-c\s+['\"]import\s+(?:socket|os|pty)" rb")", re.IGNORECASE, ) def _check_shell_profile( self, file_path: Path, content: bytes ) -> List[DetectionResult]: results: List[DetectionResult] = [] if file_path.name not in self._PROFILE_FILES: return results match = self._RE_PROFILE_SUSPICIOUS.search(content) if match: results.append(DetectionResult( threat_name="Spyware.ShellProfile.SuspiciousEntry", threat_type="SPYWARE", severity="CRITICAL", confidence=85, details=( f"Suspicious command in shell profile {file_path}: " f"{match.group()[:100]!r}" ), detector_name=self.name, )) return results