"""Network scanner for AYN Antivirus. Inspects active TCP/UDP connections for traffic to known mining pools, suspicious ports, and unexpected listening services. Also audits ``/etc/resolv.conf`` for DNS hijacking indicators. """ from __future__ import annotations import logging import re from pathlib import Path from typing import Any, Dict, List, Optional import psutil from ayn_antivirus.constants import ( CRYPTO_POOL_DOMAINS, SUSPICIOUS_PORTS, ) from ayn_antivirus.scanners.base import BaseScanner logger = logging.getLogger(__name__) # Well-known system services that are *expected* to listen — extend as needed. _EXPECTED_LISTENERS = { 22: "sshd", 53: "systemd-resolved", 80: "nginx", 443: "nginx", 3306: "mysqld", 5432: "postgres", 6379: "redis-server", 8080: "java", } # Known-malicious / suspicious public DNS servers sometimes injected by # malware into resolv.conf to redirect DNS queries. _SUSPICIOUS_DNS_SERVERS = [ "8.8.4.4", # not inherently bad, but worth noting if unexpected "1.0.0.1", "208.67.222.123", "198.54.117.10", "77.88.8.7", "94.140.14.14", ] class NetworkScanner(BaseScanner): """Scan active network connections for suspicious activity. Wraps :func:`psutil.net_connections` and enriches each connection with process ownership and threat classification. """ # ------------------------------------------------------------------ # BaseScanner interface # ------------------------------------------------------------------ @property def name(self) -> str: return "network_scanner" @property def description(self) -> str: return "Inspects network connections for mining pools and suspicious ports" def scan(self, target: Any = None) -> Dict[str, Any]: """Run a full network scan. *target* is ignored — all connections are inspected. Returns ------- dict ``total``, ``suspicious``, ``unexpected_listeners``, ``dns_issues``. """ all_conns = self.get_all_connections() suspicious = self.find_suspicious_connections() listeners = self.check_listening_ports() dns = self.check_dns_queries() return { "total": len(all_conns), "suspicious": suspicious, "unexpected_listeners": listeners, "dns_issues": dns, } # ------------------------------------------------------------------ # Connection enumeration # ------------------------------------------------------------------ @staticmethod def get_all_connections() -> List[Dict[str, Any]]: """Return a snapshot of every inet connection. Each dict contains: ``fd``, ``family``, ``type``, ``local_addr``, ``remote_addr``, ``status``, ``pid``, ``process_name``. """ result: List[Dict[str, Any]] = [] try: connections = psutil.net_connections(kind="inet") except psutil.AccessDenied: logger.warning("Insufficient permissions to read network connections") return result for conn in connections: local = f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "" remote = f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else "" proc_name = "" if conn.pid: try: proc_name = psutil.Process(conn.pid).name() except (psutil.NoSuchProcess, psutil.AccessDenied): proc_name = "?" result.append({ "fd": conn.fd, "family": str(conn.family), "type": str(conn.type), "local_addr": local, "remote_addr": remote, "status": conn.status, "pid": conn.pid, "process_name": proc_name, }) return result # ------------------------------------------------------------------ # Suspicious-connection detection # ------------------------------------------------------------------ def find_suspicious_connections(self) -> List[Dict[str, Any]]: """Identify connections to known mining pools or suspicious ports. Checks remote addresses against :pydata:`constants.CRYPTO_POOL_DOMAINS` and :pydata:`constants.SUSPICIOUS_PORTS`. """ suspicious: List[Dict[str, Any]] = [] try: connections = psutil.net_connections(kind="inet") except psutil.AccessDenied: logger.warning("Insufficient permissions to read network connections") return suspicious for conn in connections: raddr = conn.raddr if not raddr: continue remote_ip = raddr.ip remote_port = raddr.port local_str = f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "?" remote_str = f"{remote_ip}:{remote_port}" proc_info = self.resolve_process_for_connection(conn) # Suspicious port. if remote_port in SUSPICIOUS_PORTS: suspicious.append({ "local_addr": local_str, "remote_addr": remote_str, "pid": conn.pid, "process": proc_info, "status": conn.status, "reason": f"Connection on known mining port {remote_port}", "severity": "HIGH", }) # Mining-pool domain (substring match on IP / hostname). for domain in CRYPTO_POOL_DOMAINS: if domain in remote_ip: suspicious.append({ "local_addr": local_str, "remote_addr": remote_str, "pid": conn.pid, "process": proc_info, "status": conn.status, "reason": f"Connection to known mining pool: {domain}", "severity": "CRITICAL", }) break return suspicious # ------------------------------------------------------------------ # Listening-port audit # ------------------------------------------------------------------ @staticmethod def check_listening_ports() -> List[Dict[str, Any]]: """Return listening sockets that are *not* in the expected-services list. Unexpected listeners may indicate a backdoor or reverse shell. """ unexpected: List[Dict[str, Any]] = [] try: connections = psutil.net_connections(kind="inet") except psutil.AccessDenied: logger.warning("Insufficient permissions to read network connections") return unexpected for conn in connections: if conn.status != "LISTEN": continue port = conn.laddr.port if conn.laddr else None if port is None: continue proc_name = "" if conn.pid: try: proc_name = psutil.Process(conn.pid).name() except (psutil.NoSuchProcess, psutil.AccessDenied): proc_name = "?" expected_name = _EXPECTED_LISTENERS.get(port) if expected_name and expected_name in proc_name: continue # known good # Skip very common ephemeral / system ports when we can't resolve. if port > 49152: continue if port not in _EXPECTED_LISTENERS: unexpected.append({ "port": port, "local_addr": f"{conn.laddr.ip}:{port}" if conn.laddr else f"?:{port}", "pid": conn.pid, "process_name": proc_name, "reason": f"Unexpected listening service on port {port}", "severity": "MEDIUM", }) return unexpected # ------------------------------------------------------------------ # Process resolution # ------------------------------------------------------------------ @staticmethod def resolve_process_for_connection(conn: Any) -> Dict[str, Any]: """Return basic process info for a ``psutil`` connection object. Returns ------- dict ``pid``, ``name``, ``cmdline``, ``username``. """ info: Dict[str, Any] = { "pid": conn.pid, "name": "", "cmdline": [], "username": "", } if not conn.pid: return info try: proc = psutil.Process(conn.pid) info["name"] = proc.name() info["cmdline"] = proc.cmdline() info["username"] = proc.username() except (psutil.NoSuchProcess, psutil.AccessDenied): pass return info # ------------------------------------------------------------------ # DNS audit # ------------------------------------------------------------------ @staticmethod def check_dns_queries() -> List[Dict[str, Any]]: """Audit ``/etc/resolv.conf`` for suspicious DNS server entries. Malware sometimes rewrites ``resolv.conf`` to redirect DNS through an attacker-controlled resolver, enabling man-in-the-middle attacks or DNS-based C2 communication. """ issues: List[Dict[str, Any]] = [] resolv_path = Path("/etc/resolv.conf") if not resolv_path.exists(): return issues try: content = resolv_path.read_text() except PermissionError: logger.warning("Cannot read /etc/resolv.conf") return issues nameserver_re = re.compile(r"^\s*nameserver\s+(\S+)", re.MULTILINE) for match in nameserver_re.finditer(content): server = match.group(1) if server in _SUSPICIOUS_DNS_SERVERS: issues.append({ "server": server, "file": str(resolv_path), "reason": f"Potentially suspicious DNS server: {server}", "severity": "MEDIUM", }) # Flag non-RFC1918 / non-loopback servers that look unusual. if not ( server.startswith("127.") or server.startswith("10.") or server.startswith("192.168.") or server.startswith("172.") or server == "::1" ): # External DNS — not inherently bad but worth logging if the # admin didn't set it intentionally. issues.append({ "server": server, "file": str(resolv_path), "reason": f"External DNS server configured: {server}", "severity": "LOW", }) return issues