198 lines
7.0 KiB
Python
198 lines
7.0 KiB
Python
"""Security tests — validate fixes for audit findings."""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 2: SQL injection in ioc_db._count()
|
|
# -----------------------------------------------------------------------
|
|
|
|
class TestIOCTableWhitelist:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db(self, tmp_path):
|
|
from ayn_antivirus.signatures.db.ioc_db import IOCDatabase
|
|
|
|
self.db = IOCDatabase(tmp_path / "test_ioc.db")
|
|
self.db.initialize()
|
|
yield
|
|
self.db.close()
|
|
|
|
def test_valid_tables(self):
|
|
for table in ("ioc_ips", "ioc_domains", "ioc_urls"):
|
|
assert self.db._count(table) >= 0
|
|
|
|
def test_injection_blocked(self):
|
|
with pytest.raises(ValueError, match="Invalid table"):
|
|
self.db._count("ioc_ips; DROP TABLE ioc_ips; --")
|
|
|
|
def test_arbitrary_table_blocked(self):
|
|
with pytest.raises(ValueError, match="Invalid table"):
|
|
self.db._count("evil_table")
|
|
|
|
def test_valid_tables_frozenset(self):
|
|
from ayn_antivirus.signatures.db.ioc_db import IOCDatabase
|
|
|
|
assert isinstance(IOCDatabase._VALID_TABLES, frozenset)
|
|
assert IOCDatabase._VALID_TABLES == {"ioc_ips", "ioc_domains", "ioc_urls"}
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 4: Quarantine ID path traversal
|
|
# -----------------------------------------------------------------------
|
|
|
|
class TestQuarantineIDValidation:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_vault(self, tmp_path):
|
|
from ayn_antivirus.quarantine.vault import QuarantineVault
|
|
|
|
self.vault = QuarantineVault(
|
|
tmp_path / "vault", tmp_path / "vault" / ".key"
|
|
)
|
|
|
|
def test_traversal_blocked(self):
|
|
with pytest.raises(ValueError, match="Invalid quarantine ID"):
|
|
self.vault._validate_qid("../../etc/passwd")
|
|
|
|
def test_too_short(self):
|
|
with pytest.raises(ValueError, match="Invalid quarantine ID"):
|
|
self.vault._validate_qid("abc")
|
|
|
|
def test_too_long(self):
|
|
with pytest.raises(ValueError, match="Invalid quarantine ID"):
|
|
self.vault._validate_qid("a" * 33)
|
|
|
|
def test_non_hex(self):
|
|
with pytest.raises(ValueError, match="Invalid quarantine ID"):
|
|
self.vault._validate_qid("GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG")
|
|
|
|
def test_uppercase_hex_rejected(self):
|
|
with pytest.raises(ValueError, match="Invalid quarantine ID"):
|
|
self.vault._validate_qid("A" * 32)
|
|
|
|
def test_valid_id(self):
|
|
assert self.vault._validate_qid("a" * 32) == "a" * 32
|
|
assert self.vault._validate_qid("0123456789abcdef" * 2) == "0123456789abcdef" * 2
|
|
|
|
def test_whitespace_stripped(self):
|
|
padded = " " + "a" * 32 + " "
|
|
assert self.vault._validate_qid(padded) == "a" * 32
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 3: Quarantine restore path traversal
|
|
# -----------------------------------------------------------------------
|
|
|
|
class TestRestorePathValidation:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_vault(self, tmp_path):
|
|
from ayn_antivirus.quarantine.vault import QuarantineVault
|
|
|
|
self.vault = QuarantineVault(
|
|
tmp_path / "vault", tmp_path / "vault" / ".key"
|
|
)
|
|
|
|
def test_etc_blocked(self):
|
|
with pytest.raises(ValueError, match="protected path"):
|
|
self.vault._validate_restore_path("/etc/shadow")
|
|
|
|
def test_usr_bin_blocked(self):
|
|
with pytest.raises(ValueError, match="protected path"):
|
|
self.vault._validate_restore_path("/usr/bin/evil")
|
|
|
|
def test_cron_blocked(self):
|
|
with pytest.raises(ValueError, match="Refusing to restore"):
|
|
self.vault._validate_restore_path("/etc/cron.d/backdoor")
|
|
|
|
def test_systemd_blocked(self):
|
|
with pytest.raises(ValueError, match="Refusing to restore"):
|
|
self.vault._validate_restore_path("/etc/systemd/system/evil.service")
|
|
|
|
def test_safe_path_allowed(self):
|
|
result = self.vault._validate_restore_path("/tmp/restored.txt")
|
|
assert result.name == "restored.txt"
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 5: Container scanner command injection
|
|
# -----------------------------------------------------------------------
|
|
|
|
class TestContainerIDSanitization:
|
|
@pytest.fixture(autouse=True)
|
|
def setup_scanner(self):
|
|
from ayn_antivirus.scanners.container_scanner import ContainerScanner
|
|
|
|
self.scanner = ContainerScanner()
|
|
|
|
def test_semicolon_injection(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("abc; rm -rf /")
|
|
|
|
def test_dollar_injection(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("$(cat /etc/shadow)")
|
|
|
|
def test_backtick_injection(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("`whoami`")
|
|
|
|
def test_pipe_injection(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("abc|cat /etc/passwd")
|
|
|
|
def test_ampersand_injection(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("abc && echo pwned")
|
|
|
|
def test_empty_rejected(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("")
|
|
|
|
def test_too_long_rejected(self):
|
|
with pytest.raises(ValueError):
|
|
self.scanner._sanitize_id("a" * 200)
|
|
|
|
def test_valid_ids(self):
|
|
assert self.scanner._sanitize_id("abc123") == "abc123"
|
|
assert self.scanner._sanitize_id("my-container") == "my-container"
|
|
assert self.scanner._sanitize_id("web_app.v2") == "web_app.v2"
|
|
assert self.scanner._sanitize_id("a1b2c3d4e5f6") == "a1b2c3d4e5f6"
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 6: Config key validation
|
|
# -----------------------------------------------------------------------
|
|
|
|
def test_config_key_whitelist_in_cli():
|
|
"""The config --set handler should reject unknown keys.
|
|
|
|
We verify by inspecting the CLI module source for the VALID_CONFIG_KEYS
|
|
set and its guard clause, since it's defined inside a Click command body.
|
|
"""
|
|
import inspect
|
|
import ayn_antivirus.cli as cli_mod
|
|
|
|
src = inspect.getsource(cli_mod)
|
|
assert "VALID_CONFIG_KEYS" in src
|
|
assert '"scan_paths"' in src
|
|
assert '"dashboard_port"' in src
|
|
# Verify the guard clause exists
|
|
assert "if key not in VALID_CONFIG_KEYS" in src
|
|
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Fix 9: API query param validation
|
|
# -----------------------------------------------------------------------
|
|
|
|
def test_safe_int_helper():
|
|
from ayn_antivirus.dashboard.api import _safe_int
|
|
|
|
assert _safe_int("50", 10) == 50
|
|
assert _safe_int("abc", 10) == 10
|
|
assert _safe_int("", 10) == 10
|
|
assert _safe_int(None, 10) == 10
|
|
assert _safe_int("-5", 10, min_val=1) == 1
|
|
assert _safe_int("9999", 10, max_val=500) == 500
|
|
assert _safe_int("0", 10, min_val=1) == 1
|