329 lines
11 KiB
Python
329 lines
11 KiB
Python
"""Network scanner for AYN Antivirus.
|
|
|
|
Inspects active TCP/UDP connections for traffic to known mining pools,
|
|
suspicious ports, and unexpected listening services. Also audits
|
|
``/etc/resolv.conf`` for DNS hijacking indicators.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import psutil
|
|
|
|
from ayn_antivirus.constants import (
|
|
CRYPTO_POOL_DOMAINS,
|
|
SUSPICIOUS_PORTS,
|
|
)
|
|
from ayn_antivirus.scanners.base import BaseScanner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Well-known system services that are *expected* to listen — extend as needed.
|
|
_EXPECTED_LISTENERS = {
|
|
22: "sshd",
|
|
53: "systemd-resolved",
|
|
80: "nginx",
|
|
443: "nginx",
|
|
3306: "mysqld",
|
|
5432: "postgres",
|
|
6379: "redis-server",
|
|
8080: "java",
|
|
}
|
|
|
|
# Known-malicious / suspicious public DNS servers sometimes injected by
|
|
# malware into resolv.conf to redirect DNS queries.
|
|
_SUSPICIOUS_DNS_SERVERS = [
|
|
"8.8.4.4", # not inherently bad, but worth noting if unexpected
|
|
"1.0.0.1",
|
|
"208.67.222.123",
|
|
"198.54.117.10",
|
|
"77.88.8.7",
|
|
"94.140.14.14",
|
|
]
|
|
|
|
|
|
class NetworkScanner(BaseScanner):
|
|
"""Scan active network connections for suspicious activity.
|
|
|
|
Wraps :func:`psutil.net_connections` and enriches each connection with
|
|
process ownership and threat classification.
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# BaseScanner interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "network_scanner"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Inspects network connections for mining pools and suspicious ports"
|
|
|
|
def scan(self, target: Any = None) -> Dict[str, Any]:
|
|
"""Run a full network scan.
|
|
|
|
*target* is ignored — all connections are inspected.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
``total``, ``suspicious``, ``unexpected_listeners``, ``dns_issues``.
|
|
"""
|
|
all_conns = self.get_all_connections()
|
|
suspicious = self.find_suspicious_connections()
|
|
listeners = self.check_listening_ports()
|
|
dns = self.check_dns_queries()
|
|
|
|
return {
|
|
"total": len(all_conns),
|
|
"suspicious": suspicious,
|
|
"unexpected_listeners": listeners,
|
|
"dns_issues": dns,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Connection enumeration
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def get_all_connections() -> List[Dict[str, Any]]:
|
|
"""Return a snapshot of every inet connection.
|
|
|
|
Each dict contains: ``fd``, ``family``, ``type``, ``local_addr``,
|
|
``remote_addr``, ``status``, ``pid``, ``process_name``.
|
|
"""
|
|
result: List[Dict[str, Any]] = []
|
|
try:
|
|
connections = psutil.net_connections(kind="inet")
|
|
except psutil.AccessDenied:
|
|
logger.warning("Insufficient permissions to read network connections")
|
|
return result
|
|
|
|
for conn in connections:
|
|
local = f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else ""
|
|
remote = f"{conn.raddr.ip}:{conn.raddr.port}" if conn.raddr else ""
|
|
|
|
proc_name = ""
|
|
if conn.pid:
|
|
try:
|
|
proc_name = psutil.Process(conn.pid).name()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
proc_name = "?"
|
|
|
|
result.append({
|
|
"fd": conn.fd,
|
|
"family": str(conn.family),
|
|
"type": str(conn.type),
|
|
"local_addr": local,
|
|
"remote_addr": remote,
|
|
"status": conn.status,
|
|
"pid": conn.pid,
|
|
"process_name": proc_name,
|
|
})
|
|
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Suspicious-connection detection
|
|
# ------------------------------------------------------------------
|
|
|
|
def find_suspicious_connections(self) -> List[Dict[str, Any]]:
|
|
"""Identify connections to known mining pools or suspicious ports.
|
|
|
|
Checks remote addresses against :pydata:`constants.CRYPTO_POOL_DOMAINS`
|
|
and :pydata:`constants.SUSPICIOUS_PORTS`.
|
|
"""
|
|
suspicious: List[Dict[str, Any]] = []
|
|
|
|
try:
|
|
connections = psutil.net_connections(kind="inet")
|
|
except psutil.AccessDenied:
|
|
logger.warning("Insufficient permissions to read network connections")
|
|
return suspicious
|
|
|
|
for conn in connections:
|
|
raddr = conn.raddr
|
|
if not raddr:
|
|
continue
|
|
|
|
remote_ip = raddr.ip
|
|
remote_port = raddr.port
|
|
local_str = f"{conn.laddr.ip}:{conn.laddr.port}" if conn.laddr else "?"
|
|
remote_str = f"{remote_ip}:{remote_port}"
|
|
|
|
proc_info = self.resolve_process_for_connection(conn)
|
|
|
|
# Suspicious port.
|
|
if remote_port in SUSPICIOUS_PORTS:
|
|
suspicious.append({
|
|
"local_addr": local_str,
|
|
"remote_addr": remote_str,
|
|
"pid": conn.pid,
|
|
"process": proc_info,
|
|
"status": conn.status,
|
|
"reason": f"Connection on known mining port {remote_port}",
|
|
"severity": "HIGH",
|
|
})
|
|
|
|
# Mining-pool domain (substring match on IP / hostname).
|
|
for domain in CRYPTO_POOL_DOMAINS:
|
|
if domain in remote_ip:
|
|
suspicious.append({
|
|
"local_addr": local_str,
|
|
"remote_addr": remote_str,
|
|
"pid": conn.pid,
|
|
"process": proc_info,
|
|
"status": conn.status,
|
|
"reason": f"Connection to known mining pool: {domain}",
|
|
"severity": "CRITICAL",
|
|
})
|
|
break
|
|
|
|
return suspicious
|
|
|
|
# ------------------------------------------------------------------
|
|
# Listening-port audit
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def check_listening_ports() -> List[Dict[str, Any]]:
|
|
"""Return listening sockets that are *not* in the expected-services list.
|
|
|
|
Unexpected listeners may indicate a backdoor or reverse shell.
|
|
"""
|
|
unexpected: List[Dict[str, Any]] = []
|
|
|
|
try:
|
|
connections = psutil.net_connections(kind="inet")
|
|
except psutil.AccessDenied:
|
|
logger.warning("Insufficient permissions to read network connections")
|
|
return unexpected
|
|
|
|
for conn in connections:
|
|
if conn.status != "LISTEN":
|
|
continue
|
|
|
|
port = conn.laddr.port if conn.laddr else None
|
|
if port is None:
|
|
continue
|
|
|
|
proc_name = ""
|
|
if conn.pid:
|
|
try:
|
|
proc_name = psutil.Process(conn.pid).name()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
proc_name = "?"
|
|
|
|
expected_name = _EXPECTED_LISTENERS.get(port)
|
|
if expected_name and expected_name in proc_name:
|
|
continue # known good
|
|
|
|
# Skip very common ephemeral / system ports when we can't resolve.
|
|
if port > 49152:
|
|
continue
|
|
|
|
if port not in _EXPECTED_LISTENERS:
|
|
unexpected.append({
|
|
"port": port,
|
|
"local_addr": f"{conn.laddr.ip}:{port}" if conn.laddr else f"?:{port}",
|
|
"pid": conn.pid,
|
|
"process_name": proc_name,
|
|
"reason": f"Unexpected listening service on port {port}",
|
|
"severity": "MEDIUM",
|
|
})
|
|
|
|
return unexpected
|
|
|
|
# ------------------------------------------------------------------
|
|
# Process resolution
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def resolve_process_for_connection(conn: Any) -> Dict[str, Any]:
|
|
"""Return basic process info for a ``psutil`` connection object.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
``pid``, ``name``, ``cmdline``, ``username``.
|
|
"""
|
|
info: Dict[str, Any] = {
|
|
"pid": conn.pid,
|
|
"name": "",
|
|
"cmdline": [],
|
|
"username": "",
|
|
}
|
|
if not conn.pid:
|
|
return info
|
|
|
|
try:
|
|
proc = psutil.Process(conn.pid)
|
|
info["name"] = proc.name()
|
|
info["cmdline"] = proc.cmdline()
|
|
info["username"] = proc.username()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
return info
|
|
|
|
# ------------------------------------------------------------------
|
|
# DNS audit
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def check_dns_queries() -> List[Dict[str, Any]]:
|
|
"""Audit ``/etc/resolv.conf`` for suspicious DNS server entries.
|
|
|
|
Malware sometimes rewrites ``resolv.conf`` to redirect DNS through an
|
|
attacker-controlled resolver, enabling man-in-the-middle attacks or
|
|
DNS-based C2 communication.
|
|
"""
|
|
issues: List[Dict[str, Any]] = []
|
|
resolv_path = Path("/etc/resolv.conf")
|
|
|
|
if not resolv_path.exists():
|
|
return issues
|
|
|
|
try:
|
|
content = resolv_path.read_text()
|
|
except PermissionError:
|
|
logger.warning("Cannot read /etc/resolv.conf")
|
|
return issues
|
|
|
|
nameserver_re = re.compile(r"^\s*nameserver\s+(\S+)", re.MULTILINE)
|
|
for match in nameserver_re.finditer(content):
|
|
server = match.group(1)
|
|
|
|
if server in _SUSPICIOUS_DNS_SERVERS:
|
|
issues.append({
|
|
"server": server,
|
|
"file": str(resolv_path),
|
|
"reason": f"Potentially suspicious DNS server: {server}",
|
|
"severity": "MEDIUM",
|
|
})
|
|
|
|
# Flag non-RFC1918 / non-loopback servers that look unusual.
|
|
if not (
|
|
server.startswith("127.")
|
|
or server.startswith("10.")
|
|
or server.startswith("192.168.")
|
|
or server.startswith("172.")
|
|
or server == "::1"
|
|
):
|
|
# External DNS — not inherently bad but worth logging if the
|
|
# admin didn't set it intentionally.
|
|
issues.append({
|
|
"server": server,
|
|
"file": str(resolv_path),
|
|
"reason": f"External DNS server configured: {server}",
|
|
"severity": "LOW",
|
|
})
|
|
|
|
return issues
|